|
|
@@ -1,5 +1,7 @@
|
|
|
package com.zksy.pole.MQTTServer.callBack;
|
|
|
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
|
import com.zksy.pole.MQTTServer.config.MQTTServerProperties;
|
|
|
@@ -90,68 +92,67 @@ public class BootNettyMqttMsgBack {
|
|
|
mqttPublishMessage.payload().readBytes(headBytes);
|
|
|
String data = new String(headBytes);
|
|
|
System.out.println("收到数据-->"+data);
|
|
|
- handleCmdFactory.HandleCmd(data);
|
|
|
+ String jsonRes = handleCmdFactory.HandleCmd(data);
|
|
|
//重置读取的指针
|
|
|
mqttPublishMessage.payload().resetReaderIndex();
|
|
|
//(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输)
|
|
|
- switch (qos) {
|
|
|
- case AT_MOST_ONCE: // 至多一次
|
|
|
- // 构建自定义应答消息
|
|
|
- MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
|
|
|
- MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认
|
|
|
- JSONObject jsonObject = new JSONObject();
|
|
|
- jsonObject.put("flag", 1);
|
|
|
- jsonObject.put("cmd", 1111);
|
|
|
- byte[] customPayload = jsonObject.toString().getBytes(StandardCharsets.UTF_8);
|
|
|
- ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
|
|
|
- MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
|
|
|
+ if(StrUtil.isNotBlank(jsonRes)){
|
|
|
+ switch (qos) {
|
|
|
+ case AT_MOST_ONCE: // 至多一次
|
|
|
+ // 构建自定义应答消息
|
|
|
+ MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
|
|
|
+ MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认
|
|
|
+ byte[] customPayload = jsonRes.getBytes(StandardCharsets.UTF_8);
|
|
|
+ ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
|
|
|
+ MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
|
|
|
|
|
|
- // 发送自定义应答消息给网关
|
|
|
- if (channel != null) {
|
|
|
- channel.writeAndFlush(customMessage);
|
|
|
- } else {
|
|
|
- log.warn("网关通道未找到,无法发送应答消息");
|
|
|
- }
|
|
|
- break;
|
|
|
- // QoS 1: 发送PUBACK
|
|
|
- case AT_LEAST_ONCE: // 至少一次
|
|
|
- // 构建返回报文, 可变报头
|
|
|
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
|
|
|
- // 构建返回报文, 固定报头
|
|
|
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
|
|
|
- // 构建PUBACK消息体
|
|
|
- MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
|
|
|
- log.info("back--"+pubAck.toString());
|
|
|
- channel.writeAndFlush(pubAck);
|
|
|
- //推送到订阅的客户端
|
|
|
- subscribSend(mqttMessage,channel);
|
|
|
- break;
|
|
|
- // QoS 2: 发送PUBREC
|
|
|
- case EXACTLY_ONCE: // 刚好一次
|
|
|
- // 构建返回报文, 固定报头
|
|
|
- MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
|
|
|
- // 构建返回报文, 可变报头
|
|
|
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
|
|
|
- MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
|
|
|
- //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
|
|
|
- // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
|
|
|
- //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
|
|
|
- int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
|
|
|
- if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){
|
|
|
- //不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存
|
|
|
- MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage);
|
|
|
- log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
|
|
|
- }else{
|
|
|
- //重复发送消息,直接返回
|
|
|
- log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
|
|
|
- return;
|
|
|
- }
|
|
|
+ // 发送自定义应答消息给网关
|
|
|
+ if (channel != null) {
|
|
|
+ channel.writeAndFlush(customMessage);
|
|
|
+ } else {
|
|
|
+ log.warn("网关通道未找到,无法发送应答消息");
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ // QoS 1: 发送PUBACK
|
|
|
+ case AT_LEAST_ONCE: // 至少一次
|
|
|
+ // 构建返回报文, 可变报头
|
|
|
+ MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
|
|
|
+ // 构建返回报文, 固定报头
|
|
|
+ MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
|
|
|
+ // 构建PUBACK消息体
|
|
|
+ MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
|
|
|
+ log.info("back--"+pubAck.toString());
|
|
|
+ channel.writeAndFlush(pubAck);
|
|
|
+ //推送到订阅的客户端
|
|
|
+ subscribSend(mqttMessage,channel);
|
|
|
+ break;
|
|
|
+ // QoS 2: 发送PUBREC
|
|
|
+ case EXACTLY_ONCE: // 刚好一次
|
|
|
+ // 构建返回报文, 固定报头
|
|
|
+ MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
|
|
|
+ // 构建返回报文, 可变报头
|
|
|
+ MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
|
|
|
+ MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
|
|
|
+ //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
|
|
|
+ // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
|
|
|
+ //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
|
|
|
+ int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
|
|
|
+ if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){
|
|
|
+ //不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存
|
|
|
+ MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage);
|
|
|
+ log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
|
|
|
+ }else{
|
|
|
+ //重复发送消息,直接返回
|
|
|
+ log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- channel.writeAndFlush(mqttMessageBack);
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
+ channel.writeAndFlush(mqttMessageBack);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|