python实现mqtt发布订阅消息(windows)
一、消息队列服务器
这里我用到activemq-5.16.2,可到官网下载 http://activemq.apache.org/
1. 若遇到点击apache-activemq-5.16.2\bin\activemq.bat 出现闪退,64位系统请点击apache-activemq-5.16.2\bin\win64\activemq.bat,启动mqtt服务器
2. ActiveMQ mqtt默认端口为1883,测试服务器ip为192.168.1.103
3.在apache-activemq-5.16.2\conf\activemq.xml中,在
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="UserName1" password="PassWord1" groups="users"/> <authenticationUser username="UserName2" password="PassWord2" groups="users"/> <authenticationUser username="test2" password="654321" groups="users,admins"/> users> simpleAuthenticationPlugin> plugins>
如下图
二、mqtt通信过程图如下,接下来用python程序,模拟客户端1和客户端2的通信过程
三、代码
封装Mqtt客户端
import paho.mqtt.client as mqtt import logging class MqttClient(mqtt.Client): def initClient(self, mqttServer, mqttPort, username, password, timeout=10000): logging.basicConfig(level=logging.DEBUG) self.mqttServer = mqttServer self.mqttPort = mqttPort self.username_pw_set(username, password=password) self.connect(self.mqttServer, self.mqttPort, timeout) # keeplive仅为10000秒 self.on_connect = self.on_connect def getClient(self): return self.client def on_connect(self, client, userdata, flags, rc): linkAddr = client.mqttServer + ":" + str(client.mqttPort) if rc == 0: logging.info("与mqtt服务器:" + linkAddr + "连接成功") elif rc == 1: logging.error("协议版本错误") elif rc == 2: logging.error("无效的客户端标识") elif rc == 3: logging.error("服务端无法使用") elif rc == 4: logging.error("与mqtt服务器连接失败: 错误的用户名或密码 ") elif rc == 5: logging.error("登录用户未经授权 ") else: logging.error("与mqtt服务器:%s 连接返回异常结果:%s " % (linkAddr, str(rc))) def on_subscribe(self, client, userdata, mid, granted_qos): logging.info("订阅成功: " + str(mid) + " " + str(granted_qos)) def on_publish(self, client, userdata, mid): logging.info("OnPublish, mid: " + str(mid))
客户端1代码
#!/usr/bin/python import datetime import logging import time from mqttService.mqttClient import MqttClient # ------------------客户端1--------------------# # 订阅主题 devTopic = '/devices/dev1' responseDevTopic = '/7.0/dev1' # 接收客户端2响应信息 def on_message(client, userdata, message): curtime = datetime.datetime.now() strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S") logging.info("%s: 接收 %s 响应信息:主题:%s 内容:%s" % ( strcurtime, mqttServer + ":" + str(mqttPort), message.topic, str(message.payload, encoding="utf-8"))) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) username = "UserName1" password = "PassWord1" # 服务器地址 mqttServer = "192.168.1.103" # 通信端口 mqttPort = 1883 client = MqttClient() client.initClient(mqttServer, mqttPort, username, password) client.subscribe(responseDevTopic, qos=0) # 订阅主题 client.on_message = on_message msg = 'hello world' for i in range(1000): client.publish(devTopic, payload=msg, qos=0) # 发布信息 time.sleep(4) client.loop_forever() # 持续连接
客户端2代码
#!/usr/bin/python import datetime import logging from mqttService.mqttClient import MqttClient # ------------------客户端2------------------- # 订阅主题 devTopic = '/devices/#' # 发布主题 responseDevTopicPrefix = '/7.0/' def on_message(client, userdata, message): curtime = datetime.datetime.now() strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S") recvMsg = message.payload # 获取发送设备的客户端1标识 topicArr = str(message.topic).split("/") topicArr = [item for item in filter(lambda x: x != '', topicArr)] # 去除空串 deviceId = topicArr[1] logging.info("%s: 接收硬件装置 %s 信息:主题: %s 内容: %s" % (strcurtime, deviceId, message.topic, recvMsg)) # 发送响应回客户端1 client.publish(responseDevTopicPrefix + deviceId, payload="收到数据" + recvMsg, qos=0) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) mqttAddr = '192.168.1.103' mqttPort = 1883 username = "UserName2" password = "PassWord2" # 与客户端1通信 client = MqttClient() client.initClient(mqttAddr, mqttPort, username, password) client.subscribe(devTopic, qos=0) client.on_message = on_message client.loop_start() # 开始监听 # 阻塞主程序 while True: pass