471 lines
17 KiB
Python
Executable File
471 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""截图线程池管理 - 工作线程池模式(并发执行截图任务)"""
|
||
|
||
import os
|
||
import threading
|
||
import queue
|
||
import time
|
||
from typing import Callable, Optional, Dict, Any
|
||
|
||
# 安全修复: 将魔法数字提取为可配置常量
|
||
BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒),默认5分钟
|
||
TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒)
|
||
TASK_QUEUE_MAXSIZE = int(os.environ.get('BROWSER_TASK_QUEUE_MAXSIZE', '200')) # 队列最大长度(0表示无限制)
|
||
BROWSER_MAX_USE_COUNT = int(os.environ.get('BROWSER_MAX_USE_COUNT', '0')) # 每个执行环境最大复用次数(0表示不限制)
|
||
|
||
|
||
class BrowserWorker(threading.Thread):
|
||
"""截图工作线程 - 每个worker维护自己的执行环境"""
|
||
|
||
def __init__(
|
||
self,
|
||
worker_id: int,
|
||
task_queue: queue.Queue,
|
||
log_callback: Optional[Callable] = None,
|
||
pre_warm: bool = False,
|
||
):
|
||
super().__init__(daemon=True)
|
||
self.worker_id = worker_id
|
||
self.task_queue = task_queue
|
||
self.log_callback = log_callback
|
||
self.browser_instance = None
|
||
self.running = True
|
||
self.idle = True
|
||
self.total_tasks = 0
|
||
self.failed_tasks = 0
|
||
self.pre_warm = pre_warm
|
||
self.last_activity_ts = 0.0
|
||
|
||
def log(self, message: str):
|
||
"""日志输出"""
|
||
if self.log_callback:
|
||
self.log_callback(f"[Worker-{self.worker_id}] {message}")
|
||
else:
|
||
print(f"[截图池][Worker-{self.worker_id}] {message}")
|
||
|
||
def _create_browser(self):
|
||
"""创建截图执行环境(逻辑占位,无需真实浏览器)"""
|
||
created_at = time.time()
|
||
self.browser_instance = {
|
||
'created_at': created_at,
|
||
'use_count': 0,
|
||
'worker_id': self.worker_id,
|
||
}
|
||
self.last_activity_ts = created_at
|
||
self.log("截图执行环境就绪")
|
||
return True
|
||
|
||
def _close_browser(self):
|
||
"""关闭截图执行环境"""
|
||
if self.browser_instance:
|
||
self.log(f"执行环境已释放(共处理{self.browser_instance.get('use_count', 0)}个任务)")
|
||
self.browser_instance = None
|
||
|
||
def _check_browser_health(self) -> bool:
|
||
"""检查执行环境是否就绪"""
|
||
return bool(self.browser_instance)
|
||
|
||
def _ensure_browser(self) -> bool:
|
||
"""确保执行环境可用"""
|
||
if self._check_browser_health():
|
||
return True
|
||
self.log("执行环境不可用,尝试重新创建...")
|
||
self._close_browser()
|
||
return self._create_browser()
|
||
|
||
def run(self):
|
||
"""工作线程主循环 - 按需启动执行环境模式"""
|
||
if self.pre_warm:
|
||
self.log("Worker启动(预热模式,启动即准备执行环境)")
|
||
else:
|
||
self.log("Worker启动(按需模式,等待任务时不占用资源)")
|
||
|
||
if self.pre_warm and not self.browser_instance:
|
||
self._create_browser()
|
||
self.pre_warm = False
|
||
|
||
while self.running:
|
||
try:
|
||
# 允许运行中触发预热(例如池在初始化后调用 warmup)
|
||
if self.pre_warm and not self.browser_instance:
|
||
self._create_browser()
|
||
self.pre_warm = False
|
||
|
||
# 从队列获取任务(带超时,以便能响应停止信号和空闲检查)
|
||
self.idle = True
|
||
try:
|
||
task = self.task_queue.get(timeout=TASK_QUEUE_TIMEOUT)
|
||
except queue.Empty:
|
||
# 检查是否需要释放空闲的执行环境
|
||
if self.browser_instance and self.last_activity_ts > 0:
|
||
idle_time = time.time() - self.last_activity_ts
|
||
if idle_time > BROWSER_IDLE_TIMEOUT:
|
||
self.log(f"空闲{int(idle_time)}秒,释放执行环境")
|
||
self._close_browser()
|
||
continue
|
||
|
||
self.idle = False
|
||
|
||
if task is None: # None作为停止信号
|
||
self.log("收到停止信号")
|
||
break
|
||
|
||
# 按需创建或确保执行环境可用
|
||
browser_ready = False
|
||
for attempt in range(2):
|
||
if self._ensure_browser():
|
||
browser_ready = True
|
||
break
|
||
if attempt < 1:
|
||
self.log("执行环境创建失败,重试...")
|
||
time.sleep(0.5)
|
||
|
||
if not browser_ready:
|
||
retry_count = int(task.get("retry_count", 0) or 0) if isinstance(task, dict) else 0
|
||
if retry_count < 1 and isinstance(task, dict):
|
||
task["retry_count"] = retry_count + 1
|
||
try:
|
||
self.task_queue.put(task, timeout=1)
|
||
self.log("执行环境不可用,任务重新入队")
|
||
except queue.Full:
|
||
self.log("任务队列已满,无法重新入队,任务失败")
|
||
callback = task.get("callback")
|
||
if callable(callback):
|
||
callback(None, "执行环境不可用")
|
||
self.total_tasks += 1
|
||
self.failed_tasks += 1
|
||
continue
|
||
|
||
self.log("执行环境不可用,任务失败")
|
||
callback = task.get("callback") if isinstance(task, dict) else None
|
||
if callable(callback):
|
||
callback(None, "执行环境不可用")
|
||
self.total_tasks += 1
|
||
self.failed_tasks += 1
|
||
continue
|
||
|
||
# 执行任务
|
||
task_func = task.get('func')
|
||
task_args = task.get('args', ())
|
||
task_kwargs = task.get('kwargs', {})
|
||
callback = task.get('callback')
|
||
|
||
self.total_tasks += 1
|
||
self.browser_instance['use_count'] += 1
|
||
|
||
self.log(f"开始执行任务(第{self.browser_instance['use_count']}次执行)")
|
||
|
||
try:
|
||
# 将执行环境实例传递给任务函数
|
||
result = task_func(self.browser_instance, *task_args, **task_kwargs)
|
||
callback(result, None)
|
||
self.log(f"任务执行成功")
|
||
self.last_activity_ts = time.time()
|
||
|
||
except Exception as e:
|
||
self.log(f"任务执行失败: {e}")
|
||
callback(None, str(e))
|
||
self.failed_tasks += 1
|
||
self.last_activity_ts = time.time()
|
||
|
||
# 任务失败后,检查执行环境健康
|
||
if not self._check_browser_health():
|
||
self.log("任务失败导致执行环境异常,将在下次任务前重建")
|
||
self._close_browser()
|
||
|
||
# 定期重启执行环境,释放可能累积的资源
|
||
if self.browser_instance and BROWSER_MAX_USE_COUNT > 0:
|
||
if self.browser_instance.get('use_count', 0) >= BROWSER_MAX_USE_COUNT:
|
||
self.log(f"执行环境已复用{self.browser_instance['use_count']}次,重启释放资源")
|
||
self._close_browser()
|
||
|
||
except Exception as e:
|
||
self.log(f"Worker出错: {e}")
|
||
time.sleep(1)
|
||
|
||
# 清理资源
|
||
self._close_browser()
|
||
self.log(f"Worker停止(总任务:{self.total_tasks}, 失败:{self.failed_tasks})")
|
||
|
||
def stop(self):
|
||
"""停止worker"""
|
||
self.running = False
|
||
|
||
|
||
class BrowserWorkerPool:
|
||
"""截图工作线程池"""
|
||
|
||
def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None):
|
||
self.pool_size = pool_size
|
||
self.log_callback = log_callback
|
||
maxsize = TASK_QUEUE_MAXSIZE if TASK_QUEUE_MAXSIZE > 0 else 0
|
||
self.task_queue = queue.Queue(maxsize=maxsize)
|
||
self.workers = []
|
||
self.initialized = False
|
||
self.lock = threading.Lock()
|
||
|
||
def log(self, message: str):
|
||
"""日志输出"""
|
||
if self.log_callback:
|
||
self.log_callback(message)
|
||
else:
|
||
print(f"[截图池] {message}")
|
||
|
||
def initialize(self):
|
||
"""初始化工作线程池(按需模式,默认预热1个执行环境)"""
|
||
with self.lock:
|
||
if self.initialized:
|
||
return
|
||
|
||
self.log(f"正在初始化截图线程池({self.pool_size}个worker,按需启动执行环境)...")
|
||
|
||
for i in range(self.pool_size):
|
||
worker = BrowserWorker(
|
||
worker_id=i + 1,
|
||
task_queue=self.task_queue,
|
||
log_callback=self.log_callback,
|
||
pre_warm=(i < 1),
|
||
)
|
||
worker.start()
|
||
self.workers.append(worker)
|
||
|
||
self.initialized = True
|
||
self.log(f"✓ 截图线程池初始化完成({self.pool_size}个worker就绪,执行环境将在有任务时按需启动)")
|
||
|
||
# 初始化完成后,默认预热1个执行环境,降低容器重启后前几批任务的冷启动开销
|
||
self.warmup(1)
|
||
|
||
def warmup(self, count: int = 1) -> int:
|
||
"""预热截图线程池 - 预创建指定数量的执行环境"""
|
||
if count <= 0:
|
||
return 0
|
||
|
||
if not self.initialized:
|
||
self.log("警告:线程池未初始化,无法预热")
|
||
return 0
|
||
|
||
with self.lock:
|
||
target_workers = list(self.workers[: min(count, len(self.workers))])
|
||
|
||
self.log(f"预热截图线程池(预创建{len(target_workers)}个执行环境)...")
|
||
|
||
for worker in target_workers:
|
||
if not worker.browser_instance:
|
||
worker.pre_warm = True
|
||
|
||
# 等待预热完成(最多等待20秒,避免阻塞过久)
|
||
deadline = time.time() + 20
|
||
while time.time() < deadline:
|
||
warmed = sum(1 for w in target_workers if w.browser_instance)
|
||
if warmed >= len(target_workers):
|
||
break
|
||
time.sleep(0.1)
|
||
|
||
warmed = sum(1 for w in target_workers if w.browser_instance)
|
||
self.log(f"✓ 截图线程池预热完成({warmed}个执行环境就绪)")
|
||
return warmed
|
||
|
||
def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool:
|
||
"""
|
||
提交任务到队列
|
||
|
||
Args:
|
||
task_func: 任务函数,签名为 func(browser_instance, *args, **kwargs)
|
||
callback: 回调函数,签名为 callback(result, error)
|
||
*args, **kwargs: 传递给task_func的参数
|
||
|
||
Returns:
|
||
是否成功提交
|
||
"""
|
||
if not self.initialized:
|
||
self.log("警告:线程池未初始化")
|
||
return False
|
||
|
||
task = {
|
||
'func': task_func,
|
||
'args': args,
|
||
'kwargs': kwargs,
|
||
'callback': callback,
|
||
'retry_count': 0,
|
||
}
|
||
|
||
try:
|
||
self.task_queue.put(task, timeout=1)
|
||
return True
|
||
except queue.Full:
|
||
self.log(f"警告:任务队列已满(maxsize={self.task_queue.maxsize}),拒绝提交任务")
|
||
return False
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""获取线程池统计信息"""
|
||
workers = list(self.workers or [])
|
||
idle_count = sum(1 for w in workers if getattr(w, "idle", False))
|
||
total_tasks = sum(int(getattr(w, "total_tasks", 0) or 0) for w in workers)
|
||
failed_tasks = sum(int(getattr(w, "failed_tasks", 0) or 0) for w in workers)
|
||
|
||
worker_details = []
|
||
for w in workers:
|
||
browser_instance = getattr(w, "browser_instance", None)
|
||
browser_use_count = 0
|
||
browser_created_at = None
|
||
if isinstance(browser_instance, dict):
|
||
browser_use_count = int(browser_instance.get("use_count", 0) or 0)
|
||
browser_created_at = browser_instance.get("created_at")
|
||
|
||
worker_details.append(
|
||
{
|
||
"worker_id": getattr(w, "worker_id", None),
|
||
"idle": bool(getattr(w, "idle", False)),
|
||
"has_browser": bool(browser_instance),
|
||
"total_tasks": int(getattr(w, "total_tasks", 0) or 0),
|
||
"failed_tasks": int(getattr(w, "failed_tasks", 0) or 0),
|
||
"browser_use_count": browser_use_count,
|
||
"browser_created_at": browser_created_at,
|
||
"last_active_ts": float(getattr(w, "last_activity_ts", 0) or 0),
|
||
"thread_alive": bool(getattr(w, "is_alive", lambda: False)()),
|
||
}
|
||
)
|
||
|
||
return {
|
||
'pool_size': self.pool_size,
|
||
'idle_workers': idle_count,
|
||
'busy_workers': max(0, len(workers) - idle_count),
|
||
'queue_size': self.task_queue.qsize(),
|
||
'total_tasks': total_tasks,
|
||
'failed_tasks': failed_tasks,
|
||
'success_rate': f"{(total_tasks - failed_tasks) / total_tasks * 100:.1f}%" if total_tasks > 0 else "N/A",
|
||
'workers': worker_details,
|
||
'timestamp': time.time(),
|
||
}
|
||
|
||
def wait_for_completion(self, timeout: Optional[float] = None):
|
||
"""等待所有任务完成"""
|
||
start_time = time.time()
|
||
while not self.task_queue.empty():
|
||
if timeout and (time.time() - start_time) > timeout:
|
||
self.log("等待超时")
|
||
return False
|
||
time.sleep(0.5)
|
||
|
||
# 再等待一下确保正在执行的任务完成
|
||
time.sleep(2)
|
||
return True
|
||
|
||
def shutdown(self):
|
||
"""关闭线程池"""
|
||
self.log("正在关闭工作线程池...")
|
||
|
||
# 发送停止信号
|
||
for _ in self.workers:
|
||
self.task_queue.put(None)
|
||
|
||
# 等待所有worker停止
|
||
for worker in self.workers:
|
||
worker.join(timeout=10)
|
||
|
||
self.workers.clear()
|
||
self.initialized = False
|
||
self.log("✓ 工作线程池已关闭")
|
||
|
||
|
||
# 全局实例
|
||
_global_pool: Optional[BrowserWorkerPool] = None
|
||
_pool_lock = threading.Lock()
|
||
|
||
|
||
def get_browser_worker_pool(pool_size: int = 3, log_callback: Optional[Callable] = None) -> BrowserWorkerPool:
|
||
"""获取全局截图工作线程池(单例)"""
|
||
global _global_pool
|
||
|
||
with _pool_lock:
|
||
if _global_pool is None:
|
||
_global_pool = BrowserWorkerPool(pool_size=pool_size, log_callback=log_callback)
|
||
_global_pool.initialize()
|
||
|
||
return _global_pool
|
||
|
||
|
||
def init_browser_worker_pool(pool_size: int = 3, log_callback: Optional[Callable] = None):
|
||
"""初始化全局截图工作线程池"""
|
||
get_browser_worker_pool(pool_size=pool_size, log_callback=log_callback)
|
||
|
||
|
||
def _shutdown_pool_when_idle(pool: BrowserWorkerPool) -> None:
|
||
try:
|
||
pool.wait_for_completion(timeout=60)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
pool.shutdown()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def resize_browser_worker_pool(pool_size: int, log_callback: Optional[Callable] = None) -> bool:
|
||
"""调整截图线程池并发(新任务走新池,旧池空闲后自动关闭)"""
|
||
global _global_pool
|
||
|
||
try:
|
||
target_size = max(1, int(pool_size))
|
||
except Exception:
|
||
target_size = 1
|
||
|
||
with _pool_lock:
|
||
old_pool = _global_pool
|
||
if old_pool and int(getattr(old_pool, "pool_size", 0) or 0) == target_size:
|
||
return False
|
||
effective_log_callback = log_callback or (getattr(old_pool, "log_callback", None) if old_pool else None)
|
||
_global_pool = BrowserWorkerPool(pool_size=target_size, log_callback=effective_log_callback)
|
||
_global_pool.initialize()
|
||
|
||
if old_pool:
|
||
threading.Thread(target=_shutdown_pool_when_idle, args=(old_pool,), daemon=True).start()
|
||
|
||
return True
|
||
|
||
|
||
def shutdown_browser_worker_pool():
|
||
"""关闭全局截图工作线程池"""
|
||
global _global_pool
|
||
|
||
with _pool_lock:
|
||
if _global_pool:
|
||
_global_pool.shutdown()
|
||
_global_pool = None
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 测试代码
|
||
print("测试截图工作线程池...")
|
||
|
||
def test_task(browser_instance, url: str, task_id: int):
|
||
"""测试任务:访问URL"""
|
||
print(f"[Task-{task_id}] 开始访问: {url}")
|
||
time.sleep(2) # 模拟截图耗时
|
||
return {'task_id': task_id, 'url': url, 'status': 'success'}
|
||
|
||
def test_callback(result, error):
|
||
"""测试回调"""
|
||
if error:
|
||
print(f"任务失败: {error}")
|
||
else:
|
||
print(f"任务成功: {result}")
|
||
|
||
# 创建线程池(2个worker)
|
||
pool = BrowserWorkerPool(pool_size=2)
|
||
pool.initialize()
|
||
|
||
# 提交4个任务
|
||
for i in range(4):
|
||
pool.submit_task(test_task, test_callback, f"https://example.com/{i}", i + 1)
|
||
|
||
print("\n任务已提交,等待完成...")
|
||
pool.wait_for_completion()
|
||
|
||
print("\n统计信息:", pool.get_stats())
|
||
|
||
# 关闭线程池
|
||
pool.shutdown()
|
||
print("\n测试完成!")
|