Bladeren bron

feat(environment-hj212-service): 添加 Netty 服务配置和数据处理逻辑

- 新增 NettyConfig 类用于配置 Netty服务
- 添加 NtServer 类实现 Netty 服务启动和关闭逻辑
- 实现 ResponseEncoder 和 ServerHandler 类处理数据编码和业务逻辑
- 新增多个实体类和 Mapper 接口用于数据持久化
- 实现数据校验和解析工具类
- 添加 Spring Boot 应用启动类和相关配置文件
- 编写 Dockerfile 用于构建 Docker 镜像
林仔 1 jaar geleden
bovenliggende
commit
a0d1b831d1
24 gewijzigde bestanden met toevoegingen van 1126 en 0 verwijderingen
  1. 9 0
      environment-hj212-service/Dockerfile
  2. 82 0
      environment-hj212-service/pom.xml
  3. 21 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/EnvironmentHJ212Application.java
  4. 14 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/NettyConfig.java
  5. 98 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/NtServer.java
  6. 26 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/ResponseEncoder.java
  7. 92 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/ServerHandler.java
  8. 64 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/domain/po/MinuteDataInfo.java
  9. 93 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/domain/po/PsMonitorSz.java
  10. 13 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/enums/DataSourceEnum.java
  11. 8 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/mapper/MinuteDataInfoMapper.java
  12. 17 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/mapper/PsMonitorSzMapper.java
  13. 18 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/MinuteDataInfoService.java
  14. 19 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/PsMonitorSzService.java
  15. 19 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/impl/MinuteDataInfoServiceImpl.java
  16. 56 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/impl/PsMonitorSzServiceImpl.java
  17. 69 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/utils/DataCheckUtil.java
  18. 27 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/utils/DateTimeUtil.java
  19. 253 0
      environment-hj212-service/src/main/java/com/zksy/environmentHJ212/utils/MonitorDatasynch.java
  20. 20 0
      environment-hj212-service/src/main/resources/application-dev.yaml
  21. 20 0
      environment-hj212-service/src/main/resources/application-prod.yaml
  22. 31 0
      environment-hj212-service/src/main/resources/bootstrap.yaml
  23. 26 0
      environment-hj212-service/src/main/resources/mapper/MinuteDataInfoMapper.xml
  24. 31 0
      environment-hj212-service/src/main/resources/mapper/PsMonitorSzMapper.xml

+ 9 - 0
environment-hj212-service/Dockerfile

@@ -0,0 +1,9 @@
+# 基础镜像
+FROM openjdk:11.0-jre-buster
+# 设定时区
+ENV TZ=Asia/Shanghai
+RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
+# 拷贝jar包
+COPY environment-hj212-service.jar /app.jar
+# 入口
+ENTRYPOINT ["java", "-Xms256m", "-Xmx512m", "-jar", "/app.jar"]

+ 82 - 0
environment-hj212-service/pom.xml

@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>com.zksy</groupId>
+        <artifactId>dh-server-micro</artifactId>
+        <version>1.0.0</version>
+    </parent>
+    <groupId>org.example</groupId>
+    <artifactId>environment-hj212-service</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <!--common-->
+        <dependency>
+            <groupId>com.zksy</groupId>
+            <artifactId>zksy-common</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <!--web-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <!--数据库-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <!--mybatis-->
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+        </dependency>
+        <!--nacos 服务注册发现-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
+        </dependency>
+        <!--负载均衡-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
+        </dependency>
+        <!--统一配置管理-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
+        </dependency>
+        <!--加载bootstrap-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-bootstrap</artifactId>
+        </dependency>
+        <!--redis-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+        <!--sentinel-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <finalName>${project.artifactId}</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 21 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/EnvironmentHJ212Application.java

@@ -0,0 +1,21 @@
+package com.zksy.environment;
+
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * @author Administrator
+ * @version 1.0
+ * @project dh-server-micro
+ * @description
+ * @date 2024/8/14 10:59:19
+ */
+@MapperScan("com.zksy.environmentHJ212.mapper")
+@SpringBootApplication
+public class EnvironmentHJ212Application {
+    public static void main(String[] args) {
+        SpringApplication.run(EnvironmentHJ212Application.class,args);
+        System.out.println("环保服务启动成功");
+    }
+}

+ 14 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/NettyConfig.java

@@ -0,0 +1,14 @@
+package com.zksy.environmentHJ212.config;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+public class NettyConfig {
+
+    /**
+     * 存储每一个客户端接入进来时的channel对象
+     */
+    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+}

+ 98 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/NtServer.java

