|
@@ -0,0 +1,234 @@
|
|
|
|
|
+package com.zksy.infrared.utils;
|
|
|
|
|
+
|
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
|
|
|
+import com.zksy.common.exception.InvalidMessageException;
|
|
|
|
|
+import com.zksy.infrared.domain.InfraredReadingMeter;
|
|
|
|
|
+import com.zksy.infrared.service.InfraredReadingMeterService;
|
|
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
|
|
+import io.netty.buffer.Unpooled;
|
|
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
|
|
+import io.netty.channel.ChannelHandlerContext;
|
|
|
|
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
|
|
+import io.netty.handler.timeout.ReadTimeoutException;
|
|
|
|
|
+import io.netty.util.Attribute;
|
|
|
|
|
+import io.netty.util.AttributeKey;
|
|
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+
|
|
|
|
|
+@ChannelHandler.Sharable
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+public class MessageHandler extends ChannelInboundHandlerAdapter {
|
|
|
|
|
+ private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class);
|
|
|
|
|
+ private final InfraredReadingMeterService infraredReadingMeterService;
|
|
|
|
|
+ private final AttributeKey<CommunicationState> STATE_KEY = AttributeKey.valueOf("communicationState");
|
|
|
|
|
+ private final AttributeKey<Long> RECORD_ID_KEY = AttributeKey.valueOf("recordId");
|
|
|
|
|
+ private static final String INITIAL_COMMAND = "01 03 00 33 00 01 74 05";
|
|
|
|
|
+ private static final String METER_NUMBER_COMMAND = "01 03 00 03 00 03 F5 CB";
|
|
|
|
|
+
|
|
|
|
|
+ // 通信状态枚举
|
|
|
|
|
+ private enum CommunicationState {
|
|
|
|
|
+ INIT, // 初始状态
|
|
|
|
|
+ WAITING_FOR_DATA, // 等待数据返回
|
|
|
|
|
+ WAITING_FOR_METER_NUMBER // 等待表号返回
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Autowired
|
|
|
|
|
+ public MessageHandler(InfraredReadingMeterService infraredReadingMeterService) {
|
|
|
|
|
+ this.infraredReadingMeterService = infraredReadingMeterService;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
|
+ super.channelActive(ctx);
|
|
|
|
|
+ ctx.channel().attr(STATE_KEY).set(CommunicationState.INIT);
|
|
|
|
|
+ ctx.channel().attr(RECORD_ID_KEY).set(null);
|
|
|
|
|
+ sendDataToDevice(ctx, INITIAL_COMMAND);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
|
|
|
+ Attribute<CommunicationState> stateAttr = ctx.channel().attr(STATE_KEY);
|
|
|
|
|
+ CommunicationState state = stateAttr.get() != null ? stateAttr.get() : CommunicationState.INIT;
|
|
|
|
|
+ Attribute<Long> recordIdAttr = ctx.channel().attr(RECORD_ID_KEY);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ ByteBuf msgByteBuf = (ByteBuf) msg;
|
|
|
|
|
+ if (msgByteBuf == null || !msgByteBuf.isReadable()) {
|
|
|
|
|
+ logger.warn("接收到无效的消息");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("接收到 {} 字节的数据,来自: {}", msgByteBuf.readableBytes(), ctx.channel().remoteAddress());
|
|
|
|
|
+ byte[] msgBytes = new byte[msgByteBuf.readableBytes()];
|
|
|
|
|
+ msgByteBuf.readBytes(msgBytes);
|
|
|
|
|
+ String result = printHexBinary(msgBytes);
|
|
|
|
|
+ String msgString = result.replaceAll("\\s", "");
|
|
|
|
|
+ if("7777772E7573722E636E".equals(msgString)){
|
|
|
|
|
+ logger.warn("接收到无效的消息");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 数据校验
|
|
|
|
|
+ String CRCString = msgString.substring(msgString.length() - 4);
|
|
|
|
|
+ String bodyResult = msgString.substring(0, msgString.length() - 4);
|
|
|
|
|
+ String codeString = DataCheckUtil.crc16(bodyResult);
|
|
|
|
|
+ logger.info("校验码:{}-----------生成的校验码:{}", CRCString, codeString);
|
|
|
|
|
+
|
|
|
|
|
+ if (!CRCString.equals(codeString)) {
|
|
|
|
|
+ throw new InvalidMessageException("数据校验不成功");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ switch (state) {
|
|
|
|
|
+ case INIT:
|
|
|
|
|
+ processInitialResponse(ctx, msgString, stateAttr);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case WAITING_FOR_DATA:
|
|
|
|
|
+ processDeviceData(ctx, msgString, stateAttr, recordIdAttr);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case WAITING_FOR_METER_NUMBER:
|
|
|
|
|
+ processMeterNumber(ctx, msgString, recordIdAttr);
|
|
|
|
|
+ stateAttr.set(CommunicationState.INIT); // 处理完表号后回到初始状态
|
|
|
|
|
+ break;
|
|
|
|
|
+ default:
|
|
|
|
|
+ logger.warn("未知通信状态: {}", state);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (InvalidMessageException e) {
|
|
|
|
|
+ logger.error("数据处理失败: {}", e.getMessage());
|
|
|
|
|
+ ctx.writeAndFlush(Unpooled.copiedBuffer("数据处理失败".getBytes()));
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ logger.error("处理消息时发生未知异常", e);
|
|
|
|
|
+ ctx.close();
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ ReferenceCountUtil.release(msg);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void processInitialResponse(ChannelHandlerContext ctx, String msgString,
|
|
|
|
|
+ Attribute<CommunicationState> stateAttr) throws InvalidMessageException {
|
|
|
|
|
+ String agreement = msgString.substring(6, msgString.length() - 4);
|
|
|
|
|
+ switch (agreement) {
|
|
|
|
|
+ case "0000":
|
|
|
|
|
+ sendDataToDevice(ctx, "01 03 03 E8 00 02 44 7B");
|
|
|
|
|
+ stateAttr.set(CommunicationState.WAITING_FOR_DATA);
|
|
|
|
|
+ logger.info("协议匹配0000,已发送指令并更新通信状态");
|
|
|
|
|
+ break;
|
|
|
|
|
+ case "0001":
|
|
|
|
|
+ sendDataToDevice(ctx, "01 03 07 D0 00 02 C4 86");
|
|
|
|
|
+ stateAttr.set(CommunicationState.WAITING_FOR_DATA);
|
|
|
|
|
+ logger.info("协议匹配0001,已发送指令并更新通信状态");
|
|
|
|
|
+ break;
|
|
|
|
|
+ case "0002":
|
|
|
|
|
+ sendDataToDevice(ctx, "01 03 0B B8 00 02 46 0A");
|
|
|
|
|
+ stateAttr.set(CommunicationState.WAITING_FOR_DATA);
|
|
|
|
|
+ logger.info("协议匹配0002,已发送指令并更新通信状态");
|
|
|
|
|
+ break;
|
|
|
|
|
+ default:
|
|
|
|
|
+ throw new InvalidMessageException("数据协议不匹配,未知协议类型: " + agreement);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void processDeviceData(ChannelHandlerContext ctx, String msgString,
|
|
|
|
|
+ Attribute<CommunicationState> stateAttr,
|
|
|
|
|
+ Attribute<Long> recordIdAttr) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ InfraredReadingMeter resultData = DataParser.parseMessage(msgString);
|
|
|
|
|
+ boolean savedData = infraredReadingMeterService.save(resultData);
|
|
|
|
|
+ if (savedData) {
|
|
|
|
|
+ recordIdAttr.set(resultData.getId());
|
|
|
|
|
+ logger.info("已保存设备数据,ID: {}", resultData.getId());
|
|
|
|
|
+ // 发送获取表号的指令
|
|
|
|
|
+ sendDataToDevice(ctx, METER_NUMBER_COMMAND);
|
|
|
|
|
+ stateAttr.set(CommunicationState.WAITING_FOR_METER_NUMBER);
|
|
|
|
|
+ logger.info("已发送获取表号指令,等待表号返回");
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.info("失败");
|
|
|
|
|
+ stateAttr.set(CommunicationState.INIT);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ logger.error("处理设备数据时出错: {}", e.getMessage(), e);
|
|
|
|
|
+ stateAttr.set(CommunicationState.INIT); // 出错后重置状态
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private void processMeterNumber(ChannelHandlerContext ctx, String msgString,
|
|
|
|
|
+ Attribute<Long> recordIdAttr) {
|
|
|
|
|
+ Long recordId = recordIdAttr.get();
|
|
|
|
|
+ if (recordId == null) {
|
|
|
|
|
+ logger.error("没有找到关联的记录ID,无法更新表号");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 解析表号
|
|
|
|
|
+ String meterNumber = DataParser.parseMeterNumber(msgString);
|
|
|
|
|
+ if (meterNumber != null && !meterNumber.isEmpty()) {
|
|
|
|
|
+ LambdaUpdateWrapper<InfraredReadingMeter> infraredReadingMeterLambdaQueryWrapper = new LambdaUpdateWrapper<>();
|
|
|
|
|
+ infraredReadingMeterLambdaQueryWrapper.eq(InfraredReadingMeter::getId, recordId);
|
|
|
|
|
+ infraredReadingMeterLambdaQueryWrapper.set(InfraredReadingMeter::getElectricNumber, meterNumber);
|
|
|
|
|
+ infraredReadingMeterService.update(infraredReadingMeterLambdaQueryWrapper);
|
|
|
|
|
+ logger.info("已更新记录ID {} 的表号为: {}", recordId, meterNumber);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.warn("无法从消息中提取有效表号: {}", msgString);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ logger.error("处理表号时出错: {}", e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ recordIdAttr.set(null); // 处理完后清空记录ID
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private String printHexBinary(byte[] bytes) {
|
|
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
|
|
+ for (byte b : bytes) {
|
|
|
|
|
+ sb.append(String.format("%02X ", b));
|
|
|
|
|
+ }
|
|
|
|
|
+ logger.info("Received raw data: {}", sb.toString());
|
|
|
|
|
+ return sb.toString();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
|
|
+ if (cause instanceof ReadTimeoutException) {
|
|
|
|
|
+ logger.info("来自{}的连接超时断开", ctx.channel().remoteAddress());
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.error("来自{}的连接异常断开", ctx.channel().remoteAddress(), cause);
|
|
|
|
|
+ }
|
|
|
|
|
+ ctx.close();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
|
+ ctx.flush();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
|
+ logger.info("来自{}的连接主动断开", ctx.channel().remoteAddress());
|
|
|
|
|
+ ctx.channel().attr(STATE_KEY).set(null);
|
|
|
|
|
+ ctx.channel().attr(RECORD_ID_KEY).set(null);
|
|
|
|
|
+ ctx.fireChannelUnregistered();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 主动向设备发送数据
|
|
|
|
|
+ public void sendDataToDevice(ChannelHandlerContext ctx, String hexData) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String[] hexArray = hexData.split(" ");
|
|
|
|
|
+ byte[] dataBytes = new byte[hexArray.length];
|
|
|
|
|
+ for (int i = 0; i < hexArray.length; i++) {
|
|
|
|
|
+ dataBytes[i] = (byte) Integer.parseInt(hexArray[i], 16);
|
|
|
|
|
+ }
|
|
|
|
|
+ ByteBuf byteBuf = Unpooled.copiedBuffer(dataBytes);
|
|
|
|
|
+ ctx.writeAndFlush(byteBuf);
|
|
|
|
|
+ logger.info("已向设备发送数据: {}", printHexBinary(dataBytes));
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ logger.error("发送数据失败: {}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|