邵洋 1 年間 前
コミット
22d1ffed12

+ 15 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/Fliter/TopicFliter.java

@@ -0,0 +1,15 @@
+package com.zksy.pole.MQTTServer.Fliter;
+
+/**通配符 使用Trie树数据构建
+ * @author ShaoYang
+ * @date 2024/03/13 17:17
+ */
+public class TopicFliter {
+    //测试
+    public static void main(String[] args) {
+        Trie trie=new Trie();
+
+
+    }
+
+}

+ 59 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/Fliter/Trie.java

@@ -0,0 +1,59 @@
+package com.zksy.pole.MQTTServer.Fliter;
+
+/**定义Tire树数据结构
+ * @author ShaoYang
+ * @date 2024/03/13 17:38
+ */
+//字典树的java实现
+public class Trie {
+    private TrieNode root;
+
+    Trie() {
+        root = new TrieNode();
+    }
+
+    public void insert(String word) {
+        TrieNode current = root;
+        current = current.getChildren().computeIfAbsent(word, c -> new TrieNode());
+        current.setEndOfWord(true);
+    }
+
+    boolean containsNode(String word) {
+        TrieNode current = root;
+
+        for (int i = 0; i < word.length(); i++) {
+            char ch = word.charAt(i);
+            TrieNode node = current.getChildren().get(ch);
+            if (node == null) {
+                return false;
+            }
+            current = node;
+        }
+        return current.isEndOfWord();
+    }
+
+    private boolean delete(TrieNode current, String word, int index) {
+        if (index == word.length()) {
+            if (!current.isEndOfWord()) {
+                return false;
+            }
+            current.setEndOfWord(false);
+            return current.getChildren().isEmpty();
+        }
+        char ch = word.charAt(index);
+        TrieNode node = current.getChildren().get(ch);
+        if (node == null) {
+            return false;
+        }
+        boolean shouldDeleteCurrentNode = delete(node, word, index + 1) && !node.isEndOfWord();
+
+        if (shouldDeleteCurrentNode) {
+            current.getChildren().remove(ch);
+            return current.getChildren().isEmpty();
+        }
+        return false;
+    }
+
+}
+
+

+ 28 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/Fliter/TrieNode.java

@@ -0,0 +1,28 @@
+package com.zksy.pole.MQTTServer.Fliter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author ShaoYang
+ * @date 2024/03/13 17:55
+ */
+
+class TrieNode {
+    //key 为订阅的节点 ,例如订阅地址,testA/testB/testc  testA、testB、testc都算作某个订阅节点
+    private final Map<String, TrieNode> children = new HashMap<>();
+    private boolean endOfWord;
+
+    Map<String, TrieNode> getChildren() {
+        return children;
+    }
+
+    boolean isEndOfWord() {
+        return endOfWord;
+    }
+
+    void setEndOfWord(boolean endOfWord) {
+        this.endOfWord = endOfWord;
+    }
+}
+

+ 329 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/callBack/BootNettyMqttMsgBack.java

