Ver código fonte

feat(netty): 添加音频雷达水位服务的Netty服务器实现

- 新增AudioApplication作为音频服务启动类
- 实现MessageHandler消息处理器处理设备通信协议
- 添加NettyServer配置类管理服务器生命周期
- 创建NettyServerThread实现TCP服务器启动逻辑
- 支持音频数据接收存储和数据库入库功能
- 实现雷达和水位监测数据的协议解析处理
- 集成hutool工具库进行数据转换和日期处理
- 添加设备连接管理维护客户端状态信息
林仔 1 mês atrás
pai
commit
0d28d3b1cb

+ 15 - 0
audio-service/src/main/java/com/zksy/audio/AudioApplication.java

@@ -0,0 +1,15 @@
+package com.zksy.audio;
+
+
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@MapperScan(basePackages = "com.zksy.audio.mapper")
+@SpringBootApplication(scanBasePackages = {"com.zksy.audio","com.zksy.api"})
+public class AudioApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(AudioApplication.class, args);
+        System.out.println("音频启动成功");
+    }
+}

+ 40 - 0
audio-service/src/main/java/com/zksy/audio/config/NettyServer.java

@@ -0,0 +1,40 @@
+package com.zksy.audio.config;
+
+import io.netty.channel.EventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+@Component
+public class NettyServer {
+
+	private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
+	public static EventLoopGroup acceptor;
+	public static EventLoopGroup worker;
+
+	@Autowired
+	private NettyServerThread nettyServerThread;
+
+	@PostConstruct
+	public void init() {
+		new Thread(() -> nettyServerThread.startServer()).start();
+		System.out.println("nettyServer启动");
+		logger.info("nettyServer启动");
+	}
+
+	@PreDestroy
+	public void exit() {
+		if (acceptor != null) {
+			acceptor.shutdownGracefully();
+		}
+		if (worker != null) {
+			worker.shutdownGracefully();
+		}
+	}
+}

+ 77 - 0
audio-service/src/main/java/com/zksy/audio/config/NettyServerThread.java

@@ -0,0 +1,77 @@
+package com.zksy.audio.config;
+
+import com.zksy.audio.utils.MessageHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyServerThread {
+	@Value("${netty.port:8313}")
+	private int port;
+	private static Logger logger = LoggerFactory.getLogger(NettyServerThread.class);
+	private final MessageHandler messageHandler;
+	@Autowired
+	public NettyServerThread(MessageHandler messageHandler) {
+		this.messageHandler = messageHandler;
+	}
+	public void startServer() {
+		System.out.println("Netty服务启动端口号" + port);
+		EventLoopGroup acceptor = new NioEventLoopGroup();
+		EventLoopGroup worker = new NioEventLoopGroup();
+		NettyServer.acceptor = acceptor;
+		NettyServer.worker = worker;
+		ServerBootstrap bootstrap = new ServerBootstrap();
+
+		// 添加boss和worker组
+		bootstrap.group(acceptor, worker);
+		//这句是指定允许等待accept的最大连接数量,我只需要连一个客户端,这里就关掉了,java默认是50个
+		// bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
+		bootstrap.option(ChannelOption.TCP_NODELAY, true);
+		// 用于构造socketchannel工厂
+		bootstrap.channel(NioServerSocketChannel.class);
+
+		/**
+		 * 传入自定义客户端Handle(处理消息)
+		 */
+		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+			@Override
+			public void initChannel(SocketChannel ch) throws Exception {
+				logger.info("来自" + ch.remoteAddress() + "的新连接接入");
+				// 注册handler
+				ch.pipeline().addLast(new ReadTimeoutHandler(3600));// 超时时间,1小时内没有从通道(Channel)读取到任何数据
+				ch.pipeline().addLast(messageHandler);
+			}
+		});
+
+		// 绑定端口,开始接收进来的连接
+		ChannelFuture f;
+		try {
+			f = bootstrap.bind(port).sync();
+			// 等待服务器 socket 关闭 。
+			f.channel().closeFuture().sync();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			if (acceptor != null) {
+				acceptor.shutdownGracefully();
+			}
+			if (worker != null) {
+				worker.shutdownGracefully();
+			}
+		}
+	}
+}

+ 367 - 0
audio-service/src/main/java/com/zksy/audio/utils/MessageHandler.java

