package com.zksy.pole.MQTTServer.callBack; import com.alibaba.fastjson.JSONObject; import com.zksy.pole.MQTTServer.config.MQTTServerProperties; import com.zksy.pole.MQTTServer.server.MQTTServer; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.*; import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd; import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove; import io.netty.handler.codec.*; /** * 大黄 */ @Component @RequiredArgsConstructor public class BootNettyMqttMsgBack { private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class); private final MQTTServerProperties MQTTserverProperties; /** * 确认连接请求 * @param channel * @param mqttMessage */ public void connack (Channel channel, MqttMessage mqttMessage) { MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); // 构建返回报文, 可变报头 MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession()); // 构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); // 构建CONNACK消息体 MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); //log.info("back--"+connAck.toString()); log.debug("设备上线,channelId:{}", channel.id()); MQTTdeviceAdd(channel); channel.writeAndFlush(connAck); } public void disconnack (Channel channel, MqttMessage mqttMessage) { MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); // 构建返回报文, 可变报头 MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession()); // 构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); // 构建CONNACK消息体 MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); //log.info("back--"+connAck.toString()); channel.writeAndFlush(connAck); log.debug("设备下线,channelId:{}", channel.id()); MQTTdeviceRemove(channel); } /** * 根据qos发布确认 * @param channel * @param mqttMessage */ public void puback (Channel channel, MqttMessage mqttMessage) throws InterruptedException { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); Object obj= mqttMessage.variableHeader(); MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj; String topicName=variableHeader.topicName(); System.err.println("topicName========"+topicName); MqttQoS qos = mqttFixedHeaderInfo.qosLevel(); //注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针 byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()]; mqttPublishMessage.payload().readBytes(headBytes); String data = new String(headBytes); System.out.println("收到数据-->"+data); //重置读取的指针 mqttPublishMessage.payload().resetReaderIndex(); //(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输) switch (qos) { case AT_MOST_ONCE: // 至多一次 // 构建自定义应答消息 MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02); MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认 JSONObject jsonObject = new JSONObject(); jsonObject.put("flag", 1); jsonObject.put("cmd", 1111); byte[] customPayload = jsonObject.toString().getBytes(StandardCharsets.UTF_8); ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload); MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf); // 发送自定义应答消息给网关 if (channel != null) { channel.writeAndFlush(customMessage); } else { log.warn("网关通道未找到,无法发送应答消息"); } break; // QoS 1: 发送PUBACK case AT_LEAST_ONCE: // 至少一次 // 构建返回报文, 可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); // 构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); // 构建PUBACK消息体 MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); log.info("back--"+pubAck.toString()); channel.writeAndFlush(pubAck); //推送到订阅的客户端 subscribSend(mqttMessage,channel); break; // QoS 2: 发送PUBREC case EXACTLY_ONCE: // 刚好一次 // 构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02); // 构建返回报文, 可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2); //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态, // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。 //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。 int mqttMessageId=mqttPublishMessage.variableHeader().packetId(); if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){ //不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存 MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage); log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存"); }else{ //重复发送消息,直接返回 log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup()); return; } channel.writeAndFlush(mqttMessageBack); break; default: break; } } /** * 发布完成 qos2 * @param channel * @param mqttMessage */ public void pubcomp (Channel channel, MqttMessage mqttMessage) { System.out.println("==========发布完成=========="); MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); // 构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0); // 构建返回报文, 可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack); //log.info("back--"+mqttMessageBack.toString()); channel.writeAndFlush(mqttMessageBack); } /** * 订阅确认 * @param channel * @param mqttMessage */ public void suback(Channel channel, MqttMessage mqttMessage) { System.out.println("=============客户端订阅主题==================="); MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); // 构建返回报文, 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); Set topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet()); //log.info(topics.toString()); List grantedQoSLevels = new ArrayList<>(topics.size()); for (int i = 0; i < topics.size(); i++) { grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value()); System.out.println("topics=========================="+topics); } // 构建返回报文 有效负载 MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels); // 构建返回报文 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size()); // 构建返回报文 订阅确认 MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack); channel.writeAndFlush(subAck); } /** * 取消订阅确认 * @param channel * @param mqttMessage */ public void unsuback(Channel channel, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); // 构建返回报文 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); // 构建返回报文 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); // 构建返回报文 取消订阅确认 MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack); channel.writeAndFlush(unSubAck); } /** * 心跳响应 * @param channel * @param mqttMessage */ public void pingresp (Channel channel, MqttMessage mqttMessage) { System.out.println("-----------心跳响应-------------"); // 心跳响应报文 11010000 00000000 固定报文 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage mqttMessageBack = new MqttMessage(fixedHeader); channel.writeAndFlush(mqttMessageBack); } /** * 订阅推送 */ public void subscribSend(MqttMessage mqttMessage,Channel channel){ MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; Object obj=mqttMessage.variableHeader(); MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj; String topicName=variableHeader.topicName(); System.err.println("topicName========"+topicName); int packetId=variableHeader.packetId(); //固定消息头 注意此处的消息类型PUBLISH mqtt协议 MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0); //可变消息头 MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/40005274",0); JSONObject jsonObject = new JSONObject(); jsonObject.put("flag","1"); jsonObject.put("cmd","1111"); byte[] bytes = jsonObject.toString().getBytes(); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); //推送消息体 MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, byteBuf); log.info("推送地址————》"+mqttPublishVariableHeader); //订阅次此topic的Mqtt客户端搜到此消息, Channel channelSub= MQTTServer.MQTTdeviceChannelGroup.find(channel.id()); //writeAndFlush会将ByteBuf的引用释放,refCnt会减去1,使用retain加1 if(channelSub!=null) { mqttPublishMessageResult.retain(); channelSub.writeAndFlush(mqttPublishMessageResult); System.out.println("channelSub="+channelSub+"-----------mqttPublishMessageResult="+mqttPublishMessageResult); } mqttPublishMessageResult.release(); } /** * 用户鉴权 */ public boolean authentication(MqttConnectPayload payload){ //todo 这里只是使用了最直接的账号密码鉴权,这里可以进行diy, // 例如客户端用sha256算法加密(设备名(username)+系统时间戳+设备密钥(password))得到加密密钥 // 服务器根据设备名和设备密钥再进行同样的操作,再比较服务端和客户端加密的密钥是否一致 // 加密算法放下面了 log.warn("clientId"+payload.clientIdentifier()); String username=MQTTserverProperties.getUsername(); String password=MQTTserverProperties.getPassword(); //无账号或者无密码通过 if(stringEmptyCheck(password)||stringEmptyCheck(username)){ return true; }else { //消息中账号密码为空 if(payload.passwordInBytes()==null||payload.userName()==null){ return false; } String passwordAuthen=new String(payload.passwordInBytes()); String usernameAuthen=payload.userName(); log.warn("username:{},password:{}",usernameAuthen,passwordAuthen); if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){ return true; }else { return false; } } } /** * sha256_HMAC加密 * @param message 设备名+时间戳 * @param secret 设备秘钥 * @return 加密密钥字符串 */ public String hmacSHA256(String secret, String message) throws Exception { String hash = ""; Mac hmacSha256 = Mac.getInstance("HmacSHA256"); SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256"); hmacSha256.init(secret_key); byte[] bytes = hmacSha256.doFinal(message.getBytes()); hash = byteArrayToHexString(bytes); return hash; } /** * 将加密后的字节数组转换成字符串 * * @param b 字节数组 * @return 字符串 */ public String byteArrayToHexString(byte[] b) { StringBuilder hs = new StringBuilder(); String stmp; for (int n = 0; b!=null && n < b.length; n++) { stmp = Integer.toHexString(b[n] & 0XFF); if (stmp.length() == 1) hs.append('0'); hs.append(stmp); } return hs.toString().toLowerCase(); } //判断字符字符为空 private boolean stringEmptyCheck(String str){ if(str==null||"".equals(str)){ return true; }else { return false; } } }