|
|
@@ -0,0 +1,121 @@
|
|
|
+package com.zksy.gas.utils;
|
|
|
+
|
|
|
+import cn.hutool.core.lang.UUID;
|
|
|
+import com.zksy.common.exception.InvalidMessageException;
|
|
|
+import com.zksy.gas.domain.GasMonitorData;
|
|
|
+import com.zksy.gas.service.GasMonitorDataService;
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
+import io.netty.handler.timeout.ReadTimeoutException;
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+@ChannelHandler.Sharable
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class MessageHandler extends ChannelInboundHandlerAdapter {
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(MessageHandler.class);
|
|
|
+ private final GasMonitorDataService service;
|
|
|
+ @Autowired
|
|
|
+ public MessageHandler(GasMonitorDataService firefightingPressureService) {
|
|
|
+ this.service = firefightingPressureService;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ super.channelActive(ctx);
|
|
|
+ //sendDataToDevice(ctx);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
|
+ try {
|
|
|
+ ByteBuf msgByteBuf = (ByteBuf) msg;
|
|
|
+ if (msgByteBuf == null || !msgByteBuf.isReadable()) {
|
|
|
+ logger.warn("接收到无效的消息");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ int readable = msgByteBuf.readableBytes();
|
|
|
+ byte[] msgBytes = new byte[readable];
|
|
|
+ msgByteBuf.readBytes(msgBytes);
|
|
|
+ logger.info("接收到 {} 字节的数据: {}, 来自: {}",
|
|
|
+ readable, ProtocolUtils.bytesToHex(msgBytes), ctx.channel().remoteAddress());
|
|
|
+ //计算校验和
|
|
|
+ int check = DataCheckUtil.calcChecksum(msgBytes, 2, msgBytes.length - 3);
|
|
|
+ int checksum = msgBytes[msgBytes.length - 2] & 0xFF;
|
|
|
+
|
|
|
+ if (check != checksum) {
|
|
|
+ throw new InvalidMessageException("数据校验失败,本地计算=" + check + ",报文携带=" + checksum);
|
|
|
+ }
|
|
|
+ GasMonitorData resultData = DataParser.parseMessage(msgBytes);
|
|
|
+ resultData.setId(UUID.randomUUID().toString());
|
|
|
+ service.save(resultData);
|
|
|
+ logger.info("数据解析入库成功: {}", resultData);
|
|
|
+ } catch (InvalidMessageException e) {
|
|
|
+ logger.error("数据校验失败: {}", e.getMessage());
|
|
|
+ ctx.writeAndFlush(Unpooled.copiedBuffer("数据校验失败".getBytes()));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("数据解析/入库异常", e);
|
|
|
+ ctx.writeAndFlush(Unpooled.copiedBuffer("数据处理异常".getBytes()));
|
|
|
+ } finally {
|
|
|
+ ReferenceCountUtil.release(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private String printHexBinary(byte[] bytes) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for (byte b : bytes) {
|
|
|
+ sb.append(String.format("%02X ", b));
|
|
|
+ }
|
|
|
+ System.out.println("Received raw data: " + sb.toString());
|
|
|
+ logger.info("Received raw data: " + sb.toString());
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ if (cause instanceof ReadTimeoutException) {
|
|
|
+ logger.info("来自" + ctx.channel().remoteAddress() + "的连接超时断开");
|
|
|
+ } else {
|
|
|
+ cause.printStackTrace();
|
|
|
+ logger.info("来自" + ctx.channel().remoteAddress() + "的连接异常断开");
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ ctx.flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ logger.info("来自" + ctx.channel().remoteAddress() + "的连接主动断开");
|
|
|
+ ctx.fireChannelUnregistered();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 主动向设备发送数据
|
|
|
+ public void sendDataToDevice(ChannelHandlerContext ctx) {
|
|
|
+ try {
|
|
|
+ // 将字符串形式的十六进制数据转换为字节数组
|
|
|
+ String hexData = "01 03 00 00 00 01 84 0A ";
|
|
|
+ String[] hexArray = hexData.split(" ");
|
|
|
+ byte[] dataBytes = new byte[hexArray.length];
|
|
|
+ for (int i = 0; i < hexArray.length; i++) {
|
|
|
+ dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
|
|
|
+ }
|
|
|
+ ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
|
|
|
+ ctx.writeAndFlush(byteBuf);
|
|
|
+ logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("发送数据失败: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|