| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package com.zksy.pole.service.impl;
- import com.alibaba.fastjson.JSONObject;
- import com.zksy.pole.MQTTServer.request.MqttRequest;
- import com.zksy.pole.MQTTServer.server.MQTTServer;
- import com.zksy.pole.domain.dto.*;
- import com.zksy.pole.service.InstructionIssuanceServer;
- import com.zksy.pole.utils.LockUtils;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelId;
- import io.netty.handler.codec.mqtt.*;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Service;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * @author Administrator
- * @version 1.0
- * @project dh-server-micro
- * @description 指令下发实现层
- * @date 2024/8/28 17:41:04
- */
- @Service
- @Slf4j
- public class InstructionIssuanceServerImpl implements InstructionIssuanceServer {
- private static final Map<String,Object> msgMap= new HashMap<>();
- @Autowired
- private LockUtils lockUtils;
- @Autowired
- private RedisTemplate<String, String> redisTemplate;
- @Override
- public String instructionIssuance(ReportEnvironmentalDataDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("light_nums",dto.getLightNums());
- fields.put("brightness",dto.getBrightness());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String equipmentStatus(BaseDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("light_nums",dto.getLightNums());
- sendMqttMessage(dto,fields);
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String scenarioStrategy(ScenarioStrategyDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("light_nums",dto.getLightNums());
- fields.put("id",dto.getId());
- fields.put("type",dto.getType());
- fields.put("status",dto.getStatus());
- fields.put("start_time",dto.getStart_time());
- fields.put("end_time",dto.getEnd_time());
- fields.put("params",dto.getParams());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String delScenarioStrategy(DelScenarioStrategyDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("light_nums",dto.getLightNums());
- fields.put("id",dto.getId());
- fields.put("tag",dto.getTag());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String setGatewayTime(SetGatewayTimeDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("timestamp",dto.getTimestamp());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String reRegistered(BaseDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("package_id",null);
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String relayControl(RelayControlDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("params",dto.getParams());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String relayTiming(RelayTimingDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("index",dto.getIndex());
- fields.put("id",dto.getId());
- fields.put("type",dto.getType());
- fields.put("status",dto.getStatus());
- fields.put("start_time",dto.getStatus());
- fields.put("end_time",dto.getStartTime());
- fields.put("params",dto.getEndTime());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String delRelayTiming(DelRelayTimingDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("index",dto.getIndex());
- fields.put("id",dto.getId());
- fields.put("tag",dto.getTag());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- @Override
- public String queryRelayTiming(QueryRelayTimingDto dto) {
- Map<String, Object> fields = new HashMap<>();
- fields.put("index",dto.getIndex());
- String receipt = getReceipt(dto, fields);
- return receipt;
- }
- private void sendMqttMessage(BaseDto dto,Map<String, Object> additionalFields) {
- MqttQoS qos = MqttQoS.AT_MOST_ONCE;
- String topic = "down/light/" + dto.getLightNums();
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("cmd", dto.getCmd());
- if (dto.getPackageId() != null && !dto.getPackageId().isEmpty()) {
- jsonObject.put("package_id", dto.getPackageId());
- }
- if (additionalFields != null && !additionalFields.isEmpty()) {
- additionalFields.forEach((key, value) -> jsonObject.put(key, value));
- }
- writeAndFlush(qos, topic, jsonObject);
- }
- public void writeAndFlush(MqttQoS qos,String topic,JSONObject jsonObject){
- MqttRequest request = new MqttRequest((jsonObject.toString().getBytes()));
- MqttPublishMessage pubMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
- new MqttFixedHeader(MqttMessageType.PUBLISH,request.isDup(),qos,request.isRetained(),0),
- new MqttPublishVariableHeader(topic, 0),
- Unpooled.buffer().writeBytes(request.getPayload()));
- msgMap.put(pubMessage.variableHeader().packetId()+"",pubMessage.variableHeader().packetId()+"");
- if(MQTTServer.subscribeMap.containsKey(topic)){
- List<ChannelId> channelList= MQTTServer.subscribeMap.get(topic);
- for (int i = 0; i < channelList.size(); i++) {
- //订阅次此topic的Mqtt客户端搜到此消息,
- Channel channel= MQTTServer.MQTTdeviceChannelGroup.find(channelList.get(i));
- if(channel!=null) {
- pubMessage.retain();
- channel.writeAndFlush(pubMessage);
- }
- }
- pubMessage.release();
- }
- }
- private String getReceipt(BaseDto dto, Map<String, Object> fields) {
- lockUtils.lock("lock:cmd:" + dto.getCmd());
- sendMqttMessage(dto, fields);
- long startTime = System.currentTimeMillis();
- long timeoutMillis = 20000;
- while (lockUtils.isLocked("lock:cmd:" + dto.getCmd())){
- try {
- // System.out.println("当前锁住了");
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("线程被中断");
- }
- // 检查是否超时
- if (System.currentTimeMillis() - startTime > timeoutMillis) {
- log.error("等待锁超时,方法失败");
- redisTemplate.delete("lock:cmd:" + dto.getCmd());
- throw new RuntimeException("等待超时");
- }
- }
- return redisTemplate.opsForValue().get("cmd:" + dto.getCmd());
- }
- }
|