Kaynağa Gözat

feat(firefighting-pressure-service): 添加消防压力传感器数据解析和处理功能

- 新增 DataCheckUtil 类实现 CRC16 校验
- 添加 DataParser 类解析压力传感器数据
- 实现 DeviceOfflineCheckTask 定时任务检查设备离线状态
- 新增 MessageHandler 处理与设备的通信
- 添加 NettyServer 和 NettyServerThread 实现 Netty服务器功能
- 新增应用配置文件和数据库映射文件
- 实现 FirefightingPressure 实体类和相关 Mapper、Service
林仔 8 ay önce
ebeveyn
işleme
0d28090157
16 değiştirilmiş dosya ile 1021 ekleme ve 0 silme
  1. 87 0
      firefighting-pressure-service/pom.xml
  2. 16 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/PressureApplication.java
  3. 41 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/config/NettyServer.java
  4. 77 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/config/NettyServerThread.java
  5. 198 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/domain/FirefightingPressure.java
  6. 18 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/mapper/FirefightingPressureMapper.java
  7. 13 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/service/FirefightingPressureService.java
  8. 22 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/service/impl/FirefightingPressureServiceImpl.java
  9. 89 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/DataCheckUtil.java
  10. 191 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/DataParser.java
  11. 36 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/DeviceOfflineCheckTask.java
  12. 118 0
      firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/MessageHandler.java
  13. 17 0
      firefighting-pressure-service/src/main/resources/application-dev.yaml
  14. 11 0
      firefighting-pressure-service/src/main/resources/application-prod.yaml
  15. 35 0
      firefighting-pressure-service/src/main/resources/bootstrap.yaml
  16. 52 0
      firefighting-pressure-service/src/main/resources/mapper/FirefightingPressureMapper.xml

+ 87 - 0
firefighting-pressure-service/pom.xml

@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>com.zksy</groupId>
+        <artifactId>pipe-ner-server</artifactId>
+        <version>1.0.0</version>
+    </parent>
+    <groupId>org.example</groupId>
+    <artifactId>firefighting-pressure-service</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <!--common-->
+        <dependency>
+            <groupId>com.zksy</groupId>
+            <artifactId>zk-common</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <!--数据库-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <!--mybatis-->
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+        </dependency>
+        <!--nacos 服务注册发现-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
+        </dependency>
+        <!--负载均衡-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
+        </dependency>
+        <!--统一配置管理-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
+        </dependency>
+        <!--加载bootstrap-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-bootstrap</artifactId>
+        </dependency>
+        <!--sentinel-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
+        </dependency>
+        <!--netty依赖-->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <!--websocket-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>${project.artifactId}</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 16 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/PressureApplication.java

@@ -0,0 +1,16 @@
+package com.zksy.pressure;
+
+
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+@MapperScan(basePackages = "com.zksy.pressure.mapper")
+@SpringBootApplication(scanBasePackages = {"com.zksy.pressure","com.zksy.api"})
+public class PressureApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(PressureApplication.class, args);
+        System.out.println("消防压力传感器启动成功");
+    }
+}

+ 41 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/config/NettyServer.java

@@ -0,0 +1,41 @@
+package com.zksy.pressure.config;
+
+
+import io.netty.channel.EventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+@Component
+public class NettyServer {
+
+	private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
+	public static EventLoopGroup acceptor;
+	public static EventLoopGroup worker;
+
+	@Autowired
+	private NettyServerThread nettyServerThread;
+
+	@PostConstruct
+	public void init() {
+		new Thread(() -> nettyServerThread.startServer()).start();
+		System.out.println("nettyServer启动");
+		logger.info("nettyServer启动");
+	}
+
+	@PreDestroy
+	public void exit() {
+		if (acceptor != null) {
+			acceptor.shutdownGracefully();
+		}
+		if (worker != null) {
+			worker.shutdownGracefully();
+		}
+	}
+}

+ 77 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/config/NettyServerThread.java

