|
|
@@ -0,0 +1,62 @@
|
|
|
+package com.zksy.pole.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.zksy.pole.MQTTServer.request.MqttRequest;
|
|
|
+import com.zksy.pole.MQTTServer.server.MQTTServer;
|
|
|
+import com.zksy.pole.domain.dto.ReportEnvironmentalDataDto;
|
|
|
+import com.zksy.pole.service.InstructionIssuanceServer;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelId;
|
|
|
+import io.netty.handler.codec.mqtt.*;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Administrator
|
|
|
+ * @version 1.0
|
|
|
+ * @project dh-server-micro
|
|
|
+ * @description 指令下发实现层
|
|
|
+ * @date 2024/8/28 17:41:04
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class InstructionIssuanceServerImpl implements InstructionIssuanceServer {
|
|
|
+ private static final Map<String,Object> msgMap= new HashMap<>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String instructionIssuance(ReportEnvironmentalDataDto dto) {
|
|
|
+ MqttQoS qos = MqttQoS.AT_MOST_ONCE;;
|
|
|
+ String topic = "down/light/"+dto.getLightNums();
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
+ jsonObject.put("cmd",dto.getCmd());
|
|
|
+ jsonObject.put("light_nums",dto.getLightNums());
|
|
|
+ jsonObject.put("brightness",dto.getBrightness());
|
|
|
+ jsonObject.put("package_id","1050");
|
|
|
+ writeAndFlush(qos,topic,jsonObject);
|
|
|
+ return "ok";
|
|
|
+ }
|
|
|
+
|
|
|
+ public void writeAndFlush(MqttQoS qos,String topic,JSONObject jsonObject){
|
|
|
+ MqttRequest request = new MqttRequest((jsonObject.toString().getBytes()));
|
|
|
+ MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
|
|
|
+ new MqttFixedHeader(MqttMessageType.PUBLISH,request.isDup(),qos,request.isRetained(),0),
|
|
|
+ new MqttPublishVariableHeader(topic, 0),
|
|
|
+ Unpooled.buffer().writeBytes(request.getPayload()));
|
|
|
+ msgMap.put(pubMessage.variableHeader().packetId()+"",pubMessage.variableHeader().packetId()+"");
|
|
|
+ if(MQTTServer.subscribeMap.containsKey(topic)){
|
|
|
+ List<ChannelId> channelList= MQTTServer.subscribeMap.get(topic);
|
|
|
+ for (int i = 0; i < channelList.size(); i++) {
|
|
|
+ //订阅次此topic的Mqtt客户端搜到此消息,
|
|
|
+ Channel channel= MQTTServer.MQTTdeviceChannelGroup.find(channelList.get(i));
|
|
|
+ if(channel!=null) {
|
|
|
+ pubMessage.retain();
|
|
|
+ channel.writeAndFlush(pubMessage);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pubMessage.release();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|