Преглед на файлове

feat(chat): 实现 WebSocket 消息处理器

- 添加用户与客服的连接管理逻辑
- 实现消息类型处理:登录、聊天、心跳、登出- 添加连接超时检测与自动清理机制- 实现客服轮询分配策略- 添加消息发送失败回调处理
- 支持系统消息推送(如无客服在线提示)
nahida преди 8 месеца
родител
ревизия
ebdd0e946a
променени са 1 файла, в които са добавени 267 реда и са изтрити 0 реда
  1. 267 0
      zksy-ws/src/main/java/com/zksy/server/WebSocketServerHandler.java

+ 267 - 0
zksy-ws/src/main/java/com/zksy/server/WebSocketServerHandler.java

@@ -0,0 +1,267 @@
+package com.zksy.server;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.zksy.domain.dto.ChatMessageDto;
+import com.zksy.service.ChatService;
+import io.netty.channel.*;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * WebSocket 消息处理器
+ */
+public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    private final ChatService chatService;
+
+    public WebSocketServerHandler(ChatService chatService) {
+        this.chatService = chatService;
+    }
+
+    // 保存用户连接:userId -> Channel
+    private static final ConcurrentHashMap<String, Channel> userChannels = new ConcurrentHashMap<>();
+    // 客服连接:adminId -> Channel
+    private static final ConcurrentHashMap<String, Channel> adminChannels = new ConcurrentHashMap<>();
+    // 轮询下标
+    private static final AtomicInteger adminIndex = new AtomicInteger(0);
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int USER_TIMEOUT = 180;   // 秒
+    private static final int ADMIN_TIMEOUT = 300; // 秒
+
+    // 定时巡检线程池
+    private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+    static {
+        // 每 30 秒巡检一次
+        scheduler.scheduleAtFixedRate(() -> {
+            try {
+                checkChannels(userChannels, "用户");
+                checkChannels(adminChannels, "客服");
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }, 30, 30, TimeUnit.SECONDS);
+    }
+
+    private static void checkChannels(ConcurrentHashMap<String, Channel> channels, String role) {
+        for (Map.Entry<String, Channel> entry : channels.entrySet()) {
+            String id = entry.getKey();
+            Channel channel = entry.getValue();
+            if (channel == null || !channel.isActive()) {
+                channels.remove(id);
+                if (channel != null) {
+                    channel.close();
+                }
+                System.out.println(role + " " + id + " 连接失效,已清理");
+            }
+        }
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        System.out.println("新连接加入:" + ctx.channel().id().asShortText());
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) {
+        userChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
+        adminChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
+        System.out.println("连接断开:" + ctx.channel().id().asShortText());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        userChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
+        adminChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
+        super.channelInactive(ctx);
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+        String payload = msg.text();
+        System.out.println("收到消息:" + payload);
+
+        ChatMessageDto chatMsg = objectMapper.readValue(payload, ChatMessageDto.class);
+
+        switch (chatMsg.getType()) {
+            case "login":
+                if ("admin".equals(chatMsg.getRole())) {
+                    adminChannels.put(chatMsg.getFrom(), ctx.channel());
+                    chatService.onAdminLogin(chatMsg.getFrom());
+                    // 避免重复添加 IdleStateHandler
+                    if (ctx.pipeline().get("idleStateHandler") != null) {
+                        ctx.pipeline().remove("idleStateHandler");
+                    }
+                    ctx.pipeline().addBefore("handler", "idleStateHandler",
+                            new IdleStateHandler(ADMIN_TIMEOUT, 0, 0));
+                    System.out.println("客服上线:" + chatMsg.getFrom());
+                } else {
+                    userChannels.put(chatMsg.getFrom(), ctx.channel());
+                    chatService.onUserLogin(chatMsg.getFrom());
+                    if (ctx.pipeline().get("idleStateHandler") != null) {
+                        ctx.pipeline().remove("idleStateHandler");
+                    }
+                    ctx.pipeline().addBefore("handler", "idleStateHandler",
+                            new IdleStateHandler(USER_TIMEOUT, 0, 0));
+                    System.out.println("用户上线:" + chatMsg.getFrom());
+                }
+                break;
+
+            case "chat":
+                handleChat(chatMsg);
+                break;
+
+            case "heartbeat":
+                // 心跳包不需要额外操作,IdleStateHandler 会自动刷新
+                break;
+            case "logout":
+                if ("admin".equals(chatMsg.getRole())) {
+                    adminChannels.remove(chatMsg.getFrom());
+                    chatService.onAdminOffline(chatMsg.getFrom());
+                } else {
+                    userChannels.remove(chatMsg.getFrom());
+                    chatService.onUserOffline(chatMsg.getFrom());
+                }
+                System.out.println(chatMsg.getRole() + " " + chatMsg.getFrom() + " 已下线");
+                break;
+
+            default:
+                System.out.println("未知消息类型: " + chatMsg.getType());
+        }
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            boolean isUser = userChannels.containsValue(ctx.channel());
+            boolean isAdmin = adminChannels.containsValue(ctx.channel());
+            if (isUser) {
+                String userId = userChannels.entrySet().stream()
+                        .filter(e -> e.getValue().equals(ctx.channel()))
+                        .map(Map.Entry::getKey)
+                        .findFirst()
+                        .orElse(null);
+                chatService.onUserOffline(userId);
+                System.out.println("用户超时,下线:" + ctx.channel().id().asShortText());
+                userChannels.entrySet().removeIf(e -> e.getValue().equals(ctx.channel()));
+                ctx.close();
+            } else if (isAdmin) {
+                String adminId = adminChannels.entrySet().stream()
+                        .filter(e -> e.getValue().equals(ctx.channel()))
+                        .map(Map.Entry::getKey)
+                        .findFirst()
+                        .orElse(null);
+                if (adminId != null) {
+                    chatService.onAdminOffline(adminId);
+                }
+                System.out.println("客服超时,下线:" + ctx.channel().id().asShortText());
+                adminChannels.entrySet().removeIf(e -> e.getValue().equals(ctx.channel()));
+                ctx.close();
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+
+
+    private void handleChat(ChatMessageDto chatMsg) throws Exception {
+        if ("user".equals(chatMsg.getRole())) {
+            // 先确定目标客服(如果没有则分配一个)
+            String targetAdminId = chatMsg.getTo();
+            if (targetAdminId == null || targetAdminId.isEmpty()) {
+                targetAdminId = selectAdmin();
+                if (targetAdminId != null) {
+                    chatMsg.setTo(targetAdminId); // 先补充上客服ID —— 非常关键
+                }
+            }
+
+            if (targetAdminId != null) {
+                // 先落库(会话 + 消息)—— onUserMessage 里会根据 chatMsg.from/chatMsg.to 建会话并保存消息
+                chatService.onUserMessage(chatMsg);
+
+                // 再尝试推送给客服(异步,并带回调处理失败)
+                Channel targetAdmin = adminChannels.get(targetAdminId);
+                if (targetAdmin != null && targetAdmin.isActive()) {
+                    targetAdmin.writeAndFlush(new TextWebSocketFrame(
+                            objectMapper.writeValueAsString(chatMsg)
+                    )).addListener(future -> {
+                        if (!future.isSuccess()) {
+                            Throwable cause = future.cause();
+                            cause.printStackTrace();
+                            // 如果你实现了 onMessageSendFailure,可以在这里调用它做状态更新/记录
+                            chatService.onMessageSendFailure(chatMsg);
+                        }
+                    });
+                } else {
+                    // 客服不在线:消息已经入库(status=sent),这里可以选择把状态标记为待投递或记录未读
+                    System.out.println("客服 " + targetAdminId + " 不在线,消息已保存为离线/未读");
+                }
+
+            } else {
+                // 没有可用客服
+                chatService.onNoAvailableAdmin(chatMsg);
+                Channel userChannel = userChannels.get(chatMsg.getFrom());
+                if (userChannel != null && userChannel.isActive()) {
+                    ChatMessageDto systemMsg = new ChatMessageDto();
+                    systemMsg.setType("chat");   // 或 "system" / "error",前后端约定即可
+                    systemMsg.setRole("robot");
+                    systemMsg.setFrom("robot");
+                    systemMsg.setTo(chatMsg.getFrom());
+                    systemMsg.setContent("当前没有客服在线,您可以留下您的联系方式,我们会尽快联系您。");
+                    systemMsg.setTimestamp(System.currentTimeMillis());
+                    userChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(systemMsg)));
+                }
+            }
+
+        } else if ("admin".equals(chatMsg.getRole())) {
+            // 客服发消息:先落库,再推送给用户(已有失败回调)
+            chatService.onAdminMessage(chatMsg);
+            Channel targetUser = userChannels.get(chatMsg.getTo());
+            if (targetUser != null && targetUser.isActive()) {
+                targetUser.writeAndFlush(new TextWebSocketFrame(
+                        objectMapper.writeValueAsString(chatMsg)
+                )).addListener(future -> {
+                    if (!future.isSuccess()) {
+                        Throwable cause = future.cause();
+                        System.err.println("消息发送失败,目标用户:" + chatMsg.getTo());
+                        cause.printStackTrace();
+                        chatService.onMessageSendFailure(chatMsg);
+                    }
+                });
+            } else {
+                System.out.println("用户 " + chatMsg.getTo() + " 不在线,可以存未读消息");
+            }
+        }
+    }
+
+    /**
+     * 从客服列表中选择一个客服(轮询)
+     */
+    private String selectAdmin() {
+        if (adminChannels.isEmpty()) {
+            return null;
+        }
+        // 取客服ID列表
+        List<String> adminIds = new ArrayList<>(adminChannels.keySet());
+        int index = Math.abs(adminIndex.getAndIncrement() % adminIds.size());
+        return adminIds.get(index);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        System.err.println("WebSocket 异常:" + cause.getMessage());
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+}