package com.zksy.radar.utils; import com.zksy.api.domain.AlarmData; import com.zksy.api.domain.WarningThreshold; import com.zksy.api.service.AlarmDataService; import com.zksy.api.service.WarningThresholdService; import com.zksy.common.exception.InvalidMessageException; import com.zksy.radar.domain.RadarData; import com.zksy.radar.service.RadarDataService; import com.zksy.radar.utils.DataParser; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.time.LocalDateTime; @ChannelHandler.Sharable @Slf4j @Component public class MessageHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(MessageHandler.class); private final RadarDataService service; private final WarningThresholdService warningThresholdService; private final AlarmDataService alarmDataService; @Autowired public MessageHandler(RadarDataService RadarDataService, WarningThresholdService warningThresholdService, AlarmDataService alarmDataService) { this.service = RadarDataService; this.warningThresholdService = warningThresholdService; this.alarmDataService = alarmDataService; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); //sendDataToDevice(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; byte[] msgBytes = new byte[in.readableBytes()]; in.readBytes(msgBytes); in.release(); try { logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes)); // 提取帧类型 byte frameType = msgBytes[6]; logger.debug("帧类型: 0x{}", String.format("%02X", frameType)); // 1. 完整协议校验 DataParser.validateMessage(msgBytes); logger.info("数据帧校验通过"); // 2. 数据解析 RadarData resultData = DataParser.parseMessage(msgBytes); if (resultData.getSystemIdentifier() == null) { resultData.setSystemIdentifier("123456"); } // 3. 根据帧类型处理 if (frameType == 0x31) { if (msgBytes.length > 34) { byte flagByte = msgBytes[34]; String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0'); boolean isEnd = (flagByte & 0x08) != 0; logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}", String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)"); } else { logger.warn("数据长度不足,无法检测结束位"); } // 入库 service.save(resultData); logger.info("上报历史记录数据入库成功: {}", resultData); // 瞬时流量和流速告警入库 handleRadarAlarm(resultData); // 如果结束位=1,可以根据需要在此处理关闭逻辑 if (Boolean.TRUE.equals(resultData.getIsLastPacket())) { logger.info("检测到结束标志帧,执行结束处理逻辑..."); byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(response)); Thread.sleep(1000);// 休眠1秒 logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response)); byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse)); logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse)); }else{ // 回复自定义应答帧 byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(response)); logger.debug("已发送上报自定义回应包: {}", printHexBinary(response)); } } else if (frameType == 0x34) { /*logger.info("收到结束通讯帧,准备发送回应包"); byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes); ctx.writeAndFlush(Unpooled.copiedBuffer(response)); logger.debug("已发送结束通讯回应包");*/ } else { logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType)); } } catch (InvalidMessageException e) { logger.error("数据校验失败,不入库: {}", e.getMessage()); sendErrorResponse(ctx, "数据校验失败"); } catch (Exception e) { logger.error("数据解析或入库异常", e); sendErrorResponse(ctx, "数据处理异常"); } } // 工具方法:发送错误响应 private void sendErrorResponse(ChannelHandlerContext ctx, String msg) { ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes())); } private String printHexBinary(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { sb.append(String.format("%02X ", b)); } System.out.println("Received raw data: " + sb.toString()); logger.info("Received raw data: " + sb.toString()); return sb.toString(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开"); } else { cause.printStackTrace(); logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开"); ctx.close(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开"); ctx.fireChannelUnregistered(); } // 主动向设备发送数据 public void sendDataToDevice(ChannelHandlerContext ctx) { try { // 将字符串形式的十六进制数据转换为字节数组 String hexData = "01 03 00 00 00 01 84 0A "; String[] hexArray = hexData.split(" "); byte[] dataBytes = new byte[hexArray.length]; for (int i = 0; i < hexArray.length; i++) { dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16); } ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes); ctx.writeAndFlush(byteBuf); logger.info("已向设备发送数据: {}", printHexBinary(dataBytes)); } catch (Exception e) { logger.error("发送数据失败: {}", e.getMessage()); } } /** * 处理雷达设备告警 */ private void handleRadarAlarm(RadarData resultData) { try { // 设备编码从设备数据获取 String deviceCode = resultData.getDeviceCode(); if (deviceCode == null) { deviceCode = resultData.getSystemIdentifier(); } // 1. 处理瞬时流量告警 if (resultData.getMeter1InstantFlow() != null) { handleAlarm(deviceCode, "WARN_INSTANT_FLOW", "瞬时流量", resultData.getMeter1InstantFlow()); } // 2. 处理流速告警 if (resultData.getFlowSpeed() != null) { handleAlarm(deviceCode, "WARN_FLOW_SPEED", "流速", resultData.getFlowSpeed()); } } catch (Exception e) { logger.error("雷达告警处理异常", e); } } /** * 处理单个告警 */ private void handleAlarm(String deviceCode, String warningCode, String warningType, String valueStr) { try { // 解析实际值 Double actualValue = null; try { actualValue = Double.parseDouble(valueStr); } catch (NumberFormatException e) { logger.warn("无法解析告警值: {}", valueStr); return; } // 查询预警阈值表 WarningThreshold threshold = null; try { threshold = warningThresholdService.getWarningThresholdByDeviceAndCode(deviceCode, warningCode); } catch (Exception e) { logger.error("查询预警阈值失败: deviceCode={}, warningCode={}", deviceCode, warningCode, e); } // 使用阈值表中的预警类型(如果有的话) if (threshold != null && threshold.getWarningType() != null) { warningType = threshold.getWarningType(); } // 获取最小值和最大值 Double minValue = threshold != null ? threshold.getMinValue() : null; Double maxValue = threshold != null ? threshold.getMaxValue() : null; String remark = threshold != null ? threshold.getRemark() : null; // 判断是否触发报警 boolean isOverThreshold = false; if (minValue != null && actualValue <= minValue) { isOverThreshold = true; } if (maxValue != null && actualValue >= maxValue) { isOverThreshold = true; } if (isOverThreshold) { logger.warn("设备{}触发{}告警(当前值:{},最小值:{},最大值:{})", deviceCode, warningType, actualValue, minValue, maxValue); // 保存到 alarm_data 表 saveAlarmData(deviceCode, warningType, warningCode, minValue != null ? BigDecimal.valueOf(minValue) : null, maxValue != null ? BigDecimal.valueOf(maxValue) : null, BigDecimal.valueOf(actualValue), remark); } } catch (Exception e) { logger.error("处理告警异常", e); } } /** * 保存告警数据到 alarm_data 表 */ private void saveAlarmData(String deviceCode, String warningType, String warningCode, BigDecimal minValue, BigDecimal maxValue, BigDecimal actualValue, String remark) { try { AlarmData alarmData = new AlarmData(); alarmData.setDeviceCode(deviceCode); alarmData.setWarningType(warningType); alarmData.setWarningCode(warningCode); alarmData.setMinValue(minValue); alarmData.setMaxValue(maxValue); alarmData.setActualValue(actualValue); alarmData.setAlarmStatus(0); alarmData.setAlarmTime(LocalDateTime.now()); alarmData.setRemark(remark); alarmData.setCreateTime(LocalDateTime.now()); alarmDataService.saveAlarmData(alarmData); logger.info("告警数据入库成功: deviceCode={}, warningCode={}", deviceCode, warningCode); } catch (Exception e) { logger.error("保存告警数据失败", e); } } }