|
|
@@ -1,5 +1,6 @@
|
|
|
package com.zksy.server;
|
|
|
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.zksy.domain.dto.ChatMessageDto;
|
|
|
import com.zksy.service.ChatService;
|
|
|
@@ -34,6 +35,10 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
|
|
|
private static final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
|
|
|
+ // 用户 -> 上次提示客服分配的时间
|
|
|
+ private static final ConcurrentHashMap<String, Long> userLastAllocationNotice = new ConcurrentHashMap<>();
|
|
|
+ // 允许重复提示的间隔(毫秒)
|
|
|
+ private static final long ALLOCATION_NOTICE_INTERVAL = 5 * 60 * 1000; // 5分钟
|
|
|
private static final int USER_TIMEOUT = 180; // 秒
|
|
|
private static final int ADMIN_TIMEOUT = 300; // 秒
|
|
|
|
|
|
@@ -73,47 +78,111 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
|
|
|
@Override
|
|
|
public void handlerRemoved(ChannelHandlerContext ctx) {
|
|
|
- userChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
|
|
|
- adminChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
|
|
|
+ handleChannelDisconnect(ctx);
|
|
|
System.out.println("连接断开:" + ctx.channel().id().asShortText());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
- userChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
|
|
|
- adminChannels.entrySet().removeIf(entry -> entry.getValue().equals(ctx.channel()));
|
|
|
+ handleChannelDisconnect(ctx);
|
|
|
super.channelInactive(ctx);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 处理通道断开连接 - 统一处理用户和客服的离线逻辑
|
|
|
+ */
|
|
|
+ private void handleChannelDisconnect(ChannelHandlerContext ctx) {
|
|
|
+ // 先检查是否是用户连接断开
|
|
|
+ String userId = getUserIdByChannel(ctx.channel(), userChannels);
|
|
|
+ if (userId != null) {
|
|
|
+ userChannels.remove(userId);
|
|
|
+ chatService.onUserOffline(userId);
|
|
|
+ // 通知所有客服该用户离线
|
|
|
+ notifyUserOfflineToAdmins(userId);
|
|
|
+ System.out.println("用户 " + userId + " 连接断开,已标记为离线");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 再检查是否是客服连接断开
|
|
|
+ String adminId = getUserIdByChannel(ctx.channel(), adminChannels);
|
|
|
+ if (adminId != null) {
|
|
|
+ adminChannels.remove(adminId);
|
|
|
+ chatService.onAdminOffline(adminId);
|
|
|
+ // 通知所有客服该客服离线
|
|
|
+ notifyAdminOfflineToOtherAdmins(adminId);
|
|
|
+ System.out.println("客服 " + adminId + " 连接断开,已标记为离线");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据通道获取用户ID
|
|
|
+ */
|
|
|
+ private String getUserIdByChannel(Channel channel, ConcurrentHashMap<String, Channel> channels) {
|
|
|
+ return channels.entrySet().stream()
|
|
|
+ .filter(entry -> entry.getValue().equals(channel))
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .findFirst()
|
|
|
+ .orElse(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 通知所有客服用户离线
|
|
|
+ */
|
|
|
+ private void notifyUserOfflineToAdmins(String userId) {
|
|
|
+ try {
|
|
|
+ ChatMessageDto offlineMsg = new ChatMessageDto();
|
|
|
+ offlineMsg.setFrom(userId);
|
|
|
+ offlineMsg.setTo(null);
|
|
|
+ offlineMsg.setType("clientOffline");
|
|
|
+ offlineMsg.setContent(userId);
|
|
|
+ offlineMsg.setRole("system");
|
|
|
+ offlineMsg.setTimestamp(System.currentTimeMillis());
|
|
|
+
|
|
|
+ String offlineJson = objectMapper.writeValueAsString(offlineMsg);
|
|
|
+ for (Channel adminChannel : adminChannels.values()) {
|
|
|
+ if (adminChannel != null && adminChannel.isActive()) {
|
|
|
+ adminChannel.writeAndFlush(new TextWebSocketFrame(offlineJson));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.err.println("通知用户离线失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 通知其他客服该客服离线
|
|
|
+ */
|
|
|
+ private void notifyAdminOfflineToOtherAdmins(String adminId) {
|
|
|
+ try {
|
|
|
+ ChatMessageDto offlineMsg = new ChatMessageDto();
|
|
|
+ offlineMsg.setFrom(adminId);
|
|
|
+ offlineMsg.setTo(null);
|
|
|
+ offlineMsg.setType("clientOffline");
|
|
|
+ offlineMsg.setContent(adminId);
|
|
|
+ offlineMsg.setRole("system");
|
|
|
+ offlineMsg.setTimestamp(System.currentTimeMillis());
|
|
|
+
|
|
|
+ String offlineJson = objectMapper.writeValueAsString(offlineMsg);
|
|
|
+ for (Channel adminChannel : adminChannels.values()) {
|
|
|
+ if (adminChannel != null && adminChannel.isActive()) {
|
|
|
+ adminChannel.writeAndFlush(new TextWebSocketFrame(offlineJson));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.err.println("通知客服离线失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
|
|
|
String payload = msg.text();
|
|
|
- System.out.println("收到消息:" + payload);
|
|
|
|
|
|
ChatMessageDto chatMsg = objectMapper.readValue(payload, ChatMessageDto.class);
|
|
|
+ System.out.println("收到消息: " + chatMsg);
|
|
|
|
|
|
switch (chatMsg.getType()) {
|
|
|
case "login":
|
|
|
- if ("admin".equals(chatMsg.getRole())) {
|
|
|
- adminChannels.put(chatMsg.getFrom(), ctx.channel());
|
|
|
- chatService.onAdminLogin(chatMsg.getFrom());
|
|
|
- // 避免重复添加 IdleStateHandler
|
|
|
- if (ctx.pipeline().get("idleStateHandler") != null) {
|
|
|
- ctx.pipeline().remove("idleStateHandler");
|
|
|
- }
|
|
|
- ctx.pipeline().addBefore("handler", "idleStateHandler",
|
|
|
- new IdleStateHandler(ADMIN_TIMEOUT, 0, 0));
|
|
|
- System.out.println("客服上线:" + chatMsg.getFrom());
|
|
|
- } else {
|
|
|
- userChannels.put(chatMsg.getFrom(), ctx.channel());
|
|
|
- chatService.onUserLogin(chatMsg.getFrom());
|
|
|
- if (ctx.pipeline().get("idleStateHandler") != null) {
|
|
|
- ctx.pipeline().remove("idleStateHandler");
|
|
|
- }
|
|
|
- ctx.pipeline().addBefore("handler", "idleStateHandler",
|
|
|
- new IdleStateHandler(USER_TIMEOUT, 0, 0));
|
|
|
- System.out.println("用户上线:" + chatMsg.getFrom());
|
|
|
- }
|
|
|
+ handleLogin(ctx, chatMsg);
|
|
|
break;
|
|
|
|
|
|
case "chat":
|
|
|
@@ -122,59 +191,112 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
|
|
|
case "heartbeat":
|
|
|
// 心跳包不需要额外操作,IdleStateHandler 会自动刷新
|
|
|
+ handleHeartBeat(chatMsg);
|
|
|
break;
|
|
|
case "logout":
|
|
|
- if ("admin".equals(chatMsg.getRole())) {
|
|
|
- adminChannels.remove(chatMsg.getFrom());
|
|
|
- chatService.onAdminOffline(chatMsg.getFrom());
|
|
|
- } else {
|
|
|
- userChannels.remove(chatMsg.getFrom());
|
|
|
- chatService.onUserOffline(chatMsg.getFrom());
|
|
|
- }
|
|
|
- System.out.println(chatMsg.getRole() + " " + chatMsg.getFrom() + " 已下线");
|
|
|
+ handleLogout(chatMsg);
|
|
|
+ break;
|
|
|
+ case "read":
|
|
|
+ handleRead(ctx,chatMsg);
|
|
|
break;
|
|
|
+ case "typing":
|
|
|
+ case "typingStop":
|
|
|
+ handleTyping(chatMsg);
|
|
|
+ break;
|
|
|
+// case "status":
|
|
|
+// // 这个通常是服务端主动推送,不建议客户端直接发
|
|
|
+// // 如果你希望客户端也能触发,可以在这里做转发
|
|
|
+// handleStatus(chatMsg);
|
|
|
+// break;
|
|
|
|
|
|
default:
|
|
|
System.out.println("未知消息类型: " + chatMsg.getType());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void handleRead(ChannelHandlerContext ctx, ChatMessageDto chatMsg) {
|
|
|
+ chatService.onReadMessage(chatMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+// private void handleRead(ChatMessageDto chatMsg) {
|
|
|
+// chatService.onReadMessage(chatMsg);
|
|
|
+// }
|
|
|
+
|
|
|
+ private void handleHeartBeat(ChatMessageDto chatMsg) {
|
|
|
+ chatService.onHeartbeat(chatMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleLogout(ChatMessageDto chatMsg) throws JsonProcessingException {
|
|
|
+ if ("admin".equals(chatMsg.getRole())) {
|
|
|
+ adminChannels.remove(chatMsg.getFrom());
|
|
|
+ chatService.onAdminOffline(chatMsg.getFrom());
|
|
|
+ for (Channel userChannel : userChannels.values()) {
|
|
|
+ userChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(
|
|
|
+ new ChatMessageDto(chatMsg.getFrom(), null, "clientOffline",
|
|
|
+ chatMsg.getFrom(), "system", System.currentTimeMillis(), null, null,null))));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ userChannels.remove(chatMsg.getFrom());
|
|
|
+ chatService.onUserOffline(chatMsg.getFrom());
|
|
|
+ for (Channel adminChannel : adminChannels.values()) {
|
|
|
+ adminChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(
|
|
|
+ new ChatMessageDto(chatMsg.getFrom(), null, "clientOffline",
|
|
|
+ chatMsg.getFrom(), "system", System.currentTimeMillis(), null, null,null))));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ System.out.println(chatMsg.getRole() + " " + chatMsg.getFrom() + " 已下线");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleLogin(ChannelHandlerContext ctx, ChatMessageDto chatMsg) throws JsonProcessingException {
|
|
|
+ if ("admin".equals(chatMsg.getRole())) {
|
|
|
+ if (!chatService.isClientService(chatMsg.getFrom())) {
|
|
|
+ ctx.close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ adminChannels.put(chatMsg.getFrom(), ctx.channel());
|
|
|
+ chatService.onAdminLogin(chatMsg.getFrom());
|
|
|
+ // 避免重复添加 IdleStateHandler
|
|
|
+ if (ctx.pipeline().get("idleStateHandler") != null) {
|
|
|
+ ctx.pipeline().remove("idleStateHandler");
|
|
|
+ }
|
|
|
+ ctx.pipeline().addBefore("handler", "idleStateHandler",
|
|
|
+ new IdleStateHandler(ADMIN_TIMEOUT, 0, 0));
|
|
|
+ System.out.println("客服上线:" + chatMsg.getFrom());
|
|
|
+ } else {
|
|
|
+ userChannels.put(chatMsg.getFrom(), ctx.channel());
|
|
|
+ chatService.onUserLogin(chatMsg.getFrom());
|
|
|
+
|
|
|
+ for (Channel adminChannel : adminChannels.values()) {
|
|
|
+ adminChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(
|
|
|
+ new ChatMessageDto(chatMsg.getFrom(), null, "clientOnline",
|
|
|
+ chatMsg.getFrom(), "system", System.currentTimeMillis(), null, null,null))));
|
|
|
+ }
|
|
|
+ if (ctx.pipeline().get("idleStateHandler") != null) {
|
|
|
+ ctx.pipeline().remove("idleStateHandler");
|
|
|
+ }
|
|
|
+ ctx.pipeline().addBefore("handler", "idleStateHandler",
|
|
|
+ new IdleStateHandler(USER_TIMEOUT, 0, 0));
|
|
|
+ System.out.println("用户上线:" + chatMsg.getFrom());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
|
|
if (evt instanceof IdleStateEvent) {
|
|
|
- boolean isUser = userChannels.containsValue(ctx.channel());
|
|
|
- boolean isAdmin = adminChannels.containsValue(ctx.channel());
|
|
|
- if (isUser) {
|
|
|
- String userId = userChannels.entrySet().stream()
|
|
|
- .filter(e -> e.getValue().equals(ctx.channel()))
|
|
|
- .map(Map.Entry::getKey)
|
|
|
- .findFirst()
|
|
|
- .orElse(null);
|
|
|
- chatService.onUserOffline(userId);
|
|
|
- System.out.println("用户超时,下线:" + ctx.channel().id().asShortText());
|
|
|
- userChannels.entrySet().removeIf(e -> e.getValue().equals(ctx.channel()));
|
|
|
- ctx.close();
|
|
|
- } else if (isAdmin) {
|
|
|
- String adminId = adminChannels.entrySet().stream()
|
|
|
- .filter(e -> e.getValue().equals(ctx.channel()))
|
|
|
- .map(Map.Entry::getKey)
|
|
|
- .findFirst()
|
|
|
- .orElse(null);
|
|
|
- if (adminId != null) {
|
|
|
- chatService.onAdminOffline(adminId);
|
|
|
- }
|
|
|
- System.out.println("客服超时,下线:" + ctx.channel().id().asShortText());
|
|
|
- adminChannels.entrySet().removeIf(e -> e.getValue().equals(ctx.channel()));
|
|
|
- ctx.close();
|
|
|
- }
|
|
|
+ System.out.println("连接超时,开始处理离线逻辑:" + ctx.channel().id().asShortText());
|
|
|
+ // 使用统一的断开处理逻辑
|
|
|
+ handleChannelDisconnect(ctx);
|
|
|
+ // 关闭连接
|
|
|
+ ctx.close();
|
|
|
} else {
|
|
|
super.userEventTriggered(ctx, evt);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
private void handleChat(ChatMessageDto chatMsg) throws Exception {
|
|
|
+ chatMsg.setTimestamp(System.currentTimeMillis());
|
|
|
+ chatService.onHeartbeat(chatMsg);
|
|
|
if ("user".equals(chatMsg.getRole())) {
|
|
|
// 先确定目标客服(如果没有则分配一个)
|
|
|
String targetAdminId = chatMsg.getTo();
|
|
|
@@ -186,9 +308,31 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
}
|
|
|
|
|
|
if (targetAdminId != null) {
|
|
|
- // 先落库(会话 + 消息)—— onUserMessage 里会根据 chatMsg.from/chatMsg.to 建会话并保存消息
|
|
|
- chatService.onUserMessage(chatMsg);
|
|
|
+ // 通知用户xxxx客服为您服务
|
|
|
+ // 只在第一次分配客服,或者超过一定时间没提示时,才发提示
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ Long lastNoticeTime = userLastAllocationNotice.get(chatMsg.getFrom());
|
|
|
|
|
|
+ boolean shouldNotice = (lastNoticeTime == null) || (now - lastNoticeTime > ALLOCATION_NOTICE_INTERVAL);
|
|
|
+
|
|
|
+ if (shouldNotice) {
|
|
|
+ Channel userChannel = userChannels.get(chatMsg.getFrom());
|
|
|
+ if (userChannel != null && userChannel.isActive()) {
|
|
|
+ userChannel.writeAndFlush(new TextWebSocketFrame(
|
|
|
+ objectMapper.writeValueAsString(
|
|
|
+ new ChatMessageDto(targetAdminId, chatMsg.getFrom(), "adminAllocation",
|
|
|
+ "客服" + targetAdminId + "号为您服务", "system",
|
|
|
+ now, null, null, null))
|
|
|
+ ));
|
|
|
+ userLastAllocationNotice.put(chatMsg.getFrom(), now); // 更新最后提示时间
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 先落库(会话 + 消息)—— onUserMessage 里会根据 chatMsg.from/chatMsg.to 建会话并保存消息
|
|
|
+ Map<String, Long> result = chatService.onUserMessage(chatMsg);
|
|
|
+ Long sessionId = result.get("sessionId");
|
|
|
+ Long messageId = result.get("messageId");
|
|
|
+ chatMsg.setSessionId(sessionId);
|
|
|
+ chatMsg.setMessageId(messageId);
|
|
|
// 再尝试推送给客服(异步,并带回调处理失败)
|
|
|
Channel targetAdmin = adminChannels.get(targetAdminId);
|
|
|
if (targetAdmin != null && targetAdmin.isActive()) {
|
|
|
@@ -218,14 +362,17 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
systemMsg.setFrom("robot");
|
|
|
systemMsg.setTo(chatMsg.getFrom());
|
|
|
systemMsg.setContent("当前没有客服在线,您可以留下您的联系方式,我们会尽快联系您。");
|
|
|
- systemMsg.setTimestamp(System.currentTimeMillis());
|
|
|
userChannel.writeAndFlush(new TextWebSocketFrame(objectMapper.writeValueAsString(systemMsg)));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
} else if ("admin".equals(chatMsg.getRole())) {
|
|
|
// 客服发消息:先落库,再推送给用户(已有失败回调)
|
|
|
- chatService.onAdminMessage(chatMsg);
|
|
|
+ Map<String, Long> result = chatService.onAdminMessage(chatMsg);
|
|
|
+ Long sessionId = result.get("sessionId");
|
|
|
+ Long messageId = result.get("messageId");
|
|
|
+ chatMsg.setSessionId(sessionId);
|
|
|
+ chatMsg.setMessageId(messageId);
|
|
|
Channel targetUser = userChannels.get(chatMsg.getTo());
|
|
|
if (targetUser != null && targetUser.isActive()) {
|
|
|
targetUser.writeAndFlush(new TextWebSocketFrame(
|
|
|
@@ -244,6 +391,27 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void handleTyping(ChatMessageDto chatMsg) throws JsonProcessingException {
|
|
|
+ // 只做转发,不落库
|
|
|
+ if ("user".equals(chatMsg.getRole())) {
|
|
|
+ // 转发给目标客服
|
|
|
+ Channel adminChannel = adminChannels.get(chatMsg.getTo());
|
|
|
+ if (adminChannel != null && adminChannel.isActive()) {
|
|
|
+ adminChannel.writeAndFlush(new TextWebSocketFrame(
|
|
|
+ objectMapper.writeValueAsString(chatMsg)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ } else if ("admin".equals(chatMsg.getRole())) {
|
|
|
+ // 转发给目标用户
|
|
|
+ Channel userChannel = userChannels.get(chatMsg.getTo());
|
|
|
+ if (userChannel != null && userChannel.isActive()) {
|
|
|
+ userChannel.writeAndFlush(new TextWebSocketFrame(
|
|
|
+ objectMapper.writeValueAsString(chatMsg)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 从客服列表中选择一个客服(轮询)
|
|
|
*/
|
|
|
@@ -264,4 +432,4 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebS
|
|
|
ctx.close();
|
|
|
}
|
|
|
|
|
|
-}
|
|
|
+}
|