#!/usr/bin/env python3 # -*- coding: utf-8 -*- """截图线程池管理 - 工作线程池模式(并发执行截图任务)""" import os import threading import queue import time from typing import Callable, Optional, Dict, Any # 安全修复: 将魔法数字提取为可配置常量 BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒),默认5分钟 TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒) TASK_QUEUE_MAXSIZE = int(os.environ.get('BROWSER_TASK_QUEUE_MAXSIZE', '200')) # 队列最大长度(0表示无限制) BROWSER_MAX_USE_COUNT = int(os.environ.get('BROWSER_MAX_USE_COUNT', '0')) # 每个执行环境最大复用次数(0表示不限制) class BrowserWorker(threading.Thread): """截图工作线程 - 每个worker维护自己的执行环境""" def __init__( self, worker_id: int, task_queue: queue.Queue, log_callback: Optional[Callable] = None, pre_warm: bool = False, ): super().__init__(daemon=True) self.worker_id = worker_id self.task_queue = task_queue self.log_callback = log_callback self.browser_instance = None self.running = True self.idle = True self.total_tasks = 0 self.failed_tasks = 0 self.pre_warm = pre_warm self.last_activity_ts = 0.0 def log(self, message: str): """日志输出""" if self.log_callback: self.log_callback(f"[Worker-{self.worker_id}] {message}") else: print(f"[截图池][Worker-{self.worker_id}] {message}") def _create_browser(self): """创建截图执行环境(逻辑占位,无需真实浏览器)""" created_at = time.time() self.browser_instance = { 'created_at': created_at, 'use_count': 0, 'worker_id': self.worker_id, } self.last_activity_ts = created_at self.log("截图执行环境就绪") return True def _close_browser(self): """关闭截图执行环境""" if self.browser_instance: self.log(f"执行环境已释放(共处理{self.browser_instance.get('use_count', 0)}个任务)") self.browser_instance = None def _check_browser_health(self) -> bool: """检查执行环境是否就绪""" return bool(self.browser_instance) def _ensure_browser(self) -> bool: """确保执行环境可用""" if self._check_browser_health(): return True self.log("执行环境不可用,尝试重新创建...") self._close_browser() return self._create_browser() def run(self): """工作线程主循环 - 按需启动执行环境模式""" if self.pre_warm: self.log("Worker启动(预热模式,启动即准备执行环境)") else: self.log("Worker启动(按需模式,等待任务时不占用资源)") if self.pre_warm and not self.browser_instance: self._create_browser() self.pre_warm = False while self.running: try: # 允许运行中触发预热(例如池在初始化后调用 warmup) if self.pre_warm and not self.browser_instance: self._create_browser() self.pre_warm = False # 从队列获取任务(带超时,以便能响应停止信号和空闲检查) self.idle = True try: task = self.task_queue.get(timeout=TASK_QUEUE_TIMEOUT) except queue.Empty: # 检查是否需要释放空闲的执行环境 if self.browser_instance and self.last_activity_ts > 0: idle_time = time.time() - self.last_activity_ts if idle_time > BROWSER_IDLE_TIMEOUT: self.log(f"空闲{int(idle_time)}秒,释放执行环境") self._close_browser() continue self.idle = False if task is None: # None作为停止信号 self.log("收到停止信号") break # 按需创建或确保执行环境可用 browser_ready = False for attempt in range(2): if self._ensure_browser(): browser_ready = True break if attempt < 1: self.log("执行环境创建失败,重试...") time.sleep(0.5) if not browser_ready: retry_count = int(task.get("retry_count", 0) or 0) if isinstance(task, dict) else 0 if retry_count < 1 and isinstance(task, dict): task["retry_count"] = retry_count + 1 try: self.task_queue.put(task, timeout=1) self.log("执行环境不可用,任务重新入队") except queue.Full: self.log("任务队列已满,无法重新入队,任务失败") callback = task.get("callback") if callable(callback): callback(None, "执行环境不可用") self.total_tasks += 1 self.failed_tasks += 1 continue self.log("执行环境不可用,任务失败") callback = task.get("callback") if isinstance(task, dict) else None if callable(callback): callback(None, "执行环境不可用") self.total_tasks += 1 self.failed_tasks += 1 continue # 执行任务 task_func = task.get('func') task_args = task.get('args', ()) task_kwargs = task.get('kwargs', {}) callback = task.get('callback') self.total_tasks += 1 self.browser_instance['use_count'] += 1 self.log(f"开始执行任务(第{self.browser_instance['use_count']}次执行)") try: # 将执行环境实例传递给任务函数 result = task_func(self.browser_instance, *task_args, **task_kwargs) callback(result, None) self.log(f"任务执行成功") self.last_activity_ts = time.time() except Exception as e: self.log(f"任务执行失败: {e}") callback(None, str(e)) self.failed_tasks += 1 self.last_activity_ts = time.time() # 任务失败后,检查执行环境健康 if not self._check_browser_health(): self.log("任务失败导致执行环境异常,将在下次任务前重建") self._close_browser() # 定期重启执行环境,释放可能累积的资源 if self.browser_instance and BROWSER_MAX_USE_COUNT > 0: if self.browser_instance.get('use_count', 0) >= BROWSER_MAX_USE_COUNT: self.log(f"执行环境已复用{self.browser_instance['use_count']}次,重启释放资源") self._close_browser() except Exception as e: self.log(f"Worker出错: {e}") time.sleep(1) # 清理资源 self._close_browser() self.log(f"Worker停止(总任务:{self.total_tasks}, 失败:{self.failed_tasks})") def stop(self): """停止worker""" self.running = False class BrowserWorkerPool: """截图工作线程池""" def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None): self.pool_size = pool_size self.log_callback = log_callback maxsize = TASK_QUEUE_MAXSIZE if TASK_QUEUE_MAXSIZE > 0 else 0 self.task_queue = queue.Queue(maxsize=maxsize) self.workers = [] self.initialized = False self.lock = threading.Lock() def log(self, message: str): """日志输出""" if self.log_callback: self.log_callback(message) else: print(f"[截图池] {message}") def initialize(self): """初始化工作线程池(按需模式,默认预热1个执行环境)""" with self.lock: if self.initialized: return self.log(f"正在初始化截图线程池({self.pool_size}个worker,按需启动执行环境)...") for i in range(self.pool_size): worker = BrowserWorker( worker_id=i + 1, task_queue=self.task_queue, log_callback=self.log_callback, pre_warm=(i < 1), ) worker.start() self.workers.append(worker) self.initialized = True self.log(f"✓ 截图线程池初始化完成({self.pool_size}个worker就绪,执行环境将在有任务时按需启动)") # 初始化完成后,默认预热1个执行环境,降低容器重启后前几批任务的冷启动开销 self.warmup(1) def warmup(self, count: int = 1) -> int: """预热截图线程池 - 预创建指定数量的执行环境""" if count <= 0: return 0 if not self.initialized: self.log("警告:线程池未初始化,无法预热") return 0 with self.lock: target_workers = list(self.workers[: min(count, len(self.workers))]) self.log(f"预热截图线程池(预创建{len(target_workers)}个执行环境)...") for worker in target_workers: if not worker.browser_instance: worker.pre_warm = True # 等待预热完成(最多等待20秒,避免阻塞过久) deadline = time.time() + 20 while time.time() < deadline: warmed = sum(1 for w in target_workers if w.browser_instance) if warmed >= len(target_workers): break time.sleep(0.1) warmed = sum(1 for w in target_workers if w.browser_instance) self.log(f"✓ 截图线程池预热完成({warmed}个执行环境就绪)") return warmed def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool: """ 提交任务到队列 Args: task_func: 任务函数,签名为 func(browser_instance, *args, **kwargs) callback: 回调函数,签名为 callback(result, error) *args, **kwargs: 传递给task_func的参数 Returns: 是否成功提交 """ if not self.initialized: self.log("警告:线程池未初始化") return False task = { 'func': task_func, 'args': args, 'kwargs': kwargs, 'callback': callback, 'retry_count': 0, } try: self.task_queue.put(task, timeout=1) return True except queue.Full: self.log(f"警告:任务队列已满(maxsize={self.task_queue.maxsize}),拒绝提交任务") return False def get_stats(self) -> Dict[str, Any]: """获取线程池统计信息""" workers = list(self.workers or []) idle_count = sum(1 for w in workers if getattr(w, "idle", False)) total_tasks = sum(int(getattr(w, "total_tasks", 0) or 0) for w in workers) failed_tasks = sum(int(getattr(w, "failed_tasks", 0) or 0) for w in workers) worker_details = [] for w in workers: browser_instance = getattr(w, "browser_instance", None) browser_use_count = 0 browser_created_at = None if isinstance(browser_instance, dict): browser_use_count = int(browser_instance.get("use_count", 0) or 0) browser_created_at = browser_instance.get("created_at") worker_details.append( { "worker_id": getattr(w, "worker_id", None), "idle": bool(getattr(w, "idle", False)), "has_browser": bool(browser_instance), "total_tasks": int(getattr(w, "total_tasks", 0) or 0), "failed_tasks": int(getattr(w, "failed_tasks", 0) or 0), "browser_use_count": browser_use_count, "browser_created_at": browser_created_at, "last_active_ts": float(getattr(w, "last_activity_ts", 0) or 0), "thread_alive": bool(getattr(w, "is_alive", lambda: False)()), } ) return { 'pool_size': self.pool_size, 'idle_workers': idle_count, 'busy_workers': max(0, len(workers) - idle_count), 'queue_size': self.task_queue.qsize(), 'total_tasks': total_tasks, 'failed_tasks': failed_tasks, 'success_rate': f"{(total_tasks - failed_tasks) / total_tasks * 100:.1f}%" if total_tasks > 0 else "N/A", 'workers': worker_details, 'timestamp': time.time(), } def wait_for_completion(self, timeout: Optional[float] = None): """等待所有任务完成""" start_time = time.time() while not self.task_queue.empty(): if timeout and (time.time() - start_time) > timeout: self.log("等待超时") return False time.sleep(0.5) # 再等待一下确保正在执行的任务完成 time.sleep(2) return True def shutdown(self): """关闭线程池""" self.log("正在关闭工作线程池...") # 发送停止信号 for _ in self.workers: self.task_queue.put(None) # 等待所有worker停止 for worker in self.workers: worker.join(timeout=10) self.workers.clear() self.initialized = False self.log("✓ 工作线程池已关闭") # 全局实例 _global_pool: Optional[BrowserWorkerPool] = None _pool_lock = threading.Lock() def get_browser_worker_pool(pool_size: int = 3, log_callback: Optional[Callable] = None) -> BrowserWorkerPool: """获取全局截图工作线程池(单例)""" global _global_pool with _pool_lock: if _global_pool is None: _global_pool = BrowserWorkerPool(pool_size=pool_size, log_callback=log_callback) _global_pool.initialize() return _global_pool def init_browser_worker_pool(pool_size: int = 3, log_callback: Optional[Callable] = None): """初始化全局截图工作线程池""" get_browser_worker_pool(pool_size=pool_size, log_callback=log_callback) def _shutdown_pool_when_idle(pool: BrowserWorkerPool) -> None: try: pool.wait_for_completion(timeout=60) except Exception: pass try: pool.shutdown() except Exception: pass def resize_browser_worker_pool(pool_size: int, log_callback: Optional[Callable] = None) -> bool: """调整截图线程池并发(新任务走新池,旧池空闲后自动关闭)""" global _global_pool try: target_size = max(1, int(pool_size)) except Exception: target_size = 1 with _pool_lock: old_pool = _global_pool if old_pool and int(getattr(old_pool, "pool_size", 0) or 0) == target_size: return False effective_log_callback = log_callback or (getattr(old_pool, "log_callback", None) if old_pool else None) _global_pool = BrowserWorkerPool(pool_size=target_size, log_callback=effective_log_callback) _global_pool.initialize() if old_pool: threading.Thread(target=_shutdown_pool_when_idle, args=(old_pool,), daemon=True).start() return True def shutdown_browser_worker_pool(): """关闭全局截图工作线程池""" global _global_pool with _pool_lock: if _global_pool: _global_pool.shutdown() _global_pool = None if __name__ == '__main__': # 测试代码 print("测试截图工作线程池...") def test_task(browser_instance, url: str, task_id: int): """测试任务:访问URL""" print(f"[Task-{task_id}] 开始访问: {url}") time.sleep(2) # 模拟截图耗时 return {'task_id': task_id, 'url': url, 'status': 'success'} def test_callback(result, error): """测试回调""" if error: print(f"任务失败: {error}") else: print(f"任务成功: {result}") # 创建线程池(2个worker) pool = BrowserWorkerPool(pool_size=2) pool.initialize() # 提交4个任务 for i in range(4): pool.submit_task(test_task, test_callback, f"https://example.com/{i}", i + 1) print("\n任务已提交,等待完成...") pool.wait_for_completion() print("\n统计信息:", pool.get_stats()) # 关闭线程池 pool.shutdown() print("\n测试完成!")