From bce04b6899be98fca885c1a5611b7f421c95043d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?LAPTOP-S9HJSOEB=5C=E6=98=8A=E5=A4=A9?= Date: Wed, 22 Nov 2023 13:11:11 +0800 Subject: [PATCH] =?UTF-8?q?ws=E8=BF=9E=E6=8E=A5=E5=87=BA=E9=94=99=EF=BC=8C?= =?UTF-8?q?=E5=8F=AA=E6=9C=89=E6=9F=A5=E7=9C=8Blog=E6=89=8D=E8=83=BD?= =?UTF-8?q?=E6=AD=A3=E5=B8=B8=E8=BF=90=E8=A1=8C=E7=9A=84=E6=83=85=E5=86=B5?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- appConf/ping.appConf | 2 +- .../leaper/pm2java/entity/CircularList.java | 47 +++++++++++++++++++ .../service/impl/Pm2JavaServiceImpl.java | 14 ++++-- .../pm2java/util/ProcessBuilderUtil.java | 32 +++++-------- .../leaper/pm2java/websocket/WebSocket.java | 36 +++++++------- 5 files changed, 85 insertions(+), 46 deletions(-) create mode 100644 src/main/java/com/leaper/pm2java/entity/CircularList.java diff --git a/appConf/ping.appConf b/appConf/ping.appConf index 7b6d6d0..dbd7d80 100644 --- a/appConf/ping.appConf +++ b/appConf/ping.appConf @@ -1 +1 @@ -{"args":["127.0.0.1"],"argsString":"127.0.0.1","cpuProp":0,"cpuTime":0,"enable":false,"name":"ping","path":"","permanentlyDelete":false,"port":10,"ramProp":0,"reNumber":0,"script":"ping ","sessionNumber":0,"type":2} \ No newline at end of file +{"args":["127.0.0.1"],"argsString":"127.0.0.1","cpuProp":0,"cpuTime":0,"enable":true,"name":"ping","path":"","permanentlyDelete":false,"port":10,"ramProp":0,"reNumber":0,"script":"ping ","sessionNumber":0,"type":2} \ No newline at end of file diff --git a/src/main/java/com/leaper/pm2java/entity/CircularList.java b/src/main/java/com/leaper/pm2java/entity/CircularList.java new file mode 100644 index 0000000..63a7723 --- /dev/null +++ b/src/main/java/com/leaper/pm2java/entity/CircularList.java @@ -0,0 +1,47 @@ +package com.leaper.pm2java.entity; + +public class CircularList { + private Node head; // 头结点 + private int size; // 当前大小 + + private class Node { + private T value; // 当前节点值 + private Node next; // 下一个节点 + + public Node(T value) { + this.value = value; + next = null; + } + } + + public CircularList() { + head = null; + size = 150; + } + + public void add(T value) { + Node newNode = new Node(value); + if (isFull()) { + head = newNode; + } else { + newNode.next = head; + head = newNode; + } + size++; + } + + public T get(int index) { + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException("Index out of range"); + } + Node current = head; + for (int i = 0; i < index; i++) { + current = current.next; + } + return current.value; + } + + public boolean isFull() { + return size == Integer.MAX_VALUE; + } +} diff --git a/src/main/java/com/leaper/pm2java/service/impl/Pm2JavaServiceImpl.java b/src/main/java/com/leaper/pm2java/service/impl/Pm2JavaServiceImpl.java index 5fab792..7ed4570 100644 --- a/src/main/java/com/leaper/pm2java/service/impl/Pm2JavaServiceImpl.java +++ b/src/main/java/com/leaper/pm2java/service/impl/Pm2JavaServiceImpl.java @@ -34,6 +34,9 @@ public class Pm2JavaServiceImpl implements Pm2JavaService { @Resource WebSocket webSocket; + + @Resource + ProcessBuilderUtil processBuilderUtil; //现存的进程记录 public static Map processBuilderMap = new ConcurrentHashMap<>(); @@ -262,20 +265,21 @@ public class Pm2JavaServiceImpl implements Pm2JavaService { } // 创建 ProcessBuilder 对象,并设置要执行的命令 ProcessBuilder pb = new ProcessBuilder(cmd); - pb.directory(new File(appConfigEntity.getPath())); + if(appConfigEntity.getPath()!= null &&!"".equals(appConfigEntity.getPath())){ + pb.directory(new File(appConfigEntity.getPath())); + + } // 将标准输出和错误输出合并 pb.redirectErrorStream(true); pb.environment().put("LANG", "zh_CN.UTF-8"); - Process process = pb.start(); - //关闭时允许您在JVM关闭之前执行特定的代码 // 启动命令并获取 Process 对象 - processBuilderMap.put(appConfigEntity.getName(), process); + Process process = pb.start(); processBuilderMap.put(appConfigEntity.getName(), process); appServerInfoRecordMap.get(appConfigEntity.getName()).setOnOff(true); appServerInfoRecordMap.get(appConfigEntity.getName()).setSelectTime(LocalDateTime.now()); log.info("{}服务启动", appConfigEntity.getName()); - //ProcessBuilderUtil.getProcessLogs(processBuilderMap.get(appConfigEntity.getName())); + processBuilderUtil.getProcessLogs(processBuilderMap.get(appConfigEntity.getName()),appConfigEntity.getName()); // 等待命令执行完成 int exitCode = process.waitFor(); System.out.println(); diff --git a/src/main/java/com/leaper/pm2java/util/ProcessBuilderUtil.java b/src/main/java/com/leaper/pm2java/util/ProcessBuilderUtil.java index b6af67a..bc6661a 100644 --- a/src/main/java/com/leaper/pm2java/util/ProcessBuilderUtil.java +++ b/src/main/java/com/leaper/pm2java/util/ProcessBuilderUtil.java @@ -9,36 +9,26 @@ import javax.annotation.Resource; import java.io.*; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Component public class ProcessBuilderUtil { + @Resource + WebSocket webSocket; + ExecutorService executor = Executors.newFixedThreadPool(10); - public static Process getProcessLogs(Process process){ + public Process getProcessLogs(Process process,String appName){ - try { - // 获取命令的输出流 - InputStream inputStream = process.getInputStream(); - - // 读取输出流内容 - BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "gbk")); - String line; - StringBuilder output = new StringBuilder(); - while ((line = reader.readLine()) != null) { - - System.out.println(new String(line.getBytes(), StandardCharsets.UTF_8)); - } - - // 输出命令执行的返回信息 - System.out.println("命令执行结果:" + output.toString()); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } + executor.submit(() -> { + webSocket.wsLogs(appName); + }); return process; } + public static Long getPID(Process process){ try { process = new ProcessBuilder("yourCommand").start(); // 启动一个新进程 diff --git a/src/main/java/com/leaper/pm2java/websocket/WebSocket.java b/src/main/java/com/leaper/pm2java/websocket/WebSocket.java index 6aa8637..88bf4ca 100644 --- a/src/main/java/com/leaper/pm2java/websocket/WebSocket.java +++ b/src/main/java/com/leaper/pm2java/websocket/WebSocket.java @@ -24,8 +24,6 @@ import lombok.extern.slf4j.Slf4j; @ServerEndpoint("/pm2-api/ws/{userId}/{appName}") // 接口路径 ws://localhost:8087/webSocket/userId; public class WebSocket { - @Resource - ProcessBuilderUtil processBuilderUtil; //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /** @@ -42,7 +40,7 @@ public class WebSocket { private static ConcurrentHashMap sessionPool = new ConcurrentHashMap(); //队列来存给ws发送的信息 - private static Map> messageQueue = new HashMap<>(); + public static Map> messageQueue = new ConcurrentHashMap<>(); ExecutorService executor = Executors.newFixedThreadPool(10); @@ -58,16 +56,14 @@ public class WebSocket { webSockets.add(this); sessionPool.put(userId, session); executor.submit(() -> { - wsLogs(appName, userId); - }); - messageQueue.put(userId,new LinkedBlockingDeque<>()); - executor.submit(() -> { - while (messageQueue.containsKey(userId)) { - if(!messageQueue.get(userId).isEmpty()) { + while (sessionPool.containsKey(userId)) { + if(messageQueue.get(userId)!=null) { try { - String message = messageQueue.get(userId).take(); // 删除并返回队列中的第一个元素 - sendOneMessageBasic(userId, message); - } catch (InterruptedException e) { + String message = messageQueue.get(userId).poll(); // 删除并返回队列中的第一个元素 + if (message != null && !"".equals(message)) { + sendOneMessageBasic(userId, message); + } + } catch (Exception e) { e.printStackTrace(); } } @@ -82,11 +78,9 @@ public class WebSocket { return sessionPool.keySet(); } - public void wsLogs(String appName, String userId) { + public void wsLogs(String appName) { try { - while (sessionPool.containsKey(userId)) { - Process process = Pm2JavaServiceImpl.processBuilderMap.get(appName); if(process != null) { // 获取命令的输出流 @@ -94,15 +88,19 @@ public class WebSocket { // 读取输出流内容 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "gbk")); String line; - StringBuilder output = new StringBuilder(); while (process.isAlive()) { if ((line = reader.readLine()) != null) { - - messageQueue.get(userId).add(new String(line.getBytes())); + //System.out.println(line); + for(String userId : sessionPool.keySet()) { + if (messageQueue.get(userId) == null){ + ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue<>(); + messageQueue.put(userId,concurrentLinkedQueue); + } + messageQueue.get(userId).add(new String(line.getBytes())); + } } } } - } } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } catch (IOException e) {