package com.zksy.gas.utils; import cn.hutool.core.lang.UUID; import com.zksy.common.exception.InvalidMessageException; import com.zksy.gas.domain.GasMonitorData; import com.zksy.gas.service.GasMonitorDataService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Arrays; @ChannelHandler.Sharable @Slf4j @Component public class MessageHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(MessageHandler.class); private final GasMonitorDataService service; @Autowired public MessageHandler(GasMonitorDataService firefightingPressureService) { this.service = firefightingPressureService; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); //sendDataToDevice(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf msgByteBuf = (ByteBuf) msg; if (msgByteBuf == null || !msgByteBuf.isReadable()) { logger.warn("接收到无效的消息"); return; } int readable = msgByteBuf.readableBytes(); byte[] msgBytes = new byte[readable]; msgByteBuf.readBytes(msgBytes); logger.info("接收到 {} 字节的数据: {}, 来自: {}", readable, ProtocolUtils.bytesToHex(msgBytes), ctx.channel().remoteAddress()); int checksumTotalLength = 2; // 末尾2字节校验码(如ED34) int bodyStartOffset = 16; // 校验和07的主体起始索引 int fullDataStartOffset = 3; // 校验和16的完整数据起始索引 // 校验报文长度合法性 if (readable < fullDataStartOffset + checksumTotalLength) { throw new InvalidMessageException("报文长度不足,无法完成校验"); } int bodyEndOffset = msgBytes.length - checksumTotalLength; // 避开末尾2字节校验码 int bodyLength = bodyEndOffset - bodyStartOffset; if (bodyLength <= 0) { throw new InvalidMessageException("主体数据长度无效,无法计算校验和07"); } int checksum07 = DataCheckUtil.calcSumOnly(msgBytes, bodyStartOffset, bodyLength); int fullDataEndOffset = msgBytes.length - 1; // 包含倒数第二个字节 int fullDataLength = fullDataEndOffset - fullDataStartOffset; if (fullDataLength <= 0) { throw new InvalidMessageException("完整数据长度无效,无法计算校验和16"); } int checksum16 = DataCheckUtil.calcChecksum(msgBytes, fullDataStartOffset, fullDataLength); int calculatedChecksum = (checksum07 << 8) | checksum16; int receivedChecksum = ((msgBytes[msgBytes.length - 2] & 0xFF) << 8) | (msgBytes[msgBytes.length - 1] & 0xFF); // 打印调试信息,确认拼接结果 logger.debug("校验和07(高8位,仅求和): {}", Integer.toHexString(checksum07).toUpperCase()); logger.debug("校验和16(低8位,FF-求和): {}", Integer.toHexString(checksum16).toUpperCase()); logger.debug("修改后拼接校验码: {}", Integer.toHexString(calculatedChecksum).toUpperCase()); logger.debug("报文携带校验码: {}", Integer.toHexString(receivedChecksum).toUpperCase()); if (calculatedChecksum != receivedChecksum) { throw new InvalidMessageException( String.format("数据校验失败:本地计算(07+16)=%04X(07=%02X, 16=%02X),报文携带=%04X", calculatedChecksum, checksum07, checksum16, receivedChecksum)); } com.zksy.gas.domain.GasMonitorData resultData = DataParser.parseMessage(msgBytes); resultData.setId(java.util.UUID.randomUUID().toString()); service.save(resultData); logger.info("数据解析入库成功: {}", resultData); } catch (InvalidMessageException e) { logger.error("数据校验失败: {}", e.getMessage()); ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes())); } catch (Exception e) { logger.error("数据解析/入库异常", e); ctx.writeAndFlush(Unpooled.copiedBuffer("数据处理异常".getBytes())); } finally { ReferenceCountUtil.release(msg); } } private String printHexBinary(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { sb.append(String.format("%02X ", b)); } System.out.println("Received raw data: " + sb.toString()); logger.info("Received raw data: " + sb.toString()); return sb.toString(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开"); } else { cause.printStackTrace(); logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开"); ctx.close(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开"); ctx.fireChannelUnregistered(); } // 主动向设备发送数据 public void sendDataToDevice(ChannelHandlerContext ctx) { try { // 将字符串形式的十六进制数据转换为字节数组 String hexData = "01 03 00 00 00 01 84 0A "; String[] hexArray = hexData.split(" "); byte[] dataBytes = new byte[hexArray.length]; for (int i = 0; i < hexArray.length; i++) { dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16); } ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes); ctx.writeAndFlush(byteBuf); logger.info("已向设备发送数据: {}", printHexBinary(dataBytes)); } catch (Exception e) { logger.error("发送数据失败: {}", e.getMessage()); } } }