@@ -0,0 +1,98 @@
+package com.zksy.environmentHJ212.config;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+@Component
+public class NtServer {
+
+    @Value("${netty.port:20011}")
+    private int port;
+
+    private ServerSocketChannel serverSocketChannel;
+
+    private EventLoopGroup boss;
+
+    private EventLoopGroup worker;
+
+    @PostConstruct()
+    private void startServer() {
+        log.info("进入程序....");
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                //服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输
+                //连接处理group
+                EventLoopGroup boss = new NioEventLoopGroup(1);
+                //事件处理group
+                EventLoopGroup worker = new NioEventLoopGroup();
+                ServerBootstrap bootstrap = new ServerBootstrap();
+                // 绑定处理group
+                bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
+                        //保持连接数
+                        .option(ChannelOption.SO_BACKLOG, 300)
+                        //有数据立即发送
+                        .option(ChannelOption.TCP_NODELAY, true)
+                        //保持连接
+                        .childOption(ChannelOption.SO_KEEPALIVE, true)
+                        //处理新连接
+                        .childHandler(new ChannelInitializer<SocketChannel>() {
+                            @Override
+                            protected void initChannel(SocketChannel sc) throws Exception {
+                                // 增加任务处理
+                                ChannelPipeline p = sc.pipeline();
+                                p.addLast(new ServerHandler());
+                            }
+                        });
+
+                //绑定端口,同步等待成功
+                ChannelFuture future;
+                try {
+                    future = bootstrap.bind(port).sync();
+                    if (future.isSuccess()) {
+                        serverSocketChannel = (ServerSocketChannel) future.channel();
+                        log.info("服务端启动成功,端口:" + port);
+                    } else {
+                        log.info("服务端启动失败!");
+                    }
+
+                    //等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以开了线程
+                    future.channel().closeFuture().sync();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    //退出,释放线程池资源
+                    boss.shutdownGracefully();
+                    worker.shutdownGracefully();
+                }
+            }
+        });
+        thread.start();
+    }
+
+    /**
+     * 释放资源
+     * @throws InterruptedException
+     */
+    @PreDestroy
+    public void destroy() throws InterruptedException {
+        if(boss != null){
+            boss.shutdownGracefully().sync();
+        }
+        if(worker != null){
+            worker.shutdownGracefully().sync();
+        }
+    }
+
+}

+ 26 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/ResponseEncoder.java

@@ -0,0 +1,26 @@
+package com.zksy.environmentHJ212.config;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author Administrator
+ * @version 1.0
+ * @project datasynch
+ * @description 数据编码器
+ * @date 2023/11/16 14:02:56
+ */
+public class ResponseEncoder extends MessageToByteEncoder<String> {
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
+        byte[] content = msg.getBytes(StandardCharsets.UTF_8); // 将字符串转换为字节数组
+        int length = content.length;
+
+        out.writeInt(length); // 写入响应消息的长度
+        out.writeBytes(content); // 写入响应消息的内容
+    }
+}

+ 92 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/config/ServerHandler.java

@@ -0,0 +1,92 @@
+package com.zksy.environmentHJ212.config;
+
+import com.zksy.environmentHJ212.utils.MonitorDatasynch;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.time.LocalDateTime;
+
+@Slf4j
+@Component
+public class ServerHandler extends ChannelInboundHandlerAdapter {
+    private ServerHandler serverHandler;
+
+    @Autowired
+    private MonitorDatasynch monitorDatasynch;
+
+    @PostConstruct
+    public void init() {
+        log.info("init()方法");
+        serverHandler = this;
+        serverHandler.monitorDatasynch = this.monitorDatasynch;
+    }
+
+    /**
+     * 客户端与服务端创建连接的时候调用
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        log.info("客户端与服务端连接开始...");
+        NettyConfig.group.add(ctx.channel());
+        /**
+         * 往客户端发送消息
+         */
+        /*String helo = "##0128QN=20231030111800029;ST=22;CN=2041;PW=123456;MN=010000A8900016F007;Flag=5;CP=&&BeginTime=20231029000000,EndTime=20231029000000&&";
+        ByteBuf byteBuf = Unpooled.wrappedBuffer(helo.getBytes());
+        ctx.channel().writeAndFlush(byteBuf);
+        System.out.println("首次连接完成!"+byteBuf);*/
+    }
+
+    /**
+     * 客户端与服务端断开连接时调用
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        log.info("客户端与服务端连接关闭...");
+        NettyConfig.group.remove(ctx.channel());
+    }
+
+    /**
+     * 服务端接收客户端发送过来的数据结束之后调用
+     */
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        ctx.flush();
+        log.info("信息接收完毕...");
+    }
+
+    /**
+     * 工程出现异常的时候调用
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+    /**
+     * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
+     */
+    @Override
+    public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception {
+        try {
+            ByteBuf buf = (ByteBuf) info;
+            byte[] req = new byte[buf.readableBytes()];
+            buf.readBytes(req);
+            String msg = new String(req);
+            System.out.println(msg+"==============");
+            log.info("==========数据已接收,接收时间:==========" + LocalDateTime.now());
+            String pushMonitorDataStr = monitorDatasynch.pushMonitorData(channelHandlerContext,msg);
+        } catch (Exception e) {
+            log.info("==========数据入库失败==========");
+            e.printStackTrace();
+        } finally {
+            channelHandlerContext.flush();
+        }
+    }
+}