@@ -0,0 +1,329 @@
+package com.zksy.pole.MQTTServer.callBack;
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.zksy.pole.MQTTServer.config.MQTTServerProperties;
+import com.zksy.pole.MQTTServer.server.MQTTServer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.mqtt.*;
+import lombok.RequiredArgsConstructor;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
+import static com.zksy.pole.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
+import io.netty.handler.codec.*;
+/**
+ * 大黄
+ */
+@Component
+@RequiredArgsConstructor
+public class BootNettyMqttMsgBack {
+
+	private static final Logger log =  LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
+	private final MQTTServerProperties MQTTserverProperties;
+
+	/**
+	 * 	确认连接请求
+	 * @param channel
+	 * @param mqttMessage
+	 */
+	public void connack (Channel channel, MqttMessage mqttMessage) {
+		MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
+		MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
+		MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
+
+		//	构建返回报文, 可变报头
+		MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
+		//	构建返回报文, 固定报头
+		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
+		//	构建CONNACK消息体
+		MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
+		//log.info("back--"+connAck.toString());
+		log.debug("设备上线,channelId:{}", channel.id());
+		MQTTdeviceAdd(channel);
+		channel.writeAndFlush(connAck);
+	}
+	public void disconnack (Channel channel, MqttMessage mqttMessage) {
+		MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
+		MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
+		MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
+		//	构建返回报文, 可变报头
+		MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());
+		//	构建返回报文, 固定报头
+		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
+		//	构建CONNACK消息体
+		MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
+		//log.info("back--"+connAck.toString());
+		channel.writeAndFlush(connAck);
+		log.debug("设备下线,channelId:{}", channel.id());
+		MQTTdeviceRemove(channel);
+	}
+
+	/**
+	 * 	根据qos发布确认
+	 * @param channel
+	 * @param mqttMessage
+	 */
+	public  void puback (Channel channel, MqttMessage mqttMessage) throws InterruptedException {
+		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
+		MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
+		MqttQoS qos =  mqttFixedHeaderInfo.qosLevel();
+		//注意:	readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置	读指针
+        byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
+        mqttPublishMessage.payload().readBytes(headBytes);
+        String data = new String(headBytes);
+        System.out.println("收到数据-->"+data);
+		//重置读取的指针
+		mqttPublishMessage.payload().resetReaderIndex();
+		//(qos= 0最多一次的传输,1至少一次的传输,2: 只有一次的传输)
+        switch (qos) {
+	        case AT_MOST_ONCE: 		//	至多一次
+				// 构建自定义应答消息
+				MqttFixedHeader customFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0x02);
+				MqttPublishVariableHeader customVariableHeader = new MqttPublishVariableHeader("down/light/ack/40005274", 0); // 使用0作为packetId,因为这里不需要确认
+				JSONObject jsonObject = new JSONObject();
+				jsonObject.put("flag", 1);
+				jsonObject.put("cmd", 1111);
+				byte[] customPayload = jsonObject.toString().getBytes(StandardCharsets.UTF_8);
+				ByteBuf customByteBuf = Unpooled.wrappedBuffer(customPayload);
+				MqttPublishMessage customMessage = new MqttPublishMessage(customFixedHeader, customVariableHeader, customByteBuf);
+
+				// 发送自定义应答消息给网关
+				if (channel != null) {
+					channel.writeAndFlush(customMessage);
+				} else {
+					log.warn("网关通道未找到,无法发送应答消息");
+				}
+	            break;
+			// QoS 1: 发送PUBACK
+	        case AT_LEAST_ONCE:		//	至少一次
+	    		//	构建返回报文, 可变报头
+	    		MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
+	    		//	构建返回报文, 固定报头
+	    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
+	    		//	构建PUBACK消息体
+	    		MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
+	    		log.info("back--"+pubAck.toString());
+	    		channel.writeAndFlush(pubAck);
+				//推送到订阅的客户端
+				subscribSend(mqttMessage,channel);
+	            break;
+			// QoS 2: 发送PUBREC
+	        case EXACTLY_ONCE:		//	刚好一次
+	            //	构建返回报文, 固定报头
+	        	MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
+	            //	构建返回报文, 可变报头
+	            MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
+	            MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
+				//服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
+				// 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
+				//接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
+				int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
+				if(!MQTTServer.mqttMessageIdMap.containsKey(mqttMessageId)){
+					//不存在此消息,将此消息暂存 //todo 这里可以换成redis做缓存
+					MQTTServer.mqttMessageIdMap.put(mqttMessageId, mqttMessage);
+					log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
+				}else{
+					//重复发送消息,直接返回
+					log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
+					return;
+				}
+
+	    		channel.writeAndFlush(mqttMessageBack);
+	            break;
+			default:
+				break;
+        }
+	}
+
+	/**
+	 * 	发布完成 qos2
+	 * @param channel
+	 * @param mqttMessage
+	 */
+	public  void pubcomp (Channel channel, MqttMessage mqttMessage) {
+		System.out.println("==========发布完成==========");
+        MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
+        //	构建返回报文, 固定报头
+    	MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0);
+        //	构建返回报文, 可变报头
+        MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
+        MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
+		//log.info("back--"+mqttMessageBack.toString());
+		channel.writeAndFlush(mqttMessageBack);
+	}
+
+	/**
+	 * 	订阅确认
+	 * @param channel
+	 * @param mqttMessage
+	 */
+	public  void suback(Channel channel, MqttMessage mqttMessage) {
+		System.out.println("=============客户端订阅主题===================");
+		MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
+		MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
+		//	构建返回报文, 可变报头
+		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
+		Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
+		//log.info(topics.toString());
+		List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
+		for (int i = 0; i < topics.size(); i++) {
+			grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
+			System.out.println("topics=========================="+topics);
+		}
+		//	构建返回报文	有效负载
+		MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
+		//	构建返回报文	固定报头
+		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
+		//	构建返回报文	订阅确认
+		MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
+		channel.writeAndFlush(subAck);
+	}
+
+	/**
+	 * 	取消订阅确认
+	 * @param channel
+	 * @param mqttMessage
+	 */
+	public  void unsuback(Channel channel, MqttMessage mqttMessage) {
+		MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
+		//	构建返回报文	可变报头
+		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
+		//	构建返回报文	固定报头
+		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
+		//	构建返回报文	取消订阅确认
+		MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
+		channel.writeAndFlush(unSubAck);
+	}
+
+	/**
+	 * 	心跳响应
+	 * @param channel
+	 * @param mqttMessage
+	 */
+	public  void pingresp (Channel channel, MqttMessage mqttMessage) {
+		System.out.println("-----------心跳响应-------------");
+		//	心跳响应报文	11010000 00000000  固定报文
+		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
+		MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
+		channel.writeAndFlush(mqttMessageBack);
+	}
+
+	/**
+	 * 订阅推送
+	 */
+	public  void subscribSend(MqttMessage mqttMessage,Channel channel){
+		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
+		Object obj=mqttMessage.variableHeader();
+		MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
+		String topicName=variableHeader.topicName();
+		System.err.println("topicName========"+topicName);
+		int packetId=variableHeader.packetId();
+		//固定消息头 注意此处的消息类型PUBLISH mqtt协议
+		MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_MOST_ONCE,false,0);
+		//可变消息头
+		MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader("down/light/ack/40005274",0);
+		JSONObject jsonObject = new JSONObject();
+		jsonObject.put("flag","1");
+		jsonObject.put("cmd","1111");
+		byte[] bytes = jsonObject.toString().getBytes();
+		ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
+		//推送消息体
+		MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, byteBuf);
+		log.info("推送地址————》"+mqttPublishVariableHeader);
+				//订阅次此topic的Mqtt客户端搜到此消息,
+				Channel channelSub= MQTTServer.MQTTdeviceChannelGroup.find(channel.id());
+				//writeAndFlush会将ByteBuf的引用释放,refCnt会减去1,使用retain加1
+				if(channelSub!=null) {
+					mqttPublishMessageResult.retain();
+					channelSub.writeAndFlush(mqttPublishMessageResult);
+					System.out.println("channelSub="+channelSub+"-----------mqttPublishMessageResult="+mqttPublishMessageResult);
+				}
+			mqttPublishMessageResult.release();
+	}
+
+	/**
+	 * 用户鉴权
+	 */
+	public boolean authentication(MqttConnectPayload payload){
+		//todo 这里只是使用了最直接的账号密码鉴权,这里可以进行diy,
+		// 例如客户端用sha256算法加密(设备名(username)+系统时间戳+设备密钥(password))得到加密密钥
+		// 服务器根据设备名和设备密钥再进行同样的操作,再比较服务端和客户端加密的密钥是否一致
+		// 加密算法放下面了
+		log.warn("clientId"+payload.clientIdentifier());
+		String username=MQTTserverProperties.getUsername();
+		String password=MQTTserverProperties.getPassword();
+		//无账号或者无密码通过
+		if(stringEmptyCheck(password)||stringEmptyCheck(username)){
+			return true;
+		}else {
+			//消息中账号密码为空
+			if(payload.passwordInBytes()==null||payload.userName()==null){
+				return false;
+			}
+			String passwordAuthen=new String(payload.passwordInBytes());
+			String usernameAuthen=payload.userName();
+			log.warn("username:{},password:{}",usernameAuthen,passwordAuthen);
+			if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
+				return true;
+			}else {
+				return false;
+			}
+		}
+	}
+	/**
+	 * sha256_HMAC加密
+	 * @param message 设备名+时间戳
+	 * @param secret  设备秘钥
+	 * @return 加密密钥字符串
+	 */
+	public String hmacSHA256(String secret, String message) throws Exception {
+		String hash = "";
+		Mac hmacSha256 = Mac.getInstance("HmacSHA256");
+		SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
+		hmacSha256.init(secret_key);
+		byte[] bytes = hmacSha256.doFinal(message.getBytes());
+		hash = byteArrayToHexString(bytes);
+		return hash;
+	}
+	/**
+	 * 将加密后的字节数组转换成字符串
+	 *
+	 * @param b 字节数组
+	 * @return 字符串
+	 */
+	public  String byteArrayToHexString(byte[] b) {
+		StringBuilder hs = new StringBuilder();
+		String stmp;
+		for (int n = 0; b!=null && n < b.length; n++) {
+			stmp = Integer.toHexString(b[n] & 0XFF);
+			if (stmp.length() == 1)
+				hs.append('0');
+			hs.append(stmp);
+		}
+		return hs.toString().toLowerCase();
+	}
+
+	//判断字符字符为空
+	private boolean stringEmptyCheck(String str){
+		if(str==null||"".equals(str)){
+			return true;
+		}else {
+			return false;
+		}
+	}
+
+}

