MessageHandler.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package com.zksy.electricity.utils;
  2. import com.zksy.common.exception.InvalidMessageException;
  3. import com.zksy.electricity.config.NettyServer;
  4. import com.zksy.electricity.domain.MessageParseResult;
  5. import com.zksy.electricity.service.MessageParseResultService;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.ChannelHandlerContext;
  9. import io.netty.channel.ChannelInboundHandlerAdapter;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import java.math.BigInteger;
  15. @Slf4j
  16. public class MessageHandler extends ChannelInboundHandlerAdapter {
  17. private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
  18. @Autowired
  19. private MessageParseResultService messageParseResultService;
  20. public MessageHandler() {
  21. this.messageParseResultService = SpringContextUtil.getBean(MessageParseResultService.class);
  22. }
  23. /**
  24. * 本方法用于读取客户端发送的信息
  25. *
  26. * @param ctx
  27. * @param msg
  28. * @throws Exception
  29. */
  30. @Override
  31. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  32. try {
  33. ByteBuf msgByteBuf = (ByteBuf) msg;
  34. if (msgByteBuf == null || !msgByteBuf.isReadable()) {
  35. logger.warn("接收到无效的消息");
  36. return;
  37. }
  38. // 安全的日志记录
  39. logger.info("接收到 {} 字节的数据", msgByteBuf.readableBytes());
  40. byte[] msgBytes = new byte[msgByteBuf.readableBytes()];
  41. msgByteBuf.readBytes(msgBytes);
  42. String msgString = printHexBinary(msgBytes);
  43. if (!validateMessage(msgString)) {
  44. throw new InvalidMessageException("数据校验不成功");
  45. }else{
  46. MessageParseResult result = DataParser.parseMessage(msgString);
  47. messageParseResultService.saveMessageParseResult(result);
  48. }
  49. } catch (InvalidMessageException e) {
  50. logger.error("数据校验失败: {}", e.getMessage());
  51. ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes()));
  52. }
  53. }
  54. // 校验消息的方法
  55. private static boolean validateMessage(String msgString) {
  56. // 去除首尾可能存在的空格
  57. msgString = msgString.trim();
  58. // 检查是否以 FE FE FE FE 开头,以 16 结尾
  59. if (!msgString.startsWith("FE FE FE FE") || !msgString.endsWith("16")) {
  60. return false;
  61. }
  62. // 找到第一个 68 的位置
  63. int startIndex = msgString.indexOf("68");
  64. if (startIndex == -1) {
  65. return false;
  66. }
  67. // 找到结束码 16 的位置
  68. int endIndex = msgString.lastIndexOf("16");
  69. // 提取从第一个 68 到校验和之前的数据
  70. String dataToSum = msgString.substring(startIndex, msgString.lastIndexOf(" ", endIndex - 3));
  71. String[] hexArray = dataToSum.split(" ");
  72. // 计算校验和
  73. BigInteger sum = BigInteger.ZERO;
  74. for (String hex : hexArray) {
  75. sum = sum.add(new BigInteger(hex, 16));
  76. }
  77. // 取低 8 位,也就是十六进制的后两位
  78. sum = sum.and(new BigInteger("FF", 16));
  79. // 先找到校验和前面的空格位置
  80. int checksumStart = msgString.lastIndexOf(" ", endIndex - 3) + 1;
  81. // 校验和结束位置应该是 endIndex,因为 substring 方法的结束索引不包含在截取范围内
  82. int checksumEnd = endIndex;
  83. String checksumHex = msgString.substring(checksumStart, checksumEnd).trim();
  84. BigInteger expectedChecksum = new BigInteger(checksumHex, 16);
  85. // 比较计算的校验和与消息中的校验和
  86. return sum.equals(expectedChecksum);
  87. }
  88. private String printHexBinary(byte[] bytes) {
  89. StringBuilder sb = new StringBuilder();
  90. for (byte b : bytes) {
  91. sb.append(String.format("%02X ", b));
  92. }
  93. System.out.println("Received raw data: " + sb.toString());
  94. logger.info("Received raw data: " + sb.toString());
  95. return sb.toString();
  96. }
  97. /**
  98. * 本方法用作处理异常
  99. *
  100. * @param ctx
  101. * @param cause
  102. * @throws Exception
  103. */
  104. @Override
  105. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  106. if (cause.getClass() == io.netty.handler.timeout.ReadTimeoutException.class) {
  107. logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接超时断开");
  108. } else {
  109. cause.printStackTrace();
  110. logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接异常断开");
  111. ctx.close();
  112. }
  113. NettyServer.sc= null;
  114. }
  115. /**
  116. * 信息获取完毕后操作
  117. *
  118. * @param ctx
  119. * @throws Exception
  120. */
  121. @Override
  122. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  123. ctx.flush();
  124. }
  125. /**
  126. * 断开连接时操作
  127. *
  128. * @param ctx
  129. * @throws Exception
  130. */
  131. @Override
  132. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  133. if (NettyServer.sc!= null) {
  134. logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接主动断开");
  135. NettyServer.sc= null;
  136. }
  137. ctx.fireChannelUnregistered();
  138. }
  139. /**
  140. * 根据信息具体操作的业务方法
  141. *
  142. * @param msgBytes
  143. * @param ctx
  144. */
  145. private void handler(byte[] msgBytes, ChannelHandlerContext ctx) {
  146. // 解析接收到的数据
  147. // 回复客户端
  148. /*String responseMessage = "Message received and processed.";
  149. ByteBuf echo = ctx.alloc().buffer();
  150. echo.writeBytes(responseMessage.getBytes(io.netty.util.CharsetUtil.UTF_8));
  151. ctx.writeAndFlush(echo);*/
  152. }
  153. }