@@ -0,0 +1,367 @@
+package com.zksy.audio.utils;
+
+import cn.hutool.core.convert.Convert;
+import cn.hutool.core.date.DateField;
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.lang.Validator;
+import cn.hutool.core.thread.ThreadUtil;
+import cn.hutool.core.util.StrUtil;
+import com.zksy.audio.domain.ChannelInfo;
+import com.zksy.audio.domain.NoiseInfo;
+import com.zksy.audio.service.NoiseInfoService;
+import com.zksy.audio.utils.CrtHelper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * 消息处理器
+ * 负责处理各种请求和响应
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class MessageHandler extends ChannelInboundHandlerAdapter {
+
+	public ConcurrentMap<ChannelId, ChannelInfo> channelGroup = new ConcurrentHashMap<>();
+
+	public ConcurrentMap<String, ChannelInfo> deviceGroup = new ConcurrentHashMap<>();
+
+	@Value("${wlds.fileUploadBaseDir}")
+	private String fileUploadBaseDir;
+
+	private final static String head = "524946462440010057415645666D742010000000010001000020000000400000020010006461746100400100";
+	private final NoiseInfoService service;
+	@Autowired
+	public MessageHandler(NoiseInfoService noiseInfoService) {
+		this.service = noiseInfoService;
+	}
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+		//log.info("==收到客户端:{}|{}发送的消息:{}============:", ctx.channel().id().asLongText(), ctx.channel().remoteAddress(), msg);
+		try {
+			ByteBuf byteBuffer = (ByteBuf) msg;
+			if (byteBuffer == null) {
+				return;
+			}
+			int readableLength = byteBuffer.readableBytes();
+			byte[] arrayWithOutHead = new byte[readableLength];//去掉包头标识的数据
+			byteBuffer.getBytes(byteBuffer.readerIndex(), arrayWithOutHead);
+
+			String hex = CrtHelper.byteToHexStr1(arrayWithOutHead);
+			log.info("收到消息:{}", hex);
+			double power = 0;
+			String deviceNo;
+			double gain = 0;
+
+			byte[] bytes;
+			String asciiString;
+			String path;
+
+			ChannelInfo info;
+
+			ChannelInfo channelInfo = new ChannelInfo("puqi", ctx.channel());
+
+			if (!channelGroup.containsKey(ctx.channel().id())) {
+				// 加入客户端
+				channelGroup.put(ctx.channel().id(), channelInfo);
+			}
+
+			info = channelGroup.get(ctx.channel().id());
+
+			// 申报身份
+			if (hex.startsWith("242424")) {
+				String reativePath = "";
+				String[] hexArray = Arrays.stream(hex.split("0D0A")).filter(StrUtil::isNotEmpty).toArray(String[]::new);
+				log.info("hexArray.length:{}", hexArray.length);
+				if (hexArray.length >= 1) {
+					bytes = CrtHelper.strToToHexByte1(hexArray[0]);
+					asciiString = new String(bytes, StandardCharsets.US_ASCII);
+					deviceNo = asciiString.replaceAll("\\$\\$\\$", "").replaceAll("\r\n", "").trim();
+					path = getPathName(deviceNo, reativePath);
+
+					info.setDeviceNo(deviceNo);
+					info.setPath(path);
+					info.setFirst(true);
+					info.setFilePath(path);
+					info.setLength(0);
+
+					deviceGroup.put(deviceNo, info);
+					channelGroup.put(ctx.channel().id(), info);
+					info = channelGroup.get(ctx.channel().id());
+				}
+				if (hexArray.length >= 2) {
+					bytes = CrtHelper.strToToHexByte1(hexArray[1]);
+					asciiString = new String(bytes, StandardCharsets.US_ASCII);
+					asciiString = asciiString.replaceAll("VBAT:", "").replaceAll("V", "").replaceAll("\r\n", "").trim();
+					if (Validator.isNumber(asciiString)) {
+						power = Convert.toDouble(asciiString);
+					}
+				}
+				if (hexArray.length >= 3) {
+					String gainHex = hexArray[3];
+					bytes = CrtHelper.strToToHexByte1(gainHex);
+					asciiString = new String(bytes, StandardCharsets.US_ASCII);
+					asciiString = asciiString.replaceAll("Gain:", "").replaceAll("\r\n", "");
+					if (Validator.isNumber(asciiString)) {
+						gain = Convert.toDouble(asciiString);
+					}
+				}
+
+				channelGroup.get(ctx.channel().id()).setPower(power);
+				channelGroup.get(ctx.channel().id()).setGain(gain);
+
+				if (hexArray.length >= 4) {
+					String videoHex = hexArray[3];
+					info = channelGroup.get(ctx.channel().id());
+					path = info.getPath();
+					info.setLength(info.getLength() + (videoHex.length() / 2));
+					String finalPath1 = path;
+					ThreadUtil.execAsync(() -> {
+						saveHexStringToFile(finalPath1, videoHex);
+					});
+				}
+
+			}
+			// 申报电量
+			else if (hex.startsWith("56424154")) {
+				if (!channelGroup.containsKey(ctx.channel().id())) {
+					return;
+				}
+				if (StrUtil.isEmpty(channelGroup.get(ctx.channel().id()).getDeviceNo())) {
+					return;
+				}
+				String[] hexArray = Arrays.stream(hex.split("0D0A")).filter(StrUtil::isNotEmpty).toArray(String[]::new);
+				if (hexArray.length >= 1) {
+					bytes = CrtHelper.strToToHexByte1(hexArray[0]);
+					asciiString = new String(bytes, StandardCharsets.US_ASCII);
+					asciiString = asciiString.replaceAll("VBAT:", "").replaceAll("V", "").replaceAll("\r\n", "").trim();
+					if (Validator.isNumber(asciiString)) {
+						power = Double.parseDouble(asciiString);
+					}
+				}
+				if (hexArray.length >= 2) {
+					String gainHex = hexArray[1];
+					bytes = CrtHelper.strToToHexByte1(gainHex);
+					asciiString = new String(bytes, StandardCharsets.US_ASCII);
+					asciiString = asciiString.replaceAll("Gain:", "").replaceAll("\r\n", "");
+					if (Validator.isNumber(asciiString)) {
+						gain = Double.parseDouble(asciiString);
+					}
+				}
+				channelGroup.get(ctx.channel().id()).setGain(gain);
+				channelGroup.get(ctx.channel().id()).setPower(power);
+
+				if (hexArray.length >= 3) {
+					String videoHex = hexArray[2];
+					info = channelGroup.get(ctx.channel().id());
+					path = info.getPath();
+					info.setLength(info.getLength() + (videoHex.length() / 2));
+					String finalPath1 = path;
+					ThreadUtil.execAsync(() -> {
+						saveHexStringToFile(finalPath1, videoHex);
+					});
+				}
+			}
+			// 增益值
+			else if (hex.startsWith("4761696E")) {
+				if (!channelGroup.containsKey(ctx.channel().id())) {
+					return;
+				}
+				if (StrUtil.isEmpty(channelGroup.get(ctx.channel().id()).getDeviceNo())) {
+					return;
+				}
+				String[] hexArray = Arrays.stream(hex.split("0D0A")).filter(StrUtil::isNotEmpty).toArray(String[]::new);
+				if (hexArray.length >= 1) {
+					String gainHex = hexArray[0];
+					bytes = CrtHelper.strToToHexByte1(gainHex);
+					asciiString = new String(bytes, StandardCharsets.US_ASCII);
+					asciiString = asciiString.replaceAll("Gain:", "").replaceAll("\r\n", "");
+					if (Validator.isNumber(asciiString)) {
+						gain = Double.parseDouble(asciiString);
+					}
+				}
+				channelGroup.get(ctx.channel().id()).setGain(gain);
+				if (hexArray.length >= 2) {
+					String videoHex = hexArray[1];
+					info = channelGroup.get(ctx.channel().id());
+					path = info.getPath();
+					info.setLength(info.getLength() + (videoHex.length() / 2));
+					String finalPath1 = path;
+					ThreadUtil.execAsync(() -> {
+						saveHexStringToFile(finalPath1, videoHex);
+					});
+				}
+			} else if (hex.startsWith("4F4B")) {
+				bytes = CrtHelper.strToToHexByte1(hex);
+				asciiString = new String(bytes, StandardCharsets.US_ASCII);
+				log.info("==收到客户端:{},GPS信息:{}", ctx.channel().id().asLongText(), asciiString);
+				info = channelGroup.get(ctx.channel().id());
+				String pattern = ",(\\d+\\.\\d+),\\d+\\.\\d+";
+				Pattern regex = Pattern.compile(pattern);
+				Matcher match = regex.matcher(asciiString);
+				if (match.matches()) {
+					// 提取经纬度,注意第一个捕获组是纬度,第二个捕获组(如果有的话)是经度
+					// 但由于我们只关心第一个逗号后的经纬度,所以可以直接拆分
+					String[] coords = match.group(0).replaceFirst(",", "").split(",");
+					if (coords.length == 2) {
+						double lat = Double.parseDouble(coords[0]);
+						double lon = Double.parseDouble(coords[1]);
+						log.info("经纬度:{},{}", lat, lon);
+						//-----------
+					}
+				} else {
+					log.info("==收到客户端:{}的设备调试信息:{}", ctx.channel().id().asLongText(), asciiString);
+
+				}
+			} else {
+				if (!channelGroup.containsKey(ctx.channel().id())) {
+					return;
+				}
+				if (StrUtil.isEmpty(channelGroup.get(ctx.channel().id()).getDeviceNo())) {
+					return;
+				}
+				info = channelGroup.get(ctx.channel().id());
+				path = channelGroup.get(ctx.channel().id()).getPath();
+				String finalPath1 = path;
+				String headHex;
+				byte[] length = new byte[4];
+				if (info.getFirst() && !hex.startsWith("52494646") ){
+					//用于没带头文件的音频数据
+					headHex = head + hex;
+					long videoMaxSize = 80 * 1024 + 44;
+					info.setVideoMaxSize(videoMaxSize);
+
+				} else {
+					headHex = hex;
+					if (hex.startsWith("52494646")){
+						//24001900  ====> 163840
+						length[0] = (byte) Integer.parseInt(hex.substring(14, 16), 16);
+						length[1] = (byte) Integer.parseInt(hex.substring(12, 14), 16);
+						length[2] = (byte) Integer.parseInt(hex.substring(10, 12), 16);
+						length[3] = (byte) Integer.parseInt(hex.substring(8, 10), 16);
+						long videoMaxSize = (Long.parseLong(CrtHelper.byteToHexStr1(length), 16) - 36) / 10 + 44;
+						info.setVideoMaxSize(videoMaxSize);
+					}
+				}
+				info.setFirst(false);
+				info.setLength(info.getLength() + headHex.length() / 2);
+				ThreadUtil.execAsync(() -> {
+					saveHexStringToFile(finalPath1, headHex);
+				});
+				log.info("======收到客户端总字节{}-----videoMaxSize:{}",info.getLength(),info.getVideoMaxSize());
+				// 如果接收音频文件字节大于头文件标识长度;
+				if (info.getLength() >= info.getVideoMaxSize()) {
+					if (info.getGatheTime() == null) {
+						info.setGatheTime(DateUtil.date());
+					}
+					// TODO: 2023/9/5 音频文件信息入库
+					int rms = DataParser.calculateRMS(fileUploadBaseDir+path);
+					double centerFrequency = DataParser.calcCenterFrequency(fileUploadBaseDir+path);
+					NoiseInfo noise = new NoiseInfo();
+					noise.setEncode(info.getDeviceNo());
+					noise.setPower(info.getPower());
+					noise.setGain(info.getGain());
+					noise.setRms(rms);
+					noise.setCenterFrequency(centerFrequency);
+					noise.setFilepath(info.getFilePath());
+					noise.setTs(DateUtil.format(DateUtil.offset(info.getGatheTime(), DateField.HOUR, -8),
+							DatePattern.NORM_DATETIME_PATTERN));
+					//log.info(JsonUtils.toJsonString(noise));
+					service.save(noise);
+					//设备默认上报三次音频数据, 数据接收完成回复设备Ok,停止上报
+					byte[] cmdBytes = "OK\r\n".getBytes(StandardCharsets.US_ASCII);
+					ctx.channel().writeAndFlush(Unpooled.copiedBuffer(cmdBytes));
+				}
+
+			}
+
+		} catch (Exception e) {
+			log.error(e.getMessage(), e);
+		}
+
+	}
+
+	private String getPathName(String deviceNo, String tempPath) {
+		String ext = ".wav";
+		tempPath = "/devices/" + deviceNo + "/" + DateUtil.format(DateUtil.date(), DatePattern.PURE_DATE_FORMAT);
+		File desc = new File(fileUploadBaseDir, tempPath);
+		if (!desc.exists()) {
+			if (!desc.getParentFile().exists()) {
+				desc.getParentFile().mkdirs();
+			}
+		}
+		String fileName = deviceNo + "_" + DateUtil.format(DateUtil.date(), DatePattern.PURE_DATETIME_FORMAT) + ext;
+		return tempPath + "/" + fileName;
+	}
+
+	private void saveHexStringToFile(String filePath, String hexString) {
+		if (StrUtil.isEmpty(hexString)) {
+			return;
+		}
+		log.info("音频字节:{}",hexString);
+		byte[] bytes = CrtHelper.strToToHexByte1(hexString);
+		try {
+			FileUtils.writeByteArrayToFile(new File(fileUploadBaseDir + filePath), bytes, true);
+		} catch (IOException e) {
+		}
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		super.channelActive(ctx);
+		log.info("==客户端:{},连接成功", ctx.channel().remoteAddress());
+	}
+
+	@Override
+	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+		ctx.flush();
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		log.error("TCP异常:{}", cause.getMessage());
+		super.exceptionCaught(ctx, cause);
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		log.info("==客户端:{},掉线了", ctx.channel().remoteAddress());
+		ChannelInfo channelInfo = channelGroup.remove(ctx.channel().id());
+		if (channelInfo != null && StrUtil.isNotEmpty(channelInfo.getDeviceNo())) {
+			deviceGroup.remove(channelInfo.getDeviceNo());
+		}
+		super.channelInactive(ctx);
+	}
+
+	@Override
+	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+		log.info("==客户端:{},掉线了,ChannelUnregistered", ctx.channel().remoteAddress());
+		super.channelUnregistered(ctx);
+	}
+
+	@Override
+	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+		log.info("==服务端:{},下线了", ctx.toString());
+		super.handlerRemoved(ctx);
+	}
+}