+ 64 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/domain/po/MinuteDataInfo.java

@@ -0,0 +1,64 @@
+package com.zksy.environmentHJ212.domain.po;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.extension.activerecord.Model;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+
+/**
+ * 分钟数据 实体类
+ * @author sy
+ * @since 2024-10-30
+ */
+@Data
+@TableName("minute_data_info")
+@EqualsAndHashCode(callSuper = false)
+@ApiModel(value = "分钟数据")
+public class MinuteDataInfo extends Model<MinuteDataInfo> {
+
+    private static final long serialVersionUID=1L;
+
+    @ApiModelProperty(value = "id")
+    private String id;
+
+    @ApiModelProperty(value = "数据时间")
+    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime dataTime;
+
+    @ApiModelProperty(value = "监测因子编码")
+    private String code;
+
+    @ApiModelProperty(value = "设备唯一标识")
+    private String mn;
+
+    @ApiModelProperty(value = "拆分包及应答标志")
+    private String flag;
+
+    @ApiModelProperty(value = "平均监测值")
+    private BigDecimal dataAvgValue;
+
+    @ApiModelProperty(value = "最小监测值")
+    private BigDecimal dataMinValue;
+
+    @ApiModelProperty(value = "最大监测值")
+    private BigDecimal dataMaxValue;
+
+    @ApiModelProperty(value = "累计监测值")
+    private BigDecimal dataCouValue;
+
+    @ApiModelProperty(value = "数据标识")
+    private String dataFlag;
+
+    @ApiModelProperty(value = "创建时间")
+    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
+    private LocalDateTime createTime;
+
+    @ApiModelProperty(value = "备注")
+    private String remark;
+}

+ 93 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/domain/po/PsMonitorSz.java

@@ -0,0 +1,93 @@
+package com.zksy.environmentHJ212.domain.po;
+
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.extension.activerecord.Model;
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.time.LocalDateTime;
+
+/**
+ * <p>
+ * 实时数据
+ * </p>
+ *
+ * @author Javen
+ * @since 2024-06-08
+ */
+@Data
+@TableName("ps_monitor_sz")
+@EqualsAndHashCode(callSuper = false)
+@ApiModel(value = "实时数据")
+public class PsMonitorSz extends Model<PsMonitorSz> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 检测数据编号
+     */
+    @TableId(value = "FID")
+    private String fid;
+
+    /**
+     * 监测点标识码
+     */
+    private Integer monitorid;
+
+    /**
+     * 采样时间。格式: yyyy-MM-dd HH, mm, 现场采样的时间
+     */
+    private LocalDateTime sampleTime;
+
+    /**
+     * 温度。单位:摄氏度
+     */
+    private Double temp;
+
+    /**
+     * 湿度。单位:%
+     */
+    private Double humidity;
+
+    /**
+     * 数据获取时间
+     */
+    private LocalDateTime recordTime;
+
+    /**
+     * 填报单位
+     */
+    private String reporUnit;
+
+    /**
+     * 填报日期
+     */
+    private LocalDateTime reportDate;
+
+    /**
+     * 备注
+     */
+    private String remark;
+
+    /**
+     * 命令编码
+     */
+    private Double cn;
+
+    /**
+     * 系统编码
+     */
+    private Double st;
+
+    /**
+     * 设备唯一标志
+     */
+    private String mn;
+
+    /**
+     * 发送数据的IP
+     */
+    private String ipport;
+}

+ 13 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/enums/DataSourceEnum.java

@@ -0,0 +1,13 @@
+package com.zksy.environment.enums;
+
+public enum DataSourceEnum {
+    DB1("db1");
+    private String value;
+    DataSourceEnum(String value){
+        this.value = value;
+    }
+    public String getValue(){
+        return this.value;
+    }
+
+}

