| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package com.zksy.gas.utils;
- import cn.hutool.core.lang.UUID;
- import com.zksy.common.exception.InvalidMessageException;
- import com.zksy.gas.domain.GasMonitorData;
- import com.zksy.gas.service.GasMonitorDataService;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.ReadTimeoutException;
- import io.netty.util.ReferenceCountUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.util.Arrays;
- @ChannelHandler.Sharable
- @Slf4j
- @Component
- public class MessageHandler extends ChannelInboundHandlerAdapter {
- private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
- private final GasMonitorDataService service;
- @Autowired
- public MessageHandler(GasMonitorDataService firefightingPressureService) {
- this.service = firefightingPressureService;
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- //sendDataToDevice(ctx);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- try {
- ByteBuf msgByteBuf = (ByteBuf) msg;
- if (msgByteBuf == null || !msgByteBuf.isReadable()) {
- logger.warn("接收到无效的消息");
- return;
- }
- int readable = msgByteBuf.readableBytes();
- byte[] msgBytes = new byte[readable];
- msgByteBuf.readBytes(msgBytes);
- logger.info("接收到 {} 字节的数据: {}, 来自: {}",
- readable, ProtocolUtils.bytesToHex(msgBytes), ctx.channel().remoteAddress());
- int checksumTotalLength = 2; // 末尾2字节校验码(如ED34)
- int bodyStartOffset = 16; // 校验和07的主体起始索引
- int fullDataStartOffset = 3; // 校验和16的完整数据起始索引
- // 校验报文长度合法性
- if (readable < fullDataStartOffset + checksumTotalLength) {
- throw new InvalidMessageException("报文长度不足,无法完成校验");
- }
- int bodyEndOffset = msgBytes.length - checksumTotalLength; // 避开末尾2字节校验码
- int bodyLength = bodyEndOffset - bodyStartOffset;
- if (bodyLength <= 0) {
- throw new InvalidMessageException("主体数据长度无效,无法计算校验和07");
- }
- int checksum07 = DataCheckUtil.calcSumOnly(msgBytes, bodyStartOffset, bodyLength);
- int fullDataEndOffset = msgBytes.length - 1; // 包含倒数第二个字节
- int fullDataLength = fullDataEndOffset - fullDataStartOffset;
- if (fullDataLength <= 0) {
- throw new InvalidMessageException("完整数据长度无效,无法计算校验和16");
- }
- int checksum16 = DataCheckUtil.calcChecksum(msgBytes, fullDataStartOffset, fullDataLength);
- int calculatedChecksum = (checksum07 << 8) | checksum16;
- int receivedChecksum = ((msgBytes[msgBytes.length - 2] & 0xFF) << 8)
- | (msgBytes[msgBytes.length - 1] & 0xFF);
- // 打印调试信息,确认拼接结果
- logger.debug("校验和07(高8位,仅求和): {}", Integer.toHexString(checksum07).toUpperCase());
- logger.debug("校验和16(低8位,FF-求和): {}", Integer.toHexString(checksum16).toUpperCase());
- logger.debug("修改后拼接校验码: {}", Integer.toHexString(calculatedChecksum).toUpperCase());
- logger.debug("报文携带校验码: {}", Integer.toHexString(receivedChecksum).toUpperCase());
- if (calculatedChecksum != receivedChecksum) {
- throw new InvalidMessageException(
- String.format("数据校验失败:本地计算(07+16)=%04X(07=%02X, 16=%02X),报文携带=%04X",
- calculatedChecksum, checksum07, checksum16, receivedChecksum));
- }
- com.zksy.gas.domain.GasMonitorData resultData = DataParser.parseMessage(msgBytes);
- resultData.setId(java.util.UUID.randomUUID().toString());
- service.save(resultData);
- logger.info("数据解析入库成功: {}", resultData);
- } catch (InvalidMessageException e) {
- logger.error("数据校验失败: {}", e.getMessage());
- ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes()));
- } catch (Exception e) {
- logger.error("数据解析/入库异常", e);
- ctx.writeAndFlush(Unpooled.copiedBuffer("数据处理异常".getBytes()));
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- private String printHexBinary(byte[] bytes) {
- StringBuilder sb = new StringBuilder();
- for (byte b : bytes) {
- sb.append(String.format("%02X ", b));
- }
- System.out.println("Received raw data: " + sb.toString());
- logger.info("Received raw data: " + sb.toString());
- return sb.toString();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (cause instanceof ReadTimeoutException) {
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
- } else {
- cause.printStackTrace();
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
- ctx.close();
- }
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
- ctx.fireChannelUnregistered();
- }
- // 主动向设备发送数据
- public void sendDataToDevice(ChannelHandlerContext ctx) {
- try {
- // 将字符串形式的十六进制数据转换为字节数组
- String hexData = "01 03 00 00 00 01 84 0A ";
- String[] hexArray = hexData.split(" ");
- byte[] dataBytes = new byte[hexArray.length];
- for (int i = 0; i < hexArray.length; i++) {
- dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
- }
- ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
- ctx.writeAndFlush(byteBuf);
- logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
- } catch (Exception e) {
- logger.error("发送数据失败: {}", e.getMessage());
- }
- }
- }
|