+ 40 - 0
radar-service/src/main/java/com/zksy/radar/config/NettyServer.java

@@ -0,0 +1,40 @@
+package com.zksy.radar.config;
+
+import io.netty.channel.EventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+@Component
+public class NettyServer {
+
+	private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
+	public static EventLoopGroup acceptor;
+	public static EventLoopGroup worker;
+
+	@Autowired
+	private NettyServerThread nettyServerThread;
+
+	@PostConstruct
+	public void init() {
+		new Thread(() -> nettyServerThread.startServer()).start();
+		System.out.println("nettyServer启动");
+		logger.info("nettyServer启动");
+	}
+
+	@PreDestroy
+	public void exit() {
+		if (acceptor != null) {
+			acceptor.shutdownGracefully();
+		}
+		if (worker != null) {
+			worker.shutdownGracefully();
+		}
+	}
+}

+ 77 - 0
radar-service/src/main/java/com/zksy/radar/config/NettyServerThread.java

@@ -0,0 +1,77 @@
+package com.zksy.radar.config;
+
+import com.zksy.radar.utils.MessageHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyServerThread {
+	@Value("${netty.port:8313}")
+	private int port;
+	private static Logger logger = LoggerFactory.getLogger(NettyServerThread.class);
+	private final MessageHandler messageHandler;
+	@Autowired
+	public NettyServerThread(MessageHandler messageHandler) {
+		this.messageHandler = messageHandler;
+	}
+	public void startServer() {
+		System.out.println("Netty服务启动端口号" + port);
+		EventLoopGroup acceptor = new NioEventLoopGroup();
+		EventLoopGroup worker = new NioEventLoopGroup();
+		NettyServer.acceptor = acceptor;
+		NettyServer.worker = worker;
+		ServerBootstrap bootstrap = new ServerBootstrap();
+
+		// 添加boss和worker组
+		bootstrap.group(acceptor, worker);
+		//这句是指定允许等待accept的最大连接数量,我只需要连一个客户端,这里就关掉了,java默认是50个
+		// bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
+		bootstrap.option(ChannelOption.TCP_NODELAY, true);
+		// 用于构造socketchannel工厂
+		bootstrap.channel(NioServerSocketChannel.class);
+
+		/**
+		 * 传入自定义客户端Handle(处理消息)
+		 */
+		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+			@Override
+			public void initChannel(SocketChannel ch) throws Exception {
+				logger.info("来自" + ch.remoteAddress() + "的新连接接入");
+				// 注册handler
+				ch.pipeline().addLast(new ReadTimeoutHandler(3600));// 超时时间,1小时内没有从通道(Channel)读取到任何数据
+				ch.pipeline().addLast(messageHandler);
+			}
+		});
+
+		// 绑定端口,开始接收进来的连接
+		ChannelFuture f;
+		try {
+			f = bootstrap.bind(port).sync();
+			// 等待服务器 socket 关闭 。
+			f.channel().closeFuture().sync();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			if (acceptor != null) {
+				acceptor.shutdownGracefully();
+			}
+			if (worker != null) {
+				worker.shutdownGracefully();
+			}
+		}
+	}
+}