+ 8 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/mapper/MinuteDataInfoMapper.java

@@ -0,0 +1,8 @@
+package com.zksy.environmentHJ212.mapper;
+
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.zksy.environmentHJ212.domain.po.MinuteDataInfo;
+
+public interface MinuteDataInfoMapper extends BaseMapper<MinuteDataInfo> {
+}

+ 17 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/mapper/PsMonitorSzMapper.java

@@ -0,0 +1,17 @@
+package com.zksy.environmentHJ212.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.zksy.environmentHJ212.domain.po.PsMonitorSz;
+import org.apache.ibatis.annotations.Param;
+
+/**
+ * <p>
+ * 表A.0.26 水质检测数据,属性表,关联监测点 Mapper 接口
+ * </p>
+ *
+ * @author Javen
+ * @since 2021-06-08
+ */
+public interface PsMonitorSzMapper extends BaseMapper<PsMonitorSz> {
+    Integer selectMonitorId(@Param("mn") String mn);
+}

+ 18 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/MinuteDataInfoService.java

@@ -0,0 +1,18 @@
+package com.zksy.environmentHJ212.service;
+
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.zksy.environmentHJ212.domain.po.MinuteDataInfo;
+
+/**
+ * <p>
+ * 分钟数据 服务类
+ * </p>
+ *
+ * @author Javen
+ * @since 2021-06-08
+ */
+public interface MinuteDataInfoService extends IService<MinuteDataInfo> {
+
+    void saveMonitordata(String oldDataStr,String[] dataArr);
+}

+ 19 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/PsMonitorSzService.java

@@ -0,0 +1,19 @@
+package com.zksy.environmentHJ212.service;
+
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.zksy.environmentHJ212.domain.po.PsMonitorSz;
+
+/**
+ * <p>
+ * 表A.0.26 水质检测数据,属性表,关联监测点 服务类
+ * </p>
+ *
+ * @author Javen
+ * @since 2021-06-08
+ */
+public interface PsMonitorSzService extends IService<PsMonitorSz> {
+    Integer selectMonitorId(String mn);
+
+    void pushMonitorData(String oldDataStr,String[] dataArr,String ipPort);
+}

+ 19 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/impl/MinuteDataInfoServiceImpl.java

@@ -0,0 +1,19 @@
+package com.zksy.environmentHJ212.service.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.zksy.environmentHJ212.domain.po.MinuteDataInfo;
+import com.zksy.environmentHJ212.mapper.MinuteDataInfoMapper;
+import com.zksy.environmentHJ212.service.MinuteDataInfoService;
+import com.zksy.environmentHJ212.utils.MonitorDatasynch;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+@Service
+public class MinuteDataInfoServiceImpl extends ServiceImpl<MinuteDataInfoMapper, MinuteDataInfo> implements MinuteDataInfoService {
+    @Override
+    public void saveMonitordata(String oldDataStr,String[] dataArr) {
+        List<MinuteDataInfo> minuteDataInfos = MonitorDatasynch.minuteDataInfoData(oldDataStr,dataArr);
+        System.out.println("minuteDataInfos=="+minuteDataInfos);
+        super.saveBatch(minuteDataInfos);
+    }
+}

+ 56 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/service/impl/PsMonitorSzServiceImpl.java

@@ -0,0 +1,56 @@
+package com.zksy.environmentHJ212.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.zksy.environmentHJ212.domain.po.PsMonitorSz;
+import com.zksy.environmentHJ212.mapper.PsMonitorSzMapper;
+import com.zksy.environmentHJ212.service.PsMonitorSzService;
+import com.zksy.environmentHJ212.utils.MonitorDatasynch;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * <p>
+ * 表A.0.26 水质检测数据,属性表,关联监测点 服务实现类
+ * </p>
+ *
+ * @author Javen
+ * @since 2021-06-08
+ */
+@Slf4j
+@Service
+public class PsMonitorSzServiceImpl extends ServiceImpl<PsMonitorSzMapper, PsMonitorSz> implements PsMonitorSzService {
+    @Override
+    public Integer selectMonitorId(String mn) {
+        return this.baseMapper.selectMonitorId(mn);
+    }
+
+    @Override
+    public void pushMonitorData(String oldDataStr, String[] dataArr, String ipPort) {
+        Map map = new HashMap<>();
+        Map keyMap = new HashMap();
+        PsMonitorSz monitordata = null;
+        MonitorDatasynch.setPsMonitorSz(map, dataArr, keyMap, "-Rtd",monitordata);
+        if (map.size() > 0) {
+            monitordata = JSON.parseObject(JSON.toJSONString(map), PsMonitorSz.class);
+            monitordata.setFid(UUID.randomUUID().toString().replace("-", ""));
+            // 入库时间
+            monitordata.setRecordTime(LocalDateTime.now());
+            monitordata.setRemark(oldDataStr);
+            log.info("处理数据");
+
+            monitordata.setIpport(ipPort);
+            if (monitordata != null && monitordata.getSampleTime() != null) {
+                //Integer monitorId = serverHandler.psMonitorSzService.selectMonitorId(psMonitorSz.getMn());
+                monitordata.setMonitorid(123456789);
+                // 添加数据
+                this.baseMapper.insert(monitordata);
+            }
+        }
+    }
+}

