diff --git a/src/main/java/com/leaper/pm2java/websocket/WebSocket.java b/src/main/java/com/leaper/pm2java/websocket/WebSocket.java index 3f3c8b6..6aa8637 100644 --- a/src/main/java/com/leaper/pm2java/websocket/WebSocket.java +++ b/src/main/java/com/leaper/pm2java/websocket/WebSocket.java @@ -3,12 +3,10 @@ package com.leaper.pm2java.websocket; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import javax.annotation.Resource; import javax.websocket.*; @@ -43,6 +41,9 @@ public class WebSocket { // 用来存在线连接用户信息 private static ConcurrentHashMap sessionPool = new ConcurrentHashMap(); + //队列来存给ws发送的信息 + private static Map> messageQueue = new HashMap<>(); + ExecutorService executor = Executors.newFixedThreadPool(10); /** @@ -59,6 +60,19 @@ public class WebSocket { executor.submit(() -> { wsLogs(appName, userId); }); + messageQueue.put(userId,new LinkedBlockingDeque<>()); + executor.submit(() -> { + while (messageQueue.containsKey(userId)) { + if(!messageQueue.get(userId).isEmpty()) { + try { + String message = messageQueue.get(userId).take(); // 删除并返回队列中的第一个元素 + sendOneMessageBasic(userId, message); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }); log.info("【websocket消息】有新的连接,总数为:" + webSockets.size()); } catch (Exception e) { } @@ -83,7 +97,8 @@ public class WebSocket { StringBuilder output = new StringBuilder(); while (process.isAlive()) { if ((line = reader.readLine()) != null) { - sendOneMessage(userId, new String(line.getBytes())); + + messageQueue.get(userId).add(new String(line.getBytes())); } } } @@ -107,6 +122,7 @@ public class WebSocket { try { webSockets.remove(this); sessionPool.remove(this.userId); + messageQueue.remove(userId); log.info("【websocket消息】连接断开,总数为:" + webSockets.size()); } catch (Exception e) { } @@ -161,6 +177,17 @@ public class WebSocket { e.printStackTrace(); } } + } // 此为单点消息 + public void sendOneMessageBasic(String userId, String message) { + Session session = sessionPool.get(userId); + if (session != null && session.isOpen()) { + try { + log.info("【websocket消息】 单点消息:" + message); + session.getBasicRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } } // 此为单点消息(多人)