MQTT

MQTT ( Message Queuing Telemetry Transport).它是一种轻量级的发布-订阅网络协议,用于在设备之间传输消息。 MQTT 专为硬件资源有限或网络带宽有限的远程位置连接而设计,非常适合 machine-to-machine (M2M) 通信和物联网 (IoT) 应用。

历史:

Andy Stanford-Clark (IBM ) 和 Arlen Nipper(当时在Eurotech, Inc.工作)于 1999 年编写了该协议的第一个版本。 它用于监控SCADA工业控制系统内的石油管道。目标是建立一个带宽高效、重量轻且消耗很少电池电量的协议,因为这些设备是通过卫星链路连接的,而卫星链路在当时是极其昂贵的。

从历史上看,“MQTT”中的“MQ”来自IBM MQ(当时的“MQSeries”)产品线,代表“消息队列”,其实他并没有队列。

涉及几个版本: v3.1, 3.1.1 ,5.0。应用比较广泛的是3.1.1版本。MQTT-SN(传感器网络的 MQTT)是主要协议的变体,针对非 TCP/IP 网络上的电池供电嵌入式设备,例如Zigbee。

from wiki  https://en.wikipedia.org/wiki/MQTT#cite_note-6

 

优势及缺点

1. 优势

  • 效率和低带宽使用:MQTT消息小且需要最小的带宽,使得该协议成为具有有限处理能力的IoT设备和带宽受限网络环境的理想选择。
  • 轻量级协议:其简单性和低代码占用使其易于在具有有限内存和处理能力的设备上实施,例如微控制器和小型传感器。
  • 解耦通信:发布-订阅模型允许设备和服务器之间进行解耦通信。发布者和订阅者不需要彼此了解,提高了系统的可扩展性和灵活性。
  • 服务质量等级:MQTT支持三个级别的服务质量(QoS)进行消息传递,使用户可以根据应用程序要求选择更快的传递但可靠性较低,或者传递速度较慢但保证传递。
  • 保留消息:MQTT代理可以保留主题上的消息,确保新订阅者立即收到最后发布的消息,即使在他们订阅之前就已发送。
  • Last Will 和 Testament (LWT)功能:如果设备意外断开连接,允许设备发布消息到指定的主题,使得设备断开连接的监控和通知更加方便。
  • 安全性:支持使用SSL/TLS进行消息加密的安全通信,除了应用层安全实践,如身份验证和授权。

2. 缺点

  • 依赖代理:中心代理架构可能成为失败的单一点。如果代理关闭,整个消息系统将受到影响。
  • 安全开销:虽然MQTT支持SSL/TLS,但实施这些安全措施可能会增加复杂性和开销,特别是在非常受限的设备上。
  • 服务质量开销:更高的QoS级别(1和2)确保消息传递,但引入了额外的开销,这可能影响性能,特别是在受限网络上。
  • 消息载荷:MQTT不强制执行任何载荷格式,这意味着消息的解释取决于发布者和订阅者之间的协议。这种灵活性很强大,但如果不仔细管理,也可能导致不一致。
  • 代理性能和可扩展性:MQTT系统的性能和可扩展性可能严重依赖于代理的能力。大量的消息或大量的客户端需要一个健壮、配置良好的代理。

其中MQTT的payload格式是没有要求的, 可以是二进制, 名为字符串, json/xml或者自定义个格式, 对于业务的哦通讯协议要双方协定好。

 

使用场景

1. 物联网(IoT)

MQTT 在 IoT 中得到广泛使用,用于将各种设备、传感器和执行器连接到互联网或其他网络。这使得可以远程监控和控制设备,如智能家电、环境传感器和工业机器。

  • 智能家居:远程控制照明、供暖、安全系统和其他家用电器。
  • 农业:监控土壤湿度和温度传感器,以优化灌溉系统。

2. 工业自动化

在工业环境中,MQTT 促进了生产线、机器和系统的实时监控和控制。它使预测性维护、运营效率和工厂楼层不同系统的集成成为可能。

  • 制造业:连接机器以跟踪生产指标、运行状态,并进行预测性维护。
  • 能源管理:实时监控和控制能源消耗,以优化使用并降低成本。

3. 遥测

MQTT 最初是为遥测设计的,允许从各种来源高效地传输指标、测量和状态数据到服务器或云平台进行分析。

  • 车辆遥测:实时收集车辆数据,用于车队管理、预测性维护和驾驶员行为分析。
  • 环境监测:从远程传感器传输空气质量、水位和其他环境因素的数据。

4. 医疗保健

在医疗保健中,MQTT 可用于患者监控系统,将可穿戴设备和医疗设备连接到医疗提供商,以实时共享数据、分析和警报。

  • 远程患者监控:从可穿戴设备发送患者生命体征到医疗专业人员进行持续监控。
  • 资产跟踪:监控医疗设施内医疗设备的位置和状态。

5. 零售和供应链

MQTT 使得可以实时跟踪货物,管理库存,并通过连接的设备和系统提升客户体验。

  • 库存管理:实时监控库存水平,进行自动订购,减少短缺或过度库存。
  • 供应链可见性:跟踪货运和物流运营以提高效率和透明度。

