mqtt sub (mqtt订阅主题)
接下来我们试一下订阅主题,使用多通配符来测试
先测试 “” 通配符,””比较特殊,应用不能使用”$”开头的系统主题!
按规范服务端不能将 $ 字符开头的主题名匹配通配符 (#或+) 开头的主题过滤器
由于应用不能使用”**”开头的系统主题所以 “broker.emqx.io” 不允许订阅 “**SYS”, 但是为了测试我把我的mqtt服务器设置为允许订阅,下图示例使用我的mqtt服务器来进行测试。 如果您测试的时候发现订阅失败请查看您的mqtt服务器是否允许应用订阅”$”开头的系统主题
我使用emqx服务端,当订阅 “$SYS/#” 时会触发保留消息打印地址、应用名、版本等信息
RyanMqtt订阅结果
mqttx订阅结果
再来测试 “/“、”#”、”+” 通配符
为了快速我就在一个主题里面使用多个通配符,发送符合不同通配符的消息来进行测试
订阅主题:testdown/+/nihao/#
下图中红框发送的是符合订阅主题通配符的,黑框是不符合订阅主题通配符的。
可以看到RyanMqtt可以准确的接收到通配符消息。
mqtt unsub (mqtt取消订阅主题)
取消订阅主题,取消没订阅的主题时会自动忽略。
可以看到取消订阅后再发送消息,RyanMqtt就不会收到了
下图第一个红框为第一次取消可以触发回调,第二次取消就没有任何响应。
mqtt listsub (mqtt获取已订阅主题)
打印结果执行顺序为:
打印已订阅主题 —> 订阅”testdown/+/nihao/#”主题 —> 打印已订阅主题 —> 订阅”testdown2”主题 —> 打印已订阅主题 —> 取消订阅”testdown/+/nihao/#”主题 —> 打印已订阅主题
mqtt listack (打印ack链表,辅助功能)
ack链表包含发送qos1 / qos2 的ack报文、接收qos1 / qos2 的ack报文、订阅 / 取消订阅主题的ack报文。
根据上面的描述可以知道 ack链表 通常都应该为空。只有在上诉情况下才会存在,但是碍于篇幅这里无法进行测试了,等下一篇文章测试qos1 / qos2消息稳定性时再进行展示
mqtt listmsg (打印msg链表,辅助功能)
msg链表保存着订阅主题的信息,接收消息、取消订阅的时候都会操作msg链表。
所以listmsg和listsub是一摸一样的操作,结果自然也就一样,这里就不展示了
mqtt data (打印测试信息用户自定义的)
此接口我用来测试qos消息稳定性,没有实际意义。
4、将RyanMqtt添加到自己项目代码里,不使用msh示例
上面我们使用msh示例来进行RyanMqtt的测试,但在项目中我们肯定不会通过msh来操作mqtt,所以我们将根据msh示例来将RyanMqtt添加到代码里
首先我们思考下mqtt的执行流程,根据示例来看我们需要
处理订阅消息
连接mqtt服务器 —> 订阅主题 —> 收到订阅主题的消息 —> 在回调函数里面消费消息(调函数的执行环境是mqtt客户端的线程,所以非常不建议在回调函数里面做复杂逻辑操作,一是会阻塞mqtt线程运行,二是可能会导致mqtt线程爆栈。还是看使用场景如果需要串行处理mqtt消息在回调里面使用是挺好的)
发布消息
连接mqtt服务器 —> 发布主题消息(qos1 / qos2会有发送成功或者超过重发次数的回调)
重连逻辑
配置mqtt客户端自动重连 —> 连接服务器
不配置mqtt客户端自动重连 —> 连接服务器 —> 获取mqtt客户端状态(断连状态手动调用重连函数)
根据上面的处理方式我们来进行代码编写,为了方便我都放在main函数了
这是原始main函数,只设置了netdev状态变更回调
先添加头文件,如下图
1、先添加连接服务器函数,这里直接将msh示例中的connect函数复制到main.c,并处理报错地方(自行处理都很简单,都是资源未定义),图片放不下我就不截图了
2、再添加订阅主题函数,订阅主要要等mqtt连接成功后。所以不管有没有使能clearSession,都非常推荐在连接成功回调函数中订阅主题 如下图所示
3、再添加发布消息,这里简单起见直接加个循环,每10秒发送一个hello。(mqtt没有连接成功时也可以发布但是mqtt客户端不会进行处理,会直接丢弃不管qos等级) 如下图所示
4、消费订阅主题的消息,这里是直接打印出来消息的主题、报文id、载荷长度、载荷指针,推荐通过消息队列发送到别的线程进行处理,或者您如果知道在回调函数处理的副作用的话可以在回调函数中直接处理
注:载荷指针最后一位没有 “�”,可能会存在脏数据,需要用户手动处理。下图给出一种处理方案
或者mqtt消息一般为json,可以使用RyanJson / cJSON来处理json数据,都可以自动识别尾部脏数据
5、这样我们就添加完成了,烧录进行测试
如下面两个图所示,每秒上传一次消息,接收到消息后进行打印出来。结果如我们所想
5、总结
这篇文章简单介绍了RyanMqtt的使用,包括msh示例和添加到自己的工程代码里面。但文章碍于篇幅限制介绍的始终很浅,想深入的了解还是要看代码,RyanMqtt注释都为中文。
接下来应该还会写两篇文章介绍RyanMqtt,一篇进行RyanMqtt的qos1 / qos2消息等级的稳定性测试,另一篇介绍下RyanMqtt移植指南
后面看要不要详细介绍一下mqtt协议,RyanMqtt代码仓库docs/下有mqtt3.1.1协议中文版本pdf,一般来说看文档就可以了。
以及mqtt5.0,5.0增加了很多激动人心的特性,在考虑要不要适配一下,看大家需求了,可能遥遥无期哈哈哈哈哈。