| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package com.zksy.electricity.utils;
- import com.zksy.common.exception.InvalidMessageException;
- import com.zksy.electricity.config.NettyServer;
- import com.zksy.electricity.domain.MessageParseResult;
- import com.zksy.electricity.service.MessageParseResultService;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.math.BigInteger;
- @Slf4j
- public class MessageHandler extends ChannelInboundHandlerAdapter {
- private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
- @Autowired
- private MessageParseResultService messageParseResultService;
- public MessageHandler() {
- this.messageParseResultService = SpringContextUtil.getBean(MessageParseResultService.class);
- }
- /**
- * 本方法用于读取客户端发送的信息
- *
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- try {
- ByteBuf msgByteBuf = (ByteBuf) msg;
- if (msgByteBuf == null || !msgByteBuf.isReadable()) {
- logger.warn("接收到无效的消息");
- return;
- }
- // 安全的日志记录
- logger.info("接收到 {} 字节的数据", msgByteBuf.readableBytes());
- byte[] msgBytes = new byte[msgByteBuf.readableBytes()];
- msgByteBuf.readBytes(msgBytes);
- String msgString = printHexBinary(msgBytes);
- if (!validateMessage(msgString)) {
- throw new InvalidMessageException("数据校验不成功");
- }else{
- MessageParseResult result = DataParser.parseMessage(msgString);
- messageParseResultService.saveMessageParseResult(result);
- }
- } catch (InvalidMessageException e) {
- logger.error("数据校验失败: {}", e.getMessage());
- ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes()));
- }
- }
- // 校验消息的方法
- private static boolean validateMessage(String msgString) {
- // 去除首尾可能存在的空格
- msgString = msgString.trim();
- // 检查是否以 FE FE FE FE 开头,以 16 结尾
- if (!msgString.startsWith("FE FE FE FE") || !msgString.endsWith("16")) {
- return false;
- }
- // 找到第一个 68 的位置
- int startIndex = msgString.indexOf("68");
- if (startIndex == -1) {
- return false;
- }
- // 找到结束码 16 的位置
- int endIndex = msgString.lastIndexOf("16");
- // 提取从第一个 68 到校验和之前的数据
- String dataToSum = msgString.substring(startIndex, msgString.lastIndexOf(" ", endIndex - 3));
- String[] hexArray = dataToSum.split(" ");
- // 计算校验和
- BigInteger sum = BigInteger.ZERO;
- for (String hex : hexArray) {
- sum = sum.add(new BigInteger(hex, 16));
- }
- // 取低 8 位,也就是十六进制的后两位
- sum = sum.and(new BigInteger("FF", 16));
- // 先找到校验和前面的空格位置
- int checksumStart = msgString.lastIndexOf(" ", endIndex - 3) + 1;
- // 校验和结束位置应该是 endIndex,因为 substring 方法的结束索引不包含在截取范围内
- int checksumEnd = endIndex;
- String checksumHex = msgString.substring(checksumStart, checksumEnd).trim();
- BigInteger expectedChecksum = new BigInteger(checksumHex, 16);
- // 比较计算的校验和与消息中的校验和
- return sum.equals(expectedChecksum);
- }
- private String printHexBinary(byte[] bytes) {
- StringBuilder sb = new StringBuilder();
- for (byte b : bytes) {
- sb.append(String.format("%02X ", b));
- }
- System.out.println("Received raw data: " + sb.toString());
- logger.info("Received raw data: " + sb.toString());
- return sb.toString();
- }
- /**
- * 本方法用作处理异常
- *
- * @param ctx
- * @param cause
- * @throws Exception
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- if (cause.getClass() == io.netty.handler.timeout.ReadTimeoutException.class) {
- logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接超时断开");
- } else {
- cause.printStackTrace();
- logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接异常断开");
- ctx.close();
- }
- NettyServer.sc= null;
- }
- /**
- * 信息获取完毕后操作
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- /**
- * 断开连接时操作
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- if (NettyServer.sc!= null) {
- logger.info("来自" + NettyServer.sc.remoteAddress() + "的连接主动断开");
- NettyServer.sc= null;
- }
- ctx.fireChannelUnregistered();
- }
- /**
- * 根据信息具体操作的业务方法
- *
- * @param msgBytes
- * @param ctx
- */
- private void handler(byte[] msgBytes, ChannelHandlerContext ctx) {
- // 解析接收到的数据
- // 回复客户端
- /*String responseMessage = "Message received and processed.";
- ByteBuf echo = ctx.alloc().buffer();
- echo.writeBytes(responseMessage.getBytes(io.netty.util.CharsetUtil.UTF_8));
- ctx.writeAndFlush(echo);*/
- }
- }
|