Parcourir la source

设备编号动态传值

邵洋 il y a 1 an
Parent
commit
b678e55de1

+ 10 - 5
pole-service/src/main/java/com/zksy/pole/MQTTServer/callBack/BootNettyMqttMsgBack.java

@@ -91,6 +91,12 @@ public class BootNettyMqttMsgBack {
 		//注意:	readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置	读指针
 		//注意:	readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置	读指针
         byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
         byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
         mqttPublishMessage.payload().readBytes(headBytes);
         mqttPublishMessage.payload().readBytes(headBytes);
+		Object obj=mqttMessage.variableHeader();
+		MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
+		String topicName=variableHeader.topicName();
+		int lastIndex = topicName.lastIndexOf('/');
+		String lightNums = topicName.substring(lastIndex + 1);
+		System.err.println("topicName========"+topicName);
         String data = new String(headBytes);
         String data = new String(headBytes);
         log.info("收到数据-->"+data);
         log.info("收到数据-->"+data);
 		String jsonRes = handleCmdFactory.HandleCmd(data);
 		String jsonRes = handleCmdFactory.HandleCmd(data);
@@ -102,7 +108,7 @@ public class BootNettyMqttMsgBack {
 				case AT_MOST_ONCE: 		//	至多一次
 				case AT_MOST_ONCE: 		//	至多一次
 					// 构建自定义应答消息
 					// 构建自定义应答消息
 					MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
 					MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
-					MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认
+					MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/"+lightNums, 0); // 使用0作为packetId,因为这里不需要确认
 					byte[] customPayload = jsonRes.getBytes(StandardCharsets.UTF_8);
 					byte[] customPayload = jsonRes.getBytes(StandardCharsets.UTF_8);
 					ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
 					ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
 					MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
 					MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
@@ -233,16 +239,15 @@ public class BootNettyMqttMsgBack {
 	 * 订阅推送
 	 * 订阅推送
 	 */
 	 */
 	public  void subscribSend(MqttMessage mqttMessage,Channel channel){
 	public  void subscribSend(MqttMessage mqttMessage,Channel channel){
-		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
 		Object obj=mqttMessage.variableHeader();
 		Object obj=mqttMessage.variableHeader();
 		MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
 		MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
 		String topicName=variableHeader.topicName();
 		String topicName=variableHeader.topicName();
-		System.err.println("topicName========"+topicName);
-		int packetId=variableHeader.packetId();
+		int lastIndex = topicName.lastIndexOf('/');
+		String lightNums = topicName.substring(lastIndex + 1);
 		//固定消息头 注意此处的消息类型PUBLISH mqtt协议
 		//固定消息头 注意此处的消息类型PUBLISH mqtt协议
 		MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
 		MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
 		//可变消息头
 		//可变消息头
-		MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/40005274",0);
+		MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/"+lightNums,0);
 		JSONObject jsonObject = new JSONObject();
 		JSONObject jsonObject = new JSONObject();
 		jsonObject.put("flag","1");
 		jsonObject.put("flag","1");
 		jsonObject.put("cmd","1111");
 		jsonObject.put("cmd","1111");

+ 1 - 1
pole-service/src/main/java/com/zksy/pole/MQTTServer/handler/MQTTMessageHandler.java

@@ -88,7 +88,7 @@ public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
                     //	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                     //	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                     // 	to do
                     // 	to do
                     BootNettyMqttMsgBack.suback(channel, mqttMessage);
                     BootNettyMqttMsgBack.suback(channel, mqttMessage);
-                    MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
+                    MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();
                     for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
                     for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
                         String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
                         String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
                         boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
                         boolean tag= MQTTServer.subscribeMap.containsKey(topicname);