ws连接出错,只有查看log才能正常运行的情况修改

main
LAPTOP-S9HJSOEB\昊天 2 years ago
parent 2e05f00cdf
commit bce04b6899

@ -1 +1 @@
{"args":["127.0.0.1"],"argsString":"127.0.0.1","cpuProp":0,"cpuTime":0,"enable":false,"name":"ping","path":"","permanentlyDelete":false,"port":10,"ramProp":0,"reNumber":0,"script":"ping ","sessionNumber":0,"type":2} {"args":["127.0.0.1"],"argsString":"127.0.0.1","cpuProp":0,"cpuTime":0,"enable":true,"name":"ping","path":"","permanentlyDelete":false,"port":10,"ramProp":0,"reNumber":0,"script":"ping ","sessionNumber":0,"type":2}

@ -0,0 +1,47 @@
package com.leaper.pm2java.entity;
public class CircularList<T> {
private Node head; // 头结点
private int size; // 当前大小
private class Node {
private T value; // 当前节点值
private Node next; // 下一个节点
public Node(T value) {
this.value = value;
next = null;
}
}
public CircularList() {
head = null;
size = 150;
}
public void add(T value) {
Node newNode = new Node(value);
if (isFull()) {
head = newNode;
} else {
newNode.next = head;
head = newNode;
}
size++;
}
public T get(int index) {
if (index < 0 || index >= size) {
throw new IndexOutOfBoundsException("Index out of range");
}
Node current = head;
for (int i = 0; i < index; i++) {
current = current.next;
}
return current.value;
}
public boolean isFull() {
return size == Integer.MAX_VALUE;
}
}

@ -34,6 +34,9 @@ public class Pm2JavaServiceImpl implements Pm2JavaService {
@Resource @Resource
WebSocket webSocket; WebSocket webSocket;
@Resource
ProcessBuilderUtil processBuilderUtil;
//现存的进程记录 //现存的进程记录
public static Map<String, Process> processBuilderMap = new ConcurrentHashMap<>(); public static Map<String, Process> processBuilderMap = new ConcurrentHashMap<>();
@ -262,20 +265,21 @@ public class Pm2JavaServiceImpl implements Pm2JavaService {
} }
// 创建 ProcessBuilder 对象,并设置要执行的命令 // 创建 ProcessBuilder 对象,并设置要执行的命令
ProcessBuilder pb = new ProcessBuilder(cmd); ProcessBuilder pb = new ProcessBuilder(cmd);
if(appConfigEntity.getPath()!= null &&!"".equals(appConfigEntity.getPath())){
pb.directory(new File(appConfigEntity.getPath())); pb.directory(new File(appConfigEntity.getPath()));
}
// 将标准输出和错误输出合并 // 将标准输出和错误输出合并
pb.redirectErrorStream(true); pb.redirectErrorStream(true);
pb.environment().put("LANG", "zh_CN.UTF-8"); pb.environment().put("LANG", "zh_CN.UTF-8");
Process process = pb.start();
//关闭时允许您在JVM关闭之前执行特定的代码
// 启动命令并获取 Process 对象 // 启动命令并获取 Process 对象
processBuilderMap.put(appConfigEntity.getName(), process); Process process = pb.start();
processBuilderMap.put(appConfigEntity.getName(), process); processBuilderMap.put(appConfigEntity.getName(), process);
appServerInfoRecordMap.get(appConfigEntity.getName()).setOnOff(true); appServerInfoRecordMap.get(appConfigEntity.getName()).setOnOff(true);
appServerInfoRecordMap.get(appConfigEntity.getName()).setSelectTime(LocalDateTime.now()); appServerInfoRecordMap.get(appConfigEntity.getName()).setSelectTime(LocalDateTime.now());
log.info("{}服务启动", appConfigEntity.getName()); log.info("{}服务启动", appConfigEntity.getName());
//ProcessBuilderUtil.getProcessLogs(processBuilderMap.get(appConfigEntity.getName())); processBuilderUtil.getProcessLogs(processBuilderMap.get(appConfigEntity.getName()),appConfigEntity.getName());
// 等待命令执行完成 // 等待命令执行完成
int exitCode = process.waitFor(); int exitCode = process.waitFor();
System.out.println(); System.out.println();

