diff --git a/api_browser.py b/api_browser.py index 57047dc..a5e4f79 100755 --- a/api_browser.py +++ b/api_browser.py @@ -71,6 +71,7 @@ class APIBrowser: self.log_callback = log_callback self.stop_flag = False self._closed = False # 防止重复关闭 + self._first_request = True self.last_total_records = 0 # 设置代理 @@ -132,7 +133,12 @@ class APIBrowser: def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs): """带重试机制的请求方法""" - kwargs.setdefault('timeout', _API_REQUEST_TIMEOUT_SECONDS) + # 首次请求使用更长超时(10秒),后续使用配置的超时 + if self._first_request: + kwargs.setdefault('timeout', 10.0) + self._first_request = False + else: + kwargs.setdefault('timeout', _API_REQUEST_TIMEOUT_SECONDS) last_error = None for attempt in range(1, max_retries + 1): @@ -518,3 +524,28 @@ class APIBrowser: """Context manager支持 - 退出""" self.close() return False # 不抑制异常 + + +def warmup_api_connection(proxy_config: Optional[dict] = None, log_callback: Optional[Callable] = None): + """预热 API 连接 - 建立 TCP/TLS 连接池""" + + def log(msg: str): + if log_callback: + log_callback(msg) + else: + print(f"[API预热] {msg}") + + log("正在预热 API 连接...") + try: + session = requests.Session() + if proxy_config and proxy_config.get("server"): + session.proxies = {"http": proxy_config["server"], "https": proxy_config["server"]} + + # 发送一个轻量级请求建立连接 + resp = session.get(f"{BASE_URL}/admin/login.aspx", timeout=10, allow_redirects=False) + log(f"✓ API 连接预热完成 (status={resp.status_code})") + session.close() + return True + except Exception as e: + log(f"API 连接预热失败: {e}") + return False diff --git a/app.py b/app.py index 6c4c447..08e107f 100644 --- a/app.py +++ b/app.py @@ -277,6 +277,21 @@ if __name__ == "__main__": except Exception as e: logger.warning(f"警告: 截图线程池初始化失败: {e}") + # 预热 API 连接(后台进行,不阻塞启动) + logger.info("预热 API 连接...") + try: + from api_browser import warmup_api_connection + import threading + + threading.Thread( + target=warmup_api_connection, + kwargs={"log_callback": lambda msg: logger.info(msg)}, + daemon=True, + name="api-warmup", + ).start() + except Exception as e: + logger.warning(f"API 预热失败: {e}") + socketio.run( app, host=config.SERVER_HOST, diff --git a/browser_pool_worker.py b/browser_pool_worker.py index 3875884..97043ed 100755 --- a/browser_pool_worker.py +++ b/browser_pool_worker.py @@ -35,19 +35,26 @@ TASK_QUEUE_MAXSIZE = int(os.environ.get('BROWSER_TASK_QUEUE_MAXSIZE', '200')) # BROWSER_MAX_USE_COUNT = int(os.environ.get('BROWSER_MAX_USE_COUNT', '0')) # 每个浏览器最大复用次数(0表示不限制) -class BrowserWorker(threading.Thread): - """浏览器工作线程 - 每个worker维护自己的浏览器""" - - def __init__(self, worker_id: int, task_queue: queue.Queue, log_callback: Optional[Callable] = None): - super().__init__(daemon=True) - self.worker_id = worker_id - self.task_queue = task_queue - self.log_callback = log_callback - self.browser_instance = None - self.running = True - self.idle = True - self.total_tasks = 0 - self.failed_tasks = 0 +class BrowserWorker(threading.Thread): + """浏览器工作线程 - 每个worker维护自己的浏览器""" + + def __init__( + self, + worker_id: int, + task_queue: queue.Queue, + log_callback: Optional[Callable] = None, + pre_warm: bool = False, + ): + super().__init__(daemon=True) + self.worker_id = worker_id + self.task_queue = task_queue + self.log_callback = log_callback + self.browser_instance = None + self.running = True + self.idle = True + self.total_tasks = 0 + self.failed_tasks = 0 + self.pre_warm = pre_warm def log(self, message: str): """日志输出""" @@ -122,25 +129,39 @@ class BrowserWorker(threading.Thread): self._close_browser() return self._create_browser() - def run(self): - """工作线程主循环 - 按需启动浏览器模式""" - self.log("Worker启动(按需模式,等待任务时不占用浏览器资源)") - last_task_time = 0 - - while self.running: - try: - # 从队列获取任务(带超时,以便能响应停止信号和空闲检查) - self.idle = True - try: - task = self.task_queue.get(timeout=TASK_QUEUE_TIMEOUT) - except queue.Empty: - # 检查是否需要关闭空闲的浏览器 - if self.browser_instance and last_task_time > 0: - idle_time = time.time() - last_task_time - if idle_time > BROWSER_IDLE_TIMEOUT: - self.log(f"空闲{int(idle_time)}秒,关闭浏览器释放资源") - self._close_browser() - continue + def run(self): + """工作线程主循环 - 按需启动浏览器模式""" + if self.pre_warm: + self.log("Worker启动(预热模式,启动即创建浏览器)") + else: + self.log("Worker启动(按需模式,等待任务时不占用浏览器资源)") + + last_activity_time = 0 + if self.pre_warm and not self.browser_instance: + if self._create_browser(): + last_activity_time = time.time() + self.pre_warm = False + + while self.running: + try: + # 允许运行中触发预热(例如池在初始化后调用 warmup) + if self.pre_warm and not self.browser_instance: + if self._create_browser(): + last_activity_time = time.time() + self.pre_warm = False + + # 从队列获取任务(带超时,以便能响应停止信号和空闲检查) + self.idle = True + try: + task = self.task_queue.get(timeout=TASK_QUEUE_TIMEOUT) + except queue.Empty: + # 检查是否需要关闭空闲的浏览器 + if self.browser_instance and last_activity_time > 0: + idle_time = time.time() - last_activity_time + if idle_time > BROWSER_IDLE_TIMEOUT: + self.log(f"空闲{int(idle_time)}秒,关闭浏览器释放资源") + self._close_browser() + continue self.idle = False @@ -171,13 +192,13 @@ class BrowserWorker(threading.Thread): result = task_func(self.browser_instance, *task_args, **task_kwargs) callback(result, None) self.log(f"任务执行成功") - last_task_time = time.time() + last_activity_time = time.time() except Exception as e: self.log(f"任务执行失败: {e}") callback(None, str(e)) self.failed_tasks += 1 - last_task_time = time.time() + last_activity_time = time.time() # 任务失败后,检查浏览器健康 if not self._check_browser_health(): @@ -223,7 +244,7 @@ class BrowserWorkerPool: print(f"[浏览器池] {message}") def initialize(self): - """初始化工作线程池(按需模式,启动时不创建浏览器)""" + """初始化工作线程池(按需模式,默认预热1个浏览器)""" with self.lock: if self.initialized: return @@ -231,18 +252,52 @@ class BrowserWorkerPool: _apply_nest_asyncio_once() self.log(f"正在初始化工作线程池({self.pool_size}个worker,按需启动浏览器)...") - - for i in range(self.pool_size): - worker = BrowserWorker( - worker_id=i + 1, - task_queue=self.task_queue, - log_callback=self.log_callback - ) - worker.start() - self.workers.append(worker) - - self.initialized = True - self.log(f"✓ 工作线程池初始化完成({self.pool_size}个worker就绪,浏览器将在有任务时按需启动)") + + for i in range(self.pool_size): + worker = BrowserWorker( + worker_id=i + 1, + task_queue=self.task_queue, + log_callback=self.log_callback, + pre_warm=(i < 1), + ) + worker.start() + self.workers.append(worker) + + self.initialized = True + self.log(f"✓ 工作线程池初始化完成({self.pool_size}个worker就绪,浏览器将在有任务时按需启动)") + + # 初始化完成后,默认预热1个浏览器,降低容器重启后前几批任务的冷启动开销 + self.warmup(1) + + def warmup(self, count: int = 1) -> int: + """预热浏览器池 - 预创建指定数量的浏览器""" + if count <= 0: + return 0 + + if not self.initialized: + self.log("警告:线程池未初始化,无法预热") + return 0 + + with self.lock: + target_workers = list(self.workers[: min(count, len(self.workers))]) + + self.log(f"预热浏览器池(预创建{len(target_workers)}个浏览器)...") + + for worker in target_workers: + if not worker.browser_instance: + worker.pre_warm = True + + # 等待预热完成(最多等待20秒,避免阻塞过久) + deadline = time.time() + 20 + while time.time() < deadline: + warmed = sum(1 for w in target_workers if w.browser_instance) + if warmed >= len(target_workers): + break + time.sleep(0.1) + + warmed = sum(1 for w in target_workers if w.browser_instance) + self.log(f"✓ 浏览器池预热完成({warmed}个浏览器就绪)") + return warmed def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool: """