InstructionIssuanceServerImpl.java 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package com.zksy.pole.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.zksy.pole.MQTTServer.request.MqttRequest;
  4. import com.zksy.pole.MQTTServer.server.MQTTServer;
  5. import com.zksy.pole.domain.dto.*;
  6. import com.zksy.pole.service.InstructionIssuanceServer;
  7. import com.zksy.pole.utils.LockUtils;
  8. import io.netty.buffer.Unpooled;
  9. import io.netty.channel.Channel;
  10. import io.netty.channel.ChannelId;
  11. import io.netty.handler.codec.mqtt.*;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.data.redis.core.RedisTemplate;
  15. import org.springframework.stereotype.Service;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. /**
  20. * @author Administrator
  21. * @version 1.0
  22. * @project dh-server-micro
  23. * @description 指令下发实现层
  24. * @date 2024/8/28 17:41:04
  25. */
  26. @Service
  27. @Slf4j
  28. public class InstructionIssuanceServerImpl implements InstructionIssuanceServer {
  29. private static final Map<String,Object> msgMap= new HashMap<>();
  30. @Autowired
  31. private LockUtils lockUtils;
  32. @Autowired
  33. private RedisTemplate<String, String> redisTemplate;
  34. @Override
  35. public String instructionIssuance(ReportEnvironmentalDataDto dto) {
  36. Map<String, Object> fields = new HashMap<>();
  37. fields.put("light_nums",dto.getLightNums());
  38. fields.put("brightness",dto.getBrightness());
  39. String receipt = getReceipt(dto, fields);
  40. return receipt;
  41. }
  42. @Override
  43. public String equipmentStatus(BaseDto dto) {
  44. Map<String, Object> fields = new HashMap<>();
  45. fields.put("light_nums",dto.getLightNums());
  46. sendMqttMessage(dto,fields);
  47. String receipt = getReceipt(dto, fields);
  48. return receipt;
  49. }
  50. @Override
  51. public String scenarioStrategy(ScenarioStrategyDto dto) {
  52. Map<String, Object> fields = new HashMap<>();
  53. fields.put("light_nums",dto.getLightNums());
  54. fields.put("id",dto.getId());
  55. fields.put("type",dto.getType());
  56. fields.put("status",dto.getStatus());
  57. fields.put("start_time",dto.getStart_time());
  58. fields.put("end_time",dto.getEnd_time());
  59. fields.put("params",dto.getParams());
  60. String receipt = getReceipt(dto, fields);
  61. return receipt;
  62. }
  63. @Override
  64. public String delScenarioStrategy(DelScenarioStrategyDto dto) {
  65. Map<String, Object> fields = new HashMap<>();
  66. fields.put("light_nums",dto.getLightNums());
  67. fields.put("id",dto.getId());
  68. fields.put("tag",dto.getTag());
  69. String receipt = getReceipt(dto, fields);
  70. return receipt;
  71. }
  72. @Override
  73. public String setGatewayTime(SetGatewayTimeDto dto) {
  74. Map<String, Object> fields = new HashMap<>();
  75. fields.put("timestamp",dto.getTimestamp());
  76. String receipt = getReceipt(dto, fields);
  77. return receipt;
  78. }
  79. @Override
  80. public String reRegistered(BaseDto dto) {
  81. Map<String, Object> fields = new HashMap<>();
  82. fields.put("package_id",null);
  83. String receipt = getReceipt(dto, fields);
  84. return receipt;
  85. }
  86. @Override
  87. public String relayControl(RelayControlDto dto) {
  88. Map<String, Object> fields = new HashMap<>();
  89. fields.put("params",dto.getParams());
  90. String receipt = getReceipt(dto, fields);
  91. return receipt;
  92. }
  93. @Override
  94. public String relayTiming(RelayTimingDto dto) {
  95. Map<String, Object> fields = new HashMap<>();
  96. fields.put("index",dto.getIndex());
  97. fields.put("id",dto.getId());
  98. fields.put("type",dto.getType());
  99. fields.put("status",dto.getStatus());
  100. fields.put("start_time",dto.getStatus());
  101. fields.put("end_time",dto.getStartTime());
  102. fields.put("params",dto.getEndTime());
  103. String receipt = getReceipt(dto, fields);
  104. return receipt;
  105. }
  106. @Override
  107. public String delRelayTiming(DelRelayTimingDto dto) {
  108. Map<String, Object> fields = new HashMap<>();
  109. fields.put("index",dto.getIndex());
  110. fields.put("id",dto.getId());
  111. fields.put("tag",dto.getTag());
  112. String receipt = getReceipt(dto, fields);
  113. return receipt;
  114. }
  115. @Override
  116. public String queryRelayTiming(QueryRelayTimingDto dto) {
  117. Map<String, Object> fields = new HashMap<>();
  118. fields.put("index",dto.getIndex());
  119. String receipt = getReceipt(dto, fields);
  120. return receipt;
  121. }
  122. private void sendMqttMessage(BaseDto dto,Map<String, Object> additionalFields) {
  123. MqttQoS qos = MqttQoS.AT_MOST_ONCE;
  124. String topic = "down/light/" + dto.getLightNums();
  125. JSONObject jsonObject = new JSONObject();
  126. jsonObject.put("cmd", dto.getCmd());
  127. if (dto.getPackageId() != null && !dto.getPackageId().isEmpty()) {
  128. jsonObject.put("package_id", dto.getPackageId());
  129. }
  130. if (additionalFields != null && !additionalFields.isEmpty()) {
  131. additionalFields.forEach((key, value) -> jsonObject.put(key, value));
  132. }
  133. writeAndFlush(qos, topic, jsonObject);
  134. }
  135. public void writeAndFlush(MqttQoS qos,String topic,JSONObject jsonObject){
  136. MqttRequest request = new MqttRequest((jsonObject.toString().getBytes()));
  137. MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
  138. new MqttFixedHeader(MqttMessageType.PUBLISH,request.isDup(),qos,request.isRetained(),0),
  139. new MqttPublishVariableHeader(topic, 0),
  140. Unpooled.buffer().writeBytes(request.getPayload()));
  141. msgMap.put(pubMessage.variableHeader().packetId()+"",pubMessage.variableHeader().packetId()+"");
  142. if(MQTTServer.subscribeMap.containsKey(topic)){
  143. List<ChannelId> channelList= MQTTServer.subscribeMap.get(topic);
  144. for (int i = 0; i < channelList.size(); i++) {
  145. //订阅次此topic的Mqtt客户端搜到此消息,
  146. Channel channel= MQTTServer.MQTTdeviceChannelGroup.find(channelList.get(i));
  147. if(channel!=null) {
  148. pubMessage.retain();
  149. channel.writeAndFlush(pubMessage);
  150. }
  151. }
  152. pubMessage.release();
  153. }
  154. }
  155. private String getReceipt(BaseDto dto, Map<String, Object> fields) {
  156. lockUtils.lock("lock:cmd:" + dto.getCmd());
  157. sendMqttMessage(dto, fields);
  158. long startTime = System.currentTimeMillis();
  159. long timeoutMillis = 20000;
  160. while (lockUtils.isLocked("lock:cmd:" + dto.getCmd())){
  161. try {
  162. // System.out.println("当前锁住了");
  163. Thread.sleep(100);
  164. } catch (InterruptedException e) {
  165. Thread.currentThread().interrupt();
  166. log.error("线程被中断");
  167. }
  168. // 检查是否超时
  169. if (System.currentTimeMillis() - startTime > timeoutMillis) {
  170. log.error("等待锁超时,方法失败");
  171. redisTemplate.delete("lock:cmd:" + dto.getCmd());
  172. throw new RuntimeException("等待超时");
  173. }
  174. }
  175. return redisTemplate.opsForValue().get("cmd:" + dto.getCmd());
  176. }
  177. }