Files
zsglpt/browser_pool_worker.py
Yu Yon b5344cd55e 修复所有bug并添加新功能
- 修复添加账号按钮无反应问题
- 添加账号备注字段(可选)
- 添加账号设置按钮(修改密码/备注)
- 修复用户反馈���能
- 添加定时任务执行日志
- 修复容器重启后账号加载问题
- 修复所有JavaScript语法错误
- 优化账号加载机制(4层保障)

🤖 Generated with Claude Code
2025-12-10 11:19:16 +08:00

348 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""浏览器池管理 - 工作线程池模式(真正的浏览器复用)"""
import threading
import queue
import time
from typing import Callable, Optional, Dict, Any
import nest_asyncio
nest_asyncio.apply()
class BrowserWorker(threading.Thread):
"""浏览器工作线程 - 每个worker维护自己的浏览器"""
def __init__(self, worker_id: int, task_queue: queue.Queue, log_callback: Optional[Callable] = None):
super().__init__(daemon=True)
self.worker_id = worker_id
self.task_queue = task_queue
self.log_callback = log_callback
self.browser_instance = None
self.running = True
self.idle = True
self.total_tasks = 0
self.failed_tasks = 0
def log(self, message: str):
"""日志输出"""
if self.log_callback:
self.log_callback(f"[Worker-{self.worker_id}] {message}")
else:
print(f"[浏览器池][Worker-{self.worker_id}] {message}")
def _create_browser(self):
"""创建浏览器实例"""
try:
from playwright.sync_api import sync_playwright
self.log("正在创建浏览器...")
playwright = sync_playwright().start()
browser = playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
]
)
self.browser_instance = {
'playwright': playwright,
'browser': browser,
'created_at': time.time(),
'use_count': 0,
'worker_id': self.worker_id
}
self.log(f"浏览器创建成功")
return True
except Exception as e:
self.log(f"创建浏览器失败: {e}")
return False
def _close_browser(self):
"""关闭浏览器"""
if self.browser_instance:
try:
self.log("正在关闭浏览器...")
if self.browser_instance['browser']:
self.browser_instance['browser'].close()
if self.browser_instance['playwright']:
self.browser_instance['playwright'].stop()
self.log(f"浏览器已关闭(共处理{self.browser_instance['use_count']}个任务)")
except Exception as e:
self.log(f"关闭浏览器时出错: {e}")
finally:
self.browser_instance = None
def _check_browser_health(self) -> bool:
"""检查浏览器是否健康"""
if not self.browser_instance:
return False
try:
return self.browser_instance['browser'].is_connected()
except:
return False
def _ensure_browser(self) -> bool:
"""确保浏览器可用(如果不可用则重新创建)"""
if self._check_browser_health():
return True
# 浏览器不可用,尝试重新创建
self.log("浏览器不可用,尝试重新创建...")
self._close_browser()
return self._create_browser()
def run(self):
"""工作线程主循环"""
self.log("Worker启动")
# 初始创建浏览器
if not self._create_browser():
self.log("初始浏览器创建失败Worker退出")
return
while self.running:
try:
# 从队列获取任务(带超时,以便能响应停止信号)
self.idle = True
task = self.task_queue.get(timeout=1)
self.idle = False
if task is None: # None作为停止信号
self.log("收到停止信号")
break
# 确保浏览器可用
if not self._ensure_browser():
self.log("浏览器不可用,任务失败")
task['callback'](None, "浏览器不可用")
self.failed_tasks += 1
continue
# 执行任务
task_func = task.get('func')
task_args = task.get('args', ())
task_kwargs = task.get('kwargs', {})
callback = task.get('callback')
self.total_tasks += 1
self.browser_instance['use_count'] += 1
self.log(f"开始执行任务(第{self.browser_instance['use_count']}次使用浏览器)")
try:
# 将浏览器实例传递给任务函数
result = task_func(self.browser_instance, *task_args, **task_kwargs)
callback(result, None)
self.log(f"任务执行成功")
except Exception as e:
self.log(f"任务执行失败: {e}")
callback(None, str(e))
self.failed_tasks += 1
# 任务失败后,检查浏览器健康
if not self._check_browser_health():
self.log("任务失败导致浏览器异常,将在下次任务前重建")
self._close_browser()
except queue.Empty:
# 队列为空,继续等待
continue
except Exception as e:
self.log(f"Worker出错: {e}")
time.sleep(1)
# 清理资源
self._close_browser()
self.log(f"Worker停止总任务:{self.total_tasks}, 失败:{self.failed_tasks}")
def stop(self):
"""停止worker"""
self.running = False
class BrowserWorkerPool:
"""浏览器工作线程池"""
def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None):
self.pool_size = pool_size
self.log_callback = log_callback
self.task_queue = queue.Queue()
self.workers = []
self.initialized = False
self.lock = threading.Lock()
def log(self, message: str):
"""日志输出"""
if self.log_callback:
self.log_callback(message)
else:
print(f"[浏览器池] {message}")
def initialize(self):
"""初始化工作线程池"""
with self.lock:
if self.initialized:
return
self.log(f"正在初始化工作线程池({self.pool_size}个worker...")
for i in range(self.pool_size):
worker = BrowserWorker(
worker_id=i + 1,
task_queue=self.task_queue,
log_callback=self.log_callback
)
worker.start()
self.workers.append(worker)
# 等待所有worker准备就绪
time.sleep(2)
self.initialized = True
self.log(f"✓ 工作线程池初始化完成({self.pool_size}个worker已就绪")
def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool:
"""
提交任务到队列
Args:
task_func: 任务函数,签名为 func(browser_instance, *args, **kwargs)
callback: 回调函数,签名为 callback(result, error)
*args, **kwargs: 传递给task_func的参数
Returns:
是否成功提交
"""
if not self.initialized:
self.log("警告:线程池未初始化")
return False
task = {
'func': task_func,
'args': args,
'kwargs': kwargs,
'callback': callback
}
self.task_queue.put(task)
return True
def get_stats(self) -> Dict[str, Any]:
"""获取线程池统计信息"""
idle_count = sum(1 for w in self.workers if w.idle)
total_tasks = sum(w.total_tasks for w in self.workers)
failed_tasks = sum(w.failed_tasks for w in self.workers)
return {
'pool_size': self.pool_size,
'idle_workers': idle_count,
'busy_workers': self.pool_size - idle_count,
'queue_size': self.task_queue.qsize(),
'total_tasks': total_tasks,
'failed_tasks': failed_tasks,
'success_rate': f"{(total_tasks - failed_tasks) / total_tasks * 100:.1f}%" if total_tasks > 0 else "N/A"
}
def wait_for_completion(self, timeout: Optional[float] = None):
"""等待所有任务完成"""
start_time = time.time()
while not self.task_queue.empty():
if timeout and (time.time() - start_time) > timeout:
self.log("等待超时")
return False
time.sleep(0.5)
# 再等待一下确保正在执行的任务完成
time.sleep(2)
return True
def shutdown(self):
"""关闭线程池"""
self.log("正在关闭工作线程池...")
# 发送停止信号
for _ in self.workers:
self.task_queue.put(None)
# 等待所有worker停止
for worker in self.workers:
worker.join(timeout=10)
self.workers.clear()
self.initialized = False
self.log("✓ 工作线程池已关闭")
# 全局实例
_global_pool: Optional[BrowserWorkerPool] = None
_pool_lock = threading.Lock()
def get_browser_worker_pool(pool_size: int = 3, log_callback: Optional[Callable] = None) -> BrowserWorkerPool:
"""获取全局浏览器工作线程池(单例)"""
global _global_pool
with _pool_lock:
if _global_pool is None:
_global_pool = BrowserWorkerPool(pool_size=pool_size, log_callback=log_callback)
_global_pool.initialize()
return _global_pool
def init_browser_worker_pool(pool_size: int = 3, log_callback: Optional[Callable] = None):
"""初始化全局浏览器工作线程池"""
get_browser_worker_pool(pool_size=pool_size, log_callback=log_callback)
def shutdown_browser_worker_pool():
"""关闭全局浏览器工作线程池"""
global _global_pool
with _pool_lock:
if _global_pool:
_global_pool.shutdown()
_global_pool = None
if __name__ == '__main__':
# 测试代码
print("测试浏览器工作线程池...")
def test_task(browser_instance, url: str, task_id: int):
"""测试任务访问URL"""
print(f"[Task-{task_id}] 开始访问: {url}")
time.sleep(2) # 模拟截图耗时
return {'task_id': task_id, 'url': url, 'status': 'success'}
def test_callback(result, error):
"""测试回调"""
if error:
print(f"任务失败: {error}")
else:
print(f"任务成功: {result}")
# 创建线程池2个worker
pool = BrowserWorkerPool(pool_size=2)
pool.initialize()
# 提交4个任务
for i in range(4):
pool.submit_task(test_task, test_callback, f"https://example.com/{i}", i + 1)
print("\n任务已提交,等待完成...")
pool.wait_for_completion()
print("\n统计信息:", pool.get_stats())
# 关闭线程池
pool.shutdown()
print("\n测试完成!")