package com.zksy.pressure.utils; import cn.hutool.core.lang.UUID; import com.zksy.common.exception.InvalidMessageException; import com.zksy.pressure.domain.FirefightingPressure; import com.zksy.pressure.service.FirefightingPressureService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @ChannelHandler.Sharable @Slf4j @Component public class MessageHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(MessageHandler.class); private final FirefightingPressureService firefightingPressureService; @Autowired public MessageHandler(FirefightingPressureService firefightingPressureService) { this.firefightingPressureService = firefightingPressureService; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); //sendDataToDevice(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf msgByteBuf = (ByteBuf) msg; if (msgByteBuf == null || !msgByteBuf.isReadable()) { logger.warn("接收到无效的消息"); return; } logger.info("接收到 {} 字节的数据,来自: {}", msgByteBuf.readableBytes(), ctx.channel().remoteAddress()); byte[] msgBytes = new byte[msgByteBuf.readableBytes()]; msgByteBuf.readBytes(msgBytes); String result = printHexBinary(msgBytes); String msgString = result.replaceAll("\\s", ""); String CRCString = msgString.substring(msgString.length() - 4); String bodyResult = msgString.substring(0, msgString.length() - 4); String codeString = DataCheckUtil.crc16Tall(bodyResult); System.out.println("校验码:" + CRCString + "-----------生成的校验码:" + codeString); if (!CRCString.equals(codeString)) { throw new InvalidMessageException("数据校验不成功"); } else { logger.info("成功!!!接收到 {} 字节的数据,来自: {}", msgString, ctx.channel().remoteAddress()); FirefightingPressure resultData = DataParser.parseMessage(msgString); resultData.setId(UUID.randomUUID().toString()); // 更新设备最后一次接收数据的时间 //String addressCode = resultData.getAddressCode(); //DeviceOfflineCheckTask.deviceLastReceiveTimeMap.put(addressCode, new Date()); firefightingPressureService.save(resultData); } } catch (InvalidMessageException e) { logger.error("数据入库失败: {}", e.getMessage()); ctx.writeAndFlush(Unpooled.copiedBuffer("数据入库失败".getBytes())); } } private String printHexBinary(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { sb.append(String.format("%02X ", b)); } System.out.println("Received raw data: " + sb.toString()); logger.info("Received raw data: " + sb.toString()); return sb.toString(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开"); } else { cause.printStackTrace(); logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开"); ctx.close(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开"); ctx.fireChannelUnregistered(); } // 主动向设备发送数据 public void sendDataToDevice(ChannelHandlerContext ctx) { try { // 将字符串形式的十六进制数据转换为字节数组 String hexData = "01 03 00 00 00 01 84 0A "; String[] hexArray = hexData.split(" "); byte[] dataBytes = new byte[hexArray.length]; for (int i = 0; i < hexArray.length; i++) { dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16); } ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes); ctx.writeAndFlush(byteBuf); logger.info("已向设备发送数据: {}", printHexBinary(dataBytes)); } catch (Exception e) { logger.error("发送数据失败: {}", e.getMessage()); } } }