@@ -0,0 +1,77 @@
+package com.zksy.pressure.config;
+
+import com.zksy.pressure.utils.MessageHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyServerThread {
+	@Value("${netty.port:20014}")
+	private int port;
+	private static Logger logger = LoggerFactory.getLogger(NettyServerThread.class);
+	private final MessageHandler messageHandler;
+	@Autowired
+	public NettyServerThread(MessageHandler messageHandler) {
+		this.messageHandler = messageHandler;
+	}
+	public void startServer() {
+		System.out.println("Netty服务启动端口号" + port);
+		EventLoopGroup acceptor = new NioEventLoopGroup();
+		EventLoopGroup worker = new NioEventLoopGroup();
+		NettyServer.acceptor = acceptor;
+		NettyServer.worker = worker;
+		ServerBootstrap bootstrap = new ServerBootstrap();
+
+		// 添加boss和worker组
+		bootstrap.group(acceptor, worker);
+		//这句是指定允许等待accept的最大连接数量,我只需要连一个客户端,这里就关掉了,java默认是50个
+		// bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
+		bootstrap.option(ChannelOption.TCP_NODELAY, true);
+		// 用于构造socketchannel工厂
+		bootstrap.channel(NioServerSocketChannel.class);
+
+		/**
+		 * 传入自定义客户端Handle(处理消息)
+		 */
+		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+			@Override
+			public void initChannel(SocketChannel ch) throws Exception {
+				logger.info("来自" + ch.remoteAddress() + "的新连接接入");
+				// 注册handler
+				ch.pipeline().addLast(new ReadTimeoutHandler(3600));// 超时时间,1小时内没有从通道(Channel)读取到任何数据
+				ch.pipeline().addLast(messageHandler);
+			}
+		});
+
+		// 绑定端口,开始接收进来的连接
+		ChannelFuture f;
+		try {
+			f = bootstrap.bind(port).sync();
+			// 等待服务器 socket 关闭 。
+			f.channel().closeFuture().sync();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			if (acceptor != null) {
+				acceptor.shutdownGracefully();
+			}
+			if (worker != null) {
+				worker.shutdownGracefully();
+			}
+		}
+	}
+}

+ 198 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/domain/FirefightingPressure.java

@@ -0,0 +1,198 @@
+package com.zksy.pressure.domain;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import lombok.Data;
+
+/**
+ * 消防压力表
+ * @TableName firefighting_pressure
+ */
+@TableName(value ="firefighting_pressure")
+@Data
+public class FirefightingPressure implements Serializable {
+    /**
+     * 主键
+     */
+    @TableId(value = "id", type = IdType.ASSIGN_UUID)
+    private String id;
+
+    /**
+     * 中心站地址
+     */
+    @TableField(value = "central_station")
+    private String centralStation;
+
+    /**
+     * 遥测站地址
+     */
+    @TableField(value = "telemetering_station")
+    private String telemeteringStation;
+
+    /**
+     * 密码
+     */
+    @TableField(value = "password")
+    private String password;
+
+    /**
+     * 功能码
+     */
+    @TableField(value = "function_code")
+    private String functionCode;
+
+    /**
+     * 流水号
+     */
+    @TableField(value = "serial_number")
+    private String serialNumber;
+
+    /**
+     * 发报时间
+     */
+    @TableField(value = "sending_time")
+    private LocalDateTime sendingTime;
+
+    /**
+     * 观测时间
+     */
+    @TableField(value = "observed_time")
+    private LocalDateTime observedTime;
+
+    /**
+     * GPRS信号质量
+     */
+    @TableField(value = "gprs_signal")
+    private Double gprsSignal;
+
+    /**
+     * 主机电池电压
+     */
+    @TableField(value = "host_voltage")
+    private Double hostVoltage;
+
+    /**
+     * 经度
+     */
+    @TableField(value = "longitude")
+    private BigDecimal longitude;
+
+    /**
+     * 纬度
+     */
+    @TableField(value = "latitude")
+    private BigDecimal latitude;
+
+    /**
+     * 压力值
+     */
+    @TableField(value = "pressure_value")
+    private Double pressureValue;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D31_D12")
+    private Integer d31D12;
+
+    /**
+     * 压力变幅报警
+     */
+    @TableField(value = "D14")
+    private Integer d14;
+
+    /**
+     * 压力下下限报警
+     */
+    @TableField(value = "D13")
+    private Integer d13;
+
+    /**
+     * 压力上上限报警
+     */
+    @TableField(value = "D12")
+    private Integer d12;
+
+    /**
+     * 压力下线报警
+     */
+    @TableField(value = "D11")
+    private Integer d11;
+
+    /**
+     * 压力上限报警
+     */
+    @TableField(value = "D10")
+    private Integer d10;
+
+    /**
+     * 传感器状态
+     */
+    @TableField(value = "D9")
+    private Integer d9;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D8")
+    private Integer d8;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D7")
+    private Integer d7;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D6")
+    private Integer d6;
+
+    /**
+     * 保留(6245保留,6242产品的取水报警),1-报警0-正常
+     */
+    @TableField(value = "D5")
+    private Integer d5;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D4")
+    private Integer d4;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D3")
+    private Integer d3;
+
+    /**
+     * 保留(6245保留,6242产品的取旋转报警),1-报警0-正常
+     */
+    @TableField(value = "D2")
+    private Integer d2;
+
+    /**
+     * 倾斜报警1-报警,0-正常
+     */
+    @TableField(value = "D1")
+    private Integer d1;
+
+    /**
+     * 保留
+     */
+    @TableField(value = "D0")
+    private Integer d0;
+
+    @TableField(value = "create_time")
+    private LocalDateTime createTime;
+
+    @TableField(exist = false)
+    private static final long serialVersionUID = 1L;
+}

