package com.zksy.pole.service.impl; import com.alibaba.fastjson.JSONObject; import com.zksy.pole.MQTTServer.request.MqttRequest; import com.zksy.pole.MQTTServer.server.MQTTServer; import com.zksy.pole.domain.dto.*; import com.zksy.pole.service.InstructionIssuanceServer; import com.zksy.pole.utils.LockUtils; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.handler.codec.mqtt.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author Administrator * @version 1.0 * @project dh-server-micro * @description 指令下发实现层 * @date 2024/8/28 17:41:04 */ @Service @Slf4j public class InstructionIssuanceServerImpl implements InstructionIssuanceServer { private static final Map msgMap= new HashMap<>(); @Autowired private LockUtils lockUtils; @Autowired private RedisTemplate redisTemplate; @Override public String instructionIssuance(ReportEnvironmentalDataDto dto) { Map fields = new HashMap<>(); fields.put("light_nums",dto.getLightNums()); fields.put("brightness",dto.getBrightness()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String equipmentStatus(BaseDto dto) { Map fields = new HashMap<>(); fields.put("light_nums",dto.getLightNums()); sendMqttMessage(dto,fields); String receipt = getReceipt(dto, fields); return receipt; } @Override public String scenarioStrategy(ScenarioStrategyDto dto) { Map fields = new HashMap<>(); fields.put("light_nums",dto.getLightNums()); fields.put("id",dto.getId()); fields.put("type",dto.getType()); fields.put("status",dto.getStatus()); fields.put("start_time",dto.getStart_time()); fields.put("end_time",dto.getEnd_time()); fields.put("params",dto.getParams()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String delScenarioStrategy(DelScenarioStrategyDto dto) { Map fields = new HashMap<>(); fields.put("light_nums",dto.getLightNums()); fields.put("id",dto.getId()); fields.put("tag",dto.getTag()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String setGatewayTime(SetGatewayTimeDto dto) { Map fields = new HashMap<>(); fields.put("timestamp",dto.getTimestamp()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String reRegistered(BaseDto dto) { Map fields = new HashMap<>(); fields.put("package_id",null); String receipt = getReceipt(dto, fields); return receipt; } @Override public String relayControl(RelayControlDto dto) { Map fields = new HashMap<>(); fields.put("params",dto.getParams()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String relayTiming(RelayTimingDto dto) { Map fields = new HashMap<>(); fields.put("index",dto.getIndex()); fields.put("id",dto.getId()); fields.put("type",dto.getType()); fields.put("status",dto.getStatus()); fields.put("start_time",dto.getStatus()); fields.put("end_time",dto.getStartTime()); fields.put("params",dto.getEndTime()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String delRelayTiming(DelRelayTimingDto dto) { Map fields = new HashMap<>(); fields.put("index",dto.getIndex()); fields.put("id",dto.getId()); fields.put("tag",dto.getTag()); String receipt = getReceipt(dto, fields); return receipt; } @Override public String queryRelayTiming(QueryRelayTimingDto dto) { Map fields = new HashMap<>(); fields.put("index",dto.getIndex()); String receipt = getReceipt(dto, fields); return receipt; } private void sendMqttMessage(BaseDto dto,Map additionalFields) { MqttQoS qos = MqttQoS.AT_MOST_ONCE; String topic = "down/light/" + dto.getLightNums(); JSONObject jsonObject = new JSONObject(); jsonObject.put("cmd", dto.getCmd()); if (dto.getPackageId() != null && !dto.getPackageId().isEmpty()) { jsonObject.put("package_id", dto.getPackageId()); } if (additionalFields != null && !additionalFields.isEmpty()) { additionalFields.forEach((key, value) -> jsonObject.put(key, value)); } writeAndFlush(qos, topic, jsonObject); } public void writeAndFlush(MqttQoS qos,String topic,JSONObject jsonObject){ MqttRequest request = new MqttRequest((jsonObject.toString().getBytes())); MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,request.isDup(),qos,request.isRetained(),0), new MqttPublishVariableHeader(topic, 0), Unpooled.buffer().writeBytes(request.getPayload())); msgMap.put(pubMessage.variableHeader().packetId()+"",pubMessage.variableHeader().packetId()+""); if(MQTTServer.subscribeMap.containsKey(topic)){ List channelList= MQTTServer.subscribeMap.get(topic); for (int i = 0; i < channelList.size(); i++) { //订阅次此topic的Mqtt客户端搜到此消息, Channel channel= MQTTServer.MQTTdeviceChannelGroup.find(channelList.get(i)); if(channel!=null) { pubMessage.retain(); channel.writeAndFlush(pubMessage); } } pubMessage.release(); } } private String getReceipt(BaseDto dto, Map fields) { lockUtils.lock("lock:cmd:" + dto.getCmd()); sendMqttMessage(dto, fields); long startTime = System.currentTimeMillis(); long timeoutMillis = 20000; while (lockUtils.isLocked("lock:cmd:" + dto.getCmd())){ try { // System.out.println("当前锁住了"); Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("线程被中断"); } // 检查是否超时 if (System.currentTimeMillis() - startTime > timeoutMillis) { log.error("等待锁超时,方法失败"); redisTemplate.delete("lock:cmd:" + dto.getCmd()); throw new RuntimeException("等待超时"); } } return redisTemplate.opsForValue().get("cmd:" + dto.getCmd()); } }