Parcourir la source

请求获取回执

nahida il y a 1 an
Parent
commit
0bad522e63

+ 1 - 5
pole-service/pom.xml

@@ -91,11 +91,7 @@
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
-        <!--redisson依赖-->
-        <dependency>
-            <groupId>org.redisson</groupId>
-            <artifactId>redisson</artifactId>
-        </dependency>
+
     </dependencies>
 
     <build>

+ 2 - 2
pole-service/src/main/java/com/zksy/pole/MQTTServer/callBack/BootNettyMqttMsgBack.java

@@ -13,6 +13,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.handler.codec.mqtt.*;
 import lombok.RequiredArgsConstructor;
+import org.redisson.api.RedissonClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -91,9 +92,8 @@ public class BootNettyMqttMsgBack {
         byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
         mqttPublishMessage.payload().readBytes(headBytes);
         String data = new String(headBytes);
-        System.out.println("收到数据-->"+data);
+        log.info("收到数据-->"+data);
 		String jsonRes = handleCmdFactory.HandleCmd(data);
-
 		//重置读取的指针
 		mqttPublishMessage.payload().resetReaderIndex();
 		//(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输)

+ 13 - 0
pole-service/src/main/java/com/zksy/pole/factory/HandleCmdFactory.java

@@ -6,14 +6,19 @@ import cn.hutool.json.JSONArray;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import com.zksy.common.exception.CommonException;
+import com.zksy.common.utils.LockUtils;
 import com.zksy.pole.domain.po.*;
 import com.zksy.pole.factory.impl.*;
 import com.zksy.pole.mapper.DgGatewayRegistrationMapper;
 import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @Slf4j
@@ -36,6 +41,10 @@ public class HandleCmdFactory {
     private HandleDgReportEnvironmentalDataCmd handleDgReportEnvironmentalDataCmd;
     @Autowired
     private HandleDgTimelyControlCmd handleDgTimelyControlCmd;
+    @Autowired
+    private LockUtils lockUtils;
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
 
     public String HandleCmd(String data) {
         JSONObject jsonObject = JSONUtil.parseObj(data);
@@ -49,6 +58,10 @@ public class HandleCmdFactory {
         for (String paramName : paramNames) {
             convertToJsonString(jsonObject, paramName);
         }
+        lockUtils.lock("lock:cmd:" + cmd);
+        lockUtils.forceUnlock("lock:cmd:" + cmd);
+        redisTemplate.delete("lock:cmd:" + cmd);
+        redisTemplate.opsForValue().set("cmd:"+cmd,jsonObject.toString(),20, TimeUnit.SECONDS);
         String s;
         switch (cmd) {
             case 6031:

+ 53 - 25
pole-service/src/main/java/com/zksy/pole/service/impl/InstructionIssuanceServerImpl.java

@@ -1,6 +1,7 @@
 package com.zksy.pole.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.zksy.common.utils.LockUtils;
 import com.zksy.pole.MQTTServer.request.MqttRequest;
 import com.zksy.pole.MQTTServer.server.MQTTServer;
 import com.zksy.pole.domain.dto.*;
@@ -10,12 +11,17 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import io.netty.handler.codec.mqtt.*;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author Administrator
@@ -25,28 +31,28 @@ import java.util.Map;
  * @date 2024/8/28 17:41:04
  */
 @Service
+@Slf4j
 public class InstructionIssuanceServerImpl implements InstructionIssuanceServer {
     private static final Map<String,Object> msgMap= new HashMap<>();
+    @Autowired
+    private LockUtils lockUtils;
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
     @Override
     public String instructionIssuance(ReportEnvironmentalDataDto dto) {
         Map<String, Object> fields = new HashMap<>();
         fields.put("light_nums",dto.getLightNums());
         fields.put("brightness",dto.getBrightness());
-        sendMqttMessage(dto,fields);
-        System.out.println("dto------>"+dto.getCmd());
-        System.out.println("dto------>"+dto.getBrightness());
-        System.out.println("dto------>"+dto.getLightNums());
-        System.out.println("dto------>"+dto.getPackageId());
-        System.out.println("f------->"+fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
-
     @Override
     public String equipmentStatus(BaseDto dto) {
         Map<String, Object> fields = new HashMap<>();
         fields.put("light_nums",dto.getLightNums());
         sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
@@ -59,8 +65,8 @@ public class InstructionIssuanceServerImpl implements InstructionIssuanceServer
         fields.put("start_time",dto.getStart_time());
         fields.put("end_time",dto.getEnd_time());
         fields.put("params",dto.getParams());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
@@ -69,32 +75,32 @@ public class InstructionIssuanceServerImpl implements InstructionIssuanceServer
         fields.put("light_nums",dto.getLightNums());
         fields.put("id",dto.getId());
         fields.put("tag",dto.getTag());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
     public String setGatewayTime(SetGatewayTimeDto dto) {
         Map<String, Object> fields = new HashMap<>();
         fields.put("timestamp",dto.getTimestamp());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
     public String reRegistered(BaseDto dto) {
         Map<String, Object> fields = new HashMap<>();
         fields.put("package_id",null);
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
     public String relayControl(RelayControlDto dto) {
         Map<String, Object> fields = new HashMap<>();
         fields.put("params",dto.getParams());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
@@ -107,8 +113,8 @@ public class InstructionIssuanceServerImpl implements InstructionIssuanceServer
         fields.put("start_time",dto.getStatus());
         fields.put("end_time",dto.getStartTime());
         fields.put("params",dto.getEndTime());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
@@ -117,16 +123,16 @@ public class InstructionIssuanceServerImpl implements InstructionIssuanceServer
         fields.put("index",dto.getIndex());
         fields.put("id",dto.getId());
         fields.put("tag",dto.getTag());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     @Override
     public String queryRelayTiming(QueryRelayTimingDto dto) {
         Map<String, Object> fields = new HashMap<>();
         fields.put("index",dto.getIndex());
-        sendMqttMessage(dto,fields);
-        return "ok";
+        String receipt = getReceipt(dto, fields);
+        return receipt;
     }
 
     private void sendMqttMessage(BaseDto dto,Map<String, Object> additionalFields) {
@@ -165,4 +171,26 @@ public class InstructionIssuanceServerImpl implements InstructionIssuanceServer
             pubMessage.release();
         }
     }
+    private String getReceipt(BaseDto dto, Map<String, Object> fields) {
+        lockUtils.lock("lock:cmd:" + dto.getCmd());
+        sendMqttMessage(dto, fields);
+        long startTime = System.currentTimeMillis();
+        long timeoutMillis = 20000;
+        while (lockUtils.isLocked("lock:cmd:" + dto.getCmd())){
+            try {
+                System.out.println("当前锁住了");
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("线程被中断");
+            }
+            // 检查是否超时
+            if (System.currentTimeMillis() - startTime > timeoutMillis) {
+                log.error("等待锁超时,方法失败");
+                redisTemplate.delete("lock:cmd:" + dto.getCmd());
+                throw new RuntimeException("等待超时");
+            }
+        }
+        return redisTemplate.opsForValue().get("cmd:" + dto.getCmd());
+    }
 }

+ 5 - 0
zksy-common/pom.xml

@@ -96,5 +96,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-data-redis</artifactId>
         </dependency>
+        <!--redisson依赖-->
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson</artifactId>
+        </dependency>
     </dependencies>
 </project>

+ 1 - 2
pole-service/src/main/java/com/zksy/pole/config/RedissonConfig.java → zksy-common/src/main/java/com/zksy/common/config/RedissonConfig.java

@@ -1,10 +1,9 @@
-package com.zksy.pole.config;
+package com.zksy.common.config;
 
 import org.redisson.Redisson;
 import org.redisson.api.RedissonClient;
 import org.redisson.config.Config;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 

+ 60 - 0
zksy-common/src/main/java/com/zksy/common/utils/LockUtils.java

@@ -0,0 +1,60 @@
+package com.zksy.common.utils;
+
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class LockUtils {
+    @Autowired
+    private RedissonClient redissonClient;
+    /**
+     * 尝试获取锁。
+     *
+     * @param key 锁的键
+     * @return 是否成功获取锁
+     */
+    public boolean lock(String key) {
+        RLock lock = redissonClient.getLock(key);
+        return lock.tryLock();
+    }
+
+    public RLock getLock(String key) {
+        return redissonClient.getLock(key);
+    }
+
+    /**
+     * 释放锁。
+     *
+     * @param key 锁的键
+     * @return 是否成功释放锁
+     */
+    public boolean unlock(String key) {
+        RLock lock = redissonClient.getLock(key);
+        if (lock.isHeldByCurrentThread()) {
+            lock.unlock();
+            return true;
+        }
+        return false;
+    }
+    public boolean forceUnlock(String key) {
+        RLock lock = redissonClient.getLock(key);
+        if (lock.isHeldByCurrentThread()) {
+            lock.forceUnlock();
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * 检查锁是否被占用。
+     *
+     * @param key 锁的键
+     * @return 锁是否被占用
+     */
+    public boolean isLocked(String key) {
+        RLock lock = redissonClient.getLock(key);
+        return lock.isLocked();
+    }
+}

+ 3 - 1
zksy-common/src/main/resources/META-INF/spring.factories

@@ -1,4 +1,6 @@
 org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
   com.zksy.common.config.MyBatisConfig,\
   com.zksy.common.config.MvcConfig,\
-  com.zksy.common.config.JsonConfig
+  com.zksy.common.config.JsonConfig,\
+  com.zksy.common.config.RedissonConfig,\
+  com.zksy.common.utils.LockUtils