You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
yudao/yolo-service-manager.py

162 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
YOLO服务管理器
用于启动停止重启YOLO socket服务
"""
import subprocess
import sys
import os
import time
import signal
import argparse
import psutil
from pathlib import Path
class YoloServiceManager:
def __init__(self, host='localhost', port=9999, model_path=None):
self.host = host
self.port = port
self.model_path = model_path
self.pid_file = Path('yolo_service.pid')
def start(self):
"""启动YOLO服务"""
if self.is_running():
print(f"YOLO服务已在运行 (PID: {self.get_pid()})")
return
print(f"启动YOLO服务 {self.host}:{self.port}")
cmd = [
sys.executable, 'yolo_service.py',
'--host', self.host,
'--port', str(self.port)
]
if self.model_path and Path(self.model_path).exists():
cmd.extend(['--model', self.model_path])
try:
# 启动进程
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.getcwd()
)
# 保存PID
with open(self.pid_file, 'w') as f:
f.write(str(process.pid))
print(f"YOLO服务已启动 (PID: {process.pid})")
# 等待服务启动
time.sleep(2)
if self.is_service_alive():
print("服务启动成功")
else:
print("警告: 服务可能未正常启动,请检查日志")
except Exception as e:
print(f"启动服务失败: {e}")
if self.pid_file.exists():
self.pid_file.unlink()
def stop(self):
"""停止YOLO服务"""
if not self.is_running():
print("YOLO服务未运行")
return
pid = self.get_pid()
try:
# 尝试优雅停止
os.kill(pid, signal.SIGTERM)
time.sleep(2)
# 如果还在运行,强制停止
if psutil.pid_exists(pid):
os.kill(pid, signal.SIGKILL)
time.sleep(1)
print(f"YOLO服务已停止 (PID: {pid})")
except ProcessLookupError:
print(f"进程 {pid} 不存在")
except Exception as e:
print(f"停止服务失败: {e}")
finally:
if self.pid_file.exists():
self.pid_file.unlink()
def restart(self):
"""重启YOLO服务"""
self.stop()
time.sleep(1)
self.start()
def status(self):
"""查看服务状态"""
if self.is_running():
pid = self.get_pid()
if self.is_service_alive():
print(f"YOLO服务运行中 (PID: {pid})")
return True
else:
print(f"YOLO服务进程存在但不可用 (PID: {pid})")
return False
else:
print("YOLO服务未运行")
return False
def is_running(self):
"""检查服务是否在运行"""
return self.pid_file.exists() and self.get_pid() > 0
def get_pid(self):
"""获取服务PID"""
if self.pid_file.exists():
try:
with open(self.pid_file, 'r') as f:
return int(f.read().strip())
except:
return 0
return 0
def is_service_alive(self):
"""检查服务是否可用"""
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((self.host, self.port))
sock.close()
return result == 0
except:
return False
def main():
parser = argparse.ArgumentParser(description='YOLO服务管理器')
parser.add_argument('command', choices=['start', 'stop', 'restart', 'status'],
help='命令: start, stop, restart, status')
parser.add_argument('--host', default='localhost', help='服务主机地址')
parser.add_argument('--port', type=int, default=9999, help='服务端口')
parser.add_argument('--model', help='预加载模型路径')
args = parser.parse_args()
manager = YoloServiceManager(args.host, args.port, args.model)
if args.command == 'start':
manager.start()
elif args.command == 'stop':
manager.stop()
elif args.command == 'restart':
manager.restart()
elif args.command == 'status':
manager.status()
if __name__ == '__main__':
main()