@ -9,36 +9,26 @@ import javax.annotation.Resource;
import java.io.*; import java.io.*;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component @Component
public class ProcessBuilderUtil { public class ProcessBuilderUtil {
@Resource
WebSocket webSocket;
ExecutorService executor = Executors.newFixedThreadPool(10);
public static Process getProcessLogs(Process process){ public Process getProcessLogs(Process process,String appName){
try { executor.submit(() -> {
// 获取命令的输出流 webSocket.wsLogs(appName);
InputStream inputStream = process.getInputStream(); });
// 读取输出流内容
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "gbk"));
String line;
StringBuilder output = new StringBuilder();
while ((line = reader.readLine()) != null) {
System.out.println(new String(line.getBytes(), StandardCharsets.UTF_8));
}
// 输出命令执行的返回信息
System.out.println("命令执行结果:" + output.toString());
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
return process; return process;
} }
public static Long getPID(Process process){ public static Long getPID(Process process){
try { try {
process = new ProcessBuilder("yourCommand").start(); // 启动一个新进程 process = new ProcessBuilder("yourCommand").start(); // 启动一个新进程

@ -24,8 +24,6 @@ import lombok.extern.slf4j.Slf4j;
@ServerEndpoint("/pm2-api/ws/{userId}/{appName}") // 接口路径 ws://localhost:8087/webSocket/userId; @ServerEndpoint("/pm2-api/ws/{userId}/{appName}") // 接口路径 ws://localhost:8087/webSocket/userId;
public class WebSocket { public class WebSocket {
@Resource
ProcessBuilderUtil processBuilderUtil;
//与某个客户端的连接会话,需要通过它来给客户端发送数据 //与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session; private Session session;
/** /**
@ -42,7 +40,7 @@ public class WebSocket {
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>(); private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<String, Session>();
//队列来存给ws发送的信息 //队列来存给ws发送的信息
private static Map<String, BlockingQueue<String>> messageQueue = new HashMap<>(); public static Map<String, ConcurrentLinkedQueue<String>> messageQueue = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(10); ExecutorService executor = Executors.newFixedThreadPool(10);
@ -58,16 +56,14 @@ public class WebSocket {
webSockets.add(this); webSockets.add(this);
sessionPool.put(userId, session); sessionPool.put(userId, session);
executor.submit(() -> { executor.submit(() -> {
wsLogs(appName, userId); while (sessionPool.containsKey(userId)) {
}); if(messageQueue.get(userId)!=null) {
messageQueue.put(userId,new LinkedBlockingDeque<>());
executor.submit(() -> {
while (messageQueue.containsKey(userId)) {
if(!messageQueue.get(userId).isEmpty()) {
try { try {
String message = messageQueue.get(userId).take(); // 删除并返回队列中的第一个元素 String message = messageQueue.get(userId).poll(); // 删除并返回队列中的第一个元素
if (message != null && !"".equals(message)) {
sendOneMessageBasic(userId, message); sendOneMessageBasic(userId, message);
} catch (InterruptedException e) { }
} catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@ -82,11 +78,9 @@ public class WebSocket {
return sessionPool.keySet(); return sessionPool.keySet();
} }
public void wsLogs(String appName, String userId) { public void wsLogs(String appName) {
try { try {
while (sessionPool.containsKey(userId)) {
Process process = Pm2JavaServiceImpl.processBuilderMap.get(appName); Process process = Pm2JavaServiceImpl.processBuilderMap.get(appName);
if(process != null) { if(process != null) {
// 获取命令的输出流 // 获取命令的输出流
@ -94,10 +88,14 @@ public class WebSocket {
// 读取输出流内容 // 读取输出流内容
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "gbk")); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "gbk"));
String line; String line;
StringBuilder output = new StringBuilder();
while (process.isAlive()) { while (process.isAlive()) {
if ((line = reader.readLine()) != null) { if ((line = reader.readLine()) != null) {
//System.out.println(line);
for(String userId : sessionPool.keySet()) {
if (messageQueue.get(userId) == null){
ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
messageQueue.put(userId,concurrentLinkedQueue);
}
messageQueue.get(userId).add(new String(line.getBytes())); messageQueue.get(userId).add(new String(line.getBytes()));
} }
} }

Loading…
Cancel
Save