使用队列将日志信息存入队列后,然后同步传输到前端

main
LAPTOP-S9HJSOEB\昊天 2 years ago
parent 6d3f46f6f4
commit 9ece9c6fad

@ -3,12 +3,10 @@ package com.leaper.pm2java.websocket;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import javax.annotation.Resource;
import javax.websocket.*;
@ -43,6 +41,9 @@ public class WebSocket {
// 用来存在线连接用户信息
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();
//队列来存给ws发送的信息
private static Map<String, BlockingQueue<String>> messageQueue = new HashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(10);
/**
@ -59,6 +60,19 @@ public class WebSocket {
executor.submit(() -> {
wsLogs(appName, userId);
});
messageQueue.put(userId,new LinkedBlockingDeque<>());
executor.submit(() -> {
while (messageQueue.containsKey(userId)) {
if(!messageQueue.get(userId).isEmpty()) {
try {
String message = messageQueue.get(userId).take(); // 删除并返回队列中的第一个元素
sendOneMessageBasic(userId, message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
log.info("【websocket消息】有新的连接总数为:" + webSockets.size());
} catch (Exception e) {
}
@ -83,7 +97,8 @@ public class WebSocket {
StringBuilder output = new StringBuilder();
while (process.isAlive()) {
if ((line = reader.readLine()) != null) {
sendOneMessage(userId, new String(line.getBytes()));
messageQueue.get(userId).add(new String(line.getBytes()));
}
}
}
@ -107,6 +122,7 @@ public class WebSocket {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
messageQueue.remove(userId);
log.info("【websocket消息】连接断开总数为:" + webSockets.size());
} catch (Exception e) {
}
@ -161,6 +177,17 @@ public class WebSocket {
e.printStackTrace();
}
}
} // 此为单点消息
public void sendOneMessageBasic(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息(多人)

Loading…
Cancel
Save