Переглянути джерело

feat(telemetry): 初始化遥测服务模块

- 添加遥测服务基础配置文件 (application-dev.yaml, application-prod.yaml, bootstrap.yaml)- 配置数据库、Redis及Nacos相关连接参数- 引入Netty服务端实现,支持设备数据接收与处理
- 实现消息处理器MessageHandler,完成数据解析与入库逻辑
- 添加协议校验、自定义应答帧构建功能
- 集成Spring Boot Starter Web及相关中间件依赖- 设置Maven项目结构与依赖管理
林仔 7 місяців тому
батько
коміт
42ecc7215d

+ 72 - 0
telemetry-service/pom.xml

@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>com.zksy</groupId>
+        <artifactId>pipe-ner-server</artifactId>
+        <version>1.0.0</version>
+    </parent>
+    <groupId>com.zksy</groupId>
+    <artifactId>telemetry-service</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencies>
+        <!--common-->
+        <dependency>
+            <groupId>com.zksy</groupId>
+            <artifactId>zk-common</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <!--数据库-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <!--mybatis-->
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+        </dependency>
+        <!--nacos 服务注册发现-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
+        </dependency>
+        <!--负载均衡-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
+        </dependency>
+        <!--统一配置管理-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
+        </dependency>
+        <!--加载bootstrap-->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-bootstrap</artifactId>
+        </dependency>
+        <!--sentinel-->
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
+        </dependency>
+        <!--netty依赖-->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+    </dependencies>
+</project>

+ 40 - 0
telemetry-service/src/main/java/com/zksy/telemetry/config/NettyServer.java

@@ -0,0 +1,40 @@
+package com.zksy.telemetry.config;
+
+import io.netty.channel.EventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+@Component
+public class NettyServer {
+
+	private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
+	public static EventLoopGroup acceptor;
+	public static EventLoopGroup worker;
+
+	@Autowired
+	private NettyServerThread nettyServerThread;
+
+	@PostConstruct
+	public void init() {
+		new Thread(() -> nettyServerThread.startServer()).start();
+		System.out.println("nettyServer启动");
+		logger.info("nettyServer启动");
+	}
+
+	@PreDestroy
+	public void exit() {
+		if (acceptor != null) {
+			acceptor.shutdownGracefully();
+		}
+		if (worker != null) {
+			worker.shutdownGracefully();
+		}
+	}
+}

+ 77 - 0
telemetry-service/src/main/java/com/zksy/telemetry/config/NettyServerThread.java

@@ -0,0 +1,77 @@
+package com.zksy.telemetry.config;
+
+import com.zksy.telemetry.utils.MessageHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyServerThread {
+	@Value("${netty.port:8313}")
+	private int port;
+	private static Logger logger = LoggerFactory.getLogger(NettyServerThread.class);
+	private final MessageHandler messageHandler;
+	@Autowired
+	public NettyServerThread(MessageHandler messageHandler) {
+		this.messageHandler = messageHandler;
+	}
+	public void startServer() {
+		System.out.println("Netty服务启动端口号" + port);
+		EventLoopGroup acceptor = new NioEventLoopGroup();
+		EventLoopGroup worker = new NioEventLoopGroup();
+		NettyServer.acceptor = acceptor;
+		NettyServer.worker = worker;
+		ServerBootstrap bootstrap = new ServerBootstrap();
+
+		// 添加boss和worker组
+		bootstrap.group(acceptor, worker);
+		//这句是指定允许等待accept的最大连接数量,我只需要连一个客户端,这里就关掉了,java默认是50个
+		// bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
+		bootstrap.option(ChannelOption.TCP_NODELAY, true);
+		// 用于构造socketchannel工厂
+		bootstrap.channel(NioServerSocketChannel.class);
+
+		/**
+		 * 传入自定义客户端Handle(处理消息)
+		 */
+		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+			@Override
+			public void initChannel(SocketChannel ch) throws Exception {
+				logger.info("来自" + ch.remoteAddress() + "的新连接接入");
+				// 注册handler
+				ch.pipeline().addLast(new ReadTimeoutHandler(3600));// 超时时间,1小时内没有从通道(Channel)读取到任何数据
+				ch.pipeline().addLast(messageHandler);
+			}
+		});
+
+		// 绑定端口,开始接收进来的连接
+		ChannelFuture f;
+		try {
+			f = bootstrap.bind(port).sync();
+			// 等待服务器 socket 关闭 。
+			f.channel().closeFuture().sync();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			if (acceptor != null) {
+				acceptor.shutdownGracefully();
+			}
+			if (worker != null) {
+				worker.shutdownGracefully();
+			}
+		}
+	}
+}

+ 165 - 0
telemetry-service/src/main/java/com/zksy/telemetry/utils/MessageHandler.java