+ 18 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/mapper/FirefightingPressureMapper.java

@@ -0,0 +1,18 @@
+package com.zksy.pressure.mapper;
+
+import com.zksy.pressure.domain.FirefightingPressure;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+* @author Administrator
+* @description 针对表【firefighting_pressure(消防压力表)】的数据库操作Mapper
+* @createDate 2025-09-17 11:34:57
+* @Entity com.zksy.pressure.domain.FirefightingPressure
+*/
+public interface FirefightingPressureMapper extends BaseMapper<FirefightingPressure> {
+
+}
+
+
+
+

+ 13 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/service/FirefightingPressureService.java

@@ -0,0 +1,13 @@
+package com.zksy.pressure.service;
+
+import com.zksy.pressure.domain.FirefightingPressure;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author Administrator
+* @description 针对表【firefighting_pressure(消防压力表)】的数据库操作Service
+* @createDate 2025-09-17 11:34:57
+*/
+public interface FirefightingPressureService extends IService<FirefightingPressure> {
+
+}

+ 22 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/service/impl/FirefightingPressureServiceImpl.java

@@ -0,0 +1,22 @@
+package com.zksy.pressure.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.zksy.pressure.domain.FirefightingPressure;
+import com.zksy.pressure.service.FirefightingPressureService;
+import com.zksy.pressure.mapper.FirefightingPressureMapper;
+import org.springframework.stereotype.Service;
+
+/**
+* @author Administrator
+* @description 针对表【firefighting_pressure(消防压力表)】的数据库操作Service实现
+* @createDate 2025-09-17 11:34:57
+*/
+@Service
+public class FirefightingPressureServiceImpl extends ServiceImpl<FirefightingPressureMapper, FirefightingPressure>
+    implements FirefightingPressureService{
+
+}
+
+
+
+

+ 89 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/DataCheckUtil.java

