MQTT介绍与使用

[Python] MQTT介绍与使用

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

特点

  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接,提供有序,无损,双向连接。
  • 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
  • 消息QoS支持,可靠传输保证

原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)

  • (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在这里插入图片描述

MQTT代理

mosquitto是一款开源的MQTT消息代理(服务器)软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。

  • 安装mosquitto
1
sudo apt-get install mosquitto -y
  • 安装mosquitto命令行客户端
1
sudo apt-get install mosquitto-clients -y

使用

发布使用mosquitto_pub命令,订阅使用mosquitto_sub命令,常用参数如下

参数 描述
-h 服务器主机,默认localhost
-t 指定主题
-u 用户名
-P 密码
-i 客户端id,唯一
-m 发布的消息内容

发布

1
mosquitto_sub -h localhost -t "test/#" -u user2 -P 123456 -i "client1"

订阅

1
mosquitto_pub -h localhost -t "test/abc" -u user1 -P 123456 -i "client3" -m "How are you?"

MQTT使用

MQTT 客户端工具

MQTT 客户端工具常用于建立与 MQTT 服务器 的连接,进行主题订阅、消息收发等操作

  • MQTT Explorer 是一个全面且易于使用的 MQTT 客户端,是目前比较流行的 MQTT 桌面测试客户端之一,基于它提供有关 MQTT Topics 的结构化预览展示,并使其在对 MQTT Broker 上的设备/服务的使用变得非常简单。目前基于 CC BY-NC-ND 4.0 协议开源,用户可随意查看源码和使用。

    explorer

    项目地址: Github MQTT-Explorer

    下载地址: https://mqtt-explorer.com/

Python

使用 paho-mqtt 库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能

  • 使用connect()/connect_async() 连接MQTT代理
  • 频繁的调用loop()来维持与MQTT代理之间的流量或者使用loop_start()来设置一个线程为你调用loop()或者在一个阻塞的函数中调用loop_forever()来为你调用loop()
  • 使用subscribe()订阅一个主题(topic)并接受消息(messages)
  • 使用publish()来发送消息
  • 使用disconnect()来断开与MQTT代理的连接

详细见paho-mqtt项目描述

回调 Callbacks

  • on_connect 当代理响应连接请求时调用
  • on_disconnect 当与代理断开连接时调用
  • on_message 当收到关于客户订阅的主题的消息时调用。
  • on_publish 当使用publish()发送的消息已经传输到代理时被调用
  • on_subscribe 当代理响应订阅请求时被调用。
  • on_unsubscribe 当代理响应取消订阅请求时调用
  • on_log 当客户端有日志信息时调用

Client

  • 构造函数
    Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
参数 含义
client_id 连接到代理时使用的唯一客户端ID字符串。 如果client_id长度为零或无,则会随机生成一个。 在这种情况下,clean_session参数必须为True。
clean_session 一个决定客户端类型的布尔值。 如果为True,那么代理将在其断开连接时删除有关此客户端的所有信息。 如果为False,则客户端是持久客户端,当客户端断开连接时,订阅信息和排队消息将被保留。
userdata 用户定义的任何类型的数据作为userdata参数传递给回调函数。 它可能会在稍后使用user_data_set()函数进行更新。
protocol 用于此客户端的MQTT协议的版本。 可以是MQTTv31或MQTTv311或MQTT v5.0。
transport 设置为“websockets”通过WebSockets发送MQTT。 保留默认的“tcp”使用原始TCP。
  • reinitialise

    reinitialise(client_id="", clean_session=True, userdata=None)

    reinitialise()函数将客户端重置为其开始状态,就像它刚刚创建一样。 它采用与Client()构造函数相同的参数。

  • connect
    将客户端连接到代理。 这是一个阻塞函数。

    connect(host, port=1883, keepalive=60, bind_address="")

参数 含义
host 远程代理的主机名或IP地址
port 要连接的服务器主机的网络端口。 默认为1883
keepalive 与代理通信之间允许的最长时间段,如果没有其他消息正在交换,则它将控制客户端向代理发送ping消息的速率
bind_address 假设存在多个接口,将绑定此客户端的本地网络接口的IP地址

Publish

从客户端发送消息给代理
publish(topic, payload=None, qos=0, retain=False)

消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端

参数 含义
topic 该消息发布的主题
payload 要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载
qos 服务的质量级别
retain 如果设置为True,则该消息将被设置为该主题的“最后已知良好”/保留的消息

返回以下属性和方法的MQTTMessageInfo
rc:发布的结果

内容 含义
MQTT_ERR_SUCCESS 成功
MQTT_ERR_NO_CONN 客户端当前未连接
MQTT_ERR_QUEUE_SIZE 当使用max_queued_messages_set来指示消息既不排队也不发送。

mid:发布请求的消息ID。
如果mid已定义,则可以通过检查on_publish()回调中的mid来跟踪发布请求。
wait_for_publish():函数将阻塞,直到消息发布。 如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError。
is_published:如果消息已发布,is_published返回True。 如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE),它将引发ValueError。

如果主题为无,长度为零或无效(包含通配符),qos不是0,1或2之一,或者有效负载长度大于268435455字节,则会引发ValueError。

Subscribe

订阅一个或多个主题

subscribe(topic, qos=0

参数
topic 一个字符串,指定要订阅的订阅主题
qos 期望的服务质量等级。 默认为0。

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import paho.mqtt.client as mqtt
import json

CONN_CONFIG = {
'product': {
'host': 'mqtt.broker.com',
'port': 1883,
'username': 'username',
'password': 'password'
},
'dev': {
'host': '127.0.0.1',
'port': 1883,
'username': 'username',
'password': 'password'
}
}

class MQTTClient:
def __init__(self, client_id, profile='dev', ):
self.config = CONN_CONFIG[profile]
self.client_id = client_id
self.connect_flag = False
self.client = self.__create_client(client_id)
self.__connect()

def __create_client(self, client_id):
client = mqtt.Client(client_id=client_id)
client.username_pw_set(self.config['username'], self.config['password'])
client.on_connect = self.__on_connect
client.on_message = self.__on_message
client.on_publish = self.__on_publish
client.on_disconnect = self.__on_disconnect
return client

def __connect(self):
self.client.connect(self.config['host'], self.config['port'], 60)
self.client.loop_start()

def publish(self, msg):
if not self.connect_flag:
self.__connect()
payload = dict(data=msg)
self.client.publish(f'topic', payload=json.dumps(payload), qos=0)

def __on_connect(self, client, userdata, flags, rc):
self.connect_flag = True
print("connected with result code: " + str(rc))

def __on_message(self, client, userdata, msg):
print(msg.topic + " " + str(msg.payload))

def __on_subscribe(self, client, userdata, mid, granted_qos):
print('on subscribe: mid=' + str(mid))

def __on_publish(self, client, userdata, mid):
print("on publish: mid=" + str(mid))

def __on_disconnect(self, client, userdata, rc):
self.connect_flag = False
print('disconnecting reason ' + str(rc))

def __del__(self):
self.client.disconnect()

参考

MQTT 入门介绍
MQTT中文网
ubuntu下Mosquitto安装及配置
常见MQTT 客户端工具比较
Python paho-mqtt 模块使用和API分析