Thingsboard 的 MQTT 设备协议
thingsboard 官网: https://thingsboard.io/
thingsboard GitHub: https://github.com/thingsboard/thingsboard
thingsboard 提供的体验地址: http://demo.thingsboard.io/
BY Thingsboard team
以下内容是在原文基础上演绎的译文. 除非另行注明, 页面上所有内容采用知识共享 - 署名 (CC BY 2.5 AU) 协议共享.
MQTT 基础知识
MQTT 是一种轻量级的发布 - 订阅消息传递协议, 可能使其最适合各种物联网设备. 您可以在此处找到有关 MQTT 的更多信息.
ThingsBoard 服务器节点充当 MQTT Broker, 支持 QoS 级别 0(最多一次)和 1(至少一次)以及一组预定义主题.
客户端库设置
您可以在 web 上找到大量 MQTT 客户端库. 本文中的示例将基于 Mosquitto,MQTT.JS 和 Paho, 要设置其中一个工具.
键值格式
默认情况下, ThingsBoard 支持 JSON 中的键值内容. Key 始终是一个字符串, 而 value 可以是 string,boolean,double 或 long. 也可以使用自定义二进制格式或某些序列化框架. 有关详细信息, 请参阅物模型. 例如:
{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}
遥测上传 API
为了将遥测数据发布到 ThingsBoard 服务器节点, 请将 PUBLISH 消息发送到以下主题:
v1/devices/me/telemetry
最简单的支持数据格式是:
{"key1":"value1", "key2":"value2"}
要么
[{"key1":"value1"}, {"key2":"value2"}]
请注意, 在这种情况下, 服务器端时间戳将分配给上传的数据!
如果您的设备能够获取客户端时间戳, 您可以使用以下格式:
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
在上面的示例中, 我们假设 "1451649600512" 是具有毫秒精度的 unix 时间戳. 例如, 值'1451649600512'对应于'Fri,2016 年 1 月 1 日 12:00:00.512 GMT'
属性 API
ThingsBoard 属性 API 允许设备
将客户端设备属性上载到服务器.
将属性更新发布到服务器
要将客户端设备属性发布到 ThingsBoard 服务器节点, 请将 PUBLISH 消息发送到以下主题:
v1/devices/me/attributes
更多请看上文给出的连接.
Thingsboard 的 MQTT 传输协议架构
因为 Thingsboard 最新 release, 是基于微服务架构, 不利用单独理解代码.
Thingsboard 源代码: https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/mqtt
本文基于上面源代码后, 剔除相关的安全验证和处理之后搭建简易的讲解项目:
https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-MQTT
MQTT 框架
因为 Thingsboard 是一个 JVM 技术栈的 PaaS 平台, 所以使用的是基于 Java 通讯框架的 Netty, 如果有对 Netty 不太熟悉的同学, 可以参考我之前搭建的 Netty 实践学习案例: https://github.com/sanshengshui/netty-learning-example
项目结构
.
├── IOT-Guide-MQTT.iml
├── pom.xml
└── src
└── main
└── java
└── com
└── sanshengshui
└── mqtt
├── adapter
│ └── JsonMqttAdaptor.java // MQTT JSON 转换器, 在跟 Thingsboard 学习 IOT - 物模型有所讲解
├── IOTMqttServer.java // MQTT 服务
├── MqttTopicMatcher.java
├── MqttTopics.java
├── MqttTransportHandler.java //MQTT 处理类
└── MqttTransportServerInitializer.java
项目代码讲解
- IOTMqttServer
- private static final int PORT = 1884;
- private static final String leakDetectorLevel = "DISABLED";
- private static final Integer bossGroupThreadCount = 1;
- private static final Integer workerGroupThreadCount = 12;
- private static final Integer maxPayloadSize = 65536;
- public static void main(String[] args) throws Exception {
- ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
- EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
- EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup,workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new MqttTransportServerInitializer(maxPayloadSize));
- ChannelFuture f = b.bind(PORT);
- f.channel().closeFuture().sync();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
第 8 行, 设置服务端 Netty 内存读写泄漏级别, 缺省条件下为: DISABLED
第 10 行和第 11 行, 设置 boss 线程组和 work 线程组的线程数量. 默认情况下, boss 线程组的线程数量为 1,work 线程组的数量为运行服务机器内核数量的 2 倍.
第 15 行, 通过创建 ServerBootstrap 对象, 在第 16 行设置使用 EventLoopGroup.
在 17 和 19 行, 设置要被实例化的 NioServerSockerChannel 类, 并设置最大的负载内容数量.
最后我们通过 shutdowGracefully()函数优雅的关闭 bossGroup 和 workGroup.
- MqttTransportHandler#processMqttMsg()
- private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
- address = (InetSocketAddress) ctx.channel().remoteAddress();
- if (msg.fixedHeader() == null) {
- processDisconnect(ctx);
- return;
- }
- switch (msg.fixedHeader().messageType()) {
- case CONNECT:
- processConnect(ctx, (MqttConnectMessage) msg);
- break;
- case PUBLISH:
- processPublish(ctx, (MqttPublishMessage) msg);
- break;
- case SUBSCRIBE:
- processSubscribe(ctx, (MqttSubscribeMessage) msg);
- break;
- case UNSUBSCRIBE:
- processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
- break;
- case PINGREQ:
- if (checkConnected(ctx)) {
- ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
- }
- break;
- case DISCONNECT:
- if (checkConnected(ctx)) {
- processDisconnect(ctx);
- }
- break;
- default:
- break;
- }
- }
第3行, 通过判断消息的固定头部是否为空, 如果空; 则通过 processDisconnect(ctx)将设备连接关闭.
- processDisconnect(channelHandlerContext ctx)
- private void processDisconnect(ChannelHandlerContext ctx) {
- ctx.close(); // 关闭 socket 通道
- }
第 8 行, 通过判断固定头部的 MQTT 消息类型, 针对不同消息做相应的处理.
MqttTransportHandler#PublishDevicePublish
以下是对发布消息进行相关的解读, 更多消息类型的处理类, 大家请参考我上面的 IOT-Guide-MQTT 进行阅读.
- private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
- try {
- if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { // 如果主题为 v1/devices/me/attributes
- JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg);
- } else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
- JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
- } else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
- JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
- }
- } catch (AdaptorException e) {
- }
- }
我上面的代码仅是对消息的主题进行判断, 然后对主题内的内容进行物模型的解析, 得到相关属性或者遥测数据的获得.
演示效果
我们通过 Paho 或者 MQTT.JS 和服务进行连接, 发布消息到以下主题:
v1/devices/me/telemetry
简易的数据格式如下:
{"key1":"value1", "key2":"value2"}
Paho 图示:
服务器控制台打印数据:
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf2bfb3a8] REGISTERED
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816]
key= 1563946708305
属性名 = temperature 属性值 = 38
属性名 = humidity 属性值 = 60
如上所示, 希望大家对 Thingsboard 的 IOT 架构 - MQTT 设备协议这块有所了解!
来源: https://www.cnblogs.com/sanshengshui/p/11237695.html