+ 167 - 0
radar-service/src/main/java/com/zksy/radar/utils/MessageHandler.java

@@ -0,0 +1,167 @@
+package com.zksy.radar.utils;
+
+import com.zksy.common.exception.InvalidMessageException;
+import com.zksy.radar.domain.RadarData;
+import com.zksy.radar.service.RadarDataService;
+import com.zksy.radar.utils.DataParser;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.ReadTimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@ChannelHandler.Sharable
+@Slf4j
+@Component
+public class MessageHandler extends ChannelInboundHandlerAdapter {
+	private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
+	private final RadarDataService service;
+	@Autowired
+	public MessageHandler(RadarDataService RadarDataService) {
+		this.service = RadarDataService;
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		super.channelActive(ctx);
+		//sendDataToDevice(ctx);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) {
+		ByteBuf in = (ByteBuf) msg;
+		byte[] msgBytes = new byte[in.readableBytes()];
+		in.readBytes(msgBytes);
+		in.release();
+
+		try {
+			logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
+
+			// 提取帧类型
+			byte frameType = msgBytes[6];
+			logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
+
+			// 1. 完整协议校验
+			DataParser.validateMessage(msgBytes);
+			logger.info("数据帧校验通过");
+
+			// 2. 数据解析
+			RadarData resultData = DataParser.parseMessage(msgBytes);
+			if (resultData.getSystemIdentifier() == null) {
+				resultData.setSystemIdentifier("123456");
+			}
+
+			// 3. 根据帧类型处理
+			if (frameType == 0x31) {
+				if (msgBytes.length > 34) {
+					byte flagByte = msgBytes[34];
+					String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
+					boolean isEnd = (flagByte & 0x08) != 0;
+
+					logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
+							String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
+				} else {
+					logger.warn("数据长度不足,无法检测结束位");
+				}
+				// 入库
+				service.save(resultData);
+				logger.info("上报历史记录数据入库成功: {}", resultData);
+
+				// 如果结束位=1,可以根据需要在此处理关闭逻辑
+				if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
+					logger.info("检测到结束标志帧,执行结束处理逻辑...");
+					byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+					Thread.sleep(1000);// 休眠1秒
+					logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response));
+					byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
+					logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
+				}else{
+					// 回复自定义应答帧
+					byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+					logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
+				}
+
+			} else if (frameType == 0x34) {
+				/*logger.info("收到结束通讯帧,准备发送回应包");
+				byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
+				ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+				logger.debug("已发送结束通讯回应包");*/
+
+			} else {
+				logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
+			}
+
+		} catch (InvalidMessageException e) {
+			logger.error("数据校验失败,不入库: {}", e.getMessage());
+			sendErrorResponse(ctx, "数据校验失败");
+
+		} catch (Exception e) {
+			logger.error("数据解析或入库异常", e);
+			sendErrorResponse(ctx, "数据处理异常");
+		}
+	}
+
+
+	// 工具方法:发送错误响应
+	private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
+		ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
+	}
+	private String printHexBinary(byte[] bytes) {
+		StringBuilder sb = new StringBuilder();
+		for (byte b : bytes) {
+			sb.append(String.format("%02X ", b));
+		}
+		System.out.println("Received raw data: " + sb.toString());
+		logger.info("Received raw data: " + sb.toString());
+		return sb.toString();
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		if (cause instanceof ReadTimeoutException) {
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
+		} else {
+			cause.printStackTrace();
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
+			ctx.close();
+		}
+	}
+
+	@Override
+	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+		ctx.flush();
+	}
+
+	@Override
+	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+		logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
+		ctx.fireChannelUnregistered();
+	}
+
+	// 主动向设备发送数据
+	public void sendDataToDevice(ChannelHandlerContext ctx) {
+		try {
+			// 将字符串形式的十六进制数据转换为字节数组
+			String hexData = "01 03 00 00 00 01 84 0A ";
+			String[] hexArray = hexData.split(" ");
+			byte[] dataBytes = new byte[hexArray.length];
+			for (int i = 0; i < hexArray.length; i++) {
+				dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
+			}
+			ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
+			ctx.writeAndFlush(byteBuf);
+			logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
+		} catch (Exception e) {
+			logger.error("发送数据失败: {}", e.getMessage());
+		}
+	}
+}