+ 69 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/utils/DataCheckUtil.java

@@ -0,0 +1,69 @@
+package com.zksy.environmentHJ212.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @Description
+ * @Date 2021- 06-04-上午 9:02
+ * @auther Javen
+ */
+public class DataCheckUtil {
+    /**
+     * 进行CRC16数据传输协议校验
+     * @param puchMsg
+     * @param usDataLen
+     * @return
+     */
+    public static String crc16_Checkout(String puchMsg, int usDataLen){
+        int i,j,crc_reg,check;
+
+        crc_reg=0xFFFF;
+        char[] chars = puchMsg.toCharArray();
+        for(i=0;i<usDataLen;i++){
+            crc_reg=(crc_reg>>8)^chars[i];
+            for(j=0;j<8;j++) {
+                check = crc_reg&0x0001;
+                crc_reg>>=1;
+                if(check==0x0001) {
+                    crc_reg^=0xA001;
+                }
+            }
+        }
+
+        String code = as10to16(crc_reg);
+        return code;
+    }
+
+    /**
+     * 将10进制转16进制
+     * @param number
+     * @return
+     */
+    public static String as10to16(int number) {
+        int i = 0;
+        StringBuffer dataCode = new StringBuffer();
+        char[] crc16Char = new char[100];
+        if(number == 0){
+            System.out.print(0);
+        } else {
+            while(number!=0) {
+                int t=number%16;
+                if(t >=0 && t<10) {
+                    crc16Char[i] = (char)(t+'0');
+                    i++;
+                } else {
+                    crc16Char[i] = (char)(t+'A'-10);
+                    i++;
+                }
+                number=number/16;
+            }
+
+            for (int j=i-1;j>=0;j--) {
+                dataCode.append(crc16Char[j]);
+            }
+        }
+        //长度补齐4位
+        String paddedString = StringUtils.leftPad(String.valueOf(dataCode), 4, '0');
+        return paddedString;
+    }
+}

+ 27 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/utils/DateTimeUtil.java

