BootNettyMqttMsgBack.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package com.zksy.pole.MQTTServer.callBack;
  2. import cn.hutool.core.util.StrUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.zksy.pole.MQTTServer.config.MQTTServerProperties;
  5. import com.zksy.pole.MQTTServer.server.MQTTServer;
  6. import com.zksy.pole.factory.HandleCmdFactory;
  7. import io.netty.buffer.ByteBuf;
  8. import io.netty.buffer.Unpooled;
  9. import io.netty.channel.Channel;
  10. import io.netty.handler.codec.mqtt.*;
  11. import lombok.RequiredArgsConstructor;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.stereotype.Component;
  16. import javax.crypto.Mac;
  17. import javax.crypto.spec.SecretKeySpec;
  18. import java.nio.charset.StandardCharsets;
  19. import java.util.ArrayList;
  20. import java.util.List;
  21. import java.util.Set;
  22. import java.util.stream.Collectors;
  23. import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
  24. import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
  25. import io.netty.handler.codec.*;
  26. /**
  27. * 大黄
  28. */
  29. @Component
  30. @RequiredArgsConstructor
  31. public class BootNettyMqttMsgBack {
  32. private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
  33. private final MQTTServerProperties MQTTserverProperties;
  34. /**
  35. * 确认连接请求
  36. * @param channel
  37. * @param mqttMessage
  38. */
  39. public void connack (Channel channel, MqttMessage mqttMessage) {
  40. MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
  41. MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
  42. MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
  43. // 构建返回报文, 可变报头
  44. MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
  45. // 构建返回报文, 固定报头
  46. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  47. // 构建CONNACK消息体
  48. MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
  49. //log.info("back--"+connAck.toString());
  50. log.debug("设备上线,channelId:{}", channel.id());
  51. MQTTdeviceAdd(channel);
  52. channel.writeAndFlush(connAck);
  53. }
  54. public void disconnack (Channel channel, MqttMessage mqttMessage) {
  55. MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
  56. MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
  57. MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
  58. // 构建返回报文, 可变报头
  59. MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());
  60. // 构建返回报文, 固定报头
  61. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  62. // 构建CONNACK消息体
  63. MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
  64. //log.info("back--"+connAck.toString());
  65. channel.writeAndFlush(connAck);
  66. log.debug("设备下线,channelId:{}", channel.id());
  67. MQTTdeviceRemove(channel);
  68. }
  69. /**
  70. * 根据qos发布确认
  71. * @param channel
  72. * @param mqttMessage
  73. */
  74. @Autowired
  75. private HandleCmdFactory handleCmdFactory;
  76. public void puback (Channel channel, MqttMessage mqttMessage) throws InterruptedException {
  77. MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
  78. MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
  79. MqttQoS qos = mqttFixedHeaderInfo.qosLevel();
  80. //注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针
  81. byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
  82. mqttPublishMessage.payload().readBytes(headBytes);
  83. Object obj=mqttMessage.variableHeader();
  84. MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
  85. String topicName=variableHeader.topicName();
  86. int lastIndex = topicName.lastIndexOf('/');
  87. String lightNums = topicName.substring(lastIndex + 1);
  88. System.err.println("topicName========"+topicName);
  89. String data = new String(headBytes);
  90. log.info("收到数据-->"+data);
  91. String jsonRes = handleCmdFactory.HandleCmd(data);
  92. //重置读取的指针
  93. mqttPublishMessage.payload().resetReaderIndex();
  94. //(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输)
  95. if(StrUtil.isNotBlank(jsonRes)){
  96. switch (qos) {
  97. case AT_MOST_ONCE: // 至多一次
  98. // 构建自定义应答消息
  99. MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
  100. MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/"+lightNums, 0); // 使用0作为packetId,因为这里不需要确认
  101. byte[] customPayload = jsonRes.getBytes(StandardCharsets.UTF_8);
  102. ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
  103. MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
  104. // 发送自定义应答消息给网关
  105. if (channel != null) {
  106. channel.writeAndFlush(customMessage);
  107. } else {
  108. log.warn("网关通道未找到,无法发送应答消息");
  109. }
  110. break;
  111. // QoS 1: 发送PUBACK
  112. case AT_LEAST_ONCE: // 至少一次
  113. // 构建返回报文, 可变报头
  114. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
  115. // 构建返回报文, 固定报头
  116. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
  117. // 构建PUBACK消息体
  118. MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
  119. log.info("back--"+pubAck.toString());
  120. channel.writeAndFlush(pubAck);
  121. //推送到订阅的客户端
  122. subscribSend(mqttMessage,channel);
  123. break;
  124. // QoS 2: 发送PUBREC
  125. case EXACTLY_ONCE: // 刚好一次
  126. // 构建返回报文, 固定报头
  127. MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
  128. // 构建返回报文, 可变报头
  129. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
  130. MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
  131. //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
  132. // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
  133. //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
  134. int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
  135. if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){
  136. //不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存
  137. MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage);
  138. log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
  139. }else{
  140. //重复发送消息,直接返回
  141. log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
  142. return;
  143. }
  144. channel.writeAndFlush(mqttMessageBack);
  145. break;
  146. default:
  147. break;
  148. }
  149. }
  150. }
  151. /**
  152. * 发布完成 qos2
  153. * @param channel
  154. * @param mqttMessage
  155. */
  156. public void pubcomp (Channel channel, MqttMessage mqttMessage) {
  157. System.out.println("==========发布完成==========");
  158. MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
  159. // 构建返回报文, 固定报头
  160. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0);
  161. // 构建返回报文, 可变报头
  162. MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  163. MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
  164. //log.info("back--"+mqttMessageBack.toString());
  165. channel.writeAndFlush(mqttMessageBack);
  166. }
  167. /**
  168. * 订阅确认
  169. * @param channel
  170. * @param mqttMessage
  171. */
  172. public void suback(Channel channel, MqttMessage mqttMessage) {
  173. System.out.println("=============客户端订阅主题===================");
  174. MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
  175. MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
  176. // 构建返回报文, 可变报头
  177. MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  178. Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
  179. //log.info(topics.toString());
  180. List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
  181. for (int i = 0; i < topics.size(); i++) {
  182. grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
  183. System.out.println("topics=========================="+topics);
  184. }
  185. // 构建返回报文 有效负载
  186. MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
  187. // 构建返回报文 固定报头
  188. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
  189. // 构建返回报文 订阅确认
  190. MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
  191. channel.writeAndFlush(subAck);
  192. }
  193. /**
  194. * 取消订阅确认
  195. * @param channel
  196. * @param mqttMessage
  197. */
  198. public void unsuback(Channel channel, MqttMessage mqttMessage) {
  199. MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
  200. // 构建返回报文 可变报头
  201. MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
  202. // 构建返回报文 固定报头
  203. MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
  204. // 构建返回报文 取消订阅确认
  205. MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
  206. channel.writeAndFlush(unSubAck);
  207. }
  208. /**
  209. * 心跳响应
  210. * @param channel
  211. * @param mqttMessage
  212. */
  213. public void pingresp (Channel channel, MqttMessage mqttMessage) {
  214. System.out.println("-----------心跳响应-------------");
  215. // 心跳响应报文 11010000 00000000 固定报文
  216. MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
  217. MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
  218. channel.writeAndFlush(mqttMessageBack);
  219. }
  220. /**
  221. * 订阅推送
  222. */
  223. public void subscribSend(MqttMessage mqttMessage,Channel channel){
  224. Object obj=mqttMessage.variableHeader();
  225. MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
  226. String topicName=variableHeader.topicName();
  227. int lastIndex = topicName.lastIndexOf('/');
  228. String lightNums = topicName.substring(lastIndex + 1);
  229. //固定消息头 注意此处的消息类型PUBLISH mqtt协议
  230. MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
  231. //可变消息头
  232. MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/"+lightNums,0);
  233. JSONObject jsonObject = new JSONObject();
  234. jsonObject.put("flag","1");
  235. jsonObject.put("cmd","1111");
  236. byte[] bytes = jsonObject.toString().getBytes();
  237. ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
  238. //推送消息体
  239. MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, byteBuf);
  240. log.info("推送地址————》"+mqttPublishVariableHeader);
  241. //订阅次此topic的Mqtt客户端搜到此消息,
  242. Channel channelSub= MQTTServer.MQTTdeviceChannelGroup.find(channel.id());
  243. //writeAndFlush会将ByteBuf的引用释放,refCnt会减去1,使用retain加1
  244. if(channelSub!=null) {
  245. mqttPublishMessageResult.retain();
  246. channelSub.writeAndFlush(mqttPublishMessageResult);
  247. System.out.println("channelSub="+channelSub+"-----------mqttPublishMessageResult="+mqttPublishMessageResult);
  248. }
  249. mqttPublishMessageResult.release();
  250. }
  251. /**
  252. * 用户鉴权
  253. */
  254. public boolean authentication(MqttConnectPayload payload){
  255. //todo 这里只是使用了最直接的账号密码鉴权,这里可以进行diy,
  256. // 例如客户端用sha256算法加密(设备名(username)+系统时间戳+设备密钥(password))得到加密密钥
  257. // 服务器根据设备名和设备密钥再进行同样的操作,再比较服务端和客户端加密的密钥是否一致
  258. // 加密算法放下面了
  259. log.warn("clientId"+payload.clientIdentifier());
  260. String username=MQTTserverProperties.getUsername();
  261. String password=MQTTserverProperties.getPassword();
  262. //无账号或者无密码通过
  263. if(stringEmptyCheck(password)||stringEmptyCheck(username)){
  264. return true;
  265. }else {
  266. //消息中账号密码为空
  267. if(payload.passwordInBytes()==null||payload.userName()==null){
  268. return false;
  269. }
  270. String passwordAuthen=new String(payload.passwordInBytes());
  271. String usernameAuthen=payload.userName();
  272. log.warn("username:{},password:{}",usernameAuthen,passwordAuthen);
  273. if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
  274. return true;
  275. }else {
  276. return false;
  277. }
  278. }
  279. }
  280. /**
  281. * sha256_HMAC加密
  282. * @param message 设备名+时间戳
  283. * @param secret 设备秘钥
  284. * @return 加密密钥字符串
  285. */
  286. public String hmacSHA256(String secret, String message) throws Exception {
  287. String hash = "";
  288. Mac hmacSha256 = Mac.getInstance("HmacSHA256");
  289. SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
  290. hmacSha256.init(secret_key);
  291. byte[] bytes = hmacSha256.doFinal(message.getBytes());
  292. hash = byteArrayToHexString(bytes);
  293. return hash;
  294. }
  295. /**
  296. * 将加密后的字节数组转换成字符串
  297. *
  298. * @param b 字节数组
  299. * @return 字符串
  300. */
  301. public String byteArrayToHexString(byte[] b) {
  302. StringBuilder hs = new StringBuilder();
  303. String stmp;
  304. for (int n = 0; b!=null && n < b.length; n++) {
  305. stmp = Integer.toHexString(b[n] & 0XFF);
  306. if (stmp.length() == 1)
  307. hs.append('0');
  308. hs.append(stmp);
  309. }
  310. return hs.toString().toLowerCase();
  311. }
  312. //判断字符字符为空
  313. private boolean stringEmptyCheck(String str){
  314. if(str==null||"".equals(str)){
  315. return true;
  316. }else {
  317. return false;
  318. }
  319. }
  320. }