+ 48 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/channel/MQTTDeviceManerger.java

@@ -0,0 +1,48 @@
+package com.zksy.pole.MQTTServer.channel;
+
+import com.zksy.pole.MQTTServer.server.MQTTServer;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author ShaoYang
+ * @date 2024/03/13 16:53
+ */
+@Slf4j
+public class MQTTDeviceManerger {
+    /**
+     * 设备接入
+     */
+    public static void MQTTdeviceAdd(Channel channel) {
+        if(!MQTTServer.MQTTdeviceChannelGroup.contains(channel)) {
+            MQTTServer.MQTTdeviceChannelGroup.add(channel);
+        }
+    }
+    /**
+     * 设备移除和和订阅的topic
+     */
+    public static void MQTTdeviceRemove(Channel channel) {
+        if(MQTTServer.MQTTdeviceChannelGroup.contains(channel)) {
+            MQTTServer.MQTTdeviceChannelGroup.remove(channel);
+            MQTTremoveDeviceChannelId(channel.id());
+            //移除topic中的这个设备的chanelid
+            for (Map.Entry<String, List<ChannelId>> listEntry : MQTTServer.subscribeMap.entrySet()) {
+                try {
+                    if (listEntry.getValue().contains(channel.id())) {
+                        listEntry.getValue().remove(channel.id());
+                        log.info(channel.id() + "下线,topic:  " + listEntry.getKey() + "中移除此id");
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+    public static void MQTTremoveDeviceChannelId(ChannelId channelId) {
+        MQTTServer.MQTTdeviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
+    }
+}

+ 38 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/channel/MqttChannelInit.java

@@ -0,0 +1,38 @@
+package com.zksy.pole.MQTTServer.channel;
+
+/**
+ * 通道初始化
+ * @author ShaoYang
+ * @date 2024/03/13 15:51
+ */
+
+import com.zksy.supernetty.MQTTServer.handler.MQTTMessageHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttEncoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+@Component
+@RequiredArgsConstructor
+public class MqttChannelInit extends ChannelInitializer<SocketChannel> {
+    @Autowired
+    private  MQTTMessageHandler MQTTmessageHandler;
+
+    @Override
+    protected void initChannel(SocketChannel channel) {
+        channel.pipeline()
+                // 心跳时间
+                .addLast("idle", new IdleStateHandler(600, 600, 1200, TimeUnit.SECONDS))
+                .addLast("encoder", MqttEncoder.INSTANCE)
+                .addLast("decoder", new MqttDecoder())
+                .addLast(MQTTmessageHandler);
+    }
+
+}
+

+ 38 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/config/MQTTServerProperties.java

@@ -0,0 +1,38 @@
+package com.zksy.pole.MQTTServer.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 读取YML中的服务配置
+ *
+ * /**
+ *  * @author ShaoYang
+ *  * @date 2024/03/13 15:44
+ *
+ */
+@Configuration
+@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
+@Data
+public class MQTTServerProperties {
+
+    public static final String MQTTPREFIX = "netty.mqtt";
+
+    /**
+     * 服务器端口
+     */
+    private Integer port;
+
+    /**
+     * mqtt服务器用户名
+     */
+    private String username;
+
+    /**
+     * mqtt服务器密码
+     */
+    private String password;
+
+
+}

+ 218 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/handler/MQTTMessageHandler.java

@@ -0,0 +1,218 @@
+package com.zksy.pole.MQTTServer.handler;
+
+/**
+ * @author ShaoYang
+ * @date 2024/03/13 15:46
+ */
+
+import com.zksy.pole.MQTTServer.server.MQTTServer;
+import io.netty.channel.*;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * 消息处理,单例启动
+ *
+ * @author ShaoYang
+ */
+@Slf4j
+@ChannelHandler.Sharable
+@Component
+public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
+    @Autowired
+    private com.zksy.pole.MQTTServer.callBack.BootNettyMqttMsgBack BootNettyMqttMsgBack;
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (null != msg) {
+            System.err.println(msg);
+            MqttMessage mqttMessage = (MqttMessage) msg;
+            log.info("info--"+mqttMessage.toString());
+            MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
+            Channel channel = ctx.channel();
+            if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
+                //用户鉴权(配置文件中配置账号和密码,如果没有默认)
+                log.warn("正在尝试鉴权");
+               boolean authentag=  BootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());
+               if(!authentag){
+                    return;
+               }
+                //	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
+                if(MQTTServer.MQTTdeviceChannelGroup.contains(channel)){
+                    //移除次设备channel和topic
+                    BootNettyMqttMsgBack.disconnack(channel,mqttMessage);
+                }
+                //	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
+                BootNettyMqttMsgBack.connack(channel, mqttMessage);
+            }
+            //对于没有鉴权的设备,请求不处理
+            if(!MQTTServer.MQTTdeviceChannelGroup.contains(channel)){
+                log.warn(channel.id()+"无鉴权操作");
+                return;
+            }
+            /*Map<String,String> variableHeader = (Map<String,String>) mqttMessage.variableHeader();
+            List<ChannelId> channelIdList = new ArrayList<>();
+            channelIdList.add(channel.id());
+            subscribeMap.put(variableHeader.get("topicName"),channelIdList);*/
+            switch (mqttFixedHeader.messageType()){
+                case PUBLISH:		//	客户端发布消息
+                    //	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
+                    BootNettyMqttMsgBack.puback(channel, mqttMessage);
+                    MQTTServer.subscribeMap.clear();
+                    break;
+                // PUBREL	Qos2级别消息,客户端返回
+                case PUBREL:
+                    //	PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应
+                    //服务端收到pubrel之后,正式将消息投递给上层应用层。
+                    MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();
+                    if(MQTTServer.mqttMessageIdMap.containsKey(VariableHeader.messageId())) {
+                        log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());
+                        BootNettyMqttMsgBack.subscribSend(MQTTServer.mqttMessageIdMap.get(VariableHeader.messageId()),channel);
+                        BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
+                        MQTTServer.mqttMessageIdMap.remove(VariableHeader.messageId());
+                    }else {
+                        //后续多次收到REL消息,制作comp响应
+                        BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
+                    }
+                    break;
+                case SUBSCRIBE:		//	客户端订阅主题
+                    //	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
+                    //	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
+                    //	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
+                    // 	to do
+                    BootNettyMqttMsgBack.suback(channel, mqttMessage);
+                    MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
+                    for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
+                        String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
+                        boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
+                        if(tag){
+                            List<ChannelId> channelIds= MQTTServer.subscribeMap.get(topicname);
+                            if(!channelIds.contains(channel.id())) {
+                                channelIds.add(channel.id());
+                            }else {
+                                log.warn(channel.id()+"重复订阅");
+                            }
+                            MQTTServer.subscribeMap.put(topicname, channelIds);
+                        }else {
+                            List<ChannelId> channelIds=new ArrayList<>();
+                            channelIds.add(channel.id());
+                            MQTTServer.subscribeMap.put(topicname,channelIds);
+                        }
+                        log.info(channel.id()+"订阅地址————》"+topicname);
+                    }
+
+
+                    break;
+                case UNSUBSCRIBE:	//	客户端取消订阅
+                    //	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
+                    //	to do
+                    BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
+                    Object Unsubscribe=mqttMessage.payload();
+                    MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;
+                    int len=unsubscribePayload.topics().size();
+                    for (int i = 0; i < len; i++) {
+                        String topicname=unsubscribePayload.topics().get(i);
+                        boolean tag= MQTTServer.subscribeMap.containsKey(topicname);
+                        if(tag){
+                            List<ChannelId> channelIds= MQTTServer.subscribeMap.get(topicname);
+                            channelIds.remove(channel.id());
+                            MQTTServer.subscribeMap.put(topicname,channelIds);
+                        }else {
+                           log.error("不存在订阅地址——>"+topicname);
+                        }
+                        log.info(channel.id()+"取消订阅地址————》"+topicname);
+                    }
+
+                    break;
+                case PINGREQ:		//	客户端发起心跳
+                    //	客户端发送PINGREQ报文给服务端的
+                    //	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
+                    //	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
+                    BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
+                    break;
+                case DISCONNECT:	//	客户端主动断开连接
+                    log.debug("设备下线,channelId:{}", channel.id());
+                    MQTTdeviceRemove(channel);
+                    //	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
+                    //	to do
+                    break;
+                default:
+                    break;
+            }
+        }
+        else {
+            return;
+        }
+    }
+
+    /**
+     * 	从客户端收到新的数据、读取完成时调用
+     */
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
+    }
+
+    /**
+     * 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
+     */
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+        super.channelRegistered(ctx);
+    }
+
+    /**
+     * 	客户端与服务端 断连时执行 channelInactive方法之后执行
+     */
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+        log.warn(ctx.channel().id()+"连接断开");
+        MQTTdeviceRemove(ctx.channel());
+        super.channelUnregistered(ctx);
+
+    }
+
+    /**
+     * 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        Channel channel = ctx.channel();
+        log.warn(channel.id()+"连接异常断开。。。。。。。");
+        MQTTdeviceRemove(ctx.channel());
+        super.exceptionCaught(ctx, cause);
+        if(channel.isActive()){
+            ctx.close();
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        log.debug("\n");
+
+
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+    }
+    /**
+     * 	服务端 当读超时时 会调用这个方法
+     */
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        super.userEventTriggered(ctx, evt);
+        ctx.close();
+    }
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+        super.channelWritabilityChanged(ctx);
+    }
+
+}

