BootNettyMqttMsgBack.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package com.zksy.pole.MQTTServer.callBack;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.zksy.pole.MQTTServer.config.MQTTServerProperties;
  4. import com.zksy.pole.MQTTServer.server.MQTTServer;
  5. import io.netty.buffer.ByteBuf;
  6. import io.netty.buffer.Unpooled;
  7. import io.netty.channel.Channel;
  8. import io.netty.handler.codec.mqtt.*;
  9. import lombok.RequiredArgsConstructor;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.stereotype.Component;
  13. import javax.crypto.Mac;
  14. import javax.crypto.spec.SecretKeySpec;
  15. import java.nio.charset.StandardCharsets;
  16. import java.util.ArrayList;
  17. import java.util.List;
  18. import java.util.Set;
  19. import java.util.stream.Collectors;
  20. import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
  21. import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
  22. import io.netty.handler.codec.*;
  23. /**
  24. * 大黄
  25. */
  26. @Component
  27. @RequiredArgsConstructor
  28. public class BootNettyMqttMsgBack {
  29. private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
  30. private final MQTTServerProperties MQTTserverProperties;
  31. /**
  32. * 确认连接请求
  33. * @param channel
  34. * @param mqttMessage
  35. */
  36. public void connack (Channel channel, MqttMessage mqttMessage) {
  37. MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
  38. MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
  39. MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
  40. // 构建返回报文, 可变报头
  41. MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
  42. // 构建返回报文, 固定报头
  43. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  44. // 构建CONNACK消息体
  45. MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
  46. //log.info("back--"+connAck.toString());
  47. log.debug("设备上线,channelId:{}", channel.id());
  48. MQTTdeviceAdd(channel);
  49. channel.writeAndFlush(connAck);
  50. }
  51. public void disconnack (Channel channel, MqttMessage mqttMessage) {
  52. MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
  53. MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
  54. MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
  55. // 构建返回报文, 可变报头
  56. MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());
  57. // 构建返回报文, 固定报头
  58. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  59. // 构建CONNACK消息体
  60. MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
  61. //log.info("back--"+connAck.toString());
  62. channel.writeAndFlush(connAck);
  63. log.debug("设备下线,channelId:{}", channel.id());
  64. MQTTdeviceRemove(channel);
  65. }
  66. /**
  67. * 根据qos发布确认
  68. * @param channel
  69. * @param mqttMessage
  70. */
  71. public void puback (Channel channel, MqttMessage mqttMessage) throws InterruptedException {
  72. MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
  73. MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
  74. Object obj= mqttMessage.variableHeader();
  75. MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
  76. String topicName=variableHeader.topicName();
  77. System.err.println("topicName========"+topicName);
  78. MqttQoS qos = mqttFixedHeaderInfo.qosLevel();
  79. //注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针
  80. byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
  81. mqttPublishMessage.payload().readBytes(headBytes);
  82. String data = new String(headBytes);
  83. System.out.println("收到数据-->"+data);
  84. //重置读取的指针
  85. mqttPublishMessage.payload().resetReaderIndex();
  86. //(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输)
  87. switch (qos) {
  88. case AT_MOST_ONCE: // 至多一次
  89. // 构建自定义应答消息
  90. MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
  91. MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认
  92. JSONObject jsonObject = new JSONObject();
  93. jsonObject.put("flag", 1);
  94. jsonObject.put("cmd", 1111);
  95. byte[] customPayload = jsonObject.toString().getBytes(StandardCharsets.UTF_8);
  96. ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
  97. MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
  98. // 发送自定义应答消息给网关
  99. if (channel != null) {
  100. channel.writeAndFlush(customMessage);
  101. } else {
  102. log.warn("网关通道未找到,无法发送应答消息");
  103. }
  104. break;
  105. // QoS 1: 发送PUBACK
  106. case AT_LEAST_ONCE: // 至少一次
  107. // 构建返回报文, 可变报头
  108. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
  109. // 构建返回报文, 固定报头
  110. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  111. // 构建PUBACK消息体
  112. MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
  113. log.info("back--"+pubAck.toString());
  114. channel.writeAndFlush(pubAck);
  115. //推送到订阅的客户端
  116. subscribSend(mqttMessage,channel);
  117. break;
  118. // QoS 2: 发送PUBREC
  119. case EXACTLY_ONCE: // 刚好一次
  120. // 构建返回报文, 固定报头
  121. MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
  122. // 构建返回报文, 可变报头
  123. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
  124. MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
  125. //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
  126. // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
  127. //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
  128. int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
  129. if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){
  130. //不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存
  131. MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage);
  132. log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
  133. }else{
  134. //重复发送消息,直接返回
  135. log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
  136. return;
  137. }
  138. channel.writeAndFlush(mqttMessageBack);
  139. break;
  140. default:
  141. break;
  142. }
  143. }
  144. /**
  145. * 发布完成 qos2
  146. * @param channel
  147. * @param mqttMessage
  148. */
  149. public void pubcomp (Channel channel, MqttMessage mqttMessage) {
  150. System.out.println("==========发布完成==========");
  151. MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
  152. // 构建返回报文, 固定报头
  153. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0);
  154. // 构建返回报文, 可变报头
  155. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  156. MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
  157. //log.info("back--"+mqttMessageBack.toString());
  158. channel.writeAndFlush(mqttMessageBack);
  159. }
  160. /**
  161. * 订阅确认
  162. * @param channel
  163. * @param mqttMessage
  164. */
  165. public void suback(Channel channel, MqttMessage mqttMessage) {
  166. System.out.println("=============客户端订阅主题===================");
  167. MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
  168. MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
  169. // 构建返回报文, 可变报头
  170. MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  171. Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
  172. //log.info(topics.toString());
  173. List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
  174. for (int i = 0; i < topics.size(); i++) {
  175. grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
  176. System.out.println("topics=========================="+topics);
  177. }
  178. // 构建返回报文 有效负载
  179. MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
  180. // 构建返回报文 固定报头
  181. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
  182. // 构建返回报文 订阅确认
  183. MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
  184. channel.writeAndFlush(subAck);
  185. }
  186. /**
  187. * 取消订阅确认
  188. * @param channel
  189. * @param mqttMessage
  190. */
  191. public void unsuback(Channel channel, MqttMessage mqttMessage) {
  192. MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
  193. // 构建返回报文 可变报头
  194. MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  195. // 构建返回报文 固定报头
  196. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
  197. // 构建返回报文 取消订阅确认
  198. MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
  199. channel.writeAndFlush(unSubAck);
  200. }
  201. /**
  202. * 心跳响应
  203. * @param channel
  204. * @param mqttMessage
  205. */
  206. public void pingresp (Channel channel, MqttMessage mqttMessage) {
  207. System.out.println("-----------心跳响应-------------");
  208. // 心跳响应报文 11010000 00000000 固定报文
  209. MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
  210. MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
  211. channel.writeAndFlush(mqttMessageBack);
  212. }
  213. /**
  214. * 订阅推送
  215. */
  216. public void subscribSend(MqttMessage mqttMessage,Channel channel){
  217. MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
  218. Object obj=mqttMessage.variableHeader();
  219. MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
  220. String topicName=variableHeader.topicName();
  221. System.err.println("topicName========"+topicName);
  222. int packetId=variableHeader.packetId();
  223. //固定消息头 注意此处的消息类型PUBLISH mqtt协议
  224. MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
  225. //可变消息头
  226. MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/40005274",0);
  227. JSONObject jsonObject = new JSONObject();
  228. jsonObject.put("flag","1");
  229. jsonObject.put("cmd","1111");
  230. byte[] bytes = jsonObject.toString().getBytes();
  231. ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
  232. //推送消息体
  233. MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, byteBuf);
  234. log.info("推送地址————》"+mqttPublishVariableHeader);
  235. //订阅次此topic的Mqtt客户端搜到此消息,
  236. Channel channelSub= MQTTServer.MQTTdeviceChannelGroup.find(channel.id());
  237. //writeAndFlush会将ByteBuf的引用释放,refCnt会减去1,使用retain加1
  238. if(channelSub!=null) {
  239. mqttPublishMessageResult.retain();
  240. channelSub.writeAndFlush(mqttPublishMessageResult);
  241. System.out.println("channelSub="+channelSub+"-----------mqttPublishMessageResult="+mqttPublishMessageResult);
  242. }
  243. mqttPublishMessageResult.release();
  244. }
  245. /**
  246. * 用户鉴权
  247. */
  248. public boolean authentication(MqttConnectPayload payload){
  249. //todo 这里只是使用了最直接的账号密码鉴权,这里可以进行diy,
  250. // 例如客户端用sha256算法加密(设备名(username)+系统时间戳+设备密钥(password))得到加密密钥
  251. // 服务器根据设备名和设备密钥再进行同样的操作,再比较服务端和客户端加密的密钥是否一致
  252. // 加密算法放下面了
  253. log.warn("clientId"+payload.clientIdentifier());
  254. String username=MQTTserverProperties.getUsername();
  255. String password=MQTTserverProperties.getPassword();
  256. //无账号或者无密码通过
  257. if(stringEmptyCheck(password)||stringEmptyCheck(username)){
  258. return true;
  259. }else {
  260. //消息中账号密码为空
  261. if(payload.passwordInBytes()==null||payload.userName()==null){
  262. return false;
  263. }
  264. String passwordAuthen=new String(payload.passwordInBytes());
  265. String usernameAuthen=payload.userName();
  266. log.warn("username:{},password:{}",usernameAuthen,passwordAuthen);
  267. if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
  268. return true;
  269. }else {
  270. return false;
  271. }
  272. }
  273. }
  274. /**
  275. * sha256_HMAC加密
  276. * @param message 设备名+时间戳
  277. * @param secret 设备秘钥
  278. * @return 加密密钥字符串
  279. */
  280. public String hmacSHA256(String secret, String message) throws Exception {
  281. String hash = "";
  282. Mac hmacSha256 = Mac.getInstance("HmacSHA256");
  283. SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
  284. hmacSha256.init(secret_key);
  285. byte[] bytes = hmacSha256.doFinal(message.getBytes());
  286. hash = byteArrayToHexString(bytes);
  287. return hash;
  288. }
  289. /**
  290. * 将加密后的字节数组转换成字符串
  291. *
  292. * @param b 字节数组
  293. * @return 字符串
  294. */
  295. public String byteArrayToHexString(byte[] b) {
  296. StringBuilder hs = new StringBuilder();
  297. String stmp;
  298. for (int n = 0; b!=null && n < b.length; n++) {
  299. stmp = Integer.toHexString(b[n] & 0XFF);
  300. if (stmp.length() == 1)
  301. hs.append('0');
  302. hs.append(stmp);
  303. }
  304. return hs.toString().toLowerCase();
  305. }
  306. //判断字符字符为空
  307. private boolean stringEmptyCheck(String str){
  308. if(str==null||"".equals(str)){
  309. return true;
  310. }else {
  311. return false;
  312. }
  313. }
  314. }