@@ -0,0 +1,89 @@
+package com.zksy.pressure.utils;
+
+/**
+ * @Description
+ * @Date 2025- 03-06-上午 9:02
+ * @auther tuDouSi
+ */
+public class DataCheckUtil {
+    /**
+     *
+     * @description: crc16校验,输入一个数据,返回一个数组.
+     * @param str
+     * @return 返回两个校验码,低字节在前,高字节在后
+     *
+     */
+    public static String crc16(String str) {
+        int[] data = hexStringToIntArray(str);
+        int xda, xdapoly;
+        int i, j, xdabit;
+        xda = 0xFFFF;
+        xdapoly = 0xA001;
+        for (i = 0; i < data.length; i++) {
+            xda ^= data[i];
+            for (j = 0; j < 8; j++) {
+                xdabit = (int) (xda & 0x01);
+                xda >>= 1;
+                if (xdabit == 1) {
+                    xda ^= xdapoly;
+                }
+            }
+        }
+        byte[] temdata = new byte[2];
+        temdata[0] = (byte) (xda & 0xFF);
+        temdata[1] = (byte) (xda >> 8);
+        return bytesToHexString(temdata);
+    }
+    /**
+     *
+     * @description: crc16校验,
+     * @param str
+     * @return 返回两个校验码,高字节在前,低字节在后
+     *
+     */
+    public static String crc16Tall(String str) {
+        int[] data = hexStringToIntArray(str);
+        int xda, xdapoly;
+        int i, j, xdabit;
+        xda = 0xFFFF;
+        xdapoly = 0xA001;
+        for (i = 0; i < data.length; i++) {
+            xda ^= data[i];
+            for (j = 0; j < 8; j++) {
+                xdabit = (int) (xda & 0x01);
+                xda >>= 1;
+                if (xdabit == 1) {
+                    xda ^= xdapoly;
+                }
+            }
+        }
+
+        // 关键修改:先放高位字节,再放低位字节
+        byte[] temdata = new byte[2];
+        temdata[0] = (byte) (xda >> 8);  // 高位字节
+        temdata[1] = (byte) (xda & 0xFF); // 低位字节
+
+        return bytesToHexString(temdata);
+    }
+    // 将十六进制字符串转换为整数数组
+    private static int[] hexStringToIntArray(String str) {
+        if (str.length() % 2 != 0) {
+            throw new IllegalArgumentException("Hex string length must be even");
+        }
+
+        int[] result = new int[str.length() / 2];
+        for (int i = 0; i < str.length(); i += 2) {
+            result[i / 2] = Integer.parseInt(str.substring(i, i + 2), 16);
+        }
+        return result;
+    }
+
+    // 将字节数组转换为十六进制字符串
+    private static String bytesToHexString(byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes) {
+            sb.append(String.format("%02X", b));
+        }
+        return sb.toString();
+    }
+}

+ 191 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/DataParser.java