+ 27 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/server/IMQTTServer.java

@@ -0,0 +1,27 @@
+package com.zksy.pole.MQTTServer.server;
+
+import javax.annotation.PreDestroy;
+
+/**
+ * @author ShaoYang
+ * @date 2024/03/13 15:53
+ */
+public interface IMQTTServer {
+
+    /**
+     * 主启动程序,初始化参数
+     *
+     * @throws Exception 初始化异常
+     */
+    void start() throws Exception;
+
+    /**
+     * 优雅的结束服务器
+     *
+     * @throws InterruptedException 提前中断异常
+     */
+    @PreDestroy
+    void destroy() throws InterruptedException;
+}
+
+

+ 94 - 0
pole-service/src/main/java/com/zksy/pole/MQTTServer/server/MQTTServer.java

@@ -0,0 +1,94 @@
+package com.zksy.pole.MQTTServer.server;
+
+
+import com.zksy.pole.MQTTServer.channel.MqttChannelInit;
+import com.zksy.pole.MQTTServer.config.MQTTServerProperties;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 启动 Server
+ *
+ */
+@Component
+@Slf4j
+public class MQTTServer implements IMQTTServer {
+    @Autowired
+    private MqttChannelInit mqttChannelInit;
+    @Autowired
+    private MQTTServerProperties MQTTserverProperties;
+
+    //保存接入的MQTT设备channel
+    public static ChannelGroup MQTTdeviceChannelGroup;
+    //保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据
+    public static ConcurrentHashMap<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();
+    //保存设备名称和通道编号,向设备发送消息可以通过名称找到通道
+    public static Map<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();
+    //存放Qos2消息等级的消息ID,这里可使用redis之类的工具做持久化,为了简化开发,使用map暂存
+    public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    @Override
+    public void start() {
+        log.info("初始化 Mqttserver ...");
+        bossGroup = new NioEventLoopGroup();
+        workerGroup =  new NioEventLoopGroup();
+        MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+        this.MqttServer();
+    }
+    /**
+     * 初始化
+     */
+    private void MqttServer() {
+        try {
+            new ServerBootstrap()
+                    .group(bossGroup, workerGroup)
+                    .channel( NioServerSocketChannel.class )
+                    .localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))
+                    // 配置 编码器、解码器、业务处理
+                    .childHandler(mqttChannelInit)
+                    // tcp缓冲区
+                    .option(ChannelOption.SO_BACKLOG, 128)
+                    // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
+                    .childOption(ChannelOption.TCP_NODELAY, true)
+                    // 保持长连接
+                    .childOption(ChannelOption.SO_KEEPALIVE, false)
+                    // 绑定端口,开始接收进来的连接
+                    .bind().sync();
+            log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());
+        } catch (Exception e) {
+            e.printStackTrace();
+            bossGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+    /**
+     * 销毁
+     */
+    @PreDestroy
+    @Override
+    public void destroy() {
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+
+}

+ 19 - 0
pole-service/src/main/java/com/zksy/pole/PoleApplication.java

@@ -0,0 +1,19 @@
+package com.zksy.pole;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * @author Administrator
+ * @version 1.0
+ * @project dh-server-micro
+ * @description 灯杆服务
+ * @date 2024/8/28 09:54:31
+ */
+@SpringBootApplication
+public class PoleApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(PoleApplication.class,args);
+        System.out.println("灯杆服务启动成功");
+    }
+}

