fix: 修复浏览器池任务丢失和统计错误 bug
问题: 1. 当浏览器创建失败时,failed_tasks 增加但 total_tasks 不增加 导致统计显示 "0/5" 这种不合理数据 2. 浏览器创建失败时任务直接丢失,没有重新分配给其他 Worker 修复: - 添加本地浏览器创建重试(最多2次) - 失败任务根据 retry_count 决定是否重新入队 - retry_count < 1 时重新入队让其他 Worker 处理 - retry_count >= 1 时才真正失败并计入统计 - 任务字典新增 retry_count 字段初始化为 0 - 添加回归测试用例 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -170,9 +170,36 @@ class BrowserWorker(threading.Thread):
|
|||||||
break
|
break
|
||||||
|
|
||||||
# 按需创建或确保浏览器可用
|
# 按需创建或确保浏览器可用
|
||||||
if not self._ensure_browser():
|
browser_ready = False
|
||||||
|
for attempt in range(2):
|
||||||
|
if self._ensure_browser():
|
||||||
|
browser_ready = True
|
||||||
|
break
|
||||||
|
if attempt < 1:
|
||||||
|
self.log("浏览器创建失败,重试...")
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
if not browser_ready:
|
||||||
|
retry_count = int(task.get("retry_count", 0) or 0) if isinstance(task, dict) else 0
|
||||||
|
if retry_count < 1 and isinstance(task, dict):
|
||||||
|
task["retry_count"] = retry_count + 1
|
||||||
|
try:
|
||||||
|
self.task_queue.put(task, timeout=1)
|
||||||
|
self.log("浏览器不可用,任务重新入队")
|
||||||
|
except queue.Full:
|
||||||
|
self.log("任务队列已满,无法重新入队,任务失败")
|
||||||
|
callback = task.get("callback")
|
||||||
|
if callable(callback):
|
||||||
|
callback(None, "浏览器不可用")
|
||||||
|
self.total_tasks += 1
|
||||||
|
self.failed_tasks += 1
|
||||||
|
continue
|
||||||
|
|
||||||
self.log("浏览器不可用,任务失败")
|
self.log("浏览器不可用,任务失败")
|
||||||
task['callback'](None, "浏览器不可用")
|
callback = task.get("callback") if isinstance(task, dict) else None
|
||||||
|
if callable(callback):
|
||||||
|
callback(None, "浏览器不可用")
|
||||||
|
self.total_tasks += 1
|
||||||
self.failed_tasks += 1
|
self.failed_tasks += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -319,7 +346,8 @@ class BrowserWorkerPool:
|
|||||||
'func': task_func,
|
'func': task_func,
|
||||||
'args': args,
|
'args': args,
|
||||||
'kwargs': kwargs,
|
'kwargs': kwargs,
|
||||||
'callback': callback
|
'callback': callback,
|
||||||
|
'retry_count': 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
75
tests/test_browser_pool_worker.py
Normal file
75
tests/test_browser_pool_worker.py
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import queue
|
||||||
|
|
||||||
|
from browser_pool_worker import BrowserWorker
|
||||||
|
|
||||||
|
|
||||||
|
class _AlwaysFailEnsureWorker(BrowserWorker):
|
||||||
|
def __init__(self, *, worker_id: int, task_queue: queue.Queue):
|
||||||
|
super().__init__(worker_id=worker_id, task_queue=task_queue, pre_warm=False)
|
||||||
|
self.ensure_calls = 0
|
||||||
|
|
||||||
|
def _ensure_browser(self) -> bool: # noqa: D401 - matching base naming
|
||||||
|
self.ensure_calls += 1
|
||||||
|
if self.ensure_calls >= 2:
|
||||||
|
self.running = False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _close_browser(self):
|
||||||
|
self.browser_instance = None
|
||||||
|
|
||||||
|
|
||||||
|
def test_requeue_task_when_browser_unavailable():
|
||||||
|
task_queue: queue.Queue = queue.Queue()
|
||||||
|
callback_calls: list[tuple[object, object]] = []
|
||||||
|
|
||||||
|
def callback(result, error):
|
||||||
|
callback_calls.append((result, error))
|
||||||
|
|
||||||
|
task = {
|
||||||
|
"func": lambda *_args, **_kwargs: None,
|
||||||
|
"args": (),
|
||||||
|
"kwargs": {},
|
||||||
|
"callback": callback,
|
||||||
|
"retry_count": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
worker = _AlwaysFailEnsureWorker(worker_id=1, task_queue=task_queue)
|
||||||
|
worker.start()
|
||||||
|
task_queue.put(task)
|
||||||
|
worker.join(timeout=5)
|
||||||
|
|
||||||
|
assert worker.is_alive() is False
|
||||||
|
assert worker.ensure_calls == 2 # 本地最多尝试2次创建浏览器
|
||||||
|
assert callback_calls == [] # 第一次失败会重新入队,不应立即回调失败
|
||||||
|
|
||||||
|
requeued = task_queue.get_nowait()
|
||||||
|
assert requeued["retry_count"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_fail_task_after_second_assignment():
|
||||||
|
task_queue: queue.Queue = queue.Queue()
|
||||||
|
callback_calls: list[tuple[object, object]] = []
|
||||||
|
|
||||||
|
def callback(result, error):
|
||||||
|
callback_calls.append((result, error))
|
||||||
|
|
||||||
|
task = {
|
||||||
|
"func": lambda *_args, **_kwargs: None,
|
||||||
|
"args": (),
|
||||||
|
"kwargs": {},
|
||||||
|
"callback": callback,
|
||||||
|
"retry_count": 1, # 已重新分配过1次
|
||||||
|
}
|
||||||
|
|
||||||
|
worker = _AlwaysFailEnsureWorker(worker_id=1, task_queue=task_queue)
|
||||||
|
worker.start()
|
||||||
|
task_queue.put(task)
|
||||||
|
worker.join(timeout=5)
|
||||||
|
|
||||||
|
assert worker.is_alive() is False
|
||||||
|
assert callback_calls == [(None, "浏览器不可用")]
|
||||||
|
assert worker.total_tasks == 1
|
||||||
|
assert worker.failed_tasks == 1
|
||||||
|
|
||||||
Reference in New Issue
Block a user