| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package com.zksy.pole.MQTTServer.handler;
- /**
- * @author ShaoYang
- * @date 2024/03/13 15:46
- */
- import com.zksy.pole.MQTTServer.server.MQTTServer;
- import io.netty.channel.*;
- import io.netty.handler.codec.mqtt.*;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
- /**
- * 消息处理,单例启动
- *
- * @author ShaoYang
- */
- @Slf4j
- @ChannelHandler.Sharable
- @Component
- public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
- @Autowired
- private com.zksy.pole.MQTTServer.callBack.BootNettyMqttMsgBack BootNettyMqttMsgBack;
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (null != msg) {
- System.err.println(msg);
- MqttMessage mqttMessage = (MqttMessage) msg;
- log.info("info--"+mqttMessage.toString());
- MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
- Channel channel = ctx.channel();
- if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
- //用户鉴权(配置文件中配置账号和密码,如果没有默认)
- log.warn("正在尝试鉴权");
- boolean authentag= BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());
- if(!authentag){
- return;
- }
- // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
- if(MQTTServer.MQTTdeviceChannelGroup.contains(channel)){
- //移除次设备channel和topic
- BootNettyMqttMsgBack.disconnack(channel,mqttMessage);
- }
- // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
- BootNettyMqttMsgBack.connack(channel, mqttMessage);
- }
- //对于没有鉴权的设备,请求不处理
- if(!MQTTServer.MQTTdeviceChannelGroup.contains(channel)){
- log.warn(channel.id()+"无鉴权操作");
- return;
- }
- /*Map<String,String> variableHeader = (Map<String,String>) mqttMessage.variableHeader();
- List<ChannelId> channelIdList = new ArrayList<>();
- channelIdList.add(channel.id());
- subscribeMap.put(variableHeader.get("topicName"),channelIdList);*/
- switch (mqttFixedHeader.messageType()){
- case PUBLISH: // 客户端发布消息
- // PUBACK报文是对QoS 1等级的PUBLISH报文的响应
- BootNettyMqttMsgBack.puback(channel, mqttMessage);
- break;
- // PUBREL Qos2级别消息,客户端返回
- case PUBREL:
- // PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应
- //服务端收到pubrel之后,正式将消息投递给上层应用层。
- MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();
- if(MQTTServer.mqttMessageIdMap.containsKey(VariableHeader.messageId())) {
- log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());
- BootNettyMqttMsgBack.subscribSend(MQTTServer.mqttMessageIdMap.get(VariableHeader.messageId()),channel);
- BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
- MQTTServer.mqttMessageIdMap.remove(VariableHeader.messageId());
- }else {
- //后续多次收到REL消息,制作comp响应
- BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
- }
- break;
- case SUBSCRIBE: // 客户端订阅主题
- // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
- // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
- // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
- // to do
- BootNettyMqttMsgBack.suback(channel, mqttMessage);
- MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
- for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
- String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
- boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
- if(tag){
- List<ChannelId> channelIds= MQTTServer.subscribeMap.get(topicname);
- if(!channelIds.contains(channel.id())) {
- channelIds.add(channel.id());
- }else {
- log.warn(channel.id()+"重复订阅");
- }
- MQTTServer.subscribeMap.put(topicname, channelIds);
- }else {
- List<ChannelId> channelIds=new ArrayList<>();
- channelIds.add(channel.id());
- MQTTServer.subscribeMap.put(topicname,channelIds);
- }
- log.info(channel.id()+"订阅地址————》"+topicname);
- }
- break;
- case UNSUBSCRIBE: // 客户端取消订阅
- // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
- // to do
- BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
- Object Unsubscribe=mqttMessage.payload();
- MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;
- int len=unsubscribePayload.topics().size();
- for (int i = 0; i < len; i++) {
- String topicname=unsubscribePayload.topics().get(i);
- boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
- if(tag){
- List<ChannelId> channelIds= MQTTServer.subscribeMap.get(topicname);
- channelIds.remove(channel.id());
- MQTTServer.subscribeMap.put(topicname,channelIds);
- }else {
- log.error("不存在订阅地址——>"+topicname);
- }
- log.info(channel.id()+"取消订阅地址————》"+topicname);
- }
- break;
- case PINGREQ: // 客户端发起心跳
- // 客户端发送PINGREQ报文给服务端的
- // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
- // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
- BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
- break;
- case DISCONNECT: // 客户端主动断开连接
- log.debug("设备下线,channelId:{}", channel.id());
- MQTTdeviceRemove(channel);
- // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
- // to do
- break;
- default:
- break;
- }
- }
- else {
- return;
- }
- }
- /**
- * 从客户端收到新的数据、读取完成时调用
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
- }
- /**
- * 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
- */
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- super.channelRegistered(ctx);
- }
- /**
- * 客户端与服务端 断连时执行 channelInactive方法之后执行
- */
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- log.warn(ctx.channel().id()+"连接断开");
- MQTTdeviceRemove(ctx.channel());
- super.channelUnregistered(ctx);
- }
- /**
- * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- Channel channel = ctx.channel();
- log.warn(channel.id()+"连接异常断开。。。。。。。");
- MQTTdeviceRemove(ctx.channel());
- super.exceptionCaught(ctx, cause);
- if(channel.isActive()){
- ctx.close();
- }
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) {
- log.debug("\n");
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- }
- /**
- * 服务端 当读超时时 会调用这个方法
- */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- super.userEventTriggered(ctx, evt);
- ctx.close();
- }
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- super.channelWritabilityChanged(ctx);
- }
- }
|