@@ -0,0 +1,191 @@
+package com.zksy.pressure.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.zksy.pressure.domain.FirefightingPressure;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class DataParser {
+    private static Logger logger = LoggerFactory.getLogger(DataParser.class);
+
+    public static FirefightingPressure parseMessage(String msgString) {
+        DateTimeFormatter formatterSend = DateTimeFormatter.ofPattern("yyMMddHHmmss");
+        DateTimeFormatter formatterObs = DateTimeFormatter.ofPattern("yyMMddHHmm");
+        FirefightingPressure result = new FirefightingPressure();
+        try {
+            List<String> dataParts = new ArrayList<>();
+            for (int i = 0; i < msgString.length(); i += 2) {
+                int end = Math.min(i + 2, msgString.length());
+                dataParts.add(msgString.substring(i, end));
+            }
+
+            // 固定头部字段
+            getStringList(dataParts, 2); // 帧头7E7E
+            result.setCentralStation(getStringList(dataParts, 1));
+            result.setTelemeteringStation(getStringList(dataParts, 5));
+            result.setPassword(getStringList(dataParts, 2));
+            result.setFunctionCode(getStringList(dataParts, 1));
+            getStringList(dataParts, 3); // 长度+起始符
+            result.setSerialNumber(getStringList(dataParts, 2));
+
+            // 发报时间
+            String sendTimeHex = getStringList(dataParts, 6);
+            LocalDateTime sendTime = LocalDateTime.parse(sendTimeHex, formatterSend);
+            result.setSendingTime(sendTime);
+
+            getStringList(dataParts, 2); // F1F1
+            getStringList(dataParts, 5); // 遥测站地址
+            getStringList(dataParts, 1); // 48
+            getStringList(dataParts, 2); // F0F0
+
+            // 观测时间
+            String obsTimeHex = getStringList(dataParts, 5);
+            LocalDateTime obsTime = LocalDateTime.parse(obsTimeHex, formatterObs);
+            result.setObservedTime(obsTime);
+
+            // ========== TLV数据区解析 ==========
+            while (dataParts.size() > 3) { // 保证能到达结束符03
+                String tag1 = shiftFromList(dataParts);
+                if ("03".equals(tag1)) break; // 报文结束符
+                String tag2 = shiftFromList(dataParts);
+                String tag = tag1 + tag2;
+
+                // 长度字节
+                String lenByteHex = shiftFromList(dataParts);
+                int lenByte = Integer.parseInt(lenByteHex, 16);
+                int dataLen = (lenByte >> 3) & 0x1F;
+                int decimal = lenByte & 0x07;
+
+                StringBuilder valueHex = new StringBuilder();
+                for (int i = 0; i < dataLen; i++) {
+                    valueHex.append(shiftFromList(dataParts));
+                }
+
+                // 解析具体数据
+                switch (tag.toUpperCase()) {
+                    case "FF01": // GPRS信号质量
+                        result.setGprsSignal(parseValue(valueHex.toString(), decimal));
+                        break;
+                    case "FF03": // 主机电池电压
+                        result.setHostVoltage(parseValue(valueHex.toString(), decimal));
+                        break;
+                    case "FF0E": // 经度
+                        result.setLongitude(new BigDecimal(parseValue(valueHex.toString(), decimal)));
+                        break;
+                    case "FF0F": // 纬度
+                        result.setLatitude(new BigDecimal(parseValue(valueHex.toString(), decimal)));
+                        break;
+                    case "581A": // 压力值(固定2字节,2位小数)
+                        if (valueHex.length() == 0) {
+                            valueHex.append(shiftFromList(dataParts));
+                            valueHex.append(shiftFromList(dataParts));
+                        }
+                        result.setPressureValue(parseValue(valueHex.toString(), 2));
+                        break;
+                    case "4520": // 开关量(固定4字节)
+                        if (valueHex.length() == 0) {
+                            for (int i = 0; i < 4; i++) {
+                                valueHex.append(shiftFromList(dataParts));
+                            }
+                        }
+                        parseSwitchStatus(result, valueHex.toString());
+                        break;
+                    default:
+                        logger.warn("未识别的Tag: {} 值: {}", tag, valueHex);
+                        break;
+                }
+            }
+
+            result.setCreateTime(LocalDateTime.now());
+
+        } catch (Exception e) {
+            logger.error("解析消息时出错", e);
+        }
+        return result;
+    }
+
+    /**
+     * 把HEX转成数值,考虑小数位
+     */
+    private static Double parseValue(String hex, int decimal) {
+        if (hex == null || hex.isEmpty()) return null;
+        long value = Long.parseLong(hex, 16);
+        return value / Math.pow(10, decimal);
+    }
+
+    /**
+     * 解析开关量D31~D0
+     */
+    private static void parseSwitchStatus(FirefightingPressure result, String hex) {
+        // 小端模式
+        byte[] bytes = new BigInteger(hex, 16).toByteArray();
+        ByteBuffer buffer = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+        buffer.put(bytes, Math.max(0, bytes.length - 4), Math.min(4, bytes.length));
+        buffer.flip();
+        long status = buffer.getInt() & 0xFFFFFFFFL;
+
+        result.setD0((int) (status & 1));
+        result.setD1((int) ((status >> 1) & 1));
+        result.setD2((int) ((status >> 2) & 1));
+        result.setD3((int) ((status >> 3) & 1));
+        result.setD4((int) ((status >> 4) & 1));
+        result.setD5((int) ((status >> 5) & 1));
+        result.setD6((int) ((status >> 6) & 1));
+        result.setD7((int) ((status >> 7) & 1));
+        result.setD8((int) ((status >> 8) & 1));
+        result.setD9((int) ((status >> 9) & 1));
+        result.setD10((int) ((status >> 10) & 1));
+        result.setD11((int) ((status >> 11) & 1));
+        result.setD12((int) ((status >> 12) & 1));
+        result.setD13((int) ((status >> 13) & 1));
+        result.setD14((int) ((status >> 14) & 1));
+    }
+
+    public static String shiftFromList(List<String> list) {
+        if (list == null || list.isEmpty()) {
+            return null;
+        }
+        return list.remove(0);
+    }
+
+    private static String getStringList(List<String> dataParts, Integer q) {
+        StringBuilder res1 = new StringBuilder();
+        for (int i = 0; i < q; i++) {
+            res1.append(shiftFromList(dataParts));
+        }
+        return res1.toString();
+    }
+
+    public static void main(String[] args) throws Exception {
+        // 样例报文
+        String hexMsg = "7E7E0113021000071234E6003A020111250911163950F1F1130210000748F0F02509111638"
+                + "FF01100031"
+                + "FF03120370"
+                + "FF0E2500000000"
+                + "FF0F2500000000"
+                + "581A000280"
+                + "452002280000"
+                + "039E83";
+
+        FirefightingPressure pressure = DataParser.parseMessage(hexMsg);
+
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.findAndRegisterModules();
+        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); // 时间转ISO字符串
+        String jsonResult = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(pressure);
+
+        System.out.println("解析结果:\n" + jsonResult);
+    }
+}