6. 智能城市

MQTT 支持智能城市解决方案的开发,包括交通管理、公共交通和公用事业服务,提高效率、安全性和市民体验。

  • 交通管理:实时分析交通流量,优化信号时间,减少拥堵。
  • 公共交通:监控公共汽车和火车的位置和状态,用于实时乘客信息系统。

 

技术组成的部分和关键点

  1. MQTT客户端:
  • 发布消息给代理或订阅接收消息的设备或软件应用。客户端可以是小型传感器、移动设备或服务器应用。

2. MQTT代理:(BROKER)

  • 一个接收所有客户端消息,然后将这些消息路由到适当的接收客户端的服务器。代理负责管理客户端的会话和订阅。

3. 主题:(TOPIC)

  • 代理用来为每个连接的客户端过滤消息的字符串。主题由一个或多个主题级别组成,由斜杠分隔。客户端可以订阅特定主题,或使用通配符一次订阅多个主题。

4. 消息:

  • 在客户端和代理之间发送的数据。MQTT中的消息是轻量级的,可以包含任何格式的数据,二进制、文本或其他。

 

MQTT的关键点

  1. 发布/订阅模型:
  • MQTT使用解耦的发布/订阅消息模式,提供一对多消息分发和应用程序的解耦。

2. 服务质量(QoS)级别:

  • MQTT为消息传递定义了三个QoS级别:
    • QoS 0(最多一次):最大努力交付,无需确认。
    • QoS 1(至少一次):确保消息至少被传递一次,可能有重复。
    • QoS 2(仅一次):保证每条消息只被对方接收一次。

3. 轻量级协议:

  • 为低带宽和不稳定的网络设计,适合计算资源和功率有限的物联网设备。

4. 保留消息:

  • 代理可以在主题上保留消息,任何订阅该主题的客户端都会立即接收到保留的消息,即使该消息是在订阅之前发布的。

5. Last Will and Testament(LWT):

  • 允许客户端在不正常断开连接时,发布最后一条消息到指定的主题,这可以用来通知其他客户端断开连接的情况。

6. 安全性:

  • 虽然MQTT本身没有定义任何安全机制,但通常会在TLS/SSL上运行,以实现在互联网上的安全通信。身份验证可以通过传统的用户名/密码机制或更高级的方法,如客户端证书,来实现。

7.会话持久性:

  • MQTT支持持久会话,这意味着客户端可以订阅主题并断开连接。当它使用相同的客户端ID重新连接时,它可以接收到所有在它断开连接时发布的消息。

在使用nginx代理通过clientid hash完成会话保持,就是为了支持这个特性,mqtt中使用 clean Session这个标识位,为true每个连接断开时清理所有订阅和未发送消息, false则需要保留

Figure 3.4 – Connect Flag bits
Bit 7 6 5 4 3 2 1 0
User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
byte 8 X X X X X X X 0

Clean Session Flag

  • 在连接到 MQTT 代理时,客户端会设置一个标志 Clean Session。这个标志告诉代理是否应该保持会话状态。
  • 如果 Clean Session 设置为 true,那么代理会在客户端断开连接时清除所有的订阅和未发送的消息,每次连接都是一个全新的开始。
  • 如果 Clean Session 设置为 false,代理则需要保持这些状态信息,直到客户端再次以相同的 ClientID 连接。这允许客户端即使在断开连接后也能恢复其会话,包括它的订阅和那些在断开期间到达的消息(取决于消息的服务质量 QoS 级别)

from chatgpt

一些实现

  1. MQTT代理:

MQTT协议中的中心枢纽是代理,负责将消息分发给客户端。MQTT代理的示例包括:

  • Mosquitto: 一个开源的MQTT代理,轻量级且适用于从低功耗单板计算机到全服务器的所有设备。
  • RabbitMQ: RabbitMQ是一种更全面的消息代理解决方案,支持多种消息协议,包括作为插件的MQTT。
  • AWS IoT Core: 一个托管的云服务,支持MQTT等协议,提供IoT设备与云之间的安全双向通信。

2. 客户端库:

为不同的编程语言提供了各种客户端库,使设备能够轻松实现MQTT进行通信。一些值得注意的例子包括:

  • Paho: 一个Eclipse项目,提供可扩展和强大的客户端库,使用各种语言,如Python,Java和C++。
  • MQTT.js: 一种适用于Node.js的流行MQTT客户端库,适用于服务器端和基于浏览器的应用程序。

3. IoT平台:

许多IoT平台将MQTT作为主要通信方式,因为它适合IoT应用。例子包括:

  • ThingSpeak: 一个IoT平台,提供使用MQTT从传感器收集、分析和采取行动的应用程序和API。
  • Home Assistant: 一个开源的家庭自动化平台,使用MQTT等协议集成各种IoT设备。

