From 9ece9c6fadecda13ccb7e2a42c23fc04ee378692 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?LAPTOP-S9HJSOEB=5C=E6=98=8A=E5=A4=A9?= Date: Thu, 9 Nov 2023 10:36:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8=E9=98=9F=E5=88=97=E5=B0=86?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BF=A1=E6=81=AF=E5=AD=98=E5=85=A5=E9=98=9F?= =?UTF-8?q?=E5=88=97=E5=90=8E=EF=BC=8C=E7=84=B6=E5=90=8E=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E4=BC=A0=E8=BE=93=E5=88=B0=E5=89=8D=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../leaper/pm2java/websocket/WebSocket.java | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/leaper/pm2java/websocket/WebSocket.java b/src/main/java/com/leaper/pm2java/websocket/WebSocket.java index 3f3c8b6..6aa8637 100644 --- a/src/main/java/com/leaper/pm2java/websocket/WebSocket.java +++ b/src/main/java/com/leaper/pm2java/websocket/WebSocket.java @@ -3,12 +3,10 @@ package com.leaper.pm2java.websocket; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import javax.annotation.Resource; import javax.websocket.*; @@ -43,6 +41,9 @@ public class WebSocket { // 用来存在线连接用户信息 private static ConcurrentHashMap sessionPool = new ConcurrentHashMap(); + //队列来存给ws发送的信息 + private static Map> messageQueue = new HashMap<>(); + ExecutorService executor = Executors.newFixedThreadPool(10); /** @@ -59,6 +60,19 @@ public class WebSocket { executor.submit(() -> { wsLogs(appName, userId); }); + messageQueue.put(userId,new LinkedBlockingDeque<>()); + executor.submit(() -> { + while (messageQueue.containsKey(userId)) { + if(!messageQueue.get(userId).isEmpty()) { + try { + String message = messageQueue.get(userId).take(); // 删除并返回队列中的第一个元素 + sendOneMessageBasic(userId, message); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }); log.info("【websocket消息】有新的连接,总数为:" + webSockets.size()); } catch (Exception e) { } @@ -83,7 +97,8 @@ public class WebSocket { StringBuilder output = new StringBuilder(); while (process.isAlive()) { if ((line = reader.readLine()) != null) { - sendOneMessage(userId, new String(line.getBytes())); + + messageQueue.get(userId).add(new String(line.getBytes())); } } } @@ -107,6 +122,7 @@ public class WebSocket { try { webSockets.remove(this); sessionPool.remove(this.userId); + messageQueue.remove(userId); log.info("【websocket消息】连接断开,总数为:" + webSockets.size()); } catch (Exception e) { } @@ -161,6 +177,17 @@ public class WebSocket { e.printStackTrace(); } } + } // 此为单点消息 + public void sendOneMessageBasic(String userId, String message) { + Session session = sessionPool.get(userId); + if (session != null && session.isOpen()) { + try { + log.info("【websocket消息】 单点消息:" + message); + session.getBasicRemote().sendText(message); + } catch (Exception e) { + e.printStackTrace(); + } + } } // 此为单点消息(多人)