+ 36 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/DeviceOfflineCheckTask.java

@@ -0,0 +1,36 @@
+package com.zksy.pressure.utils;/*
+package com.zksy.gasTransmitter.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class DeviceOfflineCheckTask {
+    private static final Logger logger = LoggerFactory.getLogger(DeviceOfflineCheckTask.class);
+
+    // 用于存储设备编号及其最后一次接收数据的时间
+    public static ConcurrentHashMap<String, Date> deviceLastReceiveTimeMap = new ConcurrentHashMap<>();
+    @Autowired
+    private BaseDevicesService baseDevicesService;
+
+    // 定时检查设备是否离线
+    @Scheduled(fixedRate = 24 * 60 * 60 * 1000) // 每24小时执行一次
+    public void checkDeviceOffline() {
+        Date now = new Date();
+        for (Map.Entry<String, Date> entry : deviceLastReceiveTimeMap.entrySet()) {
+            long diff = now.getTime() - entry.getValue().getTime();
+            // 如果设备在 23 小时内没有接收数据,则认为设备离线
+            if (diff > 23 * 60 * 60 * 1000) {
+                baseDevicesService.getByDeviceNumberStatus(entry.getKey(), 1, 0);
+                logger.info("设备 {} 已离线", entry.getKey());
+            }
+        }
+    }
+}*/

+ 118 - 0
firefighting-pressure-service/src/main/java/com/zksy/pressure/utils/MessageHandler.java

