MQTTMessageHandler.java 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package com.zksy.pole.MQTTServer.handler;
  2. /**
  3. * @author ShaoYang
  4. * @date 2024/03/13 15:46
  5. */
  6. import com.zksy.pole.MQTTServer.server.MQTTServer;
  7. import io.netty.channel.*;
  8. import io.netty.handler.codec.mqtt.*;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Component;
  12. import java.io.IOException;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
  16. /**
  17. * 消息处理,单例启动
  18. *
  19. * @author ShaoYang
  20. */
  21. @Slf4j
  22. @ChannelHandler.Sharable
  23. @Component
  24. public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
  25. @Autowired
  26. private com.zksy.pole.MQTTServer.callBack.BootNettyMqttMsgBack BootNettyMqttMsgBack;
  27. @Override
  28. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  29. if (null != msg) {
  30. System.err.println(msg);
  31. MqttMessage mqttMessage = (MqttMessage) msg;
  32. log.info("info--"+mqttMessage.toString());
  33. MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
  34. Channel channel = ctx.channel();
  35. if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
  36. //用户鉴权(配置文件中配置账号和密码,如果没有默认)
  37. log.warn("正在尝试鉴权");
  38. boolean authentag= BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());
  39. if(!authentag){
  40. return;
  41. }
  42. // 在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
  43. if(MQTTServer.MQTTdeviceChannelGroup.contains(channel)){
  44. //移除次设备channel和topic
  45. BootNettyMqttMsgBack.disconnack(channel,mqttMessage);
  46. }
  47. // to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
  48. BootNettyMqttMsgBack.connack(channel, mqttMessage);
  49. }
  50. //对于没有鉴权的设备,请求不处理
  51. if(!MQTTServer.MQTTdeviceChannelGroup.contains(channel)){
  52. log.warn(channel.id()+"无鉴权操作");
  53. return;
  54. }
  55. /*Map<String,String> variableHeader = (Map<String,String>) mqttMessage.variableHeader();
  56. List<ChannelId> channelIdList = new ArrayList<>();
  57. channelIdList.add(channel.id());
  58. subscribeMap.put(variableHeader.get("topicName"),channelIdList);*/
  59. switch (mqttFixedHeader.messageType()){
  60. case PUBLISH: // 客户端发布消息
  61. // PUBACK报文是对QoS 1等级的PUBLISH报文的响应
  62. BootNettyMqttMsgBack.puback(channel, mqttMessage);
  63. break;
  64. // PUBREL Qos2级别消息,客户端返回
  65. case PUBREL:
  66. // PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应
  67. //服务端收到pubrel之后,正式将消息投递给上层应用层。
  68. MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();
  69. if(MQTTServer.mqttMessageIdMap.containsKey(VariableHeader.messageId())) {
  70. log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());
  71. BootNettyMqttMsgBack.subscribSend(MQTTServer.mqttMessageIdMap.get(VariableHeader.messageId()),channel);
  72. BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
  73. MQTTServer.mqttMessageIdMap.remove(VariableHeader.messageId());
  74. }else {
  75. //后续多次收到REL消息,制作comp响应
  76. BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
  77. }
  78. break;
  79. case SUBSCRIBE: // 客户端订阅主题
  80. // 客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
  81. // 为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
  82. // SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
  83. // to do
  84. BootNettyMqttMsgBack.suback(channel, mqttMessage);
  85. MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
  86. for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
  87. String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
  88. boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
  89. if(tag){
  90. List<ChannelId> channelIds= MQTTServer.subscribeMap.get(topicname);
  91. if(!channelIds.contains(channel.id())) {
  92. channelIds.add(channel.id());
  93. }else {
  94. log.warn(channel.id()+"重复订阅");
  95. }
  96. MQTTServer.subscribeMap.put(topicname, channelIds);
  97. }else {
  98. List<ChannelId> channelIds=new ArrayList<>();
  99. channelIds.add(channel.id());
  100. MQTTServer.subscribeMap.put(topicname,channelIds);
  101. }
  102. log.info(channel.id()+"订阅地址————》"+topicname);
  103. }
  104. break;
  105. case UNSUBSCRIBE: // 客户端取消订阅
  106. // 客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
  107. // to do
  108. BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
  109. Object Unsubscribe=mqttMessage.payload();
  110. MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;
  111. int len=unsubscribePayload.topics().size();
  112. for (int i = 0; i < len; i++) {
  113. String topicname=unsubscribePayload.topics().get(i);
  114. boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
  115. if(tag){
  116. List<ChannelId> channelIds= MQTTServer.subscribeMap.get(topicname);
  117. channelIds.remove(channel.id());
  118. MQTTServer.subscribeMap.put(topicname,channelIds);
  119. }else {
  120. log.error("不存在订阅地址——>"+topicname);
  121. }
  122. log.info(channel.id()+"取消订阅地址————》"+topicname);
  123. }
  124. break;
  125. case PINGREQ: // 客户端发起心跳
  126. // 客户端发送PINGREQ报文给服务端的
  127. // 在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
  128. // 请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
  129. BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
  130. break;
  131. case DISCONNECT: // 客户端主动断开连接
  132. log.debug("设备下线,channelId:{}", channel.id());
  133. MQTTdeviceRemove(channel);
  134. // DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
  135. // to do
  136. break;
  137. default:
  138. break;
  139. }
  140. }
  141. else {
  142. return;
  143. }
  144. }
  145. /**
  146. * 从客户端收到新的数据、读取完成时调用
  147. */
  148. @Override
  149. public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
  150. }
  151. /**
  152. * 客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
  153. */
  154. @Override
  155. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  156. super.channelRegistered(ctx);
  157. }
  158. /**
  159. * 客户端与服务端 断连时执行 channelInactive方法之后执行
  160. */
  161. @Override
  162. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  163. log.warn(ctx.channel().id()+"连接断开");
  164. MQTTdeviceRemove(ctx.channel());
  165. super.channelUnregistered(ctx);
  166. }
  167. /**
  168. * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
  169. */
  170. @Override
  171. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  172. Channel channel = ctx.channel();
  173. log.warn(channel.id()+"连接异常断开。。。。。。。");
  174. MQTTdeviceRemove(ctx.channel());
  175. super.exceptionCaught(ctx, cause);
  176. if(channel.isActive()){
  177. ctx.close();
  178. }
  179. }
  180. @Override
  181. public void channelInactive(ChannelHandlerContext ctx) {
  182. log.debug("\n");
  183. }
  184. @Override
  185. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  186. super.channelActive(ctx);
  187. }
  188. /**
  189. * 服务端 当读超时时 会调用这个方法
  190. */
  191. @Override
  192. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  193. super.userEventTriggered(ctx, evt);
  194. ctx.close();
  195. }
  196. @Override
  197. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  198. super.channelWritabilityChanged(ctx);
  199. }
  200. }