package com.zksy.electricity.utils; import com.zksy.common.exception.InvalidMessageException; import com.zksy.electricity.config.NettyServer; import com.zksy.electricity.domain.MessageParseResult; import com.zksy.electricity.service.MessageParseResultService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.math.BigInteger; @Slf4j public class MessageHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(MessageHandler.class); @Autowired private MessageParseResultService messageParseResultService; public MessageHandler() { this.messageParseResultService = SpringContextUtil.getBean(MessageParseResultService.class); } /** * 本方法用于读取客户端发送的信息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf msgByteBuf = (ByteBuf) msg; if (msgByteBuf == null || !msgByteBuf.isReadable()) { logger.warn("接收到无效的消息"); return; } // 安全的日志记录 logger.info("接收到 {} 字节的数据", msgByteBuf.readableBytes()); byte[] msgBytes = new byte[msgByteBuf.readableBytes()]; msgByteBuf.readBytes(msgBytes); String msgString = printHexBinary(msgBytes); if (!validateMessage(msgString)) { throw new InvalidMessageException("数据校验不成功"); }else{ MessageParseResult result = DataParser.parseMessage(msgString); messageParseResultService.saveMessageParseResult(result); } } catch (InvalidMessageException e) { logger.error("数据校验失败: {}", e.getMessage()); ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes())); } } // 校验消息的方法 private static boolean validateMessage(String msgString) { // 去除首尾可能存在的空格 msgString = msgString.trim(); // 检查是否以 FE FE FE FE 开头,以 16 结尾 if (!msgString.startsWith("FE FE FE FE") || !msgString.endsWith("16")) { return false; } // 找到第一个 68 的位置 int startIndex = msgString.indexOf("68"); if (startIndex == -1) { return false; } // 找到结束码 16 的位置 int endIndex = msgString.lastIndexOf("16"); // 提取从第一个 68 到校验和之前的数据 String dataToSum = msgString.substring(startIndex, msgString.lastIndexOf(" ", endIndex - 3)); String[] hexArray = dataToSum.split(" "); // 计算校验和 BigInteger sum = BigInteger.ZERO; for (String hex : hexArray) { sum = sum.add(new BigInteger(hex, 16)); } // 取低 8 位,也就是十六进制的后两位 sum = sum.and(new BigInteger("FF", 16)); // 先找到校验和前面的空格位置 int checksumStart = msgString.lastIndexOf(" ", endIndex - 3) + 1; // 校验和结束位置应该是 endIndex,因为 substring 方法的结束索引不包含在截取范围内 int checksumEnd = endIndex; String checksumHex = msgString.substring(checksumStart, checksumEnd).trim(); BigInteger expectedChecksum = new BigInteger(checksumHex, 16); // 比较计算的校验和与消息中的校验和 return sum.equals(expectedChecksum); } private String printHexBinary(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) { sb.append(String.format("%02X ", b)); } System.out.println("Received raw data: " + sb.toString()); logger.info("Received raw data: " + sb.toString()); return sb.toString(); } /** * 本方法用作处理异常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause.getClass() == io.netty.handler.timeout.ReadTimeoutException.class) { logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接超时断开"); } else { cause.printStackTrace(); logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接异常断开"); ctx.close(); } NettyServer.sc= null; } /** * 信息获取完毕后操作 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * 断开连接时操作 * * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { if (NettyServer.sc!= null) { logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接主动断开"); NettyServer.sc= null; } ctx.fireChannelUnregistered(); } /** * 根据信息具体操作的业务方法 * * @param msgBytes * @param ctx */ private void handler(byte[] msgBytes, ChannelHandlerContext ctx) { // 解析接收到的数据 // 回复客户端 /*String responseMessage = "Message received and processed."; ByteBuf echo = ctx.alloc().buffer(); echo.writeBytes(responseMessage.getBytes(io.netty.util.CharsetUtil.UTF_8)); ctx.writeAndFlush(echo);*/ } }