python 使用mqtt

安装

mqtt库

pip3 install paho-mqtt

安装mqtt代理

Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,跨平台且轻量级,所以选择这款作为代理软件。

//安装mosquitto
apt-get install mosquitto

//查看mosquitto服务状态
service mosquitto status

//开启/停止
service mosquitto start
service mosquitto stop

demo1

实现简单的连接订阅功能

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
client.subscribe('xzw', qos=0)
client.loop_forever() # 保持连接
  • loop:心跳函数,用来保持客户端与服务器的连接。比如keepalive参数为60秒,那么60秒内必须loop()一下或者发布一下消息,不然连接会断,就无法继续发布或者接受消息。
  • loop_start()是启用一个进程保持loop()的重复调用,就不需要定期心跳了,对应的有loop_stop()
  • loop_forever()用来保持无穷阻塞调用loop()

测试

使用mqtt测试工具mqttfx测试

39

demo2

把mqtt封装成一个类

import paho.mqtt.client as mqtt
import threading
import time

class mqtt_thread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.client = mqtt.Client()
        self.client.on_connect = on_connect
        self.client.on_message = on_message
        self.client.connect('127.0.0.1', 1883, 600)  # 600为keepalive的时间间隔
        
    def run(self):
        self.client.loop_forever()
        
    def publish(self, topic, msg,qos,retain):
        self.client.publish(topic, msg, qos, retain)

def on_connect(client, userdata, flags, rc):
    client.subscribe('xzw', qos=0)
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))
    
    
if __name__ == '__main__':
    mqttc = mqtt_thread()
    mqttc.start()
    while True:
        mqttc.publish("xzw", "Hello xiazhongwei",0, False)
        time.sleep(1.5)

  • loop_start() 和 loop_forever()断开会自动重连,在on_connect订阅,确保重连后自动订阅。

测试

40

扩展

这是之前项目写过的代码

import paho.mqtt.client as mqtt
import threading
import json
import time
import numpy as np

au_result={}
uid=""
#0- 保持 1- 开启  2 -关闭
state=0
url = ""

class alg_api(threading.Thread):
    def __init__(self,mqtt_addr,id):
        threading.Thread.__init__(self)
        self.addr = mqtt_addr
        global uid
        uid = id

    def run(self):
        self.client = mqtt.Client()
        self.client.on_connect = on_connect
        self.client.on_message = on_message
        addr_tmp=self.addr[6:]
        ip_port=addr_tmp.split(":")
        self.client.connect(ip_port[0], int(ip_port[1]), 600)  # 600为keepalive的时间间隔

        self.client.loop_forever()

    def publish(self, topic, msg, qos, retain):
        self.client.publish(topic, msg, qos, retain)

    def get_state(self):
        return state

    def set_state(self, value):
        global state
        state = value
        print("set state",state,value)

    def get_url(self):
        return url

def on_connect(client, userdata, flags, rc):
    client.subscribe('audio_control', qos=2)
    print("Connected with result code: " + str(rc))


def on_message(client, userdata, msg):
    try:
        #print(msg.topic + " " + str(msg.payload))
        decode_string = json.loads(msg.payload)
        if msg.topic == "audio_control" and decode_string["uid"]==uid:
            global state
            global url
            if decode_string["state"]:
                url = decode_string["url"]
                state = 1

            else:
                state = 2

    except Exception as e:
        print("%s", e)
        
        
# json序列化类
class MyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, time):
            return obj.__str__()
        else:
            return super(MyEncoder, self).default(obj)

发送json格式的数据

aidio_result={
	"uid": uid,
	"audioString": "[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]"
     }

api.publish("audio_result", json.dumps(aidio_result, ensure_ascii=False, indent=4, cls=MyEncoder), 0, False)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×