+ 40 - 0
water-level-service/src/main/java/com/zksy/water/config/NettyServer.java

@@ -0,0 +1,40 @@
+package com.zksy.water.config;
+
+import io.netty.channel.EventLoopGroup;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+@Component
+public class NettyServer {
+
+	private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
+	public static EventLoopGroup acceptor;
+	public static EventLoopGroup worker;
+
+	@Autowired
+	private NettyServerThread nettyServerThread;
+
+	@PostConstruct
+	public void init() {
+		new Thread(() -> nettyServerThread.startServer()).start();
+		System.out.println("nettyServer启动");
+		logger.info("nettyServer启动");
+	}
+
+	@PreDestroy
+	public void exit() {
+		if (acceptor != null) {
+			acceptor.shutdownGracefully();
+		}
+		if (worker != null) {
+			worker.shutdownGracefully();
+		}
+	}
+}

+ 77 - 0
water-level-service/src/main/java/com/zksy/water/config/NettyServerThread.java

@@ -0,0 +1,77 @@
+package com.zksy.water.config;
+
+import com.zksy.water.utils.MessageHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyServerThread {
+	@Value("${netty.port:8313}")
+	private int port;
+	private static Logger logger = LoggerFactory.getLogger(NettyServerThread.class);
+	private final MessageHandler messageHandler;
+	@Autowired
+	public NettyServerThread(MessageHandler messageHandler) {
+		this.messageHandler = messageHandler;
+	}
+	public void startServer() {
+		System.out.println("Netty服务启动端口号" + port);
+		EventLoopGroup acceptor = new NioEventLoopGroup();
+		EventLoopGroup worker = new NioEventLoopGroup();
+		NettyServer.acceptor = acceptor;
+		NettyServer.worker = worker;
+		ServerBootstrap bootstrap = new ServerBootstrap();
+
+		// 添加boss和worker组
+		bootstrap.group(acceptor, worker);
+		//这句是指定允许等待accept的最大连接数量,我只需要连一个客户端,这里就关掉了,java默认是50个
+		// bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
+		bootstrap.option(ChannelOption.TCP_NODELAY, true);
+		// 用于构造socketchannel工厂
+		bootstrap.channel(NioServerSocketChannel.class);
+
+		/**
+		 * 传入自定义客户端Handle(处理消息)
+		 */
+		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+			@Override
+			public void initChannel(SocketChannel ch) throws Exception {
+				logger.info("来自" + ch.remoteAddress() + "的新连接接入");
+				// 注册handler
+				ch.pipeline().addLast(new ReadTimeoutHandler(3600));// 超时时间,1小时内没有从通道(Channel)读取到任何数据
+				ch.pipeline().addLast(messageHandler);
+			}
+		});
+
+		// 绑定端口,开始接收进来的连接
+		ChannelFuture f;
+		try {
+			f = bootstrap.bind(port).sync();
+			// 等待服务器 socket 关闭 。
+			f.channel().closeFuture().sync();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} finally {
+			if (acceptor != null) {
+				acceptor.shutdownGracefully();
+			}
+			if (worker != null) {
+				worker.shutdownGracefully();
+			}
+		}
+	}
+}