4. MQTT服务(云):

  • AWS IoT Core
    • 一种托管的云服务,让连接的设备可以轻松且安全地与云应用程序和其他设备进行交互。
    • 支持MQTT,HTTP和WebSockets,提供安全的设备连接和数据加密,并与其他AWS服务集成以进行处理和存储。
  • Microsoft Azure IoT Hub
    • 一种托管在云中的服务,作为IoT应用程序和其管理的设备之间双向通信的中央消息中心。
    • 支持MQTT,AMQP和HTTP,设备到云和云到设备的消息,并包括每个设备的身份验证,内置的声明性消息路由和与Azure服务的集成。
  • Google Cloud IoT Core
    • 一种完全托管的服务,可让您轻松且安全地连接,管理并从数以百万计的全球分散的设备中获取数据。
    • 与Google Cloud服务集成,支持MQTT和HTTP协议,并使用TLS上的非对称密钥认证提供端到端的安全性。

 

协议标准

  • 3.1.1  http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
  • 5.0  http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
  • 5.0 报文说明
    • 01 https://www.emqx.com/en/blog/mqtt-5-0-control-packets-01-connect-connack#sample-packets
    • 02 https://www.emqx.com/en/blog/mqtt-5-0-control-packets-02-publish-puback#sample-packets
    • 03 https://www.emqx.com/en/blog/mqtt-5-0-control-packets-03-subscribe-unsubscribe
    • 04 https://www.emqx.com/en/blog/mqtt-5-0-control-packets-04-pingreq-pingresp
    • 05 https://www.emqx.com/en/blog/mqtt-5-0-control-packets-05-disconnect
    • 06 https://www.emqx.com/en/blog/mqtt-5-0-control-packets-06-auth

mqtt规范属于通讯规范, 没有业务属性,其中长度类型 Variable Byte Integer,类似IC卡中tlv结构中长度的定义通过最高位判断长度使用字节。

nginx与MQTT

nginx仅作代理, 可以通过解析connect 报文中的clientid 完成会话保持,让clientid指向相同的broker,其中报文解析nginx的 NGX_STREAM_PREREAD_PHASE 中完成的。

NGINX Plus for the IoT: Load Balancing MQTT

Optimizing MQTT Deployments in Enterprise Environments with NGINX Plus

苦于是商业版本,所以自己写了个模块试验了一下

https://github.com/weida/nginx_mqtt_preread_module

可以按照说明编一个nginx版本。

 

测试

测试完成本地订阅发布

sudo yum install mosquitto
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

mosquitto_sub -h localhost -t test/topic
mosquitto_pub -h localhost -t test/topic -m "Hello MQTT"

通过nginx代理配置端口 2833

不过测试中发现个问题v5.0版本,clientid用程序发送时是空的,

mosquitto_pub -h localhost -p 2883 -t "test/topic" -u "garlic" -P "a" -m "testwill" --will-payload "caowei" --will-topic "test/topic"  --will-qos "2" -V mqttv311
mosquitto_pub -h localhost -p 2883 -t "test/topic" -u "garlic" -P "a" -m "testwill" --will-payload "caowei" --will-topic "test/topic"  --will-qos "2" -V mqttv31
mosquitto_pub -h localhost -p 2883 -t "test/topic" -u "garlic" -P "a" -m "testwill" --will-payload "caowei" --will-topic "test/topic"  --will-qos "2" -V mqttv5
127.0.0.1 [17/Mar/2024:17:40:34 +0800] clientid: [mosq-BSeQOKRbRkdnw5Nc7y]  username: [garlic] password: [a] protcol:  [MQTT]  version: [4]
127.0.0.1 [17/Mar/2024:17:41:03 +0800] clientid: [mosq-eo12KZjlxPBcbGpesL]  username: [garlic] password: [a] protcol:  [MQIsdp]  version: [3]
127.0.0.1 [17/Mar/2024:17:41:24 +0800] clientid: []  username: [garlic] password: [a] protcol:  [MQTT]  version: [5]

 

手工指定一下就可以了

mosquitto_pub -h localhost -p 2883 -t "test/topic" -u "garlic" -P "a" -m "testwill" --will-payload "caowei" --will-topic "test/topic"  --will-qos "2" -V mqttv5 -i "garlic"
。。。
127.0.0.1 [17/Mar/2024:17:43:23 +0800] clientid: [garlic]  username: [garlic] password: [a] protcol:  [MQTT]  version: [5]

 

通讯方式

mqtt使用长连接方式,减少了建链和断链的开销,ibm CICS中间件也是这个选取这种方式,当使用CTG网关链接时可以配置连接时长,长时间没有链接会断开重连。

mqtt 使用 Keep Alive机制,使用一个时间间隔,客户端在这个时间间隔内如果没有向服务器发送任何控制包(如 PUBLISH、SUBSCRIBE 等),就必须发送一个 PINGREQ 消息。服务器收到 PINGREQ 后,应回复一个 PINGRESP。如果客户端在设定的时间内未收到 PINGRESP,通常会认为连接已断开,并可能尝试重新连接。

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe("some/topic")

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# 设置 Keep Alive 为 60 秒
client.connect("mqtt.example.com", 1883, 60)

client.loop_forever()

 

图片及引用

from chatgpt

Comments are closed.