package com.zksy.pole.MQTTServer.handler; /** * @author ShaoYang * @date 2024/03/13 15:46 */ import com.zksy.pole.MQTTServer.server.MQTTServer; import io.netty.channel.*; import io.netty.handler.codec.mqtt.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; import java.util.List; import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove; /** * 消息处理,单例启动 * * @author ShaoYang */ @Slf4j @ChannelHandler.Sharable @Component public class MQTTMessageHandler extends ChannelInboundHandlerAdapter { @Autowired private com.zksy.pole.MQTTServer.callBack.BootNettyMqttMsgBack BootNettyMqttMsgBack; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg) { System.err.println(msg); MqttMessage mqttMessage = (MqttMessage) msg; log.info("info--"+mqttMessage.toString()); MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); Channel channel = ctx.channel(); if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){ //用户鉴权(配置文件中配置账号和密码,如果没有默认) log.warn("正在尝试鉴权"); boolean authentag= BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload()); if(!authentag){ return; } // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接 if(MQTTServer.MQTTdeviceChannelGroup.contains(channel)){ //移除次设备channel和topic BootNettyMqttMsgBack.disconnack(channel,mqttMessage); } // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息 BootNettyMqttMsgBack.connack(channel, mqttMessage); } //对于没有鉴权的设备,请求不处理 if(!MQTTServer.MQTTdeviceChannelGroup.contains(channel)){ log.warn(channel.id()+"无鉴权操作"); return; } /*Map variableHeader = (Map) mqttMessage.variableHeader(); List channelIdList = new ArrayList<>(); channelIdList.add(channel.id()); subscribeMap.put(variableHeader.get("topicName"),channelIdList);*/ switch (mqttFixedHeader.messageType()){ case PUBLISH: // 客户端发布消息 // PUBACK报文是对QoS 1等级的PUBLISH报文的响应 BootNettyMqttMsgBack.puback(channel, mqttMessage); break; // PUBREL Qos2级别消息,客户端返回 case PUBREL: // PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应 //服务端收到pubrel之后,正式将消息投递给上层应用层。 MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader(); if(MQTTServer.mqttMessageIdMap.containsKey(VariableHeader.messageId())) { log.warn("移除消息缓存-->消息id"+VariableHeader.messageId()); BootNettyMqttMsgBack.subscribSend(MQTTServer.mqttMessageIdMap.get(VariableHeader.messageId()),channel); BootNettyMqttMsgBack.pubcomp(channel, mqttMessage); MQTTServer.mqttMessageIdMap.remove(VariableHeader.messageId()); }else { //后续多次收到REL消息,制作comp响应 BootNettyMqttMsgBack.pubcomp(channel, mqttMessage); } break; case SUBSCRIBE: // 客户端订阅主题 // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。 // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。 // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端 // to do BootNettyMqttMsgBack.suback(channel, mqttMessage); MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();; for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) { String topicname=SubscribePayload.topicSubscriptions().get(i).topicName(); boolean tag= MQTTServer.subscribeMap.containsKey(topicname); if(tag){ List channelIds= MQTTServer.subscribeMap.get(topicname); if(!channelIds.contains(channel.id())) { channelIds.add(channel.id()); }else { log.warn(channel.id()+"重复订阅"); } MQTTServer.subscribeMap.put(topicname, channelIds); }else { List channelIds=new ArrayList<>(); channelIds.add(channel.id()); MQTTServer.subscribeMap.put(topicname,channelIds); } log.info(channel.id()+"订阅地址————》"+topicname); } break; case UNSUBSCRIBE: // 客户端取消订阅 // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题 // to do BootNettyMqttMsgBack.unsuback(channel, mqttMessage); Object Unsubscribe=mqttMessage.payload(); MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe; int len=unsubscribePayload.topics().size(); for (int i = 0; i < len; i++) { String topicname=unsubscribePayload.topics().get(i); boolean tag= MQTTServer.subscribeMap.containsKey(topicname); if(tag){ List channelIds= MQTTServer.subscribeMap.get(topicname); channelIds.remove(channel.id()); MQTTServer.subscribeMap.put(topicname,channelIds); }else { log.error("不存在订阅地址——>"+topicname); } log.info(channel.id()+"取消订阅地址————》"+topicname); } break; case PINGREQ: // 客户端发起心跳 // 客户端发送PINGREQ报文给服务端的 // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着 // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开 BootNettyMqttMsgBack.pingresp(channel, mqttMessage); break; case DISCONNECT: // 客户端主动断开连接 log.debug("设备下线,channelId:{}", channel.id()); MQTTdeviceRemove(channel); // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0 // to do break; default: break; } } else { return; } } /** * 从客户端收到新的数据、读取完成时调用 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { } /** * 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行 */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } /** * 客户端与服务端 断连时执行 channelInactive方法之后执行 */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.warn(ctx.channel().id()+"连接断开"); MQTTdeviceRemove(ctx.channel()); super.channelUnregistered(ctx); } /** * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); log.warn(channel.id()+"连接异常断开。。。。。。。"); MQTTdeviceRemove(ctx.channel()); super.exceptionCaught(ctx, cause); if(channel.isActive()){ ctx.close(); } } @Override public void channelInactive(ChannelHandlerContext ctx) { log.debug("\n"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } /** * 服务端 当读超时时 会调用这个方法 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); ctx.close(); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } }