package com.zksy.water.utils; import com.zksy.common.exception.InvalidMessageException; import com.zksy.water.domain.WaterMonitorData; import com.zksy.water.service.WaterMonitorDataService; import com.zksy.water.utils.DataParser; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @ChannelHandler.Sharable @Slf4j @Component public class MessageHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(MessageHandler.class); private final WaterMonitorDataService service; @Autowired public MessageHandler(WaterMonitorDataService waterMonitorDataService) { this.service = waterMonitorDataService; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); //sendDataToDevice(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; byte[] msgBytes = new byte[in.readableBytes()]; in.readBytes(msgBytes); in.release(); try { logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes)); // 提取帧类型 byte frameType = msgBytes[6]; logger.debug("帧类型: 0x{}", String.format("%02X", frameType)); // 1. 完整协议校验 DataParser.validateMessage(msgBytes); logger.info("数据帧校验通过"); // 2. 数据解析 WaterMonitorData resultData = DataParser.parseMessage(msgBytes); if (resultData.getSystemIdentifier() == null) { resultData.setSystemIdentifier("123456"); } // 3. 根据帧类型处理 if (frameType == 0x31) { if (msgBytes.length > 34) { byte flagByte = msgBytes[34]; String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0'); boolean isEnd = (flagByte & 0x08) != 0; logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}", String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)"); } else { logger.warn("数据长度不足,无法检测结束位"); } // 入库 service.save(resultData); logger.info("上报历史记录数据入库成功: {}", resultData); // 如果结束位=1,可以根据需要在此处理关闭逻辑 if (Boolean.TRUE.equals(resultData.getIsLastPacket())) { logger.info("检测到结束标志帧,执行结束处理逻辑..."); byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(response)); Thread.sleep(1000);// 休眠1秒 logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response)); byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse)); logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse)); }else{ // 回复自定义应答帧 byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(response)); logger.debug("已发送上报自定义回应包: {}", printHexBinary(response)); } } else if (frameType == 0x34) { /*logger.info("收到结束通讯帧,准备发送回应包"); byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(response)); logger.debug("已发送结束通讯回应包");*/ } else { logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType)); } } catch (InvalidMessageException e) { logger.error("数据校验失败,不入库: {}", e.getMessage()); sendErrorResponse(ctx, "数据校验失败"); } catch (Exception e) { logger.error("数据解析或入库异常", e); sendErrorResponse(ctx, "数据处理异常"); } } // 工具方法:发送错误响应 private void sendErrorResponse(ChannelHandlerContext ctx, String msg) { ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes())); } private String printHexBinary(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { sb.append(String.format("%02X ", b)); } System.out.println("Received raw data: " + sb.toString()); logger.info("Received raw data: " + sb.toString()); return sb.toString(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开"); } else { cause.printStackTrace(); logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开"); ctx.close(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开"); ctx.fireChannelUnregistered(); } // 主动向设备发送数据 public void sendDataToDevice(ChannelHandlerContext ctx) { try { // 将字符串形式的十六进制数据转换为字节数组 String hexData = "01 03 00 00 00 01 84 0A "; String[] hexArray = hexData.split(" "); byte[] dataBytes = new byte[hexArray.length]; for (int i = 0; i < hexArray.length; i++) { dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16); } ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes); ctx.writeAndFlush(byteBuf); logger.info("已向设备发送数据: {}", printHexBinary(dataBytes)); } catch (Exception e) { logger.error("发送数据失败: {}", e.getMessage()); } } }