@@ -0,0 +1,118 @@
+package com.zksy.pressure.utils;
+
+import cn.hutool.core.lang.UUID;
+import com.zksy.common.exception.InvalidMessageException;
+import com.zksy.pressure.domain.FirefightingPressure;
+import com.zksy.pressure.service.FirefightingPressureService;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.ReadTimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+@ChannelHandler.Sharable
+@Slf4j
+@Component
+public class MessageHandler extends ChannelInboundHandlerAdapter {
+	private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
+	private final FirefightingPressureService firefightingPressureService;
+	@Autowired
+	public MessageHandler(FirefightingPressureService firefightingPressureService) {
+		this.firefightingPressureService = firefightingPressureService;
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		super.channelActive(ctx);
+		//sendDataToDevice(ctx);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) {
+		try {
+			ByteBuf msgByteBuf = (ByteBuf) msg;
+			if (msgByteBuf == null || !msgByteBuf.isReadable()) {
+				logger.warn("接收到无效的消息");
+				return;
+			}
+			logger.info("接收到 {} 字节的数据,来自: {}", msgByteBuf.readableBytes(), ctx.channel().remoteAddress());
+			byte[] msgBytes = new byte[msgByteBuf.readableBytes()];
+			msgByteBuf.readBytes(msgBytes);
+			String result = printHexBinary(msgBytes);
+			String msgString = result.replaceAll("\\s", "");
+			String CRCString = msgString.substring(msgString.length() - 4);
+			String bodyResult = msgString.substring(0, msgString.length() - 4);
+			String codeString = DataCheckUtil.crc16Tall(bodyResult);
+			System.out.println("校验码:" + CRCString + "-----------生成的校验码:" + codeString);
+			if (!CRCString.equals(codeString)) {
+				throw new InvalidMessageException("数据校验不成功");
+			} else {
+				logger.info("成功!!!接收到 {} 字节的数据,来自: {}", msgString, ctx.channel().remoteAddress());
+				FirefightingPressure resultData = DataParser.parseMessage(msgString);
+				resultData.setId(UUID.randomUUID().toString());
+				// 更新设备最后一次接收数据的时间
+				//String addressCode = resultData.getAddressCode();
+				//DeviceOfflineCheckTask.deviceLastReceiveTimeMap.put(addressCode, new Date());
+				firefightingPressureService.save(resultData);
+			}
+		} catch (InvalidMessageException e) {
+			logger.error("数据入库失败: {}", e.getMessage());
+			ctx.writeAndFlush(Unpooled.copiedBuffer("数据入库失败".getBytes()));
+		}
+	}
+
+	private String printHexBinary(byte[] bytes) {
+		StringBuilder sb = new StringBuilder();
+		for (byte b : bytes) {
+			sb.append(String.format("%02X ", b));
+		}
+		System.out.println("Received raw data: " + sb.toString());
+		logger.info("Received raw data: " + sb.toString());
+		return sb.toString();
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		if (cause instanceof ReadTimeoutException) {
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
+		} else {
+			cause.printStackTrace();
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
+			ctx.close();
+		}
+	}
+
+	@Override
+	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+		ctx.flush();
+	}
+
+	@Override
+	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+		logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
+		ctx.fireChannelUnregistered();
+	}
+
+	// 主动向设备发送数据
+	public void sendDataToDevice(ChannelHandlerContext ctx) {
+		try {
+			// 将字符串形式的十六进制数据转换为字节数组
+			String hexData = "01 03 00 00 00 01 84 0A ";
+			String[] hexArray = hexData.split(" ");
+			byte[] dataBytes = new byte[hexArray.length];
+			for (int i = 0; i < hexArray.length; i++) {
+				dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
+			}
+			ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
+			ctx.writeAndFlush(byteBuf);
+			logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
+		} catch (Exception e) {
+			logger.error("发送数据失败: {}", e.getMessage());
+		}
+	}
+}

+ 17 - 0
firefighting-pressure-service/src/main/resources/application-dev.yaml

@@ -0,0 +1,17 @@
+zksy:
+  db:
+    host: 192.168.110.30
+    un: root
+    pw: 123
+    port: 3307
+    database: pipe-ner
+spring:
+  redis:
+    host: 192.168.110.30
+    port: 6379
+  cloud:
+    nacos:
+      discovery:
+        server-addr: 192.168.110.30:8848
+      config:
+        server-addr: 192.168.110.30:8848

+ 11 - 0
firefighting-pressure-service/src/main/resources/application-prod.yaml

@@ -0,0 +1,11 @@
+zksy:
+  db:
+    host: 47.107.107.47
+    un: root
+    pw: d$3%#*(jnhUDGHB]z0x876c~
+    port: 3306
+    database: pipe-ner
+spring:
+  redis:
+    host: 47.107.107.47
+    port: 6379

+ 35 - 0
firefighting-pressure-service/src/main/resources/bootstrap.yaml