@@ -0,0 +1,27 @@
+package com.zksy.environmentHJ212.utils;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * @Description
+ * @Date 2021- 06-08-下午 16:38
+ * @auther Javen
+ */
+public class DateTimeUtil {
+    /**
+     * 获取现在时间
+     *
+     * @return返回时间格式
+     */
+    public static LocalDateTime parseDateTime(String str) {
+        DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+        LocalDateTime ldt = null;
+        try {
+            ldt = LocalDateTime.parse(str, format);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return ldt;
+    }
+}

+ 253 - 0
environment-hj212-service/src/main/java/com/zksy/environmentHJ212/utils/MonitorDatasynch.java

@@ -0,0 +1,253 @@
+package com.zksy.environmentHJ212.utils;
+
+import com.zksy.environmentHJ212.domain.po.MinuteDataInfo;
+import com.zksy.environmentHJ212.domain.po.PsMonitorSz;
+import com.zksy.environmentHJ212.service.MinuteDataInfoService;
+import com.zksy.environmentHJ212.service.PsMonitorSzService;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.net.InetSocketAddress;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE;
+
+/**
+ * @Description
+ * @Date 2021- 06-01-下午 14:37
+ * @auther Javen
+ */
+@Service
+@Slf4j
+@Scope(SCOPE_PROTOTYPE)
+public class MonitorDatasynch {
+
+    private final PsMonitorSzService psMonitorSzService;
+
+    private final MinuteDataInfoService minuteDataInfoService;
+
+    @Autowired
+    public MonitorDatasynch(PsMonitorSzService psMonitorSzService, MinuteDataInfoService minuteDataInfoService) {
+        this.psMonitorSzService = psMonitorSzService;
+        this.minuteDataInfoService = minuteDataInfoService;
+    }
+    private static LinkedHashMap<String, String> fieldMap = new LinkedHashMap<>();
+    static {
+        fieldMap.put("a01001", "TEMP");// 温度
+        fieldMap.put("a01002", "HUMIDITY");// 湿度
+        fieldMap.put("DataTime", "SAMPLETIME");// 采样时间
+        fieldMap.put("CN", "CN");// 命令编码
+        fieldMap.put("ST", "ST");// 系统编码
+        fieldMap.put("MN", "MN");// 设备唯一标志
+    }
+    /**
+     * 获取水质同步数据,数据有效性校验,解析
+     */
+    public String pushMonitorData(ChannelHandlerContext channelHandlerContext, String oldDataStr){
+        log.info("==========接收到数据:" +  oldDataStr + "==========");
+        MinuteDataInfo monitordata = null;
+        String response = null;
+        if (oldDataStr.length() > 6) {
+            String lenstr = oldDataStr.substring(2, 6);
+            // 获取数据长度
+            int codeLen = Integer.parseInt(lenstr);
+            log.info("数据主体长度:" + codeLen);
+            // 获取数据正文
+            String dataStr = oldDataStr.substring(6, oldDataStr.length()-6);
+            log.info("数据主体正文:" + dataStr);
+            // 获取数据CRC16检验码
+            String code = oldDataStr.substring(oldDataStr.length()-6, oldDataStr.length()-2);
+            log.info(code);
+            // 通过数据生成CRC16校验码
+            String nowCode = DataCheckUtil.crc16_Checkout(dataStr, codeLen);
+            log.info("==========自带的验证码:" + code + "新生成的验证码:" + nowCode);
+            // 前后crc16校验码比对(true-数据传输无问题,false-数据传输出现问题)
+            if (code.equals(nowCode)) {
+                try {
+                    String[] dataArr = dataStr.split(";");
+                    List<String> collect = Arrays.asList(dataArr).stream().filter(item -> item.contains("CN=")).collect(Collectors.toList());
+                    String cn = "";
+                    if (collect != null && collect.size() > 0) {
+                        cn = collect.get(0);
+                    }
+                    // 判断是实时数据(2011)
+                    if ("2011".equals(cn.substring(3))) {
+                        String host = ((InetSocketAddress)channelHandlerContext.channel().remoteAddress()).getAddress().getHostAddress();
+                        int port = ((InetSocketAddress)channelHandlerContext.channel().remoteAddress()).getPort();
+                        String ipPort = host+":"+port;
+                        psMonitorSzService.pushMonitorData(oldDataStr,dataArr,ipPort);
+                    } else if ("2051".equals(cn.substring(3)) || "2061".equals(cn.substring(3))){
+                        //分钟,小时数据
+                        minuteDataInfoService.saveMonitordata(oldDataStr,dataArr);
+                    }else{
+                        System.err.println("报警"+oldDataStr);
+                    }
+
+                } catch (Exception e) {
+                    log.info("==========数据解析错误==========", oldDataStr);
+                    e.printStackTrace();
+                }
+            } else {
+                log.info("==========数据传输错误,验证不匹配==========", oldDataStr);
+            }
+        } else {
+            log.info("==========数据长度格式不正确==========");
+        }
+        return response;
+    }
+
+    public static PsMonitorSz setPsMonitorSz(Map map, String[] dataArr, Map keyMap, String resultStr, PsMonitorSz monitordata) {
+        Pattern pattern = Pattern.compile("^[aw]{1}[0-9]{1,5}$");
+        fieldMap.keySet().forEach(item -> {
+            String aa = item;
+            if (pattern.matcher(item).matches()) {
+                aa = item + resultStr;
+            } else {
+                aa = item;
+            }
+            keyMap.put(aa, fieldMap.get(item));
+        });
+        for (String str : dataArr) {
+            // 判断对应值是有包含&&符号,如有就替换为空字符
+            if (str.contains("&&")) {
+                str = str.replace("&&", "");
+            }
+            List<String> dtList = new ArrayList<>();
+            if (str.contains(",")) {
+                // 通过,符号截取同项目不同数据值的数据。
+                String[] split = str.split(",");
+                dtList = Arrays.asList(split);
+            } else {
+                dtList.add(str);
+            }
+            for (String item : dtList) {
+                if (item.contains("=")) {
+                    int index = item.indexOf("=");
+                    int lastIndex = item.lastIndexOf("=");
+                    // 判断是否有多个数据值
+                    if (index == lastIndex) {
+                        // 判断=符号,是否是最后一个字符
+                        if (index + 1 != item.length()) {
+                            String key = item.substring(0, index);// 011-Rtd
+                            String val = item.substring(index + 1); // 35.570
+                            // 替换字段键
+                            if (keyMap.containsKey(key)) {
+                                if ("DataTime".equals(key)) {
+                                    map.put(keyMap.get(key), DateTimeUtil.parseDateTime(val));
+                                } else {
+                                    map.put(keyMap.get(key), val);
+                                }
+                            }
+                        }
+                    } else {
+                        // 判断=符号,是否是最后一个字符
+                        if (lastIndex + 1 != item.length()) {
+                            String key = item.substring(index + 1, lastIndex);
+                            String val = item.substring(lastIndex + 1);
+                            // 替换字段键
+                            if (keyMap.containsKey(key)) {
+                                if ("DataTime".equals(key)) {
+                                    map.put(keyMap.get(key), DateTimeUtil.parseDateTime(val));
+                                } else {
+                                    map.put(keyMap.get(key), val);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return monitordata;
+    }
+
+    public static List<MinuteDataInfo> minuteDataInfoData(String oldDataStr,String[] dataArr) {
+        Pattern pattern = Pattern.compile("^[aw]{1}[0-9]{1,5}$");
+        ArrayList<String> arrayList = new ArrayList<>();
+        List<MinuteDataInfo> minuteDataInfolist = new ArrayList<>();
+        for (String param : dataArr) {
+            System.out.println("param = " + param);
+            String String = "&&";
+            boolean contains = param.contains(String);
+            if (contains) {
+                param = param.replace(String, ",");
+
+            }
+            if (param.contains(",")) {
+                arrayList.add(param);
+            }
+
+        }
+        ArrayList<String> objects = new ArrayList<>();
+        for (String entity : arrayList) {
+            String substring = null;
+            if (entity.contains("-")) {
+                substring = entity.substring(0, entity.indexOf("-"));
+            }
+            if (StringUtils.isBlank(substring)) {
+                continue;
+            }
+            if (entity.startsWith(substring)) {
+                objects.add(substring);
+                String[] split = entity.split(",");
+
+                MinuteDataInfo minuteDataInfo = new MinuteDataInfo();
+                minuteDataInfo.setRemark(oldDataStr);
+                for (String parameters : split) {
+
+                    String[] split1 = parameters.split("=");
+                    minuteDataInfo.setCode(substring);
+                    if (parameters.startsWith(substring + "-Cou=")) {
+
+                        minuteDataInfo.setDataCouValue(new BigDecimal(split1[1]));
+                    }
+                    if (parameters.startsWith(substring + "-Min=")) {
+                        minuteDataInfo.setDataMinValue(new BigDecimal(split1[1]));
+                    }
+                    if (parameters.startsWith(substring + "-Avg=")) {
+                        minuteDataInfo.setDataAvgValue(new BigDecimal(split1[1]));
+                    }
+                    if (parameters.startsWith(substring + "-Max=")) {
+                        minuteDataInfo.setDataMaxValue(new BigDecimal(split1[1]));
+                    }
+                    if (parameters.startsWith(substring + "-Flag=")) {
+                        minuteDataInfo.setDataFlag(split1[1]);
+                    }
+                    minuteDataInfo.setCreateTime(LocalDateTime.now());
+                    minuteDataInfo.setId(UUID.randomUUID().toString().replace("-", ""));
+                    for (String comParams : dataArr) {
+                        if (comParams.contains("&&") && !comParams.contains(",")) {
+                            String replace = comParams.replace("&&", ",");
+                            String[] split2 = replace.split(",");
+                            if (split2.length>1) {
+                                String dateString = split2[1];
+                                if (dateString.startsWith("DataTime=")) {
+                                    minuteDataInfo.setDataTime(DateTimeUtil.parseDateTime(dateString.split("=")[1]));
+                                }
+                            }
+                        }
+                        String[] comsplit1 = comParams.split("=");
+                        String paraname = comsplit1[0];
+                        String value = comsplit1[1];
+                        if (paraname.equals("MN")) {
+                            minuteDataInfo.setMn(value);
+                        }
+                        if (paraname.equals("Flag")) {
+                            minuteDataInfo.setFlag(value);
+                        }
+                    }
+                }
+                minuteDataInfolist.add(minuteDataInfo);
+            }
+        }
+        return minuteDataInfolist;
+    }
+}

+ 20 - 0
environment-hj212-service/src/main/resources/application-dev.yaml

@@ -0,0 +1,20 @@
+zksy:
+  db:
+    host: 192.168.110.30
+    un: root
+    pw: 123
+    port: 3307
+    database: zhongkeshengyang
+spring:
+  redis:
+    host: 192.168.110.30
+    port: 6379
+minio:
+  endpoint: http://192.168.110.30:9000
+  accessKey: minio
+  secretKey: minio123
+  bucket: zksy-file
+  readPath: http://192.168.110.30:9000
+netty:
+  # 端口号
+  port: 8085

+ 20 - 0
environment-hj212-service/src/main/resources/application-prod.yaml

@@ -0,0 +1,20 @@
+zksy:
+  db:
+    host: 172.16.102.51
+    un: root
+    pw: d$3%#*(jnhUDGHB]z0x876c~
+    port: 3306
+    database: zhongkeshengyang
+spring:
+  redis:
+    host: 172.16.102.52
+    port: 6379
+minio:
+  endpoint: http://172.16.102.52:9000
+  accessKey: minio
+  secretKey: minio123
+  bucket: test
+  readPath: http://172.16.102.52:9000
+netty:
+  # 端口号
+  port: 8085

+ 31 - 0
environment-hj212-service/src/main/resources/bootstrap.yaml

@@ -0,0 +1,31 @@
+spring:
+  application:
+    name: environment-hj212-service
+  profiles:
+    active: dev
+  main:
+    allow-bean-definition-overriding: true
+  cloud:
+    sentinel:
+      transport:
+        dashboard: 192.168.110.30:8090
+      http-method-specify: true
+    nacos:
+      discovery:
+        server-addr: 192.168.110.30:8848
+      config:
+        server-addr: 192.168.110.30:8848
+        file-extension: yaml
+        shared-configs:
+          - dataId: environment-hj212-service.yaml
+          - dataId: zksy-shared-jdbc.yaml
+          - dataId: zksy-shared-log.yaml
+  redis:
+    host: 192.168.110.30
+    port: 6379
+netty:
+  # 端口号
+  port: 20011
+
+
+

+ 26 - 0
environment-hj212-service/src/main/resources/mapper/MinuteDataInfoMapper.xml

@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.zksy.environmentHJ212.mapper.MinuteDataInfoMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.zksy.environmentHJ212.domain.po.MinuteDataInfo">
+        <id column="id" property="id" />
+        <result column="data_time" property="dataTime" />
+        <result column="code" property="code" />
+        <result column="mn" property="mn" />
+        <result column="flag" property="flag" />
+        <result column="data_avg_value" property="dataAvgValue" />
+        <result column="data_min_value" property="dataMinValue" />
+        <result column="data_max_value" property="dataMaxValue" />
+        <result column="data_cou_value" property="dataCouValue" />
+        <result column="data_flag" property="dataCouValue" />
+        <result column="create_time" property="dataFlag" />
+        <result column="remark" property="remark" />
+    </resultMap>
+
+    <!-- 通用查询结果列 -->
+    <sql id="Base_Column_List">
+        id, data_time, code, mn, flag, data_avg_value, data_min_value, data_max_value, data_cou_value, data_flag, create_time, remark
+    </sql>
+
+</mapper>

+ 31 - 0
environment-hj212-service/src/main/resources/mapper/PsMonitorSzMapper.xml

@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.zksy.environmentHJ212.mapper.PsMonitorSzMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="BaseResultMap" type="com.zksy.environmentHJ212.domain.po.PsMonitorSz">
+        <id column="FID" property="fid" />
+        <result column="MONITORID" property="monitorid" />
+        <result column="SAMPLE_TIME" property="sampleTime" />
+        <result column="TEMP" property="temp" />
+        <result column="HUMIDITY" property="humidity" />
+        <result column="RECORD_TIME" property="recordTime" />
+        <result column="REPOR_UNIT" property="reporUnit" />
+        <result column="REPORT_DATE" property="reportDate" />
+        <result column="REMARK" property="remark" />
+        <result column="CN" property="cn" />
+        <result column="ST" property="st" />
+        <result column="MN" property="mn" />
+        <result column="IPPORT" property="ipport" />
+    </resultMap>
+
+    <!-- 通用查询结果列 -->
+    <sql id="Base_Column_List">
+        FID, MONITORID, SAMPLE_TIME, TEMP, HUMIDITY, RECORD_TIME, REPOR_UNIT, REPORT_DATE, REMARK, CN, ST, MN
+    </sql>
+
+    <select id="selectMonitorId" resultType="java.lang.Integer">
+        select monitorid from ps_monitor_sz where mn=#{mn}
+    </select>
+
+</mapper>