| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- package com.zksy.radar.utils;
- import com.zksy.api.domain.AlarmData;
- import com.zksy.api.domain.WarningThreshold;
- import com.zksy.api.service.AlarmDataService;
- import com.zksy.api.service.WarningThresholdService;
- import com.zksy.common.exception.InvalidMessageException;
- import com.zksy.radar.domain.RadarData;
- import com.zksy.radar.service.RadarDataService;
- import com.zksy.radar.utils.DataParser;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.ReadTimeoutException;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.math.BigDecimal;
- import java.time.LocalDateTime;
- @ChannelHandler.Sharable
- @Slf4j
- @Component
- public class MessageHandler extends ChannelInboundHandlerAdapter {
- private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
- private final RadarDataService service;
- private final WarningThresholdService warningThresholdService;
- private final AlarmDataService alarmDataService;
- @Autowired
- public MessageHandler(RadarDataService RadarDataService, WarningThresholdService warningThresholdService, AlarmDataService alarmDataService) {
- this.service = RadarDataService;
- this.warningThresholdService = warningThresholdService;
- this.alarmDataService = alarmDataService;
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- //sendDataToDevice(ctx);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ByteBuf in = (ByteBuf) msg;
- byte[] msgBytes = new byte[in.readableBytes()];
- in.readBytes(msgBytes);
- in.release();
- try {
- logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
- // 提取帧类型
- byte frameType = msgBytes[6];
- logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
- // 1. 完整协议校验
- DataParser.validateMessage(msgBytes);
- logger.info("数据帧校验通过");
- // 2. 数据解析
- RadarData resultData = DataParser.parseMessage(msgBytes);
- if (resultData.getSystemIdentifier() == null) {
- resultData.setSystemIdentifier("123456");
- }
- // 3. 根据帧类型处理
- if (frameType == 0x31) {
- if (msgBytes.length > 34) {
- byte flagByte = msgBytes[34];
- String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
- boolean isEnd = (flagByte & 0x08) != 0;
- logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
- String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
- } else {
- logger.warn("数据长度不足,无法检测结束位");
- }
- // 入库
- service.save(resultData);
- logger.info("上报历史记录数据入库成功: {}", resultData);
-
- // 瞬时流量和流速告警入库
- handleRadarAlarm(resultData);
- // 如果结束位=1,可以根据需要在此处理关闭逻辑
- if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
- logger.info("检测到结束标志帧,执行结束处理逻辑...");
- byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(response));
- Thread.sleep(1000);// 休眠1秒
- logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response));
- byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
- logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
- }else{
- // 回复自定义应答帧
- byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(response));
- logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
- }
- } else if (frameType == 0x34) {
- /*logger.info("收到结束通讯帧,准备发送回应包");
- byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(response));
- logger.debug("已发送结束通讯回应包");*/
- } else {
- logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
- }
- } catch (InvalidMessageException e) {
- logger.error("数据校验失败,不入库: {}", e.getMessage());
- sendErrorResponse(ctx, "数据校验失败");
- } catch (Exception e) {
- logger.error("数据解析或入库异常", e);
- sendErrorResponse(ctx, "数据处理异常");
- }
- }
- // 工具方法:发送错误响应
- private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
- ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
- }
- private String printHexBinary(byte[] bytes) {
- StringBuilder sb = new StringBuilder();
- for (byte b : bytes) {
- sb.append(String.format("%02X ", b));
- }
- System.out.println("Received raw data: " + sb.toString());
- logger.info("Received raw data: " + sb.toString());
- return sb.toString();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (cause instanceof ReadTimeoutException) {
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
- } else {
- cause.printStackTrace();
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
- ctx.close();
- }
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
- ctx.fireChannelUnregistered();
- }
- // 主动向设备发送数据
- public void sendDataToDevice(ChannelHandlerContext ctx) {
- try {
- // 将字符串形式的十六进制数据转换为字节数组
- String hexData = "01 03 00 00 00 01 84 0A ";
- String[] hexArray = hexData.split(" ");
- byte[] dataBytes = new byte[hexArray.length];
- for (int i = 0; i < hexArray.length; i++) {
- dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
- }
- ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
- ctx.writeAndFlush(byteBuf);
- logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
- } catch (Exception e) {
- logger.error("发送数据失败: {}", e.getMessage());
- }
- }
- /**
- * 处理雷达设备告警
- */
- private void handleRadarAlarm(RadarData resultData) {
- try {
- // 设备编码从设备数据获取
- String deviceCode = resultData.getDeviceCode();
- if (deviceCode == null) {
- deviceCode = resultData.getSystemIdentifier();
- }
- // 1. 处理瞬时流量告警
- if (resultData.getMeter1InstantFlow() != null) {
- handleAlarm(deviceCode, "WARN_INSTANT_FLOW", "瞬时流量",
- resultData.getMeter1InstantFlow());
- }
- // 2. 处理流速告警
- if (resultData.getFlowSpeed() != null) {
- handleAlarm(deviceCode, "WARN_FLOW_SPEED", "流速",
- resultData.getFlowSpeed());
- }
- } catch (Exception e) {
- logger.error("雷达告警处理异常", e);
- }
- }
- /**
- * 处理单个告警
- */
- private void handleAlarm(String deviceCode, String warningCode, String warningType, String valueStr) {
- try {
- // 解析实际值
- Double actualValue = null;
- try {
- actualValue = Double.parseDouble(valueStr);
- } catch (NumberFormatException e) {
- logger.warn("无法解析告警值: {}", valueStr);
- return;
- }
- // 查询预警阈值表
- WarningThreshold threshold = null;
- try {
- threshold = warningThresholdService.getWarningThresholdByDeviceAndCode(deviceCode, warningCode);
- } catch (Exception e) {
- logger.error("查询预警阈值失败: deviceCode={}, warningCode={}", deviceCode, warningCode, e);
- }
- // 使用阈值表中的预警类型(如果有的话)
- if (threshold != null && threshold.getWarningType() != null) {
- warningType = threshold.getWarningType();
- }
- // 获取最小值和最大值
- Double minValue = threshold != null ? threshold.getMinValue() : null;
- Double maxValue = threshold != null ? threshold.getMaxValue() : null;
- String remark = threshold != null ? threshold.getRemark() : null;
- // 判断是否触发报警
- boolean isOverThreshold = false;
- if (minValue != null && actualValue <= minValue) {
- isOverThreshold = true;
- }
- if (maxValue != null && actualValue >= maxValue) {
- isOverThreshold = true;
- }
- if (isOverThreshold) {
- logger.warn("设备{}触发{}告警(当前值:{},最小值:{},最大值:{})",
- deviceCode, warningType, actualValue, minValue, maxValue);
- // 保存到 alarm_data 表
- saveAlarmData(deviceCode, warningType, warningCode,
- minValue != null ? BigDecimal.valueOf(minValue) : null,
- maxValue != null ? BigDecimal.valueOf(maxValue) : null,
- BigDecimal.valueOf(actualValue), remark);
- }
- } catch (Exception e) {
- logger.error("处理告警异常", e);
- }
- }
- /**
- * 保存告警数据到 alarm_data 表
- */
- private void saveAlarmData(String deviceCode, String warningType, String warningCode,
- BigDecimal minValue, BigDecimal maxValue, BigDecimal actualValue, String remark) {
- try {
- AlarmData alarmData = new AlarmData();
- alarmData.setDeviceCode(deviceCode);
- alarmData.setWarningType(warningType);
- alarmData.setWarningCode(warningCode);
- alarmData.setMinValue(minValue);
- alarmData.setMaxValue(maxValue);
- alarmData.setActualValue(actualValue);
- alarmData.setAlarmStatus(0);
- alarmData.setAlarmTime(LocalDateTime.now());
- alarmData.setRemark(remark);
- alarmData.setCreateTime(LocalDateTime.now());
- alarmDataService.saveAlarmData(alarmData);
- logger.info("告警数据入库成功: deviceCode={}, warningCode={}", deviceCode, warningCode);
- } catch (Exception e) {
- logger.error("保存告警数据失败", e);
- }
- }
- }
|