MessageHandler.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package com.zksy.radar.utils;
  2. import com.zksy.common.exception.InvalidMessageException;
  3. import com.zksy.radar.domain.RadarData;
  4. import com.zksy.radar.service.RadarDataService;
  5. import com.zksy.radar.utils.DataParser;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.ChannelHandler;
  9. import io.netty.channel.ChannelHandlerContext;
  10. import io.netty.channel.ChannelInboundHandlerAdapter;
  11. import io.netty.handler.timeout.ReadTimeoutException;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.stereotype.Component;
  17. @ChannelHandler.Sharable
  18. @Slf4j
  19. @Component
  20. public class MessageHandler extends ChannelInboundHandlerAdapter {
  21. private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
  22. private final RadarDataService service;
  23. @Autowired
  24. public MessageHandler(RadarDataService RadarDataService) {
  25. this.service = RadarDataService;
  26. }
  27. @Override
  28. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  29. super.channelActive(ctx);
  30. //sendDataToDevice(ctx);
  31. }
  32. @Override
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  34. ByteBuf in = (ByteBuf) msg;
  35. byte[] msgBytes = new byte[in.readableBytes()];
  36. in.readBytes(msgBytes);
  37. in.release();
  38. try {
  39. logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
  40. // 提取帧类型
  41. byte frameType = msgBytes[6];
  42. logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
  43. // 1. 完整协议校验
  44. DataParser.validateMessage(msgBytes);
  45. logger.info("数据帧校验通过");
  46. // 2. 数据解析
  47. RadarData resultData = DataParser.parseMessage(msgBytes);
  48. if (resultData.getSystemIdentifier() == null) {
  49. resultData.setSystemIdentifier("123456");
  50. }
  51. // 3. 根据帧类型处理
  52. if (frameType == 0x31) {
  53. if (msgBytes.length > 34) {
  54. byte flagByte = msgBytes[34];
  55. String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
  56. boolean isEnd = (flagByte & 0x08) != 0;
  57. logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
  58. String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
  59. } else {
  60. logger.warn("数据长度不足,无法检测结束位");
  61. }
  62. // 入库
  63. service.save(resultData);
  64. logger.info("上报历史记录数据入库成功: {}", resultData);
  65. // 如果结束位=1,可以根据需要在此处理关闭逻辑
  66. if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
  67. logger.info("检测到结束标志帧,执行结束处理逻辑...");
  68. byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
  69. ctx.writeAndFlush(Unpooled.copiedBuffer(response));
  70. Thread.sleep(1000);// 休眠1秒
  71. logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response));
  72. byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
  73. ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
  74. logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
  75. }else{
  76. // 回复自定义应答帧
  77. byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
  78. ctx.writeAndFlush(Unpooled.copiedBuffer(response));
  79. logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
  80. }
  81. } else if (frameType == 0x34) {
  82. /*logger.info("收到结束通讯帧,准备发送回应包");
  83. byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
  84. ctx.writeAndFlush(Unpooled.copiedBuffer(response));
  85. logger.debug("已发送结束通讯回应包");*/
  86. } else {
  87. logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
  88. }
  89. } catch (InvalidMessageException e) {
  90. logger.error("数据校验失败,不入库: {}", e.getMessage());
  91. sendErrorResponse(ctx, "数据校验失败");
  92. } catch (Exception e) {
  93. logger.error("数据解析或入库异常", e);
  94. sendErrorResponse(ctx, "数据处理异常");
  95. }
  96. }
  97. // 工具方法:发送错误响应
  98. private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
  99. ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
  100. }
  101. private String printHexBinary(byte[] bytes) {
  102. StringBuilder sb = new StringBuilder();
  103. for (byte b : bytes) {
  104. sb.append(String.format("%02X ", b));
  105. }
  106. System.out.println("Received raw data: " + sb.toString());
  107. logger.info("Received raw data: " + sb.toString());
  108. return sb.toString();
  109. }
  110. @Override
  111. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  112. if (cause instanceof ReadTimeoutException) {
  113. logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
  114. } else {
  115. cause.printStackTrace();
  116. logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
  117. ctx.close();
  118. }
  119. }
  120. @Override
  121. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  122. ctx.flush();
  123. }
  124. @Override
  125. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  126. logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
  127. ctx.fireChannelUnregistered();
  128. }
  129. // 主动向设备发送数据
  130. public void sendDataToDevice(ChannelHandlerContext ctx) {
  131. try {
  132. // 将字符串形式的十六进制数据转换为字节数组
  133. String hexData = "01 03 00 00 00 01 84 0A ";
  134. String[] hexArray = hexData.split(" ");
  135. byte[] dataBytes = new byte[hexArray.length];
  136. for (int i = 0; i < hexArray.length; i++) {
  137. dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
  138. }
  139. ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
  140. ctx.writeAndFlush(byteBuf);
  141. logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
  142. } catch (Exception e) {
  143. logger.error("发送数据失败: {}", e.getMessage());
  144. }
  145. }
  146. }