MessageHandler.java 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package com.zksy.radar.utils;
  2. import com.zksy.api.domain.AlarmData;
  3. import com.zksy.api.domain.WarningThreshold;
  4. import com.zksy.api.service.AlarmDataService;
  5. import com.zksy.api.service.WarningThresholdService;
  6. import com.zksy.common.exception.InvalidMessageException;
  7. import com.zksy.radar.domain.RadarData;
  8. import com.zksy.radar.service.RadarDataService;
  9. import com.zksy.radar.utils.DataParser;
  10. import io.netty.buffer.ByteBuf;
  11. import io.netty.buffer.Unpooled;
  12. import io.netty.channel.ChannelHandler;
  13. import io.netty.channel.ChannelHandlerContext;
  14. import io.netty.channel.ChannelInboundHandlerAdapter;
  15. import io.netty.handler.timeout.ReadTimeoutException;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.stereotype.Component;
  21. import java.math.BigDecimal;
  22. import java.time.LocalDateTime;
  23. @ChannelHandler.Sharable
  24. @Slf4j
  25. @Component
  26. public class MessageHandler extends ChannelInboundHandlerAdapter {
  27. private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
  28. private final RadarDataService service;
  29. private final WarningThresholdService warningThresholdService;
  30. private final AlarmDataService alarmDataService;
  31. @Autowired
  32. public MessageHandler(RadarDataService RadarDataService, WarningThresholdService warningThresholdService, AlarmDataService alarmDataService) {
  33. this.service = RadarDataService;
  34. this.warningThresholdService = warningThresholdService;
  35. this.alarmDataService = alarmDataService;
  36. }
  37. @Override
  38. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  39. super.channelActive(ctx);
  40. //sendDataToDevice(ctx);
  41. }
  42. @Override
  43. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  44. ByteBuf in = (ByteBuf) msg;
  45. byte[] msgBytes = new byte[in.readableBytes()];
  46. in.readBytes(msgBytes);
  47. in.release();
  48. try {
  49. logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
  50. // 提取帧类型
  51. byte frameType = msgBytes[6];
  52. logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
  53. // 1. 完整协议校验
  54. DataParser.validateMessage(msgBytes);
  55. logger.info("数据帧校验通过");
  56. // 2. 数据解析
  57. RadarData resultData = DataParser.parseMessage(msgBytes);
  58. if (resultData.getSystemIdentifier() == null) {
  59. resultData.setSystemIdentifier("123456");
  60. }
  61. // 3. 根据帧类型处理
  62. if (frameType == 0x31) {
  63. if (msgBytes.length > 34) {
  64. byte flagByte = msgBytes[34];
  65. String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
  66. boolean isEnd = (flagByte & 0x08) != 0;
  67. logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
  68. String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
  69. } else {
  70. logger.warn("数据长度不足,无法检测结束位");
  71. }
  72. // 入库
  73. service.save(resultData);
  74. logger.info("上报历史记录数据入库成功: {}", resultData);
  75. // 瞬时流量和流速告警入库
  76. handleRadarAlarm(resultData);
  77. // 如果结束位=1,可以根据需要在此处理关闭逻辑
  78. if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
  79. logger.info("检测到结束标志帧,执行结束处理逻辑...");
  80. byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
  81. ctx.writeAndFlush(Unpooled.copiedBuffer(response));
  82. Thread.sleep(1000);// 休眠1秒
  83. logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response));
  84. byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
  85. ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
  86. logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
  87. }else{
  88. // 回复自定义应答帧
  89. byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
  90. ctx.writeAndFlush(Unpooled.copiedBuffer(response));
  91. logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
  92. }
  93. } else if (frameType == 0x34) {
  94. /*logger.info("收到结束通讯帧,准备发送回应包");
  95. byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
  96. ctx.writeAndFlush(Unpooled.copiedBuffer(response));
  97. logger.debug("已发送结束通讯回应包");*/
  98. } else {
  99. logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
  100. }
  101. } catch (InvalidMessageException e) {
  102. logger.error("数据校验失败,不入库: {}", e.getMessage());
  103. sendErrorResponse(ctx, "数据校验失败");
  104. } catch (Exception e) {
  105. logger.error("数据解析或入库异常", e);
  106. sendErrorResponse(ctx, "数据处理异常");
  107. }
  108. }
  109. // 工具方法:发送错误响应
  110. private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
  111. ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
  112. }
  113. private String printHexBinary(byte[] bytes) {
  114. StringBuilder sb = new StringBuilder();
  115. for (byte b : bytes) {
  116. sb.append(String.format("%02X ", b));
  117. }
  118. System.out.println("Received raw data: " + sb.toString());
  119. logger.info("Received raw data: " + sb.toString());
  120. return sb.toString();
  121. }
  122. @Override
  123. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  124. if (cause instanceof ReadTimeoutException) {
  125. logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
  126. } else {
  127. cause.printStackTrace();
  128. logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
  129. ctx.close();
  130. }
  131. }
  132. @Override
  133. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  134. ctx.flush();
  135. }
  136. @Override
  137. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  138. logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
  139. ctx.fireChannelUnregistered();
  140. }
  141. // 主动向设备发送数据
  142. public void sendDataToDevice(ChannelHandlerContext ctx) {
  143. try {
  144. // 将字符串形式的十六进制数据转换为字节数组
  145. String hexData = "01 03 00 00 00 01 84 0A ";
  146. String[] hexArray = hexData.split(" ");
  147. byte[] dataBytes = new byte[hexArray.length];
  148. for (int i = 0; i < hexArray.length; i++) {
  149. dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
  150. }
  151. ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
  152. ctx.writeAndFlush(byteBuf);
  153. logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
  154. } catch (Exception e) {
  155. logger.error("发送数据失败: {}", e.getMessage());
  156. }
  157. }
  158. /**
  159. * 处理雷达设备告警
  160. */
  161. private void handleRadarAlarm(RadarData resultData) {
  162. try {
  163. // 设备编码从设备数据获取
  164. String deviceCode = resultData.getDeviceCode();
  165. if (deviceCode == null) {
  166. deviceCode = resultData.getSystemIdentifier();
  167. }
  168. // 1. 处理瞬时流量告警
  169. if (resultData.getMeter1InstantFlow() != null) {
  170. handleAlarm(deviceCode, "WARN_INSTANT_FLOW", "瞬时流量",
  171. resultData.getMeter1InstantFlow());
  172. }
  173. // 2. 处理流速告警
  174. if (resultData.getFlowSpeed() != null) {
  175. handleAlarm(deviceCode, "WARN_FLOW_SPEED", "流速",
  176. resultData.getFlowSpeed());
  177. }
  178. } catch (Exception e) {
  179. logger.error("雷达告警处理异常", e);
  180. }
  181. }
  182. /**
  183. * 处理单个告警
  184. */
  185. private void handleAlarm(String deviceCode, String warningCode, String warningType, String valueStr) {
  186. try {
  187. // 解析实际值
  188. Double actualValue = null;
  189. try {
  190. actualValue = Double.parseDouble(valueStr);
  191. } catch (NumberFormatException e) {
  192. logger.warn("无法解析告警值: {}", valueStr);
  193. return;
  194. }
  195. // 查询预警阈值表
  196. WarningThreshold threshold = null;
  197. try {
  198. threshold = warningThresholdService.getWarningThresholdByDeviceAndCode(deviceCode, warningCode);
  199. } catch (Exception e) {
  200. logger.error("查询预警阈值失败: deviceCode={}, warningCode={}", deviceCode, warningCode, e);
  201. }
  202. // 使用阈值表中的预警类型(如果有的话)
  203. if (threshold != null && threshold.getWarningType() != null) {
  204. warningType = threshold.getWarningType();
  205. }
  206. // 获取最小值和最大值
  207. Double minValue = threshold != null ? threshold.getMinValue() : null;
  208. Double maxValue = threshold != null ? threshold.getMaxValue() : null;
  209. String remark = threshold != null ? threshold.getRemark() : null;
  210. // 判断是否触发报警
  211. boolean isOverThreshold = false;
  212. if (minValue != null && actualValue <= minValue) {
  213. isOverThreshold = true;
  214. }
  215. if (maxValue != null && actualValue >= maxValue) {
  216. isOverThreshold = true;
  217. }
  218. if (isOverThreshold) {
  219. logger.warn("设备{}触发{}告警(当前值:{},最小值:{},最大值:{})",
  220. deviceCode, warningType, actualValue, minValue, maxValue);
  221. // 保存到 alarm_data 表
  222. saveAlarmData(deviceCode, warningType, warningCode,
  223. minValue != null ? BigDecimal.valueOf(minValue) : null,
  224. maxValue != null ? BigDecimal.valueOf(maxValue) : null,
  225. BigDecimal.valueOf(actualValue), remark);
  226. }
  227. } catch (Exception e) {
  228. logger.error("处理告警异常", e);
  229. }
  230. }
  231. /**
  232. * 保存告警数据到 alarm_data 表
  233. */
  234. private void saveAlarmData(String deviceCode, String warningType, String warningCode,
  235. BigDecimal minValue, BigDecimal maxValue, BigDecimal actualValue, String remark) {
  236. try {
  237. AlarmData alarmData = new AlarmData();
  238. alarmData.setDeviceCode(deviceCode);
  239. alarmData.setWarningType(warningType);
  240. alarmData.setWarningCode(warningCode);
  241. alarmData.setMinValue(minValue);
  242. alarmData.setMaxValue(maxValue);
  243. alarmData.setActualValue(actualValue);
  244. alarmData.setAlarmStatus(0);
  245. alarmData.setAlarmTime(LocalDateTime.now());
  246. alarmData.setRemark(remark);
  247. alarmData.setCreateTime(LocalDateTime.now());
  248. alarmDataService.saveAlarmData(alarmData);
  249. logger.info("告警数据入库成功: deviceCode={}, warningCode={}", deviceCode, warningCode);
  250. } catch (Exception e) {
  251. logger.error("保存告警数据失败", e);
  252. }
  253. }
  254. }