@@ -0,0 +1,35 @@
+spring:
+  application:
+    name: pressure-service
+  profiles:
+    active: dev
+  servlet:
+    multipart:
+      max-file-size: 100MB
+      max-request-size: 100MB
+  main:
+    allow-bean-definition-overriding: true
+  cloud:
+    sentinel:
+      transport:
+#        dashboard: 172.16.102.52:8090
+        dashboard: 192.168.110.30:8090
+      http-method-specify: true
+    nacos:
+      discovery:
+#        server-addr: 172.16.102.52:8848
+#        namespace: 99a7b129-faed-4b32-bcbf-54acd7ffd372
+         server-addr: 192.168.110.30:8848
+         namespace: 9ce240d6-ae6b-45a3-8566-e75ac25c9d71
+      config:
+#        server-addr: 172.16.102.52:8848
+#        namespace: 99a7b129-faed-4b32-bcbf-54acd7ffd372
+        server-addr: 192.168.110.30:8848
+        namespace: 9ce240d6-ae6b-45a3-8566-e75ac25c9d71
+        file-extension: yaml
+        shared-configs:
+          - dataId: pressure-service.yaml
+          - dataId: zksy-shared-jdbc.yaml
+          - dataId: zksy-shared-log.yaml
+netty:
+  port: 8311

+ 52 - 0
firefighting-pressure-service/src/main/resources/mapper/FirefightingPressureMapper.xml

@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.zksy.pressure.mapper.FirefightingPressureMapper">
+
+    <resultMap id="BaseResultMap" type="com.zksy.pressure.domain.FirefightingPressure">
+            <id property="id" column="id" jdbcType="VARCHAR"/>
+            <result property="centralStation" column="central_station" jdbcType="VARCHAR"/>
+            <result property="telemeteringStation" column="telemetering_station" jdbcType="VARCHAR"/>
+            <result property="password" column="password" jdbcType="VARCHAR"/>
+            <result property="functionCode" column="function_code" jdbcType="VARCHAR"/>
+            <result property="serialNumber" column="serial_number" jdbcType="VARCHAR"/>
+            <result property="sendingTime" column="sending_time" jdbcType="TIMESTAMP"/>
+            <result property="observedTime" column="observed_time" jdbcType="TIMESTAMP"/>
+            <result property="gprsSignal" column="gprs_signal" jdbcType="DOUBLE"/>
+            <result property="hostVoltage" column="host_voltage" jdbcType="DOUBLE"/>
+            <result property="longitude" column="longitude" jdbcType="DECIMAL"/>
+            <result property="latitude" column="latitude" jdbcType="DECIMAL"/>
+            <result property="pressureValue" column="pressure_value" jdbcType="DOUBLE"/>
+            <result property="d31D12" column="D31_D12" jdbcType="TINYINT"/>
+            <result property="d14" column="D14" jdbcType="TINYINT"/>
+            <result property="d13" column="D13" jdbcType="TINYINT"/>
+            <result property="d12" column="D12" jdbcType="TINYINT"/>
+            <result property="d11" column="D11" jdbcType="TINYINT"/>
+            <result property="d10" column="D10" jdbcType="TINYINT"/>
+            <result property="d9" column="D9" jdbcType="TINYINT"/>
+            <result property="d8" column="D8" jdbcType="TINYINT"/>
+            <result property="d7" column="D7" jdbcType="TINYINT"/>
+            <result property="d6" column="D6" jdbcType="TINYINT"/>
+            <result property="d5" column="D5" jdbcType="TINYINT"/>
+            <result property="d4" column="D4" jdbcType="TINYINT"/>
+            <result property="d3" column="D3" jdbcType="TINYINT"/>
+            <result property="d2" column="D2" jdbcType="TINYINT"/>
+            <result property="d1" column="D1" jdbcType="TINYINT"/>
+            <result property="d0" column="D0" jdbcType="TINYINT"/>
+            <result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id,central_station,telemetering_station,
+        password,function_code,serial_number,
+        sending_time,observed_time,gprs_signal,
+        host_voltage,longitude,latitude,
+        pressure_value,D31_D12,D14,
+        D13,D12,D11,
+        D10,D9,D8,
+        D7,D6,D5,
+        D4,D3,D2,
+        D1,D0
+    </sql>
+</mapper>