MQTT
2020-05-18 13:55:51 7 举报
AI智能生成
MQTT
作者其他创作
大纲/内容
简介
MQ优点
任务异步处理
应用程序解耦合
消峰
MQTT优点
精简,发布/订阅(Pub/Sub)模式为中心
基于MQTT协议,报文头小,提高传输效率,有效荷载为二进制
Topic层级主题/可通配符,灵活多种QOS,回调信息丰富
Server基于C语言,Client多语言适配,各种设备,支持WebSockets
有遗嘱消息,保留消息
基本结构
Broker:消息topic订阅树管理,及数据的分发
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,订阅MQ的消息。
基础功能
客户端配置
连接参数
BrokerUrl
服务端地址IP:Port/域名
ClientId
客户端ID--是全局唯一值
UserName
用户名
Passwork
密码
AutomaticReconnect
是否自动重连 true:自动、false:不自动
HttpsHostnameVerificationEnabled
是否开启Https验证,在V1.2.1版本出现
KeepAliveInterval
心跳间隔时间 服务端最大检测时间 KeepAliveInterval*1.5
MaxInflight
最大并发数/消息最大积累量
CleanSession
是否清楚缓存信息 true:清理、false:不清理
Will
遗嘱信息
SocketFactory
添加SSL配置信息
心跳机制
心跳创建
MqttAsyncClient:客户端创建的时候将心跳的定时器及心跳内容创建完毕
心跳定时器:PingSender
TimerPingSender:默认
ScheduledExecutorPingSender
心跳内容:PingCommand
new MqttPingReq()
MESSAGE_TYPE_PINGREQ = 12
主要是在CommsReceiver接受MqttConnect的ACK才开始启动定时器检测是否发送心跳
MqttAck
MqttConnack
pingSender.start()
触发机制
心跳的触发主要是若在KeepAliveInterval的时间内有相应的信息接受和发送则不会触发心跳,若无则会发送心跳信息PINGREQ = 12
定时器定时检查活性
pingOutstanding:是否已经发送过
> 0 :已经发送,未回来
lastInboundActivity超时
shutdownConnection:32000
= 0 :未发送心跳
lastOutboundActivity 信息太大或网络问题,超时未发出去. >2*keepAlive
shutdownConnection:32002
lastInboundActivity和lastOutboundActivity在keepAlive时间段内接收和发送都没有,则发送心跳消息
tokenStore.saveToken(token, pingCommand);
keepAlive期间有信息的接受和发送 lastInboundActivity、lastOutboundActivity,则不触发心跳
keepAlive期间无信息接受和发送,则触发心跳
pingOutstanding
发出:+1
接收:-1
持久化方式
persistence
MqttDefaultFilePersistence(默认)
默认本地文件持久化
MemoryPersistence
内存持久化
STL/SSL
无认证
无需认证信息,只需要账号密码认证
单向认证
客户端只需要提供 ca.crt证书
双向认证
客户端需提供
server.crt
server.key
ca.crt
根据提供的地址URI创建正确网络模块的工厂方法
服务端配置
常规配置
allow_zero_length_clientid
true
则代理将为客户端分配客户端ID
false
将断开连接长度为零的客户端ID的客户端
auto_id_prefix
设置前缀:默认'auto-'
per_listener_settings
false
true
每个listener都需要配置参数
check_retain_source
true
broker会检查消息的发布者是否还有权限,若有发送保留信息,若无则不发保留信息
false
broker不会检查消息发布者的是否还有权限,直接发送
max_inflight_bytes
对于Qos1/2,表示最大正在处理的字节数。默认为0。(无最大值),一般不设置
max_inflight_messages
同时传输消息的最大数,QOS>0
max_keepalive
仅适用于MQTT v5客户端。允许的最大值为65535。请勿设置为10以下,单位秒
max_packet_size
仅适用于MQTT v5客户端。最大数据包大小,单位:字节
注:超过设置值,broker会主动断开与该client的连接。
注:超过设置值,broker会主动断开与该client的连接。
max_queued_bytes
最大消息队列的数量,超过这被丢弃,默认0,无最大值
max_queued_messages
超过max_inflight_messages的消息会被缓存到队列中。本项设置队列的大小。默认100
memory_limit
设置代理将分配的最大堆内存字节数,因此对代理使用的内存设置硬限制。默认无限制
message_size_limit
允许的最大发布有效负载大小,缺省值为0,表示接受所有有效的MQTT消息。 MQTT的最大有效负载大小为268435455字节(256M)
persistent_client_expiration
客户端断开超过该时间,broker会删除该会话的信息,客户端将接收不到任何它断开之后到重连这段时间内未收到的消息。
非标准选项,默认设置是永不使持久客户端过期
非标准选项,默认设置是永不使持久客户端过期
pid_file
将进程ID写入文件。默认值为空字符串,这意味着不应写入pid文件。
queue_qos0_messages
设置为true在持久客户端断开连接时将具有QoS 0的消息排队。这些消息包含在max_queued_messages施加的限制中。默认为 false。
retain_available
如果设置为false,则不支持保留的消息。如果此选项设置为false,则发送带有保留位的消息的客户端将被断开连接。默认为true。
set_tcp_nodelay
如果设置为true,则将在客户端套接字上设置TCP_NODELAY选项以禁用Nagle的算法。这具有减少某些消息的等待时间的效果,从而潜在地增加了要发送的TCP数据包的数量。默认为false。
sys_interval
订阅$SYS层次topic树的更新时间,默认10秒
设为0,表示禁用$SYS层次
设为0,表示禁用$SYS层次
upgrade_outgoing_qos
true:改变QOS等级,与订阅者的消息等级匹配。默认false
user
默认为mosquitto
auth_plugin
指定用于身份验证和访问控制的外部模块。这允许创建自定义的用户名/密码和访问控制功能,如mysql
监听器
bind_address
指定的IP地址/主机名上侦听传入的网络连接 单个地址
bind_interface
仅在指定的接口上侦听传入的网络连接。这类似于该 bind_address选项,但是在接口具有多个地址或地址可能更改时很有用
http_dir
当侦听器使用websockets协议时,也可以提供http数据。设置 http_dir到包含您要提供的文件的目录。如果未指定此选项,则将无法进行正常的http连接。
listener
监听器,开放端口
max_connections
允许的最大客户端连接数。这是每个侦听器的设置。默认值是-1,无限连接。与服务进程数有关
protocol
选择监听时要使用的协议,默认mqtt
use_username_as_clientid
将use_username_as_clientid设置为true可以将客户端连接的客户端ID替换为其用户名,默认为false
SSL / TLS
cafile/capath
用于定义包含受PEM编码的CA证书的文件的路径
certfile
PEM编码的服务器证书的路径
keyfile
PEM编码的密钥文件的路径
require_certificate
通过设置 require_certificate为 true,连接到该侦听器的客户端必须提供有效的证书才能进行网络连接
dhparamfile
为了允许使用临时DH密钥交换来提供前向安全性,侦听器必须加载DH参数。
tls_version
配置用于此侦听器的TLS协议的版本。可能的值是 tlsv1.3, tlsv1.2和 tlsv1.1。如果未设置,则使用默认值,即允许所有TLS v1.3,v1.2和v1.1。
use_identity_as_username
如果require_certificate为 true,则可以设置 use_identity_as_username为 true使用客户端证书中的CN值作为用户名。如果为true,则该 password_file选项将不会用于此侦听器。
use_subject_as_username
如果require_certificate为 true,则可以设置 use_subject_as_username为 true使用客户端证书中的完整主题值作为用户名。如果为true,则该 password_file选项将不会用于此侦听器。
消息发布接收流程
发布信息
topic
根据合理规则定义主题
主题Topic
语法/原则
主题至少有一个字符长,不能为空
主题名字是大小写敏感
主题名字可以包含空格
以/开头会产生一个不同的主题
不要在任何主题中包含null
在主题树中,长度被限制于64k内但是在这以内没有限制层级的数
客户端注意不能使用 $ 字符开头的主题,$SYS/ 被广泛用作包含服务器特定信息或控制接口的主题的前缀
主题尽量不要太多主题订阅,会导致broker的订阅树盘大,进而影响到插入查找效率
通配符
使用通配符,允许一次订阅多个主题,不能作用于发布主题
主题层级分隔符/
/ 被用来分割主题树的每一层,并给主题空间提供分等级的结构。当两个通配符在一个主题中出现的时候,主题层次分隔符的使用是很重要的。
多层通配符#
# 是一个匹配主题中任意层次数的通配符,通俗的说就是截至层数,包括#当前层
单层通配符+
+ 只匹配主题的一层,但必须是/后面
$SYS中各主题
$SYS/broker/bytes/received
自服务器启动以来共接收的字节数
$SYS/broker/bytes/sent
自服务器启动以来共发送的字节数
$SYS/broker/clients/connected
当前连接的客户端数量
$SYS/broker/clients/expired
超过有效期被断开连接的客户端数量,有效期通过persistent_client_expiration参数设置
$SYS/broker/clients/disconnected
注册到服务器上的持久连接(clean seesion为假)但当前断开的客户端数量
$SYS/broker/clients/maximum
服务器同一时间连接的最大客户端数量
$SYS/broker/clients/total
有效和无效连接、注册到服务器上的总数
$SYS/broker/messages/inflight
等待确认的Qos>0的消息的数量
$SYS/broker/messages/stored
服务器存储的消息的总数,包括保留消息和持久连接客户端的消息队列中的消息数
message
发布的信息内容
QOS
Qos = 0 最多一次的传输
发布者PUBLISH消息到服务器(broker),发送即丢弃。没有确认消息,也不知道对方是否收到。网络层面,传输压力小
Qos = 1 至少一次的传输
发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)PUBLISH到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅重复消息
Qos = 2 只有一次的传输
其实Qos = 2 只是在 1 的基础上做了改掉的赶脚,在发布者PUBLISH到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器PUBLISH到订阅者之后也多了消息的确认。
retained
保留信息,true/false
发布信息保留最新一条
订阅客户端重连则会再次收到
发布空message信息可取消保留信息
发送与接收消息
1、token信息存在tokenStore(Hashtable),发布信息:persistence(持久化),信息等待队列:pendingMessages
2、在CommsSender中while中发出
3、在CommsReceiver读取接收消息
4、在CommsCallback中处理回调
5、在ClientComms中处理与服务器的客户端通信
will遗嘱
创建
遗嘱消息在客户端连接的时候就创建发送到broker中保存了
Will
topic
每个客户端都订阅同一个
message
遗嘱消息
Qos
消息等级-建议:1
retained
是否保留信息,建议不保留false,视情况而定
原理
消息保留到broker
broker下发遗嘱情况
服务器发现一个I/O错误或者网络错误
客户端没有按时发送心跳包
客户端没有在断开连接前发送DISCONNECT包
服务器因为协议错误而断开连接
应用场景
客户端的上下线问题
创建连接设置遗嘱信息主题robot/status,信息:Offline
上线,则发布topic主题robot/status,信息:Online
下线或断开,则broker自主分发主题robot/status,信息:Offline
0 条评论
下一页