+ 21 - 0
pole-service/src/main/java/com/zksy/pole/controller/TestController.java

@@ -0,0 +1,21 @@
+package com.zksy.pole.controller;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author Administrator
+ * @version 1.0
+ * @project dh-server-micro
+ * @description
+ * @date 2024/8/28 10:02:54
+ */
+@RestController
+@RequestMapping("/a")
+public class TestController {
+    @GetMapping("/test")
+    public String test(){
+        return "好想你";
+    }
+}

+ 41 - 0
pole-service/src/main/java/com/zksy/pole/startServer/startSrver.java

@@ -0,0 +1,41 @@
+package com.zksy.pole.startServer;
+
+import com.zksy.supernetty.MQTTServer.server.MQTTServer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author ShaoYang
+ * @date 2024/03/13 15:58
+ */
+@Component
+public class startSrver {
+
+    @Autowired
+    private MQTTServer MQTTServer;
+
+    @PostConstruct
+    public void startNetty(){
+        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(
+                3,
+                3,
+                60,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(3));
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    MQTTServer.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+}

+ 29 - 0
pole-service/src/main/resources/application-dev.yaml

@@ -0,0 +1,29 @@
+zksy:
+  db:
+    host: 192.168.110.30
+    un: root
+    pw: 123
+    port: 3307
+    database: zhongkeshengyang
+spring:
+  redis:
+    host: 192.168.110.30
+    port: 6379
+minio:
+  endpoint: http://192.168.110.30:9000
+  accessKey: minio
+  secretKey: minio123
+  bucket: zksy-file
+  readPath: http://192.168.110.30:9000
+netty:
+  mqtt:
+    port: 1884
+    username: admin
+    password: admin
+  server:
+    host: 127.0.0.1
+    port: 1996
+    # 传输模式linux上开启会有更高的性能
+    use-epoll: false
+
+

+ 27 - 0
pole-service/src/main/resources/application-prod.yaml

@@ -0,0 +1,27 @@
+zksy:
+  db:
+    host: 192.168.110.30
+    un: root
+    pw: 123
+    port: 3307
+    database: zhongkeshengyang
+spring:
+  redis:
+    host: 192.168.110.30
+    port: 6379
+minio:
+  endpoint: http://192.168.110.30:9000
+  accessKey: minio
+  secretKey: minio123
+  bucket: test
+  readPath: http://192.168.110.30:9000
+netty:
+  mqtt:
+    port: 1884
+    username: admin
+    password: admin
+  server:
+    host: 127.0.0.1
+    port: 1996
+    # 传输模式linux上开启会有更高的性能
+    use-epoll: false

+ 29 - 0
pole-service/src/main/resources/bootstrap.yaml

@@ -0,0 +1,29 @@
+spring:
+  application:
+    name: pole-service
+  profiles:
+    active: dev
+  main:
+    allow-bean-definition-overriding: true
+  cloud:
+    sentinel:
+      transport:
+        dashboard: 192.168.110.30:8090
+      http-method-specify: true
+    nacos:
+      discovery:
+        server-addr: 192.168.110.30:8848
+      config:
+        server-addr: 192.168.110.30:8848
+        file-extension: yaml
+        shared-configs:
+          - dataId: pole-service.yaml
+          - dataId: zksy-shared-jdbc.yaml
+          - dataId: zksy-shared-log.yaml
+  redis:
+    host: 192.168.110.30
+    port: 6379
+
+
+
+

+ 13 - 0
pom.xml

@@ -14,6 +14,7 @@
         <module>zksy-common</module>
         <module>visualization-service</module>
         <module>environment-data-service</module>
+        <module>pole-service</module>
     </modules>
 
     <parent>
@@ -39,6 +40,8 @@
         <redis.version>3.0.5</redis.version>
         <sentinel.version>2021.0.4.0</sentinel.version>
         <minioutil.version>1.0.0</minioutil.version>
+        <mqtt.version>5.5.9</mqtt.version>
+        <netty.version>4.1.75.Final</netty.version>
     </properties>
 
     <dependencyManagement>
@@ -100,6 +103,16 @@
                 <artifactId>minioutil</artifactId>
                 <version>${minioutil.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.springframework.integration</groupId>
+                <artifactId>spring-integration-mqtt</artifactId>
+                <version>${mqtt.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-all</artifactId>
+                <version>${netty.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>