MessageHandler.java 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package com.zksy.gas.utils;
  2. import cn.hutool.core.lang.UUID;
  3. import com.zksy.common.exception.InvalidMessageException;
  4. import com.zksy.gas.domain.GasMonitorData;
  5. import com.zksy.gas.service.GasMonitorDataService;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.ChannelHandler;
  9. import io.netty.channel.ChannelHandlerContext;
  10. import io.netty.channel.ChannelInboundHandlerAdapter;
  11. import io.netty.handler.timeout.ReadTimeoutException;
  12. import io.netty.util.ReferenceCountUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.stereotype.Component;
  18. import java.util.Arrays;
  19. @ChannelHandler.Sharable
  20. @Slf4j
  21. @Component
  22. public class MessageHandler extends ChannelInboundHandlerAdapter {
  23. private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
  24. private final GasMonitorDataService service;
  25. @Autowired
  26. public MessageHandler(GasMonitorDataService firefightingPressureService) {
  27. this.service = firefightingPressureService;
  28. }
  29. @Override
  30. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  31. super.channelActive(ctx);
  32. //sendDataToDevice(ctx);
  33. }
  34. @Override
  35. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  36. try {
  37. ByteBuf msgByteBuf = (ByteBuf) msg;
  38. if (msgByteBuf == null || !msgByteBuf.isReadable()) {
  39. logger.warn("接收到无效的消息");
  40. return;
  41. }
  42. int readable = msgByteBuf.readableBytes();
  43. byte[] msgBytes = new byte[readable];
  44. msgByteBuf.readBytes(msgBytes);
  45. logger.info("接收到 {} 字节的数据: {}, 来自: {}",
  46. readable, ProtocolUtils.bytesToHex(msgBytes), ctx.channel().remoteAddress());
  47. int checksumTotalLength = 2; // 末尾2字节校验码(如ED34)
  48. int bodyStartOffset = 16; // 校验和07的主体起始索引
  49. int fullDataStartOffset = 3; // 校验和16的完整数据起始索引
  50. // 校验报文长度合法性
  51. if (readable < fullDataStartOffset + checksumTotalLength) {
  52. throw new InvalidMessageException("报文长度不足,无法完成校验");
  53. }
  54. int bodyEndOffset = msgBytes.length - checksumTotalLength; // 避开末尾2字节校验码
  55. int bodyLength = bodyEndOffset - bodyStartOffset;
  56. if (bodyLength <= 0) {
  57. throw new InvalidMessageException("主体数据长度无效,无法计算校验和07");
  58. }
  59. int checksum07 = DataCheckUtil.calcSumOnly(msgBytes, bodyStartOffset, bodyLength);
  60. int fullDataEndOffset = msgBytes.length - 1; // 包含倒数第二个字节
  61. int fullDataLength = fullDataEndOffset - fullDataStartOffset;
  62. if (fullDataLength <= 0) {
  63. throw new InvalidMessageException("完整数据长度无效,无法计算校验和16");
  64. }
  65. int checksum16 = DataCheckUtil.calcChecksum(msgBytes, fullDataStartOffset, fullDataLength);
  66. int calculatedChecksum = (checksum07 << 8) | checksum16;
  67. int receivedChecksum = ((msgBytes[msgBytes.length - 2] & 0xFF) << 8)
  68. | (msgBytes[msgBytes.length - 1] & 0xFF);
  69. // 打印调试信息,确认拼接结果
  70. logger.debug("校验和07(高8位,仅求和): {}", Integer.toHexString(checksum07).toUpperCase());
  71. logger.debug("校验和16(低8位,FF-求和): {}", Integer.toHexString(checksum16).toUpperCase());
  72. logger.debug("修改后拼接校验码: {}", Integer.toHexString(calculatedChecksum).toUpperCase());
  73. logger.debug("报文携带校验码: {}", Integer.toHexString(receivedChecksum).toUpperCase());
  74. if (calculatedChecksum != receivedChecksum) {
  75. throw new InvalidMessageException(
  76. String.format("数据校验失败:本地计算(07+16)=%04X(07=%02X, 16=%02X),报文携带=%04X",
  77. calculatedChecksum, checksum07, checksum16, receivedChecksum));
  78. }
  79. com.zksy.gas.domain.GasMonitorData resultData = DataParser.parseMessage(msgBytes);
  80. resultData.setId(java.util.UUID.randomUUID().toString());
  81. service.save(resultData);
  82. logger.info("数据解析入库成功: {}", resultData);
  83. } catch (InvalidMessageException e) {
  84. logger.error("数据校验失败: {}", e.getMessage());
  85. ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes()));
  86. } catch (Exception e) {
  87. logger.error("数据解析/入库异常", e);
  88. ctx.writeAndFlush(Unpooled.copiedBuffer("数据处理异常".getBytes()));
  89. } finally {
  90. ReferenceCountUtil.release(msg);
  91. }
  92. }
  93. private String printHexBinary(byte[] bytes) {
  94. StringBuilder sb = new StringBuilder();
  95. for (byte b : bytes) {
  96. sb.append(String.format("%02X ", b));
  97. }
  98. System.out.println("Received raw data: " + sb.toString());
  99. logger.info("Received raw data: " + sb.toString());
  100. return sb.toString();
  101. }
  102. @Override
  103. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  104. if (cause instanceof ReadTimeoutException) {
  105. logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
  106. } else {
  107. cause.printStackTrace();
  108. logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
  109. ctx.close();
  110. }
  111. }
  112. @Override
  113. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  114. ctx.flush();
  115. }
  116. @Override
  117. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  118. logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
  119. ctx.fireChannelUnregistered();
  120. }
  121. // 主动向设备发送数据
  122. public void sendDataToDevice(ChannelHandlerContext ctx) {
  123. try {
  124. // 将字符串形式的十六进制数据转换为字节数组
  125. String hexData = "01 03 00 00 00 01 84 0A ";
  126. String[] hexArray = hexData.split(" ");
  127. byte[] dataBytes = new byte[hexArray.length];
  128. for (int i = 0; i < hexArray.length; i++) {
  129. dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
  130. }
  131. ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
  132. ctx.writeAndFlush(byteBuf);
  133. logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
  134. } catch (Exception e) {
  135. logger.error("发送数据失败: {}", e.getMessage());
  136. }
  137. }
  138. }