# thread_pool.py import concurrent.futures import threading from typing import Callable, Any class ThreadPoolManager: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super(ThreadPoolManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if not self._initialized: # 创建线程池,最大4个线程 self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) self._initialized = True def submit(self, fn: Callable, *args, **kwargs) -> concurrent.futures.Future: """ 提交任务到线程池执行 :param fn: 要执行的函数 :param args: 函数的位置参数 :param kwargs: 函数的关键字参数 :return: Future对象 """ if kwargs: # 如果有关键字参数,需要包装函数 def wrapper(): return fn(*args, **kwargs) return self.executor.submit(wrapper) else: return self.executor.submit(fn, *args) def shutdown(self, wait=True): """ 关闭线程池 :param wait: 是否等待所有任务完成 """ self.executor.shutdown(wait=wait) # 创建全局线程池管理器实例 _thread_pool_manager = ThreadPoolManager() def get_thread_pool() -> ThreadPoolManager: """ 获取线程池管理器实例 :return: ThreadPoolManager实例 """ return _thread_pool_manager def submit_task(fn: Callable, *args, **kwargs) -> concurrent.futures.Future: """ 提交任务到线程池执行的便捷函数 :param fn: 要执行的函数 :param args: 函数的位置参数 :param kwargs: 函数的关键字参数 :return: Future对象 """ return _thread_pool_manager.submit(fn, *args, **kwargs)