@@ -0,0 +1,165 @@
+package com.zksy.telemetry.utils;
+
+import com.zksy.common.exception.InvalidMessageException;
+import com.zksy.telemetry.domain.TelemetryData;
+import com.zksy.telemetry.service.TelemetryDataService;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.util.ReferenceCountUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@ChannelHandler.Sharable
+@Slf4j
+@Component
+public class MessageHandler extends ChannelInboundHandlerAdapter {
+	private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
+	private final TelemetryDataService service;
+	@Autowired
+	public MessageHandler(TelemetryDataService telemetryDataService) {
+		this.service = telemetryDataService;
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		super.channelActive(ctx);
+		//sendDataToDevice(ctx);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) {
+		ByteBuf in = (ByteBuf) msg;
+		byte[] msgBytes = new byte[in.readableBytes()];
+		in.readBytes(msgBytes);
+		in.release();
+
+		try {
+			logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
+
+			// 提取帧类型
+			byte frameType = msgBytes[6];
+			logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
+
+			// 1. 完整协议校验
+			DataParser.validateMessage(msgBytes);
+			logger.info("数据帧校验通过");
+
+			// 2. 数据解析
+			TelemetryData resultData = DataParser.parseMessage(msgBytes);
+			if (resultData.getSystemIdentifier() == null) {
+				resultData.setSystemIdentifier("123456");
+			}
+
+			// 3. 根据帧类型处理
+			if (frameType == 0x31) {
+				if (msgBytes.length > 34) {
+					byte flagByte = msgBytes[34];
+					String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
+					boolean isEnd = (flagByte & 0b00001000) != 0;
+
+					logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
+							String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
+
+					resultData.setIsLastPacket(isEnd);
+				} else {
+					logger.warn("数据长度不足,无法检测结束位");
+				}
+				// 入库
+				service.save(resultData);
+				logger.info("上报历史记录数据入库成功: {}", resultData);
+
+				// 如果结束位=1,可以根据需要在此处理关闭逻辑
+				if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
+					logger.info("检测到结束标志帧,执行结束处理逻辑...");
+					byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
+					logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
+				}else{
+					// 回复自定义应答帧
+					byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+					logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
+				}
+
+			} else if (frameType == 0x34) {
+				logger.info("收到结束通讯帧,准备发送回应包");
+				byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
+				ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+				logger.debug("已发送结束通讯回应包");
+
+			} else {
+				logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
+			}
+
+		} catch (InvalidMessageException e) {
+			logger.error("数据校验失败,不入库: {}", e.getMessage());
+			sendErrorResponse(ctx, "数据校验失败");
+
+		} catch (Exception e) {
+			logger.error("数据解析或入库异常", e);
+			sendErrorResponse(ctx, "数据处理异常");
+		}
+	}
+
+
+	// 工具方法:发送错误响应
+	private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
+		ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
+	}
+	private String printHexBinary(byte[] bytes) {
+		StringBuilder sb = new StringBuilder();
+		for (byte b : bytes) {
+			sb.append(String.format("%02X ", b));
+		}
+		System.out.println("Received raw data: " + sb.toString());
+		logger.info("Received raw data: " + sb.toString());
+		return sb.toString();
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		if (cause instanceof ReadTimeoutException) {
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
+		} else {
+			cause.printStackTrace();
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
+			ctx.close();
+		}
+	}
+
+	@Override
+	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+		ctx.flush();
+	}
+
+	@Override
+	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+		logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
+		ctx.fireChannelUnregistered();
+	}
+
+	// 主动向设备发送数据
+	public void sendDataToDevice(ChannelHandlerContext ctx) {
+		try {
+			// 将字符串形式的十六进制数据转换为字节数组
+			String hexData = "01 03 00 00 00 01 84 0A ";
+			String[] hexArray = hexData.split(" ");
+			byte[] dataBytes = new byte[hexArray.length];
+			for (int i = 0; i < hexArray.length; i++) {
+				dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
+			}
+			ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
+			ctx.writeAndFlush(byteBuf);
+			logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
+		} catch (Exception e) {
+			logger.error("发送数据失败: {}", e.getMessage());
+		}
+	}
+}

+ 17 - 0
telemetry-service/src/main/resources/application-dev.yaml

@@ -0,0 +1,17 @@
+zksy:
+  db:
+    host: 192.168.110.30
+    un: root
+    pw: 123
+    port: 3307
+    database: pipe-ner
+spring:
+  redis:
+    host: 192.168.110.30
+    port: 6379
+  cloud:
+    nacos:
+      discovery:
+        server-addr: 192.168.110.30:8848
+      config:
+        server-addr: 192.168.110.30:8848

+ 11 - 0
telemetry-service/src/main/resources/application-prod.yaml

@@ -0,0 +1,11 @@
+zksy:
+  db:
+    host: 47.107.107.47
+    un: root
+    pw: d$3%#*(jnhUDGHB]z0x876c~
+    port: 3306
+    database: pipe-ner
+spring:
+  redis:
+    host: 47.107.107.47
+    port: 6379

+ 35 - 0
telemetry-service/src/main/resources/bootstrap.yaml

@@ -0,0 +1,35 @@
+spring:
+  application:
+    name: telemetry-service
+  profiles:
+    active: dev
+  servlet:
+    multipart:
+      max-file-size: 100MB
+      max-request-size: 100MB
+  main:
+    allow-bean-definition-overriding: true
+  cloud:
+    sentinel:
+      transport:
+#        dashboard: 172.16.102.52:8090
+        dashboard: 192.168.110.30:8090
+      http-method-specify: true
+    nacos:
+      discovery:
+#        server-addr: 172.16.102.52:8848
+#        namespace: 99a7b129-faed-4b32-bcbf-54acd7ffd372
+         server-addr: 192.168.110.30:8848
+         namespace: 9ce240d6-ae6b-45a3-8566-e75ac25c9d71
+      config:
+#        server-addr: 172.16.102.52:8848
+#        namespace: 99a7b129-faed-4b32-bcbf-54acd7ffd372
+        server-addr: 192.168.110.30:8848
+        namespace: 9ce240d6-ae6b-45a3-8566-e75ac25c9d71
+        file-extension: yaml
+        shared-configs:
+          - dataId: telemetry-service.yaml
+          - dataId: zksy-shared-jdbc.yaml
+          - dataId: zksy-shared-log.yaml
+netty:
+  port: 8313