MessageHandler.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package com.zksy.pressure.utils;
  2. import cn.hutool.core.lang.UUID;
  3. import com.zksy.common.exception.InvalidMessageException;
  4. import com.zksy.pressure.domain.FirefightingPressure;
  5. import com.zksy.pressure.service.FirefightingPressureService;
  6. import io.netty.buffer.ByteBuf;
  7. import io.netty.buffer.Unpooled;
  8. import io.netty.channel.ChannelHandler;
  9. import io.netty.channel.ChannelHandlerContext;
  10. import io.netty.channel.ChannelInboundHandlerAdapter;
  11. import io.netty.handler.timeout.ReadTimeoutException;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.stereotype.Component;
  17. @ChannelHandler.Sharable
  18. @Slf4j
  19. @Component
  20. public class MessageHandler extends ChannelInboundHandlerAdapter {
  21. private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
  22. private final FirefightingPressureService firefightingPressureService;
  23. @Autowired
  24. public MessageHandler(FirefightingPressureService firefightingPressureService) {
  25. this.firefightingPressureService = firefightingPressureService;
  26. }
  27. @Override
  28. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  29. super.channelActive(ctx);
  30. //sendDataToDevice(ctx);
  31. }
  32. @Override
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  34. try {
  35. ByteBuf msgByteBuf = (ByteBuf) msg;
  36. if (msgByteBuf == null || !msgByteBuf.isReadable()) {
  37. logger.warn("接收到无效的消息");
  38. return;
  39. }
  40. logger.info("接收到 {} 字节的数据,来自: {}", msgByteBuf.readableBytes(), ctx.channel().remoteAddress());
  41. byte[] msgBytes = new byte[msgByteBuf.readableBytes()];
  42. msgByteBuf.readBytes(msgBytes);
  43. String result = printHexBinary(msgBytes);
  44. String msgString = result.replaceAll("\\s", "");
  45. String CRCString = msgString.substring(msgString.length() - 4);
  46. String bodyResult = msgString.substring(0, msgString.length() - 4);
  47. String codeString = DataCheckUtil.crc16Tall(bodyResult);
  48. System.out.println("校验码:" + CRCString + "-----------生成的校验码:" + codeString);
  49. if (!CRCString.equals(codeString)) {
  50. throw new InvalidMessageException("数据校验不成功");
  51. } else {
  52. logger.info("成功!!!接收到 {} 字节的数据,来自: {}", msgString, ctx.channel().remoteAddress());
  53. FirefightingPressure resultData = DataParser.parseMessage(msgString);
  54. resultData.setId(UUID.randomUUID().toString());
  55. // 更新设备最后一次接收数据的时间
  56. //String addressCode = resultData.getAddressCode();
  57. //DeviceOfflineCheckTask.deviceLastReceiveTimeMap.put(addressCode, new Date());
  58. firefightingPressureService.save(resultData);
  59. }
  60. } catch (InvalidMessageException e) {
  61. logger.error("数据入库失败: {}", e.getMessage());
  62. ctx.writeAndFlush(Unpooled.copiedBuffer("数据入库失败".getBytes()));
  63. }
  64. }
  65. private String printHexBinary(byte[] bytes) {
  66. StringBuilder sb = new StringBuilder();
  67. for (byte b : bytes) {
  68. sb.append(String.format("%02X ", b));
  69. }
  70. System.out.println("Received raw data: " + sb.toString());
  71. logger.info("Received raw data: " + sb.toString());
  72. return sb.toString();
  73. }
  74. @Override
  75. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  76. if (cause instanceof ReadTimeoutException) {
  77. logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
  78. } else {
  79. cause.printStackTrace();
  80. logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
  81. ctx.close();
  82. }
  83. }
  84. @Override
  85. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  86. ctx.flush();
  87. }
  88. @Override
  89. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  90. logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
  91. ctx.fireChannelUnregistered();
  92. }
  93. // 主动向设备发送数据
  94. public void sendDataToDevice(ChannelHandlerContext ctx) {
  95. try {
  96. // 将字符串形式的十六进制数据转换为字节数组
  97. String hexData = "01 03 00 00 00 01 84 0A ";
  98. String[] hexArray = hexData.split(" ");
  99. byte[] dataBytes = new byte[hexArray.length];
  100. for (int i = 0; i < hexArray.length; i++) {
  101. dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
  102. }
  103. ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
  104. ctx.writeAndFlush(byteBuf);
  105. logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
  106. } catch (Exception e) {
  107. logger.error("发送数据失败: {}", e.getMessage());
  108. }
  109. }
  110. }