From d7d878dc08f1759d36afaf959a500184c497665a Mon Sep 17 00:00:00 2001 From: yuyx <237899745@qq.com> Date: Sat, 13 Dec 2025 17:36:02 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=86=85=E5=AD=98=E6=BA=A2=E5=87=BA?= =?UTF-8?q?=E4=B8=8E=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api_browser.py | 28 +- app.py | 1306 ++++++++++++++++++++++++++------------ browser_pool_worker.py | 87 +-- email_service.py | 104 ++- playwright_automation.py | 83 ++- 5 files changed, 1128 insertions(+), 480 deletions(-) diff --git a/api_browser.py b/api_browser.py index 61551ec..1c6b376 100755 --- a/api_browser.py +++ b/api_browser.py @@ -10,12 +10,27 @@ from bs4 import BeautifulSoup import re import time import atexit +import weakref from typing import Optional, Callable from dataclasses import dataclass BASE_URL = "https://postoa.aidunsoft.com" +_api_browser_instances: "weakref.WeakSet[APIBrowser]" = weakref.WeakSet() + + +def _cleanup_api_browser_instances(): + """进程退出时清理残留的API浏览器实例(弱引用,不阻止GC)""" + for inst in list(_api_browser_instances): + try: + inst.close() + except Exception: + pass + + +atexit.register(_cleanup_api_browser_instances) + @dataclass class APIBrowseResult: @@ -52,8 +67,7 @@ class APIBrowser: else: self.proxy_server = None - # 注册退出清理函数 - atexit.register(self._cleanup_on_exit) + _api_browser_instances.add(self) def log(self, message: str): """记录日志""" @@ -427,14 +441,10 @@ class APIBrowser: self.session.close() except: pass - - def _cleanup_on_exit(self): - """进程退出时的清理函数(由atexit调用)""" - if not self._closed: + finally: try: - self.session.close() - self._closed = True - except: + _api_browser_instances.discard(self) + except Exception: pass def __enter__(self): diff --git a/app.py b/app.py index 99edf5b..aa3daa5 100755 --- a/app.py +++ b/app.py @@ -45,6 +45,9 @@ import sys import secrets # 安全修复: 使用加密安全的随机数生成 from datetime import datetime, timedelta, timezone from functools import wraps +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +import heapq # 导入数据库模块和核心模块 import database @@ -131,7 +134,7 @@ os.makedirs(SCREENSHOTS_DIR, exist_ok=True) # 全局变量 browser_manager = None user_accounts = {} # {user_id: {account_id: Account对象}} -active_tasks = {} # {account_id: Thread对象} +active_tasks = {} # {account_id: Future/Thread对象} task_status = {} # {account_id: {"user_id": x, "username": y, "status": "排队中/运行中", "detail_status": "具体状态", "browse_type": z, "start_time": t, "source": s, "progress": {...}, "is_vip": bool}} # 线程安全锁 - 修复Bug #12 @@ -151,6 +154,76 @@ log_cache_total_count = 0 # 全局日志总数,防止无限增长 batch_task_screenshots = {} # {batch_id: {'user_id': x, 'browse_type': y, 'screenshots': [{'account_name': a, 'path': p, 'items': n, 'attachments': m}], 'total_accounts': n, 'completed': n}} batch_task_lock = threading.Lock() +# 批次/随机任务内存回收TTL(秒) +BATCH_TASK_EXPIRE_SECONDS = int(os.environ.get('BATCH_TASK_EXPIRE_SECONDS', '21600')) # 默认6小时 +PENDING_RANDOM_EXPIRE_SECONDS = int(os.environ.get('PENDING_RANDOM_EXPIRE_SECONDS', '7200')) # 默认2小时 + + +def _get_batch_id_from_source(source: str): + """从source中提取批次ID(source格式: user_scheduled:batch_xxx)""" + if not source: + return None + if source.startswith('user_scheduled:batch_'): + return source.split(':', 1)[1] + return None + + +def _send_batch_task_email_if_configured(batch_info: dict): + """批次任务:当所有账号完成后发送打包邮件(在锁外调用)""" + try: + batch_user_id = batch_info.get('user_id') + if not batch_user_id: + return + user_info = database.get_user_by_id(batch_user_id) + if not user_info or not user_info.get('email'): + return + if not database.get_user_email_notify(batch_user_id): + return + if not batch_info.get('screenshots'): + return + email_service.send_batch_task_complete_email_async( + user_id=batch_user_id, + email=user_info['email'], + username=user_info['username'], + schedule_name=batch_info.get('schedule_name', '未命名任务'), + browse_type=batch_info.get('browse_type', '应读'), + screenshots=batch_info['screenshots'] + ) + print(f"[批次邮件] 已发送打包邮件,包含 {len(batch_info['screenshots'])} 条记录") + except Exception as e: + print(f"[批次邮件] 发送失败: {e}") + + +def _batch_task_record_result(batch_id: str, account_name: str, screenshot_path: str, total_items: int, total_attachments: int): + """批次任务:记录单账号结果,达到完成条件时触发邮件并回收内存""" + should_send_email = False + batch_info = None + now_ts = time.time() + + with batch_task_lock: + info = batch_task_screenshots.get(batch_id) + if not info: + return + + info['screenshots'].append({ + 'account_name': account_name, + 'path': screenshot_path, + 'items': total_items, + 'attachments': total_attachments + }) + info['completed'] += 1 + info['updated_at'] = now_ts + + total = info.get('total_accounts', 0) or 0 + if total > 0 and info['completed'] >= total: + should_send_email = True + batch_info = batch_task_screenshots.pop(batch_id) + print(f"[批次邮件] 批次 {batch_id} 已完成: {batch_info['completed']}/{batch_info['total_accounts']},准备发送邮件") + + if should_send_email and batch_info: + _send_batch_task_email_if_configured(batch_info) + + # 随机延迟任务存储 pending_random_schedules = {} pending_random_lock = threading.Lock() @@ -181,7 +254,437 @@ max_concurrent_per_account = config.MAX_CONCURRENT_PER_ACCOUNT max_concurrent_global = config.MAX_CONCURRENT_GLOBAL user_semaphores = {} # {user_id: Semaphore} user_semaphores_lock = threading.Lock() # 保护user_semaphores的线程锁 -global_semaphore = threading.Semaphore(max_concurrent_global) + + +class PrioritySemaphore: + """带VIP优先级的信号量(VIP等待时普通任务让行)""" + + def __init__(self, value: int): + self._value = max(0, int(value)) + self._vip_waiters = 0 + self._normal_waiters = 0 + self._cond = threading.Condition() + + def acquire(self, is_vip: bool = False, timeout: float = None) -> bool: + deadline = None if timeout is None else (time.time() + float(timeout)) + with self._cond: + if is_vip: + self._vip_waiters += 1 + else: + self._normal_waiters += 1 + + try: + while True: + can_acquire = self._value > 0 and (is_vip or self._vip_waiters == 0) + if can_acquire: + self._value -= 1 + return True + + if deadline is None: + self._cond.wait() + else: + remaining = deadline - time.time() + if remaining <= 0: + return False + self._cond.wait(timeout=remaining) + finally: + if is_vip: + self._vip_waiters = max(0, self._vip_waiters - 1) + else: + self._normal_waiters = max(0, self._normal_waiters - 1) + self._cond.notify_all() + + def release(self) -> None: + with self._cond: + self._value += 1 + self._cond.notify_all() + + def get_stats(self) -> dict: + with self._cond: + return { + 'value': self._value, + 'vip_waiters': self._vip_waiters, + 'normal_waiters': self._normal_waiters + } + + +global_semaphore = PrioritySemaphore(max_concurrent_global) + +# ==================== 任务调度器(固定Worker + 优先队列) ==================== + +@dataclass +class _TaskRequest: + user_id: int + account_id: str + browse_type: str + enable_screenshot: bool + source: str + retry_count: int + submitted_at: float + is_vip: bool + seq: int + canceled: bool = False + done_callback: object = None + + +class TaskScheduler: + """全局任务调度器:队列排队,不为每个任务单独创建线程。""" + + def __init__(self, max_global: int, max_per_user: int, max_queue_size: int = 1000): + self.max_global = max(1, int(max_global)) + self.max_per_user = max(1, int(max_per_user)) + self.max_queue_size = max(1, int(max_queue_size)) + + self._cond = threading.Condition() + self._pending = [] # heap: (priority, submitted_at, seq, task) + self._pending_by_account = {} # {account_id: task} + self._seq = 0 + + self._running_global = 0 + self._running_by_user = {} # {user_id: running_count} + + self._executor_max_workers = self.max_global + self._executor = ThreadPoolExecutor(max_workers=self._executor_max_workers, thread_name_prefix="TaskWorker") + self._old_executors = [] + + self._running = True + self._dispatcher_thread = threading.Thread( + target=self._dispatch_loop, daemon=True, name="TaskDispatcher" + ) + self._dispatcher_thread.start() + + def shutdown(self, timeout: float = 5.0): + """停止调度器(用于进程退出清理)""" + with self._cond: + self._running = False + self._cond.notify_all() + + try: + self._dispatcher_thread.join(timeout=timeout) + except Exception: + pass + + try: + self._executor.shutdown(wait=False) + except Exception: + pass + + for ex in self._old_executors: + try: + ex.shutdown(wait=False) + except Exception: + pass + + def update_limits(self, max_global: int = None, max_per_user: int = None, max_queue_size: int = None): + """动态更新并发/队列上限(不影响已在运行的任务)""" + with self._cond: + if max_per_user is not None: + self.max_per_user = max(1, int(max_per_user)) + if max_queue_size is not None: + self.max_queue_size = max(1, int(max_queue_size)) + + if max_global is not None: + new_max_global = max(1, int(max_global)) + self.max_global = new_max_global + if new_max_global > self._executor_max_workers: + self._old_executors.append(self._executor) + self._executor_max_workers = new_max_global + self._executor = ThreadPoolExecutor( + max_workers=self._executor_max_workers, thread_name_prefix="TaskWorker" + ) + try: + self._old_executors[-1].shutdown(wait=False) + except Exception: + pass + + self._cond.notify_all() + + def submit_task( + self, + user_id: int, + account_id: str, + browse_type: str, + enable_screenshot: bool = True, + source: str = "manual", + retry_count: int = 0, + is_vip: bool = None, + done_callback=None, + ): + """提交任务进入队列(返回: (ok, message))""" + if not user_id or not account_id: + return False, "参数错误" + + submitted_at = time.time() + if is_vip is None: + try: + is_vip = bool(database.is_user_vip(user_id)) + except Exception: + is_vip = False + else: + is_vip = bool(is_vip) + + with self._cond: + if not self._running: + return False, "调度器未运行" + if len(self._pending_by_account) >= self.max_queue_size: + return False, "任务队列已满,请稍后再试" + if account_id in self._pending_by_account: + return False, "任务已在队列中" + if safe_get_task(account_id) is not None: + return False, "任务已在运行中" + + self._seq += 1 + task = _TaskRequest( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=bool(enable_screenshot), + source=source, + retry_count=int(retry_count or 0), + submitted_at=submitted_at, + is_vip=is_vip, + seq=self._seq, + done_callback=done_callback, + ) + self._pending_by_account[account_id] = task + priority = 0 if is_vip else 1 + heapq.heappush(self._pending, (priority, task.submitted_at, task.seq, task)) + self._cond.notify_all() + + # 用于可视化/调试:记录队列 + with task_queue_lock: + if is_vip: + vip_task_queue.append(account_id) + else: + normal_task_queue.append(account_id) + + return True, "已加入队列" + + def cancel_pending_task(self, user_id: int, account_id: str) -> bool: + """取消尚未开始的排队任务(已运行的任务由 should_stop 控制)""" + canceled_task = None + with self._cond: + task = self._pending_by_account.pop(account_id, None) + if not task: + return False + task.canceled = True + canceled_task = task + self._cond.notify_all() + + # 从可视化队列移除 + with task_queue_lock: + if account_id in vip_task_queue: + vip_task_queue.remove(account_id) + if account_id in normal_task_queue: + normal_task_queue.remove(account_id) + + # 批次任务:取消也要推进完成计数,避免批次缓存常驻 + try: + batch_id = _get_batch_id_from_source(canceled_task.source) + if batch_id: + if user_id in user_accounts and account_id in user_accounts[user_id]: + acc = user_accounts[user_id][account_id] + account_name = acc.remark if acc.remark else acc.username + else: + account_name = account_id + _batch_task_record_result( + batch_id=batch_id, + account_name=account_name, + screenshot_path=None, + total_items=0, + total_attachments=0, + ) + except Exception: + pass + + return True + + def _dispatch_loop(self): + while True: + task = None + with self._cond: + if not self._running: + return + + # 等待队列里有任务 & 有全局容量 + if not self._pending or self._running_global >= self.max_global: + self._cond.wait(timeout=0.5) + continue + + task = self._pop_next_runnable_locked() + if task is None: + self._cond.wait(timeout=0.5) + continue + + # 占用资源 + self._running_global += 1 + self._running_by_user[task.user_id] = self._running_by_user.get(task.user_id, 0) + 1 + + # 从队列移除(可视化) + with task_queue_lock: + if task.account_id in vip_task_queue: + vip_task_queue.remove(task.account_id) + if task.account_id in normal_task_queue: + normal_task_queue.remove(task.account_id) + + try: + future = self._executor.submit(self._run_task_wrapper, task) + safe_set_task(task.account_id, future) + except Exception: + # 提交失败,回滚占用 + with self._cond: + self._running_global = max(0, self._running_global - 1) + self._running_by_user[task.user_id] = max(0, self._running_by_user.get(task.user_id, 1) - 1) + if self._running_by_user.get(task.user_id) == 0: + self._running_by_user.pop(task.user_id, None) + self._cond.notify_all() + + def _pop_next_runnable_locked(self): + """在锁内从优先队列取出“可运行”的任务,避免VIP任务占位阻塞普通任务。""" + if not self._pending: + return None + + skipped = [] + selected = None + + while self._pending: + _, _, _, task = heapq.heappop(self._pending) + + # 已取消/已被移除 + if task.canceled: + continue + if self._pending_by_account.get(task.account_id) is not task: + continue + + # 用户并发限制:当前不可运行则先跳过,不阻塞其他用户任务 + running_for_user = self._running_by_user.get(task.user_id, 0) + if running_for_user >= self.max_per_user: + skipped.append(task) + continue + + selected = task + break + + # 把暂不可运行的任务放回队列 + for t in skipped: + priority = 0 if t.is_vip else 1 + heapq.heappush(self._pending, (priority, t.submitted_at, t.seq, t)) + + if selected is None: + return None + + # 从映射移除,表示已出队(开始执行) + self._pending_by_account.pop(selected.account_id, None) + return selected + + def _run_task_wrapper(self, task: _TaskRequest): + try: + run_task( + user_id=task.user_id, + account_id=task.account_id, + browse_type=task.browse_type, + enable_screenshot=task.enable_screenshot, + source=task.source, + retry_count=task.retry_count, + ) + finally: + try: + if callable(task.done_callback): + task.done_callback() + except Exception: + pass + safe_remove_task(task.account_id) + with self._cond: + self._running_global = max(0, self._running_global - 1) + self._running_by_user[task.user_id] = max(0, self._running_by_user.get(task.user_id, 1) - 1) + if self._running_by_user.get(task.user_id) == 0: + self._running_by_user.pop(task.user_id, None) + self._cond.notify_all() + + +_task_scheduler = None +_task_scheduler_lock = threading.Lock() + + +def get_task_scheduler() -> TaskScheduler: + """获取全局任务调度器(单例)""" + global _task_scheduler + with _task_scheduler_lock: + if _task_scheduler is None: + try: + max_queue_size = int(os.environ.get("TASK_QUEUE_MAXSIZE", "1000")) + except Exception: + max_queue_size = 1000 + _task_scheduler = TaskScheduler( + max_global=max_concurrent_global, + max_per_user=max_concurrent_per_account, + max_queue_size=max_queue_size, + ) + return _task_scheduler + + +def submit_account_task( + user_id: int, + account_id: str, + browse_type: str, + enable_screenshot: bool = True, + source: str = "manual", + retry_count: int = 0, + done_callback=None, +): + """统一入口:提交账号任务进入队列""" + if user_id not in user_accounts or account_id not in user_accounts[user_id]: + return False, "账号不存在" + + account = user_accounts[user_id][account_id] + if getattr(account, 'is_running', False): + return False, "任务已在运行中" + + try: + is_vip_user = bool(database.is_user_vip(user_id)) + except Exception: + is_vip_user = False + + # 先标记排队状态,避免任务极快派发时出现 is_running 仍为 False 的竞态 + account.is_running = True + account.should_stop = False + account.status = "排队中" + (" (VIP)" if is_vip_user else "") + + # 记录任务状态为排队中(用于前端进度/状态展示) + safe_set_task_status(account_id, { + "user_id": user_id, + "username": account.username, + "status": "排队中", + "detail_status": "等待资源" + (" [VIP优先]" if is_vip_user else ""), + "browse_type": browse_type, + "start_time": time.time(), + "source": source, + "progress": {"items": 0, "attachments": 0}, + "is_vip": is_vip_user + }) + socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + + scheduler = get_task_scheduler() + ok, message = scheduler.submit_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot, + source=source, + retry_count=retry_count, + is_vip=is_vip_user, + done_callback=done_callback, + ) + if not ok: + # 回滚状态 + account.is_running = False + account.status = "未开始" + safe_remove_task_status(account_id) + socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + return False, message + + log_to_client(message + (" [VIP优先]" if is_vip_user else ""), user_id, account_id) + return True, message # 截图专用信号量:限制同时进行的截图任务数量(避免资源竞争) # 信号量将在首次使用时初始化,并支持动态更新 @@ -276,7 +779,6 @@ def cleanup_expired_data(): # 5. 清理僵尸进程(防止Chrome子进程变成僵尸) try: import os - import signal # 回收所有已终止的子进程 while True: try: @@ -286,32 +788,31 @@ def cleanup_expired_data(): logger.debug(f"已回收僵尸进程: PID={pid}") except ChildProcessError: break # 没有子进程了 - except Exception as e: + except Exception: pass # 忽略清理错误 - # 6. 清理超时的批量截图任务(超过30分钟未完成的) + # 6. 清理过期的批次截图收集与随机延迟任务(避免字典长期增长) with batch_task_lock: expired_batches = [] - for batch_id, batch_data in list(batch_task_screenshots.items()): - created_time = batch_data.get('created_time', 0) - if created_time and (current_time - created_time) > 1800: # 30分钟 + for batch_id, info in list(batch_task_screenshots.items()): + last_ts = info.get('updated_at') or info.get('created_at') or info.get('created_time') or current_time + if (current_time - last_ts) > BATCH_TASK_EXPIRE_SECONDS: expired_batches.append(batch_id) for batch_id in expired_batches: del batch_task_screenshots[batch_id] if expired_batches: - logger.debug(f"已清理 {len(expired_batches)} 个超时批量截图任务") + logger.debug(f"已清理 {len(expired_batches)} 个过期批次任务缓存") - # 7. 清理过期的随机延迟任务(超过2小时未执行的) with pending_random_lock: - expired_schedules = [] - for schedule_id, schedule_data in list(pending_random_schedules.items()): - created_time = schedule_data.get('created_time', 0) - if created_time and (current_time - created_time) > 7200: # 2小时 - expired_schedules.append(schedule_id) - for schedule_id in expired_schedules: + expired_random = [] + for schedule_id, info in list(pending_random_schedules.items()): + created_at = info.get('created_at') or info.get('created_time') or current_time + if (current_time - created_at) > PENDING_RANDOM_EXPIRE_SECONDS: + expired_random.append(schedule_id) + for schedule_id in expired_random: del pending_random_schedules[schedule_id] - if expired_schedules: - logger.debug(f"已清理 {len(expired_schedules)} 个过期随机延迟任务") + if expired_random: + logger.debug(f"已清理 {len(expired_random)} 个过期随机延迟任务") def start_cleanup_scheduler(): @@ -1957,22 +2458,17 @@ def start_account(account_id): if not init_browser_manager(): return jsonify({"error": "浏览器初始化失败"}), 500 - # 启动任务线程 - account.is_running = True - account.should_stop = False - account.status = "运行中" - - thread = threading.Thread( - target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot, 'manual'), - daemon=True + ok, message = submit_account_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot, + source='manual' ) - thread.start() - active_tasks[account_id] = thread + if not ok: + return jsonify({"error": message}), 400 log_to_client(f"启动任务: {account.username} - {browse_type}", user_id) - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - return jsonify({"success": True}) @@ -1993,6 +2489,19 @@ def stop_account(account_id): account.should_stop = True account.status = "正在停止" + # 若任务仍在排队,直接取消,避免“排队任务永远不执行也不退出” + try: + scheduler = get_task_scheduler() + if scheduler.cancel_pending_task(user_id=user_id, account_id=account_id): + account.status = "已停止" + account.is_running = False + safe_remove_task_status(account_id) + socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + log_to_client(f"任务已取消: {account.username}", user_id) + return jsonify({"success": True, "canceled": True}) + except Exception: + pass + log_to_client(f"停止任务: {account.username}", user_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') @@ -2020,89 +2529,33 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m return account = user_accounts[user_id][account_id] + batch_id = _get_batch_id_from_source(source) + batch_recorded = False # 导入time模块 import time as time_module # 注意:不在此处记录开始时间,因为要排除排队等待时间 - # 两级并发控制:用户级 + 全局级(VIP优先) - user_sem = get_user_semaphore(user_id) - - # 检查是否VIP用户 - is_vip_user = database.is_user_vip(user_id) - vip_label = " [VIP优先]" if is_vip_user else "" - - # 获取用户级信号量(同一用户的账号排队) - log_to_client(f"等待资源分配...{vip_label}", user_id, account_id) - account.status = "排队中" + (" (VIP)" if is_vip_user else "") - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - - # 记录任务状态为排队中 - import time as time_mod - task_status[account_id] = { - "user_id": user_id, - "username": account.username, - "status": "排队中", - "detail_status": "等待资源" + vip_label, - "browse_type": browse_type, - "start_time": time_mod.time(), - "source": source, - "progress": {"items": 0, "attachments": 0}, - "is_vip": is_vip_user - } - - # 加入优先级队列 - with task_queue_lock: - if is_vip_user: - vip_task_queue.append(account_id) - else: - normal_task_queue.append(account_id) - - # VIP优先排队机制 - acquired = False - while not acquired: - with task_queue_lock: - # VIP用户直接尝试获取; 普通用户需等VIP队列为空 - can_try = is_vip_user or len(vip_task_queue) == 0 - - if can_try and user_sem.acquire(blocking=False): - acquired = True - with task_queue_lock: - if account_id in vip_task_queue: - vip_task_queue.remove(account_id) - if account_id in normal_task_queue: - normal_task_queue.remove(account_id) - break - - # 检查是否被停止 - if account.should_stop: - with task_queue_lock: - if account_id in vip_task_queue: - vip_task_queue.remove(account_id) - if account_id in normal_task_queue: - normal_task_queue.remove(account_id) - log_to_client(f"任务已取消", user_id, account_id) - account.status = "已停止" - account.is_running = False - if account_id in task_status: - del task_status[account_id] - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - return - - time_module.sleep(0.3) - try: + # 任务排队/优先级/并发控制已迁移到 TaskScheduler(submit_account_task) # 如果在排队期间被停止,直接返回 if account.should_stop: - log_to_client(f"任务已取消", user_id, account_id) + log_to_client("任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False + safe_remove_task_status(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + if batch_id: + account_name = account.remark if account.remark else account.username + _batch_task_record_result( + batch_id=batch_id, + account_name=account_name, + screenshot_path=None, + total_items=0, + total_attachments=0 + ) return - # 获取全局信号量(防止所有用户同时运行导致资源耗尽) - global_semaphore.acquire() - try: # 再次检查是否被停止 if account.should_stop: @@ -2172,73 +2625,71 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m checkpoint_mgr.update_stage(task_id, TaskStage.LOGGING_IN, progress_percent=25) # 使用 API 方式登录和浏览(不启动浏览器) - api_browser = APIBrowser(log_callback=custom_log, proxy_config=proxy_config) - if api_browser.login(account.username, account.password): - log_to_client(f"✓ 登录成功!", user_id, account_id) - # 登录成功,清除失败计数 - # 保存cookies供截图使用 - api_browser.save_cookies_for_playwright(account.username) - database.reset_account_login_status(account_id) + with APIBrowser(log_callback=custom_log, proxy_config=proxy_config) as api_browser: + if api_browser.login(account.username, account.password): + log_to_client(f"✓ 登录成功!", user_id, account_id) + # 登录成功,清除失败计数 + # 保存cookies供截图使用 + api_browser.save_cookies_for_playwright(account.username) + database.reset_account_login_status(account_id) - # 如果账号没有备注,自动获取真实姓名作为备注 - if not account.remark: - try: - real_name = api_browser.get_real_name() - if real_name: - account.remark = real_name - database.update_account_remark(account_id, real_name) - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - logger.info(f"[自动备注] 账号 {account.username} 自动设置备注为: {real_name}") - except Exception as e: - logger.warning(f"[自动备注] 获取姓名失败: {e}") + # 如果账号没有备注,自动获取真实姓名作为备注 + if not account.remark: + try: + real_name = api_browser.get_real_name() + if real_name: + account.remark = real_name + database.update_account_remark(account_id, real_name) + socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + logger.info(f"[自动备注] 账号 {account.username} 自动设置备注为: {real_name}") + except Exception as e: + logger.warning(f"[自动备注] 获取姓名失败: {e}") - if account_id in task_status: - task_status[account_id]["detail_status"] = "正在浏览" - log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id) + if account_id in task_status: + task_status[account_id]["detail_status"] = "正在浏览" + log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id) - def should_stop(): - return account.should_stop + def should_stop(): + return account.should_stop - checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50) - result = api_browser.browse_content( - browse_type=browse_type, - should_stop_callback=should_stop - ) - # APIBrowseResult和BrowseResult字段完全相同,无需转换 - api_browser.close() - else: - # API 登录失败 - error_message = "登录失败" - log_to_client(f"❌ {error_message}", user_id, account_id) - - # 增加失败计数(假设密码错误) - is_suspended = database.increment_account_login_fail(account_id, error_message) - if is_suspended: - log_to_client(f"⚠ 该账号连续3次密码错误,已自动暂停", user_id, account_id) - log_to_client(f"请在前台修改密码后才能继续使用", user_id, account_id) - - retry_action = checkpoint_mgr.record_error(task_id, error_message) - if retry_action == "paused": - logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)") - - account.status = "登录失败" - account.is_running = False - # 记录登录失败日志 - database.create_task_log( - user_id=user_id, - account_id=account_id, - username=account.username, - browse_type=browse_type, - status='failed', - total_items=0, - total_attachments=0, - error_message=error_message, - duration=int(time_module.time() - task_start_time), - source=source - ) - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - api_browser.close() - return + checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50) + result = api_browser.browse_content( + browse_type=browse_type, + should_stop_callback=should_stop + ) + # APIBrowseResult和BrowseResult字段完全相同,无需转换 + else: + # API 登录失败 + error_message = "登录失败" + log_to_client(f"❌ {error_message}", user_id, account_id) + + # 增加失败计数(假设密码错误) + is_suspended = database.increment_account_login_fail(account_id, error_message) + if is_suspended: + log_to_client(f"⚠ 该账号连续3次密码错误,已自动暂停", user_id, account_id) + log_to_client(f"请在前台修改密码后才能继续使用", user_id, account_id) + + retry_action = checkpoint_mgr.record_error(task_id, error_message) + if retry_action == "paused": + logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)") + + account.status = "登录失败" + account.is_running = False + # 记录登录失败日志 + database.create_task_log( + user_id=user_id, + account_id=account_id, + username=account.username, + browse_type=browse_type, + status='failed', + total_items=0, + total_attachments=0, + error_message=error_message, + duration=int(time_module.time() - task_start_time), + source=source + ) + socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + return account.total_items = result.total_items account.total_attachments = result.total_attachments @@ -2248,47 +2699,57 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m if account_id in task_status: task_status[account_id]["detail_status"] = "浏览完成" task_status[account_id]["progress"] = {"items": result.total_items, "attachments": result.total_attachments} - account.status = "已完成" - checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95) - checkpoint_mgr.complete_task(task_id, success=True) - logger.info(f"[断点] 任务 {task_id} 已完成") - # 记录成功日志(如果不截图则在此记录,截图时在截图完成后记录) - if not enable_screenshot: - database.create_task_log( - user_id=user_id, - account_id=account_id, - username=account.username, - browse_type=browse_type, - status='success', - total_items=result.total_items, - total_attachments=result.total_attachments, - error_message='', - duration=int(time_module.time() - task_start_time), - source=source - ) - # 发送任务完成邮件通知(不截图时在此发送) - # 只对定时任务发送邮件通知,手动执行不发送 - if source and source.startswith('user_scheduled'): - try: - user_info = database.get_user_by_id(user_id) - # 检查用户是否开启了邮件通知 - if user_info and user_info.get('email') and database.get_user_email_notify(user_id): - account_name = account.remark if account.remark else account.username - email_service.send_task_complete_email_async( - user_id=user_id, - email=user_info['email'], - username=user_info['username'], + account.status = "已完成" + checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95) + checkpoint_mgr.complete_task(task_id, success=True) + logger.info(f"[断点] 任务 {task_id} 已完成") + # 记录成功日志(如果不截图则在此记录,截图时在截图完成后记录) + if not enable_screenshot: + database.create_task_log( + user_id=user_id, + account_id=account_id, + username=account.username, + browse_type=browse_type, + status='success', + total_items=result.total_items, + total_attachments=result.total_attachments, + error_message='', + duration=int(time_module.time() - task_start_time), + source=source + ) + # 批次任务:不截图时也要推进批次状态并最终回收 + if batch_id: + account_name = account.remark if account.remark else account.username + _batch_task_record_result( + batch_id=batch_id, account_name=account_name, - browse_type=browse_type, - total_items=result.total_items, - total_attachments=result.total_attachments, screenshot_path=None, - log_callback=lambda msg: log_to_client(msg, user_id, account_id) + total_items=result.total_items, + total_attachments=result.total_attachments ) - except Exception as email_error: - logger.warning(f"发送任务完成邮件失败: {email_error}") - # 成功则跳出重试循环 - break + batch_recorded = True + # 非批次的定时任务:不截图时按账号发送邮件通知(手动执行不发送) + elif source and source.startswith('user_scheduled'): + try: + user_info = database.get_user_by_id(user_id) + # 检查用户是否开启了邮件通知 + if user_info and user_info.get('email') and database.get_user_email_notify(user_id): + account_name = account.remark if account.remark else account.username + email_service.send_task_complete_email_async( + user_id=user_id, + email=user_info['email'], + username=user_info['username'], + account_name=account_name, + browse_type=browse_type, + total_items=result.total_items, + total_attachments=result.total_attachments, + screenshot_path=None, + log_callback=lambda msg: log_to_client(msg, user_id, account_id) + ) + except Exception as email_error: + logger.warning(f"发送任务完成邮件失败: {email_error}") + # 成功则跳出重试循环 + break else: # 浏览出错,检查是否是超时错误 error_msg = result.error_message @@ -2417,6 +2878,7 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m finally: # 先关闭浏览器,再释放信号量(避免并发创建/关闭浏览器导致资源竞争) account.is_running = False + screenshot_submitted = False # 如果状态不是已完成(需要截图),则重置为未开始 if account.status not in ["已完成"]: account.status = "未开始" @@ -2430,13 +2892,10 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m finally: account.automation = None - # 浏览器关闭后再释放全局信号量,确保新任务创建浏览器时旧浏览器已完全关闭 - global_semaphore.release() + # 任务并发由 TaskScheduler 控制 - if account_id in active_tasks: - del active_tasks[account_id] - if account_id in task_status: - del task_status[account_id] + safe_remove_task(account_id) + safe_remove_task_status(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') @@ -2462,6 +2921,7 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m } time.sleep(2) # 延迟启动截图,确保主任务资源已完全释放 browse_result_dict = {'total_items': result.total_items, 'total_attachments': result.total_attachments} + screenshot_submitted = True threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id, browse_type, source, task_start_time, browse_result_dict), daemon=True).start() else: # 不截图时,重置状态为未开始 @@ -2480,23 +2940,42 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m account.status = "等待重试" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - def delayed_retry(): - time.sleep(5) - if not account.should_stop: - log_to_client(f"🔄 开始第 {retry_count + 1} 次自动重试...", user_id, account_id) - thread = threading.Thread( - target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot, source, retry_count + 1), - daemon=True - ) - thread.start() - active_tasks[account_id] = thread + def delayed_retry_submit(): + if account.should_stop: + return + log_to_client(f"🔄 开始第 {retry_count + 1} 次自动重试...", user_id, account_id) + ok, msg = submit_account_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot, + source=source, + retry_count=retry_count + 1 + ) + if not ok: + log_to_client(f"自动重试提交失败: {msg}", user_id, account_id) - threading.Thread(target=delayed_retry, daemon=True).start() + try: + threading.Timer(5, delayed_retry_submit).start() + except Exception: + # 兜底:Timer失败则立即提交 + delayed_retry_submit() + + # 批次任务:如果本次不会进入截图回调,则在此推进批次计数,避免批次缓存常驻 + if batch_id and (not screenshot_submitted) and (not batch_recorded) and account.status != "等待重试": + account_name = account.remark if account.remark else account.username + _batch_task_record_result( + batch_id=batch_id, + account_name=account_name, + screenshot_path=None, + total_items=getattr(account, 'total_items', 0) or 0, + total_attachments=getattr(account, 'total_attachments', 0) or 0 + ) + batch_recorded = True finally: - # 释放用户级信号量 - user_sem.release() + # 任务并发由 TaskScheduler 控制,无需释放用户级信号量 + pass def take_screenshot_for_account(user_id, account_id, browse_type="应读", source="manual", task_start_time=None, browse_result=None): @@ -2661,9 +3140,7 @@ def take_screenshot_for_account(user_id, account_id, browse_type="应读", sourc # 处理邮件通知 - 批次任务收集截图,非批次任务直接发送 try: # 检查是否是批次任务 (source格式: user_scheduled:batch_xxx) - batch_id = None - if source and source.startswith('user_scheduled:batch_'): - batch_id = source.split(':', 1)[1] + batch_id = _get_batch_id_from_source(source) screenshot_path = None if result and result.get('success') and result.get('filename'): @@ -2672,41 +3149,13 @@ def take_screenshot_for_account(user_id, account_id, browse_type="应读", sourc account_name = account.remark if account.remark else account.username if batch_id: - # 批次任务:收集截图信息,当所有截图完成时发送打包邮件 - should_send_email = False - batch_info = None - with batch_task_lock: - if batch_id in batch_task_screenshots: - batch_task_screenshots[batch_id]['screenshots'].append({ - 'account_name': account_name, - 'path': screenshot_path, - 'items': browse_result.get('total_items', 0), - 'attachments': browse_result.get('total_attachments', 0) - }) - batch_task_screenshots[batch_id]['completed'] += 1 - # 检查是否所有截图都完成了 - if batch_task_screenshots[batch_id]['completed'] >= batch_task_screenshots[batch_id]['total_accounts']: - should_send_email = True - batch_info = batch_task_screenshots.pop(batch_id) - print(f"[批次邮件] 批次 {batch_id} 所有 {batch_info['total_accounts']} 个截图已完成,准备发送邮件") - - # 在锁外发送邮件 - if should_send_email and batch_info and batch_info['screenshots']: - try: - batch_user_id = batch_info['user_id'] - user_info = database.get_user_by_id(batch_user_id) - if user_info and user_info.get('email') and database.get_user_email_notify(batch_user_id): - email_service.send_batch_task_complete_email_async( - user_id=batch_user_id, - email=user_info['email'], - username=user_info['username'], - schedule_name=batch_info['schedule_name'], - browse_type=batch_info['browse_type'], - screenshots=batch_info['screenshots'] - ) - print(f"[批次邮件] 已发送打包邮件,包含 {len(batch_info['screenshots'])} 个截图") - except Exception as batch_email_err: - print(f"[批次邮件] 发送失败: {batch_email_err}") + _batch_task_record_result( + batch_id=batch_id, + account_name=account_name, + screenshot_path=screenshot_path, + total_items=browse_result.get('total_items', 0), + total_attachments=browse_result.get('total_attachments', 0) + ) elif source and source.startswith('user_scheduled'): # 非批次的定时任务:直接发送邮件(手动执行不发送邮件通知) user_info = database.get_user_by_id(user_id) @@ -2730,11 +3179,13 @@ def take_screenshot_for_account(user_id, account_id, browse_type="应读", sourc # 提交任务到工作线程池 pool = get_browser_worker_pool() - pool.submit_task( + submitted = pool.submit_task( screenshot_task, screenshot_callback, user_id, account_id, account, browse_type, source, task_start_time, browse_result ) + if not submitted: + screenshot_callback(None, "截图队列已满,请稍后重试") def manual_screenshot(account_id): """手动为指定账号截图""" user_id = current_user.id @@ -3262,7 +3713,7 @@ def update_system_config_api(): # 如果修改了并发数,更新全局变量和信号量 if max_concurrent is not None and max_concurrent != max_concurrent_global: max_concurrent_global = max_concurrent - global_semaphore = threading.Semaphore(max_concurrent) + global_semaphore = PrioritySemaphore(max_concurrent) print(f"全局并发数已更新为: {max_concurrent}") # 如果修改了单用户并发数,更新全局变量(已有的信号量会在下次创建时使用新值) @@ -3276,6 +3727,15 @@ def update_system_config_api(): screenshot_semaphore = threading.Semaphore(new_max_screenshot_concurrent) print(f"截图并发数已更新为: {new_max_screenshot_concurrent}") + # 同步更新任务调度器并发参数 + try: + get_task_scheduler().update_limits( + max_global=max_concurrent_global, + max_per_user=max_concurrent_per_account + ) + except Exception: + pass + return jsonify({"message": "系统配置已更新"}) return jsonify({"error": "更新失败"}), 400 @@ -3616,6 +4076,10 @@ def run_scheduled_task(skip_weekday_check=False): skipped_duplicates = 0 executed_accounts = 0 + # 获取系统配置的截图开关(避免每个账号重复读配置) + cfg = database.get_system_config() + enable_screenshot_scheduled = cfg.get("enable_screenshot", 0) == 1 + for user in approved_users: user_id = user['id'] if user_id not in user_accounts: @@ -3649,26 +4113,17 @@ def run_scheduled_task(skip_weekday_check=False): print(f"[定时任务] 启动账号: {account.username} (用户:{user['username']})") - # 启动任务 - account.is_running = True - account.should_stop = False - account.status = "运行中" - - # 获取系统配置的截图开关 - cfg = database.get_system_config() - enable_screenshot_scheduled = cfg.get("enable_screenshot", 0) == 1 - - thread = threading.Thread( - target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot_scheduled, 'scheduled'), - daemon=True + ok, msg = submit_account_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot_scheduled, + source='scheduled' ) - thread.start() - active_tasks[account_id] = thread - executed_accounts += 1 - - # 发送更新到用户 - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + if ok: + executed_accounts += 1 + else: + print(f"[定时任务] 启动失败({account.username}): {msg}") # 间隔启动,避免瞬间并发过高 time.sleep(2) @@ -3681,11 +4136,19 @@ def run_scheduled_task(skip_weekday_check=False): def status_push_worker(): - """后台线程:每秒推送运行中任务的状态更新""" + """后台线程:按间隔推送运行中任务的状态更新(可节流)""" + try: + push_interval = float(os.environ.get('STATUS_PUSH_INTERVAL_SECONDS', '2')) + except Exception: + push_interval = 2.0 + push_interval = max(0.5, push_interval) + while True: try: # 遍历所有运行中的任务状态 - for account_id, status_info in list(task_status.items()): + with task_status_lock: + status_items = list(task_status.items()) + for account_id, status_info in status_items: user_id = status_info.get('user_id') if user_id: # 获取账号对象 @@ -3708,10 +4171,10 @@ def status_push_worker(): 'elapsed_display': account_data.get('elapsed_display', '') } socketio.emit('task_progress', progress_data, room=f'user_{user_id}') - time.sleep(1) # 每秒推送一次 + time.sleep(push_interval) except Exception as e: logger.debug(f"状态推送出错: {e}") - time.sleep(1) + time.sleep(push_interval) def scheduled_task_worker(): @@ -3819,6 +4282,7 @@ def scheduled_task_worker(): random_time = window_start + timedelta(minutes=random_minutes) pending_random_schedules[schedule_id] = { 'scheduled_for': random_time, + 'created_at': time.time(), 'config': schedule_config } print(f"[定时任务] 任务#{schedule_id} 安排随机时间: {random_time.strftime('%H:%M')}") @@ -3890,12 +4354,33 @@ def scheduled_task_worker(): 'schedule_name': schedule_config.get('name', '未命名任务'), 'screenshots': [], 'total_accounts': 0, - 'completed': 0 + 'completed': 0, + 'created_at': time_mod.time(), + 'updated_at': time_mod.time() } started_count = 0 skipped_count = 0 - task_threads = [] # 收集所有启动的任务线程 + completion_lock = threading.Lock() + remaining = {'count': 0, 'done': False} + + def on_browse_done(): + """单账号浏览阶段完成回调:全部完成后更新执行日志""" + with completion_lock: + remaining['count'] -= 1 + if remaining['done'] or remaining['count'] > 0: + return + remaining['done'] = True + execution_duration = int(time_mod.time() - execution_start_time) + database.update_schedule_execution_log( + log_id, + total_accounts=len(account_ids), + success_accounts=started_count, + failed_accounts=len(account_ids) - started_count, + duration_seconds=execution_duration, + status='completed' + ) + print(f"[用户定时任务] 任务#{schedule_id}浏览阶段完成,耗时{execution_duration}秒,等待截图完成后发送邮件") for account_id in account_ids: if user_id not in user_accounts: @@ -3909,59 +4394,49 @@ def scheduled_task_worker(): skipped_count += 1 continue - account.is_running = True - account.should_stop = False - account.status = "排队中" - # 传递批次ID,格式: user_scheduled:batch_xxx task_source = f"user_scheduled:{batch_id}" - thread = threading.Thread( - target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot, task_source), - daemon=True + with completion_lock: + remaining['count'] += 1 + ok, msg = submit_account_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot, + source=task_source, + done_callback=on_browse_done ) - thread.start() - active_tasks[account_id] = thread - task_threads.append(thread) - started_count += 1 + if ok: + started_count += 1 + else: + with completion_lock: + remaining['count'] -= 1 + skipped_count += 1 + print(f"[用户定时任务] 账号 {account.username} 启动失败: {msg}") - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') - - # 更新批次的总账号数 + # 更新批次的总账号数(并处理“任务已提前完成/全部跳过”的场景) + should_send_email = False + batch_info = None with batch_task_lock: - if batch_id in batch_task_screenshots: - batch_task_screenshots[batch_id]['total_accounts'] = started_count + info = batch_task_screenshots.get(batch_id) + if info: + info['total_accounts'] = started_count + info['updated_at'] = time_mod.time() + if started_count <= 0: + batch_task_screenshots.pop(batch_id, None) + elif info.get('completed', 0) >= started_count: + should_send_email = True + batch_info = batch_task_screenshots.pop(batch_id) + + if should_send_email and batch_info: + _send_batch_task_email_if_configured(batch_info) # 更新最后执行时间 database.update_schedule_last_run(schedule_id) print(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号,批次ID: {batch_id}") - - # 启动监控线程,等待所有任务完成后更新日志 - # 注意:邮件发送已移至截图回调中,当所有截图完成时自动发送 - def wait_and_update_log(threads, start_time, lid, total, success, sid): - for t in threads: - t.join() # 等待每个任务完成(注意:这只是run_task线程,截图是异步的) - execution_duration = int(time_mod.time() - start_time) - database.update_schedule_execution_log( - lid, - total_accounts=total, - success_accounts=success, - failed_accounts=total - success, - duration_seconds=execution_duration, - status='completed' - ) - print(f"[用户定时任务] 任务#{sid}浏览阶段完成,耗时{execution_duration}秒,等待截图完成后发送邮件") - - if task_threads: - monitor_thread = threading.Thread( - target=wait_and_update_log, - args=(task_threads, execution_start_time, log_id, len(account_ids), started_count, schedule_id), - daemon=True - ) - monitor_thread.start() - else: - # 没有启动任何任务,直接更新日志 + # 没有启动任何任务,直接更新日志 + if started_count <= 0: database.update_schedule_execution_log( log_id, total_accounts=len(account_ids), @@ -3978,20 +4453,28 @@ def scheduled_task_worker(): import traceback traceback.print_exc() - # 用于跟踪配置变化,避免重复打印日志 - last_config_hash = [None] # 使用列表以便在闭包中修改 + try: + config_check_interval = float(os.environ.get('SCHEDULER_CONFIG_CHECK_SECONDS', '30')) + except Exception: + config_check_interval = 30.0 + config_check_interval = max(5.0, config_check_interval) - # 每分钟检查一次配置 - def check_and_schedule(): + schedule_state = {"signature": None} + + # 检查系统配置并按需重建 schedule 任务 + def check_and_schedule(force: bool = False): config = database.get_system_config() + schedule_enabled = bool(config.get('schedule_enabled')) + schedule_time_cst = config.get('schedule_time', '02:00') - # 计算配置哈希,检测配置是否变化 - config_hash = (config.get('schedule_enabled'), config.get('schedule_time', '02:00')) - config_changed = (last_config_hash[0] != config_hash) - is_first_run = (last_config_hash[0] is None) - last_config_hash[0] = config_hash + signature = (schedule_enabled, schedule_time_cst) + config_changed = (schedule_state.get("signature") != signature) + is_first_run = (schedule_state.get("signature") is None) + if (not force) and (not config_changed): + return + schedule_state["signature"] = signature - # 清除旧的任务 + # 清除旧的任务(仅当配置变化时才重建) schedule.clear() # 时区转换函数:将CST时间转换为UTC时间(容器使用UTC) @@ -4028,8 +4511,7 @@ def scheduled_task_worker(): print(f"[定时任务] 已设置SMTP配额重置: 每天 CST 00:00 (UTC {quota_reset_utc_time})") # 如果启用了定时浏览任务,则添加 - if config.get('schedule_enabled'): - schedule_time_cst = config.get('schedule_time', '02:00') + if schedule_enabled: schedule_time_utc = cst_to_utc_time(schedule_time_cst) schedule.every().day.at(schedule_time_utc).do(run_scheduled_task) # 只在首次运行或配置变化时打印 @@ -4039,19 +4521,27 @@ def scheduled_task_worker(): print(f"[定时任务] 浏览任务已禁用") # 初始检查 - check_and_schedule() - last_check = time.time() + check_and_schedule(force=True) + last_config_check = time.time() + last_user_schedule_minute = None while True: try: # 执行待执行的任务 schedule.run_pending() - # 每5秒重新检查一次配置(提高检查频率以确保定时任务准时执行) - if time.time() - last_check > 5: + now_ts = time.time() + # 按间隔检查系统配置(默认30秒;仅在变化时重建 schedule) + if now_ts - last_config_check >= config_check_interval: check_and_schedule() - check_user_schedules() # 检查用户定时任务 - last_check = time.time() + last_config_check = now_ts + + # 用户定时任务:按分钟触发检查(避免高频扫描) + now_beijing = get_beijing_now() + minute_key = now_beijing.strftime('%Y-%m-%d %H:%M') + if minute_key != last_user_schedule_minute: + check_user_schedules() + last_user_schedule_minute = minute_key time.sleep(1) except Exception as e: @@ -4082,12 +4572,18 @@ def checkpoint_resume(task_id): if checkpoint['status'] != 'paused': return jsonify({'success': False, 'message': '任务未暂停'}), 400 if checkpoint_mgr.resume_task(task_id): - import threading - threading.Thread( - target=run_task, - args=(checkpoint['user_id'], checkpoint['account_id'], checkpoint['browse_type'], True, 'resumed'), - daemon=True - ).start() + user_id = checkpoint['user_id'] + if user_id not in user_accounts: + load_user_accounts(user_id) + ok, msg = submit_account_task( + user_id=user_id, + account_id=checkpoint['account_id'], + browse_type=checkpoint['browse_type'], + enable_screenshot=True, + source='resumed' + ) + if not ok: + return jsonify({'success': False, 'message': msg}), 400 return jsonify({'success': True}) return jsonify({'success': False}), 500 except Exception as e: @@ -4465,20 +4961,15 @@ def run_schedule_now_api(schedule_id): if account.is_running: continue - account.is_running = True - account.should_stop = False - account.status = "排队中" - - thread = threading.Thread( - target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'), - daemon=True + ok, msg = submit_account_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot, + source='user_scheduled' ) - thread.start() - active_tasks[account_id] = thread - started.append(account_id) - - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + if ok: + started.append(account_id) database.update_schedule_last_run(schedule_id) @@ -4554,20 +5045,17 @@ def batch_start_accounts(): failed.append({'id': account_id, 'reason': '已在运行中'}) continue - account.is_running = True - account.should_stop = False - account.status = "排队中" - - thread = threading.Thread( - target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot, 'batch'), - daemon=True + ok, msg = submit_account_task( + user_id=user_id, + account_id=account_id, + browse_type=browse_type, + enable_screenshot=enable_screenshot, + source='batch' ) - thread.start() - active_tasks[account_id] = thread - started.append(account_id) - - socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + if ok: + started.append(account_id) + else: + failed.append({'id': account_id, 'reason': msg}) return jsonify({ "success": True, @@ -4605,6 +5093,16 @@ def batch_stop_accounts(): account.status = "正在停止" stopped.append(account_id) + # 若仍在排队,直接取消 + try: + scheduler = get_task_scheduler() + if scheduler.cancel_pending_task(user_id=user_id, account_id=account_id): + account.status = "已停止" + account.is_running = False + safe_remove_task_status(account_id) + except Exception: + pass + socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({ @@ -4622,24 +5120,22 @@ def cleanup_on_exit(): # 1. 停止所有运行中的任务 print("- 停止运行中的任务...") - for account_id, thread in list(active_tasks.items()): - try: - # 设置停止标志 - if account_id in user_accounts: - for user_id in user_accounts: - if account_id in user_accounts[user_id]: - user_accounts[user_id][account_id].should_stop = True - except Exception: - pass + try: + for uid, accounts in list(user_accounts.items()): + for _, acc in list(accounts.items()): + if getattr(acc, 'is_running', False): + acc.should_stop = True + except Exception: + pass - # 2. 等待所有线程完成(最多等待5秒) - print("- 等待线程退出...") - for account_id, thread in list(active_tasks.items()): - try: - if thread and thread.is_alive(): - thread.join(timeout=2) - except Exception: - pass + # 2. 停止任务调度器(停止派发新任务) + print("- 停止任务调度器...") + try: + global _task_scheduler + if _task_scheduler: + _task_scheduler.shutdown(timeout=5) + except Exception: + pass # 3. 关闭浏览器工作线程池 print("- 关闭浏览器线程池...") @@ -4708,13 +5204,23 @@ if __name__ == '__main__': globals()['max_concurrent_global'] = system_config.get('max_concurrent_global', 2) globals()['max_concurrent_per_account'] = system_config.get('max_concurrent_per_account', 1) - # 重新创建信号量 - globals()['global_semaphore'] = threading.Semaphore(globals()['max_concurrent_global']) + # 重新创建信号量(VIP优先) + globals()['global_semaphore'] = PrioritySemaphore(globals()['max_concurrent_global']) print(f"✓ 已加载并发配置: 全局={globals()['max_concurrent_global']}, 单账号={globals()['max_concurrent_per_account']}") except Exception as e: print(f"警告: 加载并发配置失败,使用默认值: {e}") + # 初始化任务调度器(固定worker + 优先队列) + try: + get_task_scheduler().update_limits( + max_global=globals().get('max_concurrent_global', max_concurrent_global), + max_per_user=globals().get('max_concurrent_per_account', max_concurrent_per_account) + ) + print("✓ 任务调度器已初始化") + except Exception as e: + print(f"警告: 任务调度器初始化失败: {e}") + # 主线程初始化浏览器(Playwright不支持跨线程) print("\n正在初始化浏览器管理器...") init_browser_manager() @@ -4748,5 +5254,3 @@ if __name__ == '__main__': print(f"警告: 截图线程池初始化失败: {e}") socketio.run(app, host=config.SERVER_HOST, port=config.SERVER_PORT, debug=config.DEBUG, allow_unsafe_werkzeug=True) - - diff --git a/browser_pool_worker.py b/browser_pool_worker.py index bd7317e..e9abb06 100755 --- a/browser_pool_worker.py +++ b/browser_pool_worker.py @@ -10,9 +10,11 @@ from typing import Callable, Optional, Dict, Any import nest_asyncio nest_asyncio.apply() -# 安全修复: 将魔法数字提取为可配置常量 -BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒),默认5分钟 -TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒) +# 安全修复: 将魔法数字提取为可配置常量 +BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒),默认5分钟 +TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒) +TASK_QUEUE_MAXSIZE = int(os.environ.get('BROWSER_TASK_QUEUE_MAXSIZE', '200')) # 队列最大长度(0表示无限制) +BROWSER_MAX_USE_COUNT = int(os.environ.get('BROWSER_MAX_USE_COUNT', '0')) # 每个浏览器最大复用次数(0表示不限制) class BrowserWorker(threading.Thread): @@ -146,27 +148,33 @@ class BrowserWorker(threading.Thread): self.log(f"开始执行任务(第{self.browser_instance['use_count']}次使用浏览器)") - try: - # 将浏览器实例传递给任务函数 - result = task_func(self.browser_instance, *task_args, **task_kwargs) - callback(result, None) - self.log(f"任务执行成功") - last_task_time = time.time() - - except Exception as e: - self.log(f"任务执行失败: {e}") - callback(None, str(e)) - self.failed_tasks += 1 - last_task_time = time.time() - - # 任务失败后,检查浏览器健康 - if not self._check_browser_health(): - self.log("任务失败导致浏览器异常,将在下次任务前重建") - self._close_browser() - - except Exception as e: - self.log(f"Worker出错: {e}") - time.sleep(1) + try: + # 将浏览器实例传递给任务函数 + result = task_func(self.browser_instance, *task_args, **task_kwargs) + callback(result, None) + self.log(f"任务执行成功") + last_task_time = time.time() + + except Exception as e: + self.log(f"任务执行失败: {e}") + callback(None, str(e)) + self.failed_tasks += 1 + last_task_time = time.time() + + # 任务失败后,检查浏览器健康 + if not self._check_browser_health(): + self.log("任务失败导致浏览器异常,将在下次任务前重建") + self._close_browser() + + # 定期重启浏览器,释放Chromium可能累积的内存 + if self.browser_instance and BROWSER_MAX_USE_COUNT > 0: + if self.browser_instance.get('use_count', 0) >= BROWSER_MAX_USE_COUNT: + self.log(f"浏览器已复用{self.browser_instance['use_count']}次,重启释放资源") + self._close_browser() + + except Exception as e: + self.log(f"Worker出错: {e}") + time.sleep(1) # 清理资源 self._close_browser() @@ -177,16 +185,17 @@ class BrowserWorker(threading.Thread): self.running = False -class BrowserWorkerPool: - """浏览器工作线程池""" - - def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None): - self.pool_size = pool_size - self.log_callback = log_callback - self.task_queue = queue.Queue() - self.workers = [] - self.initialized = False - self.lock = threading.Lock() +class BrowserWorkerPool: + """浏览器工作线程池""" + + def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None): + self.pool_size = pool_size + self.log_callback = log_callback + maxsize = TASK_QUEUE_MAXSIZE if TASK_QUEUE_MAXSIZE > 0 else 0 + self.task_queue = queue.Queue(maxsize=maxsize) + self.workers = [] + self.initialized = False + self.lock = threading.Lock() def log(self, message: str): """日志输出""" @@ -215,7 +224,7 @@ class BrowserWorkerPool: self.initialized = True self.log(f"✓ 工作线程池初始化完成({self.pool_size}个worker就绪,浏览器将在有任务时按需启动)") - def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool: + def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool: """ 提交任务到队列 @@ -238,8 +247,12 @@ class BrowserWorkerPool: 'callback': callback } - self.task_queue.put(task) - return True + try: + self.task_queue.put(task, timeout=1) + return True + except queue.Full: + self.log(f"警告:任务队列已满(maxsize={self.task_queue.maxsize}),拒绝提交任务") + return False def get_stats(self) -> Dict[str, Any]: """获取线程池统计信息""" diff --git a/email_service.py b/email_service.py index 4e2708f..3904dcc 100644 --- a/email_service.py +++ b/email_service.py @@ -1623,6 +1623,15 @@ class EmailQueue: def _process_task(self, task: Dict): """处理邮件任务""" try: + func = task.get('callable') + if callable(func): + args = task.get('args') or () + kwargs = task.get('kwargs') or {} + result = func(*args, **kwargs) + if task.get('callback'): + task['callback'](result) + return + result = send_email( to_email=task['to_email'], subject=task['subject'], @@ -1670,6 +1679,20 @@ class EmailQueue: print("[邮件服务] 邮件队列已满") return False + def enqueue_callable(self, func: Callable, args=None, kwargs=None, callback: Callable = None) -> bool: + """将可调用任务加入队列(用于复杂邮件/打包等逻辑异步化)""" + try: + self.queue.put({ + 'callable': func, + 'args': args or (), + 'kwargs': kwargs or {}, + 'callback': callback + }, timeout=5) + return True + except queue.Full: + print("[邮件服务] 邮件队列已满") + return False + @property def pending_count(self) -> int: """队列中待处理的任务数""" @@ -1958,14 +1981,14 @@ def send_task_complete_email_async( log_callback: Callable = None ): """异步发送任务完成通知邮件""" - import threading - thread = threading.Thread( - target=send_task_complete_email, + queue = get_email_queue() + ok = queue.enqueue_callable( + send_task_complete_email, args=(user_id, email, username, account_name, browse_type, total_items, total_attachments, screenshot_path, log_callback), - daemon=True ) - thread.start() + if (not ok) and log_callback: + log_callback("[邮件] 邮件队列已满,任务通知未发送") def send_batch_task_complete_email( @@ -2058,32 +2081,55 @@ def send_batch_task_complete_email( """ - # 收集所有截图文件 - screenshot_files = [] + # 收集可用截图文件路径(避免把所有图片一次性读入内存) + screenshot_paths = [] for s in screenshots: - if s.get('path') and os.path.exists(s['path']): - try: - with open(s['path'], 'rb') as f: - screenshot_files.append({ - 'filename': f"{s.get('account_name', 'screenshot')}_{os.path.basename(s['path'])}", - 'data': f.read() - }) - except Exception as e: - print(f"[邮件] 读取截图文件失败: {e}") + path = s.get('path') + if path and os.path.exists(path): + arcname = f"{s.get('account_name', 'screenshot')}_{os.path.basename(path)}" + screenshot_paths.append((path, arcname)) - # 如果有截图,打包成ZIP + # 如果有截图,优先落盘打包ZIP,再按大小决定是否附加(降低内存峰值) zip_data = None zip_filename = None - if screenshot_files: + attachment_note = "" + if screenshot_paths: + import tempfile + zip_path = None try: - zip_buffer = BytesIO() - with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: - for sf in screenshot_files: - zf.writestr(sf['filename'], sf['data']) - zip_data = zip_buffer.getvalue() - zip_filename = f"screenshots_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" + with tempfile.NamedTemporaryFile(prefix="screenshots_", suffix=".zip", delete=False) as tmp: + zip_path = tmp.name + with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: + for file_path, arcname in screenshot_paths: + try: + zf.write(file_path, arcname=arcname) + except Exception as e: + print(f"[邮件] 写入ZIP失败: {e}") + + zip_size = os.path.getsize(zip_path) if zip_path and os.path.exists(zip_path) else 0 + if zip_size <= 0: + attachment_note = "本次无可用截图文件(可能截图失败或文件不存在)。" + elif zip_size > MAX_ATTACHMENT_SIZE: + attachment_note = f"截图打包文件过大({zip_size} bytes),本次不附加附件。" + else: + with open(zip_path, 'rb') as f: + zip_data = f.read() + zip_filename = f"screenshots_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" + attachment_note = "截图已打包为ZIP附件,请查收。" except Exception as e: print(f"[邮件] 打包截图失败: {e}") + attachment_note = "截图打包失败,本次不附加附件。" + finally: + if zip_path and os.path.exists(zip_path): + try: + os.remove(zip_path) + except Exception: + pass + else: + attachment_note = "本次无可用截图文件(可能截图失败或未启用截图)。" + + # 将附件说明写入邮件内容 + html_content = html_content.replace("截图已打包为ZIP附件,请查收。", attachment_note) # 发送邮件 attachments = [] @@ -2132,13 +2178,13 @@ def send_batch_task_complete_email_async( screenshots: List[Dict[str, Any]] ): """异步发送批次任务完成通知邮件""" - import threading - thread = threading.Thread( - target=send_batch_task_complete_email, + queue = get_email_queue() + ok = queue.enqueue_callable( + send_batch_task_complete_email, args=(user_id, email, username, schedule_name, browse_type, screenshots), - daemon=True ) - thread.start() + if not ok: + print("[邮件] 邮件队列已满,批次任务邮件未发送") # ============ 初始化 ============ diff --git a/playwright_automation.py b/playwright_automation.py index 38cd347..45b742f 100755 --- a/playwright_automation.py +++ b/playwright_automation.py @@ -12,6 +12,7 @@ import time import json import threading import atexit +import weakref from typing import Optional, Callable from dataclasses import dataclass from app_config import get_config @@ -28,6 +29,20 @@ else: # 获取配置 config = get_config() +_playwright_automation_instances: "weakref.WeakSet[PlaywrightAutomation]" = weakref.WeakSet() + + +def _cleanup_playwright_automation_instances(): + """进程退出时清理残留的自动化实例(弱引用,不阻止GC)""" + for inst in list(_playwright_automation_instances): + try: + inst._force_cleanup() + except Exception: + pass + + +atexit.register(_cleanup_playwright_automation_instances) + @dataclass class BrowseResult: @@ -151,8 +166,7 @@ class PlaywrightAutomation: self._closed = False # 防止重复关闭 self._lock = threading.Lock() # Bug #13 fix: 保护浏览器资源访问 - # 注册退出清理函数,确保进程异常退出时也能关闭浏览器 - atexit.register(self._cleanup_on_exit) + _playwright_automation_instances.add(self) def log(self, message: str): """记录日志""" @@ -215,6 +229,52 @@ class PlaywrightAutomation: except Exception as e: self.log(f"加载cookies失败: {e}") return False + + def load_cookies_into_current_browser(self, username: str) -> bool: + """在“已连接的现有 browser”上加载 cookies 创建 context(用于浏览器池复用)""" + import os + if not self.browser or not self.browser.is_connected(): + return False + + cookies_path = self.get_cookies_path(username) + if not os.path.exists(cookies_path): + return False + + try: + # 检查cookies文件是否过期(24小时) + import time as time_module + file_age = time_module.time() - os.path.getmtime(cookies_path) + if file_age > 24 * 3600: + self.log("Cookies已过期,需要重新登录") + os.remove(cookies_path) + return False + + with open(cookies_path, 'r', encoding='utf-8') as f: + storage = json.load(f) + + context_options = { + 'viewport': {'width': 1920, 'height': 1080}, + 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'device_scale_factor': 2, + 'storage_state': storage + } + self.context = self.browser.new_context(**context_options) + self.context.set_default_timeout(config.DEFAULT_TIMEOUT) + self.context.set_default_navigation_timeout(config.PAGE_LOAD_TIMEOUT) + self.page = self.context.new_page() + self.main_page = self.page + return True + except Exception as e: + self.log(f"加载cookies失败: {e}") + try: + if self.context: + self.context.close() + except Exception: + pass + self.context = None + self.page = None + self.main_page = None + return False def check_login_state(self) -> bool: """检查当前是否处于登录状态""" @@ -235,9 +295,24 @@ class PlaywrightAutomation: def quick_login(self, username: str, password: str, remember: bool = True): """快速登录 - 使用池中浏览器时直接登录,否则尝试cookies""" - # 如果已有浏览器实例(从池中获取),直接使用该浏览器登录 - # 不尝试加载cookies,因为load_cookies会创建新浏览器覆盖池中的 + # 如果已有浏览器实例(从池中获取),优先尝试复用cookies(避免重复登录/减少耗时) if self.browser and self.browser.is_connected(): + if self.load_cookies_into_current_browser(username): + self.log("使用池中浏览器,尝试使用已保存的登录态...") + if self.check_login_state(): + self.log("✓ 登录态有效,跳过登录") + return {"success": True, "message": "使用已保存的登录态", "used_cookies": True} + else: + self.log("登录态已失效,重新登录") + try: + if self.context: + self.context.close() + except Exception: + pass + self.context = None + self.page = None + self.main_page = None + self.log("使用池中浏览器,直接登录") result = self.login(username, password, remember) if result.get('success'):