| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- package com.zksy.pole.MQTTServer.callBack;
- import com.alibaba.fastjson.JSONObject;
- import com.zksy.pole.MQTTServer.config.MQTTServerProperties;
- import com.zksy.pole.MQTTServer.server.MQTTServer;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.Channel;
- import io.netty.handler.codec.mqtt.*;
- import lombok.RequiredArgsConstructor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
- import javax.crypto.Mac;
- import javax.crypto.spec.SecretKeySpec;
- import java.nio.charset.StandardCharsets;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Set;
- import java.util.stream.Collectors;
- import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
- import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
- import io.netty.handler.codec.*;
- /**
- * 大黄
- */
- @Component
- @RequiredArgsConstructor
- public class BootNettyMqttMsgBack {
- private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
- private final MQTTServerProperties MQTTserverProperties;
- /**
- * 确认连接请求
- * @param channel
- * @param mqttMessage
- */
- public void connack (Channel channel, MqttMessage mqttMessage) {
- MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
- MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
- MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
- // 构建返回报文, 可变报头
- MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
- // 构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
- // 构建CONNACK消息体
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
- //log.info("back--"+connAck.toString());
- log.debug("设备上线,channelId:{}", channel.id());
- MQTTdeviceAdd(channel);
- channel.writeAndFlush(connAck);
- }
- public void disconnack (Channel channel, MqttMessage mqttMessage) {
- MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
- MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
- MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
- // 构建返回报文, 可变报头
- MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());
- // 构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
- // 构建CONNACK消息体
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
- //log.info("back--"+connAck.toString());
- channel.writeAndFlush(connAck);
- log.debug("设备下线,channelId:{}", channel.id());
- MQTTdeviceRemove(channel);
- }
- /**
- * 根据qos发布确认
- * @param channel
- * @param mqttMessage
- */
- public void puback (Channel channel, MqttMessage mqttMessage) throws InterruptedException {
- MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
- MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
- Object obj= mqttMessage.variableHeader();
- MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
- String topicName=variableHeader.topicName();
- System.err.println("topicName========"+topicName);
- MqttQoS qos = mqttFixedHeaderInfo.qosLevel();
- //注意: readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置 读指针
- byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
- mqttPublishMessage.payload().readBytes(headBytes);
- String data = new String(headBytes);
- System.out.println("收到数据-->"+data);
- //重置读取的指针
- mqttPublishMessage.payload().resetReaderIndex();
- //(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输)
- switch (qos) {
- case AT_MOST_ONCE: // 至多一次
- // 构建自定义应答消息
- MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
- MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("flag", 1);
- jsonObject.put("cmd", 1111);
- byte[] customPayload = jsonObject.toString().getBytes(StandardCharsets.UTF_8);
- ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
- MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
- // 发送自定义应答消息给网关
- if (channel != null) {
- channel.writeAndFlush(customMessage);
- } else {
- log.warn("网关通道未找到,无法发送应答消息");
- }
- break;
- // QoS 1: 发送PUBACK
- case AT_LEAST_ONCE: // 至少一次
- // 构建返回报文, 可变报头
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
- // 构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
- // 构建PUBACK消息体
- MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
- log.info("back--"+pubAck.toString());
- channel.writeAndFlush(pubAck);
- //推送到订阅的客户端
- subscribSend(mqttMessage,channel);
- break;
- // QoS 2: 发送PUBREC
- case EXACTLY_ONCE: // 刚好一次
- // 构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
- // 构建返回报文, 可变报头
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
- MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
- //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
- // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
- //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
- int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
- if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){
- //不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存
- MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage);
- log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
- }else{
- //重复发送消息,直接返回
- log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
- return;
- }
- channel.writeAndFlush(mqttMessageBack);
- break;
- default:
- break;
- }
- }
- /**
- * 发布完成 qos2
- * @param channel
- * @param mqttMessage
- */
- public void pubcomp (Channel channel, MqttMessage mqttMessage) {
- System.out.println("==========发布完成==========");
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- // 构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0);
- // 构建返回报文, 可变报头
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
- MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
- //log.info("back--"+mqttMessageBack.toString());
- channel.writeAndFlush(mqttMessageBack);
- }
- /**
- * 订阅确认
- * @param channel
- * @param mqttMessage
- */
- public void suback(Channel channel, MqttMessage mqttMessage) {
- System.out.println("=============客户端订阅主题===================");
- MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
- MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
- // 构建返回报文, 可变报头
- MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
- Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
- //log.info(topics.toString());
- List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
- for (int i = 0; i < topics.size(); i++) {
- grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
- System.out.println("topics=========================="+topics);
- }
- // 构建返回报文 有效负载
- MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
- // 构建返回报文 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
- // 构建返回报文 订阅确认
- MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
- channel.writeAndFlush(subAck);
- }
- /**
- * 取消订阅确认
- * @param channel
- * @param mqttMessage
- */
- public void unsuback(Channel channel, MqttMessage mqttMessage) {
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- // 构建返回报文 可变报头
- MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
- // 构建返回报文 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
- // 构建返回报文 取消订阅确认
- MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
- channel.writeAndFlush(unSubAck);
- }
- /**
- * 心跳响应
- * @param channel
- * @param mqttMessage
- */
- public void pingresp (Channel channel, MqttMessage mqttMessage) {
- System.out.println("-----------心跳响应-------------");
- // 心跳响应报文 11010000 00000000 固定报文
- MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
- MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
- channel.writeAndFlush(mqttMessageBack);
- }
- /**
- * 订阅推送
- */
- public void subscribSend(MqttMessage mqttMessage,Channel channel){
- MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
- Object obj=mqttMessage.variableHeader();
- MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
- String topicName=variableHeader.topicName();
- System.err.println("topicName========"+topicName);
- int packetId=variableHeader.packetId();
- //固定消息头 注意此处的消息类型PUBLISH mqtt协议
- MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
- //可变消息头
- MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/40005274",0);
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("flag","1");
- jsonObject.put("cmd","1111");
- byte[] bytes = jsonObject.toString().getBytes();
- ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
- //推送消息体
- MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, byteBuf);
- log.info("推送地址————》"+mqttPublishVariableHeader);
- //订阅次此topic的Mqtt客户端搜到此消息,
- Channel channelSub= MQTTServer.MQTTdeviceChannelGroup.find(channel.id());
- //writeAndFlush会将ByteBuf的引用释放,refCnt会减去1,使用retain加1
- if(channelSub!=null) {
- mqttPublishMessageResult.retain();
- channelSub.writeAndFlush(mqttPublishMessageResult);
- System.out.println("channelSub="+channelSub+"-----------mqttPublishMessageResult="+mqttPublishMessageResult);
- }
- mqttPublishMessageResult.release();
- }
- /**
- * 用户鉴权
- */
- public boolean authentication(MqttConnectPayload payload){
- //todo 这里只是使用了最直接的账号密码鉴权,这里可以进行diy,
- // 例如客户端用sha256算法加密(设备名(username)+系统时间戳+设备密钥(password))得到加密密钥
- // 服务器根据设备名和设备密钥再进行同样的操作,再比较服务端和客户端加密的密钥是否一致
- // 加密算法放下面了
- log.warn("clientId"+payload.clientIdentifier());
- String username=MQTTserverProperties.getUsername();
- String password=MQTTserverProperties.getPassword();
- //无账号或者无密码通过
- if(stringEmptyCheck(password)||stringEmptyCheck(username)){
- return true;
- }else {
- //消息中账号密码为空
- if(payload.passwordInBytes()==null||payload.userName()==null){
- return false;
- }
- String passwordAuthen=new String(payload.passwordInBytes());
- String usernameAuthen=payload.userName();
- log.warn("username:{},password:{}",usernameAuthen,passwordAuthen);
- if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
- return true;
- }else {
- return false;
- }
- }
- }
- /**
- * sha256_HMAC加密
- * @param message 设备名+时间戳
- * @param secret 设备秘钥
- * @return 加密密钥字符串
- */
- public String hmacSHA256(String secret, String message) throws Exception {
- String hash = "";
- Mac hmacSha256 = Mac.getInstance("HmacSHA256");
- SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
- hmacSha256.init(secret_key);
- byte[] bytes = hmacSha256.doFinal(message.getBytes());
- hash = byteArrayToHexString(bytes);
- return hash;
- }
- /**
- * 将加密后的字节数组转换成字符串
- *
- * @param b 字节数组
- * @return 字符串
- */
- public String byteArrayToHexString(byte[] b) {
- StringBuilder hs = new StringBuilder();
- String stmp;
- for (int n = 0; b!=null && n < b.length; n++) {
- stmp = Integer.toHexString(b[n] & 0XFF);
- if (stmp.length() == 1)
- hs.append('0');
- hs.append(stmp);
- }
- return hs.toString().toLowerCase();
- }
- //判断字符字符为空
- private boolean stringEmptyCheck(String str){
- if(str==null||"".equals(str)){
- return true;
- }else {
- return false;
- }
- }
- }
|