869 lines
36 KiB
Python
869 lines
36 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
import heapq
|
||
import os
|
||
import threading
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, wait
|
||
from dataclasses import dataclass
|
||
|
||
import database
|
||
import email_service
|
||
from api_browser import APIBrowser
|
||
from app_config import get_config
|
||
from app_logger import LoggerAdapter, get_logger
|
||
from services.checkpoints import get_checkpoint_mgr
|
||
from services.client_log import log_to_client
|
||
from services.proxy import get_proxy_from_api
|
||
from services.runtime import get_socketio
|
||
from services.screenshots import take_screenshot_for_account
|
||
from services.state import (
|
||
safe_get_account,
|
||
safe_get_task,
|
||
safe_remove_task,
|
||
safe_remove_task_status,
|
||
safe_set_task,
|
||
safe_set_task_status,
|
||
safe_update_task_status,
|
||
)
|
||
from services.task_batches import _batch_task_record_result, _get_batch_id_from_source
|
||
from task_checkpoint import TaskStage
|
||
|
||
logger = get_logger("app")
|
||
config = get_config()
|
||
|
||
# VIP优先级队列(仅用于可视化/调试)
|
||
vip_task_queue = [] # VIP用户任务队列
|
||
normal_task_queue = [] # 普通用户任务队列
|
||
task_queue_lock = threading.Lock()
|
||
|
||
# 并发默认值(启动后会由系统配置覆盖并调用 update_limits)
|
||
max_concurrent_per_account = config.MAX_CONCURRENT_PER_ACCOUNT
|
||
max_concurrent_global = config.MAX_CONCURRENT_GLOBAL
|
||
|
||
|
||
def _emit(event: str, data: object, *, room: str | None = None) -> None:
|
||
try:
|
||
socketio = get_socketio()
|
||
socketio.emit(event, data, room=room)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
@dataclass
|
||
class _TaskRequest:
|
||
user_id: int
|
||
account_id: str
|
||
browse_type: str
|
||
enable_screenshot: bool
|
||
source: str
|
||
retry_count: int
|
||
submitted_at: float
|
||
is_vip: bool
|
||
seq: int
|
||
canceled: bool = False
|
||
done_callback: object = None
|
||
|
||
|
||
class TaskScheduler:
|
||
"""全局任务调度器:队列排队,不为每个任务单独创建线程。"""
|
||
|
||
def __init__(self, max_global: int, max_per_user: int, max_queue_size: int = 1000):
|
||
self.max_global = max(1, int(max_global))
|
||
self.max_per_user = max(1, int(max_per_user))
|
||
self.max_queue_size = max(1, int(max_queue_size))
|
||
|
||
self._cond = threading.Condition()
|
||
self._pending = [] # heap: (priority, submitted_at, seq, task)
|
||
self._pending_by_account = {} # {account_id: task}
|
||
self._seq = 0
|
||
|
||
self._running_global = 0
|
||
self._running_by_user = {} # {user_id: running_count}
|
||
|
||
self._executor_max_workers = self.max_global
|
||
self._executor = ThreadPoolExecutor(max_workers=self._executor_max_workers, thread_name_prefix="TaskWorker")
|
||
self._old_executors = []
|
||
|
||
self._futures_lock = threading.Lock()
|
||
self._active_futures = set()
|
||
|
||
self._running = True
|
||
self._dispatcher_thread = threading.Thread(target=self._dispatch_loop, daemon=True, name="TaskDispatcher")
|
||
self._dispatcher_thread.start()
|
||
|
||
def _track_future(self, future) -> None:
|
||
with self._futures_lock:
|
||
self._active_futures.add(future)
|
||
try:
|
||
future.add_done_callback(self._untrack_future)
|
||
except Exception:
|
||
pass
|
||
|
||
def _untrack_future(self, future) -> None:
|
||
with self._futures_lock:
|
||
self._active_futures.discard(future)
|
||
|
||
def shutdown(self, timeout: float = 5.0):
|
||
"""停止调度器(用于进程退出清理)"""
|
||
with self._cond:
|
||
self._running = False
|
||
self._cond.notify_all()
|
||
|
||
try:
|
||
self._dispatcher_thread.join(timeout=timeout)
|
||
except Exception:
|
||
pass
|
||
|
||
# 等待已提交的任务收尾(最多等待 timeout 秒),避免遗留 active_task 干扰后续调度/测试
|
||
try:
|
||
deadline = time.time() + max(0.0, float(timeout or 0))
|
||
while True:
|
||
with self._futures_lock:
|
||
pending = [f for f in self._active_futures if not f.done()]
|
||
if not pending:
|
||
break
|
||
remaining = deadline - time.time()
|
||
if remaining <= 0:
|
||
break
|
||
wait(pending, timeout=remaining)
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
self._executor.shutdown(wait=False)
|
||
except Exception:
|
||
pass
|
||
|
||
for ex in self._old_executors:
|
||
try:
|
||
ex.shutdown(wait=False)
|
||
except Exception:
|
||
pass
|
||
|
||
def update_limits(self, max_global: int = None, max_per_user: int = None, max_queue_size: int = None):
|
||
"""动态更新并发/队列上限(不影响已在运行的任务)"""
|
||
with self._cond:
|
||
if max_per_user is not None:
|
||
self.max_per_user = max(1, int(max_per_user))
|
||
if max_queue_size is not None:
|
||
self.max_queue_size = max(1, int(max_queue_size))
|
||
|
||
if max_global is not None:
|
||
new_max_global = max(1, int(max_global))
|
||
self.max_global = new_max_global
|
||
if new_max_global > self._executor_max_workers:
|
||
self._old_executors.append(self._executor)
|
||
self._executor_max_workers = new_max_global
|
||
self._executor = ThreadPoolExecutor(
|
||
max_workers=self._executor_max_workers, thread_name_prefix="TaskWorker"
|
||
)
|
||
try:
|
||
self._old_executors[-1].shutdown(wait=False)
|
||
except Exception:
|
||
pass
|
||
|
||
self._cond.notify_all()
|
||
|
||
def submit_task(
|
||
self,
|
||
user_id: int,
|
||
account_id: str,
|
||
browse_type: str,
|
||
enable_screenshot: bool = True,
|
||
source: str = "manual",
|
||
retry_count: int = 0,
|
||
is_vip: bool = None,
|
||
done_callback=None,
|
||
):
|
||
"""提交任务进入队列(返回: (ok, message))"""
|
||
if not user_id or not account_id:
|
||
return False, "参数错误"
|
||
|
||
submitted_at = time.time()
|
||
if is_vip is None:
|
||
try:
|
||
is_vip = bool(database.is_user_vip(user_id))
|
||
except Exception:
|
||
is_vip = False
|
||
else:
|
||
is_vip = bool(is_vip)
|
||
|
||
with self._cond:
|
||
if not self._running:
|
||
return False, "调度器未运行"
|
||
if len(self._pending_by_account) >= self.max_queue_size:
|
||
return False, "任务队列已满,请稍后再试"
|
||
if account_id in self._pending_by_account:
|
||
return False, "任务已在队列中"
|
||
if safe_get_task(account_id) is not None:
|
||
return False, "任务已在运行中"
|
||
|
||
self._seq += 1
|
||
task = _TaskRequest(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
browse_type=browse_type,
|
||
enable_screenshot=bool(enable_screenshot),
|
||
source=source,
|
||
retry_count=int(retry_count or 0),
|
||
submitted_at=submitted_at,
|
||
is_vip=is_vip,
|
||
seq=self._seq,
|
||
done_callback=done_callback,
|
||
)
|
||
self._pending_by_account[account_id] = task
|
||
priority = 0 if is_vip else 1
|
||
heapq.heappush(self._pending, (priority, task.submitted_at, task.seq, task))
|
||
self._cond.notify_all()
|
||
|
||
# 用于可视化/调试:记录队列
|
||
with task_queue_lock:
|
||
if is_vip:
|
||
vip_task_queue.append(account_id)
|
||
else:
|
||
normal_task_queue.append(account_id)
|
||
|
||
return True, "已加入队列"
|
||
|
||
def cancel_pending_task(self, user_id: int, account_id: str) -> bool:
|
||
"""取消尚未开始的排队任务(已运行的任务由 should_stop 控制)"""
|
||
canceled_task = None
|
||
with self._cond:
|
||
task = self._pending_by_account.pop(account_id, None)
|
||
if not task:
|
||
return False
|
||
task.canceled = True
|
||
canceled_task = task
|
||
self._cond.notify_all()
|
||
|
||
# 从可视化队列移除
|
||
with task_queue_lock:
|
||
if account_id in vip_task_queue:
|
||
vip_task_queue.remove(account_id)
|
||
if account_id in normal_task_queue:
|
||
normal_task_queue.remove(account_id)
|
||
|
||
# 批次任务:取消也要推进完成计数,避免批次缓存常驻
|
||
try:
|
||
batch_id = _get_batch_id_from_source(canceled_task.source)
|
||
if batch_id:
|
||
acc = safe_get_account(user_id, account_id)
|
||
if acc:
|
||
account_name = acc.remark if acc.remark else acc.username
|
||
else:
|
||
account_name = account_id
|
||
_batch_task_record_result(
|
||
batch_id=batch_id,
|
||
account_name=account_name,
|
||
screenshot_path=None,
|
||
total_items=0,
|
||
total_attachments=0,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
return True
|
||
|
||
def _dispatch_loop(self):
|
||
while True:
|
||
task = None
|
||
with self._cond:
|
||
if not self._running:
|
||
return
|
||
|
||
if not self._pending or self._running_global >= self.max_global:
|
||
self._cond.wait(timeout=0.5)
|
||
continue
|
||
|
||
task = self._pop_next_runnable_locked()
|
||
if task is None:
|
||
self._cond.wait(timeout=0.5)
|
||
continue
|
||
|
||
self._running_global += 1
|
||
self._running_by_user[task.user_id] = self._running_by_user.get(task.user_id, 0) + 1
|
||
|
||
# 从队列移除(可视化)
|
||
with task_queue_lock:
|
||
if task.account_id in vip_task_queue:
|
||
vip_task_queue.remove(task.account_id)
|
||
if task.account_id in normal_task_queue:
|
||
normal_task_queue.remove(task.account_id)
|
||
|
||
try:
|
||
future = self._executor.submit(self._run_task_wrapper, task)
|
||
self._track_future(future)
|
||
safe_set_task(task.account_id, future)
|
||
except Exception:
|
||
with self._cond:
|
||
self._running_global = max(0, self._running_global - 1)
|
||
self._running_by_user[task.user_id] = max(0, self._running_by_user.get(task.user_id, 1) - 1)
|
||
if self._running_by_user.get(task.user_id) == 0:
|
||
self._running_by_user.pop(task.user_id, None)
|
||
self._cond.notify_all()
|
||
|
||
def _pop_next_runnable_locked(self):
|
||
"""在锁内从优先队列取出“可运行”的任务,避免VIP任务占位阻塞普通任务。"""
|
||
if not self._pending:
|
||
return None
|
||
|
||
skipped = []
|
||
selected = None
|
||
|
||
while self._pending:
|
||
_, _, _, task = heapq.heappop(self._pending)
|
||
|
||
if task.canceled:
|
||
continue
|
||
if self._pending_by_account.get(task.account_id) is not task:
|
||
continue
|
||
|
||
running_for_user = self._running_by_user.get(task.user_id, 0)
|
||
if running_for_user >= self.max_per_user:
|
||
skipped.append(task)
|
||
continue
|
||
|
||
selected = task
|
||
break
|
||
|
||
for t in skipped:
|
||
priority = 0 if t.is_vip else 1
|
||
heapq.heappush(self._pending, (priority, t.submitted_at, t.seq, t))
|
||
|
||
if selected is None:
|
||
return None
|
||
|
||
self._pending_by_account.pop(selected.account_id, None)
|
||
return selected
|
||
|
||
def _run_task_wrapper(self, task: _TaskRequest):
|
||
try:
|
||
run_task(
|
||
user_id=task.user_id,
|
||
account_id=task.account_id,
|
||
browse_type=task.browse_type,
|
||
enable_screenshot=task.enable_screenshot,
|
||
source=task.source,
|
||
retry_count=task.retry_count,
|
||
)
|
||
finally:
|
||
try:
|
||
if callable(task.done_callback):
|
||
task.done_callback()
|
||
except Exception:
|
||
pass
|
||
safe_remove_task(task.account_id)
|
||
with self._cond:
|
||
self._running_global = max(0, self._running_global - 1)
|
||
self._running_by_user[task.user_id] = max(0, self._running_by_user.get(task.user_id, 1) - 1)
|
||
if self._running_by_user.get(task.user_id) == 0:
|
||
self._running_by_user.pop(task.user_id, None)
|
||
self._cond.notify_all()
|
||
|
||
|
||
_task_scheduler = None
|
||
_task_scheduler_lock = threading.Lock()
|
||
|
||
|
||
def get_task_scheduler() -> TaskScheduler:
|
||
"""获取全局任务调度器(单例)"""
|
||
global _task_scheduler
|
||
with _task_scheduler_lock:
|
||
if _task_scheduler is None:
|
||
try:
|
||
max_queue_size = int(os.environ.get("TASK_QUEUE_MAXSIZE", "1000"))
|
||
except Exception:
|
||
max_queue_size = 1000
|
||
_task_scheduler = TaskScheduler(
|
||
max_global=max_concurrent_global,
|
||
max_per_user=max_concurrent_per_account,
|
||
max_queue_size=max_queue_size,
|
||
)
|
||
return _task_scheduler
|
||
|
||
|
||
def submit_account_task(
|
||
user_id: int,
|
||
account_id: str,
|
||
browse_type: str,
|
||
enable_screenshot: bool = True,
|
||
source: str = "manual",
|
||
retry_count: int = 0,
|
||
done_callback=None,
|
||
):
|
||
"""统一入口:提交账号任务进入队列"""
|
||
account = safe_get_account(user_id, account_id)
|
||
if not account:
|
||
return False, "账号不存在"
|
||
if getattr(account, "is_running", False):
|
||
return False, "任务已在运行中"
|
||
|
||
try:
|
||
is_vip_user = bool(database.is_user_vip(user_id))
|
||
except Exception:
|
||
is_vip_user = False
|
||
|
||
account.is_running = True
|
||
account.should_stop = False
|
||
account.status = "排队中" + (" (VIP)" if is_vip_user else "")
|
||
|
||
safe_set_task_status(
|
||
account_id,
|
||
{
|
||
"user_id": user_id,
|
||
"username": account.username,
|
||
"status": "排队中",
|
||
"detail_status": "等待资源" + (" [VIP优先]" if is_vip_user else ""),
|
||
"browse_type": browse_type,
|
||
"start_time": time.time(),
|
||
"source": source,
|
||
"progress": {"items": 0, "attachments": 0},
|
||
"is_vip": is_vip_user,
|
||
},
|
||
)
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
|
||
scheduler = get_task_scheduler()
|
||
ok, message = scheduler.submit_task(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
browse_type=browse_type,
|
||
enable_screenshot=enable_screenshot,
|
||
source=source,
|
||
retry_count=retry_count,
|
||
is_vip=is_vip_user,
|
||
done_callback=done_callback,
|
||
)
|
||
if not ok:
|
||
account.is_running = False
|
||
account.status = "未开始"
|
||
safe_remove_task_status(account_id)
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
return False, message
|
||
|
||
log_to_client(message + (" [VIP优先]" if is_vip_user else ""), user_id, account_id)
|
||
return True, message
|
||
|
||
|
||
def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="manual", retry_count=0):
|
||
"""运行自动化任务
|
||
|
||
Args:
|
||
retry_count: 当前重试次数,用于自动重试机制(最多重试2次)
|
||
"""
|
||
MAX_AUTO_RETRY = 2 # 最大自动重试次数
|
||
LoggerAdapter("app", {"user_id": user_id, "account_id": account_id, "source": source}).debug(
|
||
f"run_task enable_screenshot={enable_screenshot} ({type(enable_screenshot).__name__}), retry={retry_count}"
|
||
)
|
||
|
||
account = safe_get_account(user_id, account_id)
|
||
if not account:
|
||
return
|
||
batch_id = _get_batch_id_from_source(source)
|
||
batch_recorded = False
|
||
|
||
checkpoint_mgr = get_checkpoint_mgr()
|
||
|
||
import time as time_module
|
||
|
||
try:
|
||
if account.should_stop:
|
||
log_to_client("任务已取消", user_id, account_id)
|
||
account.status = "已停止"
|
||
account.is_running = False
|
||
safe_remove_task_status(account_id)
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
if batch_id:
|
||
account_name = account.remark if account.remark else account.username
|
||
_batch_task_record_result(
|
||
batch_id=batch_id,
|
||
account_name=account_name,
|
||
screenshot_path=None,
|
||
total_items=0,
|
||
total_attachments=0,
|
||
)
|
||
return
|
||
|
||
try:
|
||
if account.should_stop:
|
||
log_to_client("任务已取消", user_id, account_id)
|
||
account.status = "已停止"
|
||
account.is_running = False
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
return
|
||
|
||
task_id = checkpoint_mgr.create_checkpoint(
|
||
user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type
|
||
)
|
||
logger.info(f"[断点] 任务 {task_id} 已创建")
|
||
|
||
task_start_time = time_module.time()
|
||
|
||
account.status = "运行中"
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
account.last_browse_type = browse_type
|
||
|
||
safe_update_task_status(account_id, {"status": "运行中", "detail_status": "初始化", "start_time": task_start_time})
|
||
|
||
max_attempts = 3
|
||
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
if attempt > 1:
|
||
log_to_client(f"🔄 第 {attempt} 次尝试(共{max_attempts}次)...", user_id, account_id)
|
||
|
||
proxy_config = None
|
||
config = database.get_system_config()
|
||
if config.get("proxy_enabled") == 1:
|
||
proxy_api_url = config.get("proxy_api_url", "").strip()
|
||
if proxy_api_url:
|
||
log_to_client("正在获取代理IP...", user_id, account_id)
|
||
proxy_server = get_proxy_from_api(proxy_api_url, max_retries=3)
|
||
if proxy_server:
|
||
proxy_config = {"server": proxy_server}
|
||
log_to_client(f"✓ 将使用代理: {proxy_server}", user_id, account_id)
|
||
account.proxy_config = proxy_config # 保存代理配置供截图使用
|
||
else:
|
||
log_to_client("✗ 代理获取失败,将不使用代理继续", user_id, account_id)
|
||
else:
|
||
log_to_client("⚠ 代理已启用但未配置API地址", user_id, account_id)
|
||
|
||
checkpoint_mgr.update_stage(task_id, TaskStage.STARTING, progress_percent=10)
|
||
|
||
def custom_log(message: str):
|
||
log_to_client(message, user_id, account_id)
|
||
|
||
log_to_client("开始登录...", user_id, account_id)
|
||
safe_update_task_status(account_id, {"detail_status": "正在登录"})
|
||
checkpoint_mgr.update_stage(task_id, TaskStage.LOGGING_IN, progress_percent=25)
|
||
|
||
with APIBrowser(log_callback=custom_log, proxy_config=proxy_config) as api_browser:
|
||
if api_browser.login(account.username, account.password):
|
||
log_to_client("✓ 登录成功!", user_id, account_id)
|
||
api_browser.save_cookies_for_playwright(account.username)
|
||
database.reset_account_login_status(account_id)
|
||
|
||
if not account.remark:
|
||
try:
|
||
real_name = api_browser.get_real_name()
|
||
if real_name:
|
||
account.remark = real_name
|
||
database.update_account_remark(account_id, real_name)
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
logger.info(f"[自动备注] 账号 {account.username} 自动设置备注为: {real_name}")
|
||
except Exception as e:
|
||
logger.warning(f"[自动备注] 获取姓名失败: {e}")
|
||
|
||
safe_update_task_status(account_id, {"detail_status": "正在浏览"})
|
||
log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id)
|
||
|
||
def should_stop():
|
||
return account.should_stop
|
||
|
||
checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50)
|
||
result = api_browser.browse_content(browse_type=browse_type, should_stop_callback=should_stop)
|
||
else:
|
||
error_message = "登录失败"
|
||
log_to_client(f"❌ {error_message}", user_id, account_id)
|
||
|
||
is_suspended = database.increment_account_login_fail(account_id, error_message)
|
||
if is_suspended:
|
||
log_to_client("⚠ 该账号连续3次密码错误,已自动暂停", user_id, account_id)
|
||
log_to_client("请在前台修改密码后才能继续使用", user_id, account_id)
|
||
|
||
retry_action = checkpoint_mgr.record_error(task_id, error_message)
|
||
if retry_action == "paused":
|
||
logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)")
|
||
|
||
account.status = "登录失败"
|
||
account.is_running = False
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="failed",
|
||
total_items=0,
|
||
total_attachments=0,
|
||
error_message=error_message,
|
||
duration=int(time_module.time() - task_start_time),
|
||
source=source,
|
||
)
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
return
|
||
|
||
account.total_items = result.total_items
|
||
account.total_attachments = result.total_attachments
|
||
|
||
if result.success:
|
||
log_to_client(
|
||
f"浏览完成! 共 {result.total_items} 条内容,{result.total_attachments} 个附件", user_id, account_id
|
||
)
|
||
safe_update_task_status(
|
||
account_id,
|
||
{
|
||
"detail_status": "浏览完成",
|
||
"progress": {"items": result.total_items, "attachments": result.total_attachments},
|
||
},
|
||
)
|
||
account.status = "已完成"
|
||
checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95)
|
||
checkpoint_mgr.complete_task(task_id, success=True)
|
||
logger.info(f"[断点] 任务 {task_id} 已完成")
|
||
|
||
if not enable_screenshot:
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="success",
|
||
total_items=result.total_items,
|
||
total_attachments=result.total_attachments,
|
||
error_message="",
|
||
duration=int(time_module.time() - task_start_time),
|
||
source=source,
|
||
)
|
||
if batch_id:
|
||
account_name = account.remark if account.remark else account.username
|
||
_batch_task_record_result(
|
||
batch_id=batch_id,
|
||
account_name=account_name,
|
||
screenshot_path=None,
|
||
total_items=result.total_items,
|
||
total_attachments=result.total_attachments,
|
||
)
|
||
batch_recorded = True
|
||
elif source and source.startswith("user_scheduled"):
|
||
try:
|
||
user_info = database.get_user_by_id(user_id)
|
||
if user_info and user_info.get("email") and database.get_user_email_notify(user_id):
|
||
account_name = account.remark if account.remark else account.username
|
||
email_service.send_task_complete_email_async(
|
||
user_id=user_id,
|
||
email=user_info["email"],
|
||
username=user_info["username"],
|
||
account_name=account_name,
|
||
browse_type=browse_type,
|
||
total_items=result.total_items,
|
||
total_attachments=result.total_attachments,
|
||
screenshot_path=None,
|
||
log_callback=lambda msg: log_to_client(msg, user_id, account_id),
|
||
)
|
||
except Exception as email_error:
|
||
logger.warning(f"发送任务完成邮件失败: {email_error}")
|
||
break
|
||
|
||
error_msg = result.error_message
|
||
if "Timeout" in error_msg or "timeout" in error_msg:
|
||
log_to_client(f"⚠ 检测到超时错误: {error_msg}", user_id, account_id)
|
||
|
||
if account.automation:
|
||
try:
|
||
account.automation.close()
|
||
log_to_client("已关闭超时的浏览器实例", user_id, account_id)
|
||
except Exception as e:
|
||
logger.debug(f"关闭超时浏览器实例失败: {e}")
|
||
account.automation = None
|
||
|
||
if attempt < max_attempts:
|
||
log_to_client(f"⚠ 代理可能速度过慢,将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
|
||
time_module.sleep(2)
|
||
continue
|
||
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
|
||
account.status = "出错"
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="failed",
|
||
total_items=result.total_items,
|
||
total_attachments=result.total_attachments,
|
||
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
|
||
duration=int(time_module.time() - task_start_time),
|
||
)
|
||
break
|
||
|
||
log_to_client(f"浏览出错: {error_msg}", user_id, account_id)
|
||
account.status = "出错"
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="failed",
|
||
total_items=result.total_items,
|
||
total_attachments=result.total_attachments,
|
||
error_message=error_msg,
|
||
duration=int(time_module.time() - task_start_time),
|
||
source=source,
|
||
)
|
||
break
|
||
|
||
except Exception as retry_error:
|
||
error_msg = str(retry_error)
|
||
if account.automation:
|
||
try:
|
||
account.automation.close()
|
||
except Exception as e:
|
||
logger.debug(f"关闭浏览器实例失败: {e}")
|
||
account.automation = None
|
||
|
||
if "Timeout" in error_msg or "timeout" in error_msg:
|
||
log_to_client(f"⚠ 执行超时: {error_msg}", user_id, account_id)
|
||
if attempt < max_attempts:
|
||
log_to_client(f"⚠ 将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
|
||
time_module.sleep(2)
|
||
continue
|
||
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
|
||
account.status = "出错"
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="failed",
|
||
total_items=account.total_items,
|
||
total_attachments=account.total_attachments,
|
||
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
|
||
duration=int(time_module.time() - task_start_time),
|
||
source=source,
|
||
)
|
||
break
|
||
|
||
log_to_client(f"任务执行异常: {error_msg}", user_id, account_id)
|
||
account.status = "出错"
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="failed",
|
||
total_items=account.total_items,
|
||
total_attachments=account.total_attachments,
|
||
error_message=error_msg,
|
||
duration=int(time_module.time() - task_start_time),
|
||
source=source,
|
||
)
|
||
break
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
log_to_client(f"任务执行出错: {error_msg}", user_id, account_id)
|
||
account.status = "出错"
|
||
database.create_task_log(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
username=account.username,
|
||
browse_type=browse_type,
|
||
status="failed",
|
||
total_items=account.total_items,
|
||
total_attachments=account.total_attachments,
|
||
error_message=error_msg,
|
||
duration=int(time_module.time() - task_start_time),
|
||
source=source,
|
||
)
|
||
|
||
finally:
|
||
account.is_running = False
|
||
screenshot_submitted = False
|
||
if account.status not in ["已完成"]:
|
||
account.status = "未开始"
|
||
|
||
if account.automation:
|
||
try:
|
||
account.automation.close()
|
||
except Exception as e:
|
||
log_to_client(f"关闭主任务浏览器时出错: {str(e)}", user_id, account_id)
|
||
finally:
|
||
account.automation = None
|
||
|
||
safe_remove_task(account_id)
|
||
safe_remove_task_status(account_id)
|
||
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
|
||
if account.status == "已完成" and not account.should_stop:
|
||
if enable_screenshot:
|
||
log_to_client("等待2秒后开始截图...", user_id, account_id)
|
||
account.status = "等待截图"
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
import time as time_mod
|
||
|
||
safe_set_task_status(
|
||
account_id,
|
||
{
|
||
"user_id": user_id,
|
||
"username": account.username,
|
||
"status": "排队中",
|
||
"detail_status": "等待截图资源",
|
||
"browse_type": browse_type,
|
||
"start_time": time_mod.time(),
|
||
"source": source,
|
||
"progress": {
|
||
"items": result.total_items if result else 0,
|
||
"attachments": result.total_attachments if result else 0,
|
||
},
|
||
},
|
||
)
|
||
time.sleep(2)
|
||
browse_result_dict = {"total_items": result.total_items, "total_attachments": result.total_attachments}
|
||
screenshot_submitted = True
|
||
threading.Thread(
|
||
target=take_screenshot_for_account,
|
||
args=(user_id, account_id, browse_type, source, task_start_time, browse_result_dict),
|
||
daemon=True,
|
||
).start()
|
||
else:
|
||
account.status = "未开始"
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
log_to_client("截图功能已禁用,跳过截图", user_id, account_id)
|
||
else:
|
||
if account.status not in ["登录失败", "出错"]:
|
||
account.status = "未开始"
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
elif account.status == "出错" and retry_count < MAX_AUTO_RETRY:
|
||
log_to_client(
|
||
f"⚠ 任务执行失败,5秒后自动重试 ({retry_count + 1}/{MAX_AUTO_RETRY})...", user_id, account_id
|
||
)
|
||
account.status = "等待重试"
|
||
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
|
||
|
||
def delayed_retry_submit():
|
||
if account.should_stop:
|
||
return
|
||
log_to_client(f"🔄 开始第 {retry_count + 1} 次自动重试...", user_id, account_id)
|
||
ok, msg = submit_account_task(
|
||
user_id=user_id,
|
||
account_id=account_id,
|
||
browse_type=browse_type,
|
||
enable_screenshot=enable_screenshot,
|
||
source=source,
|
||
retry_count=retry_count + 1,
|
||
)
|
||
if not ok:
|
||
log_to_client(f"自动重试提交失败: {msg}", user_id, account_id)
|
||
|
||
try:
|
||
threading.Timer(5, delayed_retry_submit).start()
|
||
except Exception:
|
||
delayed_retry_submit()
|
||
|
||
if batch_id and (not screenshot_submitted) and (not batch_recorded) and account.status != "等待重试":
|
||
account_name = account.remark if account.remark else account.username
|
||
_batch_task_record_result(
|
||
batch_id=batch_id,
|
||
account_name=account_name,
|
||
screenshot_path=None,
|
||
total_items=getattr(account, "total_items", 0) or 0,
|
||
total_attachments=getattr(account, "total_attachments", 0) or 0,
|
||
)
|
||
batch_recorded = True
|
||
|
||
finally:
|
||
pass
|