| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package com.zksy.radar.utils;
- import com.zksy.common.exception.InvalidMessageException;
- import com.zksy.radar.domain.RadarData;
- import com.zksy.radar.service.RadarDataService;
- import com.zksy.radar.utils.DataParser;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.ReadTimeoutException;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @ChannelHandler.Sharable
- @Slf4j
- @Component
- public class MessageHandler extends ChannelInboundHandlerAdapter {
- private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
- private final RadarDataService service;
- @Autowired
- public MessageHandler(RadarDataService RadarDataService) {
- this.service = RadarDataService;
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- //sendDataToDevice(ctx);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ByteBuf in = (ByteBuf) msg;
- byte[] msgBytes = new byte[in.readableBytes()];
- in.readBytes(msgBytes);
- in.release();
- try {
- logger.debug("接收到的原始数据帧: {}", printHexBinary(msgBytes));
- // 提取帧类型
- byte frameType = msgBytes[6];
- logger.debug("帧类型: 0x{}", String.format("%02X", frameType));
- // 1. 完整协议校验
- DataParser.validateMessage(msgBytes);
- logger.info("数据帧校验通过");
- // 2. 数据解析
- RadarData resultData = DataParser.parseMessage(msgBytes);
- if (resultData.getSystemIdentifier() == null) {
- resultData.setSystemIdentifier("123456");
- }
- // 3. 根据帧类型处理
- if (frameType == 0x31) {
- if (msgBytes.length > 34) {
- byte flagByte = msgBytes[34];
- String binary = String.format("%8s", Integer.toBinaryString(flagByte & 0xFF)).replace(' ', '0');
- boolean isEnd = (flagByte & 0x08) != 0;
- logger.debug("结束位检测: 字节=0x{}, 二进制={}, 第5位(结束标识)={}",
- String.format("%02X", flagByte), binary, isEnd ? "1(结束)" : "0(未结束)");
- } else {
- logger.warn("数据长度不足,无法检测结束位");
- }
- // 入库
- service.save(resultData);
- logger.info("上报历史记录数据入库成功: {}", resultData);
- // 如果结束位=1,可以根据需要在此处理关闭逻辑
- if (Boolean.TRUE.equals(resultData.getIsLastPacket())) {
- logger.info("检测到结束标志帧,执行结束处理逻辑...");
- byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(response));
- Thread.sleep(1000);// 休眠1秒
- logger.debug("已发送结束应答帧上报自定义回应包: {}", printHexBinary(response));
- byte[] endResponse = ProtocolUtils.buildEndReplyFrame(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(endResponse));
- logger.debug("已发送结束应答帧: {}", printHexBinary(endResponse));
- }else{
- // 回复自定义应答帧
- byte[] response = ProtocolUtils.buildCustomReplyFrame(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(response));
- logger.debug("已发送上报自定义回应包: {}", printHexBinary(response));
- }
- } else if (frameType == 0x34) {
- /*logger.info("收到结束通讯帧,准备发送回应包");
- byte[] response = ProtocolUtils.buildShutdownAckPacket(msgBytes);
- ctx.writeAndFlush(Unpooled.copiedBuffer(response));
- logger.debug("已发送结束通讯回应包");*/
- } else {
- logger.warn("收到未知帧类型: 0x{}", String.format("%02X", frameType));
- }
- } catch (InvalidMessageException e) {
- logger.error("数据校验失败,不入库: {}", e.getMessage());
- sendErrorResponse(ctx, "数据校验失败");
- } catch (Exception e) {
- logger.error("数据解析或入库异常", e);
- sendErrorResponse(ctx, "数据处理异常");
- }
- }
- // 工具方法:发送错误响应
- private void sendErrorResponse(ChannelHandlerContext ctx, String msg) {
- ctx.writeAndFlush(Unpooled.copiedBuffer(("数据处理失败: " + msg).getBytes()));
- }
- private String printHexBinary(byte[] bytes) {
- StringBuilder sb = new StringBuilder();
- for (byte b : bytes) {
- sb.append(String.format("%02X ", b));
- }
- System.out.println("Received raw data: " + sb.toString());
- logger.info("Received raw data: " + sb.toString());
- return sb.toString();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (cause instanceof ReadTimeoutException) {
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
- } else {
- cause.printStackTrace();
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
- ctx.close();
- }
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
- ctx.fireChannelUnregistered();
- }
- // 主动向设备发送数据
- public void sendDataToDevice(ChannelHandlerContext ctx) {
- try {
- // 将字符串形式的十六进制数据转换为字节数组
- String hexData = "01 03 00 00 00 01 84 0A ";
- String[] hexArray = hexData.split(" ");
- byte[] dataBytes = new byte[hexArray.length];
- for (int i = 0; i < hexArray.length; i++) {
- dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
- }
- ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
- ctx.writeAndFlush(byteBuf);
- logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
- } catch (Exception e) {
- logger.error("发送数据失败: {}", e.getMessage());
- }
- }
- }
|