+ 167 - 0
water-level-service/src/main/java/com/zksy/water/utils/MessageHandler.java

@@ -0,0 +1,167 @@
+package com.zksy.water.utils;
+
+import com.zksy.common.exception.InvalidMessageException;
+import com.zksy.water.domain.WaterMonitorData;
+import com.zksy.water.service.WaterMonitorDataService;
+import com.zksy.water.utils.DataParser;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.ReadTimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@ChannelHandler.Sharable
+@Slf4j
+@Component
+public class MessageHandler extends ChannelInboundHandlerAdapter {
+	private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
+	private final WaterMonitorDataService service;
+	@Autowired
+	public MessageHandler(WaterMonitorDataService waterMonitorDataService) {
+		this.service = waterMonitorDataService;
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+		super.channelActive(ctx);
+		//sendDataToDevice(ctx);
+	}
+
+	@Override
+	public void channelRead(ChannelHandlerContext ctx, Object msg) {
+		ByteBuf in = (ByteBuf) msg;
+		byte[] msgBytes = new byte[in.readableBytes()];
+		in.readBytes(msgBytes);
+		in.release();
+
+		try {
+			logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
+
+			// 提取帧类型
+			byte frameType = msgBytes[6];
+			logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
+
+			// 1. 完整协议校验
+			DataParser.validateMessage(msgBytes);
+			logger.info("数据帧校验通过");
+
+			// 2. 数据解析
+			WaterMonitorData resultData = DataParser.parseMessage(msgBytes);
+			if (resultData.getSystemIdentifier() == null) {
+				resultData.setSystemIdentifier("123456");
+			}
+
+			// 3. 根据帧类型处理
+			if (frameType == 0x31) {
+				if (msgBytes.length > 34) {
+					byte flagByte = msgBytes[34];
+					String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
+					boolean isEnd = (flagByte & 0x08) != 0;
+
+					logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
+							String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
+				} else {
+					logger.warn("数据长度不足,无法检测结束位");
+				}
+				// 入库
+				service.save(resultData);
+				logger.info("上报历史记录数据入库成功: {}", resultData);
+
+				// 如果结束位=1,可以根据需要在此处理关闭逻辑
+				if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
+					logger.info("检测到结束标志帧,执行结束处理逻辑...");
+					byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+					Thread.sleep(1000);// 休眠1秒
+					logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response));
+					byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
+					logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
+				}else{
+					// 回复自定义应答帧
+					byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
+					ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+					logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
+				}
+
+			} else if (frameType == 0x34) {
+				/*logger.info("收到结束通讯帧,准备发送回应包");
+				byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
+				ctx.writeAndFlush(Unpooled.copiedBuffer(response));
+				logger.debug("已发送结束通讯回应包");*/
+
+			} else {
+				logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
+			}
+
+		} catch (InvalidMessageException e) {
+			logger.error("数据校验失败,不入库: {}", e.getMessage());
+			sendErrorResponse(ctx, "数据校验失败");
+
+		} catch (Exception e) {
+			logger.error("数据解析或入库异常", e);
+			sendErrorResponse(ctx, "数据处理异常");
+		}
+	}
+
+
+	// 工具方法:发送错误响应
+	private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
+		ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
+	}
+	private String printHexBinary(byte[] bytes) {
+		StringBuilder sb = new StringBuilder();
+		for (byte b : bytes) {
+			sb.append(String.format("%02X ", b));
+		}
+		System.out.println("Received raw data: " + sb.toString());
+		logger.info("Received raw data: " + sb.toString());
+		return sb.toString();
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		if (cause instanceof ReadTimeoutException) {
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
+		} else {
+			cause.printStackTrace();
+			logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
+			ctx.close();
+		}
+	}
+
+	@Override
+	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+		ctx.flush();
+	}
+
+	@Override
+	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+		logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
+		ctx.fireChannelUnregistered();
+	}
+
+	// 主动向设备发送数据
+	public void sendDataToDevice(ChannelHandlerContext ctx) {
+		try {
+			// 将字符串形式的十六进制数据转换为字节数组
+			String hexData = "01 03 00 00 00 01 84 0A ";
+			String[] hexArray = hexData.split(" ");
+			byte[] dataBytes = new byte[hexArray.length];
+			for (int i = 0; i < hexArray.length; i++) {
+				dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
+			}
+			ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
+			ctx.writeAndFlush(byteBuf);
+			logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
+		} catch (Exception e) {
+			logger.error("发送数据失败: {}", e.getMessage());
+		}
+	}
+}