Files
zsglpt/services/tasks.py
yuyx a8b9f225bd 更新系统页面和更新功能
- 更新 admin-frontend 系统页面和更新 API
- 更新 routes 和 services 中的更新逻辑
- 重新构建前端静态资源
2025-12-15 15:58:12 +08:00

883 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import heapq
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
import database
import email_service
from api_browser import APIBrowser
from app_config import get_config
from app_logger import LoggerAdapter, get_logger
from services.checkpoints import get_checkpoint_mgr
from services.client_log import log_to_client
from services.proxy import get_proxy_from_api
from services.runtime import get_socketio
from services.screenshots import take_screenshot_for_account
from services.state import (
safe_get_account,
safe_get_task,
safe_remove_task,
safe_remove_task_status,
safe_set_task,
safe_set_task_status,
safe_update_task_status,
)
from services.task_batches import _batch_task_record_result, _get_batch_id_from_source
from task_checkpoint import TaskStage
logger = get_logger("app")
config = get_config()
# VIP优先级队列仅用于可视化/调试)
vip_task_queue = [] # VIP用户任务队列
normal_task_queue = [] # 普通用户任务队列
task_queue_lock = threading.Lock()
# 并发默认值(启动后会由系统配置覆盖并调用 update_limits
max_concurrent_per_account = config.MAX_CONCURRENT_PER_ACCOUNT
max_concurrent_global = config.MAX_CONCURRENT_GLOBAL
def _emit(event: str, data: object, *, room: str | None = None) -> None:
try:
socketio = get_socketio()
socketio.emit(event, data, room=room)
except Exception:
pass
@dataclass
class _TaskRequest:
user_id: int
account_id: str
browse_type: str
enable_screenshot: bool
source: str
retry_count: int
submitted_at: float
is_vip: bool
seq: int
canceled: bool = False
done_callback: object = None
class TaskScheduler:
"""全局任务调度器:队列排队,不为每个任务单独创建线程。"""
def __init__(self, max_global: int, max_per_user: int, max_queue_size: int = 1000):
self.max_global = max(1, int(max_global))
self.max_per_user = max(1, int(max_per_user))
self.max_queue_size = max(1, int(max_queue_size))
self._cond = threading.Condition()
self._pending = [] # heap: (priority, submitted_at, seq, task)
self._pending_by_account = {} # {account_id: task}
self._seq = 0
self._known_account_ids = set()
self._running_global = 0
self._running_by_user = {} # {user_id: running_count}
self._executor_max_workers = self.max_global
self._executor = ThreadPoolExecutor(max_workers=self._executor_max_workers, thread_name_prefix="TaskWorker")
self._old_executors = []
self._futures_lock = threading.Lock()
self._active_futures = set()
self._running = True
self._dispatcher_thread = threading.Thread(target=self._dispatch_loop, daemon=True, name="TaskDispatcher")
self._dispatcher_thread.start()
def _track_future(self, future) -> None:
with self._futures_lock:
self._active_futures.add(future)
try:
future.add_done_callback(self._untrack_future)
except Exception:
pass
def _untrack_future(self, future) -> None:
with self._futures_lock:
self._active_futures.discard(future)
def shutdown(self, timeout: float = 5.0):
"""停止调度器(用于进程退出清理)"""
with self._cond:
self._running = False
self._cond.notify_all()
try:
self._dispatcher_thread.join(timeout=timeout)
except Exception:
pass
# 等待已提交的任务收尾(最多等待 timeout 秒),避免遗留 active_task 干扰后续调度/测试
try:
deadline = time.time() + max(0.0, float(timeout or 0))
while True:
with self._futures_lock:
pending = [f for f in self._active_futures if not f.done()]
if not pending:
break
remaining = deadline - time.time()
if remaining <= 0:
break
wait(pending, timeout=remaining)
except Exception:
pass
try:
self._executor.shutdown(wait=False)
except Exception:
pass
for ex in self._old_executors:
try:
ex.shutdown(wait=False)
except Exception:
pass
# 最后兜底:清理本调度器提交过的 active_task避免测试/重启时被“任务已在运行中”误拦截
try:
with self._cond:
known_ids = set(self._known_account_ids) | set(self._pending_by_account.keys())
self._pending.clear()
self._pending_by_account.clear()
self._cond.notify_all()
for account_id in known_ids:
safe_remove_task(account_id)
except Exception:
pass
def update_limits(self, max_global: int = None, max_per_user: int = None, max_queue_size: int = None):
"""动态更新并发/队列上限(不影响已在运行的任务)"""
with self._cond:
if max_per_user is not None:
self.max_per_user = max(1, int(max_per_user))
if max_queue_size is not None:
self.max_queue_size = max(1, int(max_queue_size))
if max_global is not None:
new_max_global = max(1, int(max_global))
self.max_global = new_max_global
if new_max_global > self._executor_max_workers:
self._old_executors.append(self._executor)
self._executor_max_workers = new_max_global
self._executor = ThreadPoolExecutor(
max_workers=self._executor_max_workers, thread_name_prefix="TaskWorker"
)
try:
self._old_executors[-1].shutdown(wait=False)
except Exception:
pass
self._cond.notify_all()
def submit_task(
self,
user_id: int,
account_id: str,
browse_type: str,
enable_screenshot: bool = True,
source: str = "manual",
retry_count: int = 0,
is_vip: bool = None,
done_callback=None,
):
"""提交任务进入队列(返回: (ok, message)"""
if not user_id or not account_id:
return False, "参数错误"
submitted_at = time.time()
if is_vip is None:
try:
is_vip = bool(database.is_user_vip(user_id))
except Exception:
is_vip = False
else:
is_vip = bool(is_vip)
with self._cond:
if not self._running:
return False, "调度器未运行"
if len(self._pending_by_account) >= self.max_queue_size:
return False, "任务队列已满,请稍后再试"
if account_id in self._pending_by_account:
return False, "任务已在队列中"
if safe_get_task(account_id) is not None:
return False, "任务已在运行中"
self._seq += 1
task = _TaskRequest(
user_id=user_id,
account_id=account_id,
browse_type=browse_type,
enable_screenshot=bool(enable_screenshot),
source=source,
retry_count=int(retry_count or 0),
submitted_at=submitted_at,
is_vip=is_vip,
seq=self._seq,
done_callback=done_callback,
)
self._pending_by_account[account_id] = task
self._known_account_ids.add(account_id)
priority = 0 if is_vip else 1
heapq.heappush(self._pending, (priority, task.submitted_at, task.seq, task))
self._cond.notify_all()
# 用于可视化/调试:记录队列
with task_queue_lock:
if is_vip:
vip_task_queue.append(account_id)
else:
normal_task_queue.append(account_id)
return True, "已加入队列"
def cancel_pending_task(self, user_id: int, account_id: str) -> bool:
"""取消尚未开始的排队任务(已运行的任务由 should_stop 控制)"""
canceled_task = None
with self._cond:
task = self._pending_by_account.pop(account_id, None)
if not task:
return False
task.canceled = True
canceled_task = task
self._cond.notify_all()
# 从可视化队列移除
with task_queue_lock:
if account_id in vip_task_queue:
vip_task_queue.remove(account_id)
if account_id in normal_task_queue:
normal_task_queue.remove(account_id)
# 批次任务:取消也要推进完成计数,避免批次缓存常驻
try:
batch_id = _get_batch_id_from_source(canceled_task.source)
if batch_id:
acc = safe_get_account(user_id, account_id)
if acc:
account_name = acc.remark if acc.remark else acc.username
else:
account_name = account_id
_batch_task_record_result(
batch_id=batch_id,
account_name=account_name,
screenshot_path=None,
total_items=0,
total_attachments=0,
)
except Exception:
pass
return True
def _dispatch_loop(self):
while True:
task = None
with self._cond:
if not self._running:
return
if not self._pending or self._running_global >= self.max_global:
self._cond.wait(timeout=0.5)
continue
task = self._pop_next_runnable_locked()
if task is None:
self._cond.wait(timeout=0.5)
continue
self._running_global += 1
self._running_by_user[task.user_id] = self._running_by_user.get(task.user_id, 0) + 1
# 从队列移除(可视化)
with task_queue_lock:
if task.account_id in vip_task_queue:
vip_task_queue.remove(task.account_id)
if task.account_id in normal_task_queue:
normal_task_queue.remove(task.account_id)
try:
future = self._executor.submit(self._run_task_wrapper, task)
self._track_future(future)
safe_set_task(task.account_id, future)
except Exception:
with self._cond:
self._running_global = max(0, self._running_global - 1)
self._running_by_user[task.user_id] = max(0, self._running_by_user.get(task.user_id, 1) - 1)
if self._running_by_user.get(task.user_id) == 0:
self._running_by_user.pop(task.user_id, None)
self._cond.notify_all()
def _pop_next_runnable_locked(self):
"""在锁内从优先队列取出“可运行”的任务避免VIP任务占位阻塞普通任务。"""
if not self._pending:
return None
skipped = []
selected = None
while self._pending:
_, _, _, task = heapq.heappop(self._pending)
if task.canceled:
continue
if self._pending_by_account.get(task.account_id) is not task:
continue
running_for_user = self._running_by_user.get(task.user_id, 0)
if running_for_user >= self.max_per_user:
skipped.append(task)
continue
selected = task
break
for t in skipped:
priority = 0 if t.is_vip else 1
heapq.heappush(self._pending, (priority, t.submitted_at, t.seq, t))
if selected is None:
return None
self._pending_by_account.pop(selected.account_id, None)
return selected
def _run_task_wrapper(self, task: _TaskRequest):
try:
run_task(
user_id=task.user_id,
account_id=task.account_id,
browse_type=task.browse_type,
enable_screenshot=task.enable_screenshot,
source=task.source,
retry_count=task.retry_count,
)
finally:
try:
if callable(task.done_callback):
task.done_callback()
except Exception:
pass
safe_remove_task(task.account_id)
with self._cond:
self._running_global = max(0, self._running_global - 1)
self._running_by_user[task.user_id] = max(0, self._running_by_user.get(task.user_id, 1) - 1)
if self._running_by_user.get(task.user_id) == 0:
self._running_by_user.pop(task.user_id, None)
self._cond.notify_all()
_task_scheduler = None
_task_scheduler_lock = threading.Lock()
def get_task_scheduler() -> TaskScheduler:
"""获取全局任务调度器(单例)"""
global _task_scheduler
with _task_scheduler_lock:
if _task_scheduler is None:
try:
max_queue_size = int(os.environ.get("TASK_QUEUE_MAXSIZE", "1000"))
except Exception:
max_queue_size = 1000
_task_scheduler = TaskScheduler(
max_global=max_concurrent_global,
max_per_user=max_concurrent_per_account,
max_queue_size=max_queue_size,
)
return _task_scheduler
def submit_account_task(
user_id: int,
account_id: str,
browse_type: str,
enable_screenshot: bool = True,
source: str = "manual",
retry_count: int = 0,
done_callback=None,
):
"""统一入口:提交账号任务进入队列"""
account = safe_get_account(user_id, account_id)
if not account:
return False, "账号不存在"
if getattr(account, "is_running", False):
return False, "任务已在运行中"
try:
is_vip_user = bool(database.is_user_vip(user_id))
except Exception:
is_vip_user = False
account.is_running = True
account.should_stop = False
account.status = "排队中" + (" (VIP)" if is_vip_user else "")
safe_set_task_status(
account_id,
{
"user_id": user_id,
"username": account.username,
"status": "排队中",
"detail_status": "等待资源" + (" [VIP优先]" if is_vip_user else ""),
"browse_type": browse_type,
"start_time": time.time(),
"source": source,
"progress": {"items": 0, "attachments": 0},
"is_vip": is_vip_user,
},
)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
scheduler = get_task_scheduler()
ok, message = scheduler.submit_task(
user_id=user_id,
account_id=account_id,
browse_type=browse_type,
enable_screenshot=enable_screenshot,
source=source,
retry_count=retry_count,
is_vip=is_vip_user,
done_callback=done_callback,
)
if not ok:
account.is_running = False
account.status = "未开始"
safe_remove_task_status(account_id)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
return False, message
log_to_client(message + (" [VIP优先]" if is_vip_user else ""), user_id, account_id)
return True, message
def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="manual", retry_count=0):
"""运行自动化任务
Args:
retry_count: 当前重试次数用于自动重试机制最多重试2次
"""
MAX_AUTO_RETRY = 2 # 最大自动重试次数
LoggerAdapter("app", {"user_id": user_id, "account_id": account_id, "source": source}).debug(
f"run_task enable_screenshot={enable_screenshot} ({type(enable_screenshot).__name__}), retry={retry_count}"
)
account = safe_get_account(user_id, account_id)
if not account:
return
batch_id = _get_batch_id_from_source(source)
batch_recorded = False
checkpoint_mgr = get_checkpoint_mgr()
import time as time_module
try:
if account.should_stop:
log_to_client("任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
safe_remove_task_status(account_id)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
if batch_id:
account_name = account.remark if account.remark else account.username
_batch_task_record_result(
batch_id=batch_id,
account_name=account_name,
screenshot_path=None,
total_items=0,
total_attachments=0,
)
return
try:
if account.should_stop:
log_to_client("任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
return
task_id = checkpoint_mgr.create_checkpoint(
user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type
)
logger.info(f"[断点] 任务 {task_id} 已创建")
task_start_time = time_module.time()
account.status = "运行中"
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
account.last_browse_type = browse_type
safe_update_task_status(account_id, {"status": "运行中", "detail_status": "初始化", "start_time": task_start_time})
max_attempts = 3
for attempt in range(1, max_attempts + 1):
try:
if attempt > 1:
log_to_client(f"🔄 第 {attempt} 次尝试(共{max_attempts}次)...", user_id, account_id)
proxy_config = None
config = database.get_system_config()
if config.get("proxy_enabled") == 1:
proxy_api_url = config.get("proxy_api_url", "").strip()
if proxy_api_url:
log_to_client("正在获取代理IP...", user_id, account_id)
proxy_server = get_proxy_from_api(proxy_api_url, max_retries=3)
if proxy_server:
proxy_config = {"server": proxy_server}
log_to_client(f"✓ 将使用代理: {proxy_server}", user_id, account_id)
account.proxy_config = proxy_config # 保存代理配置供截图使用
else:
log_to_client("✗ 代理获取失败,将不使用代理继续", user_id, account_id)
else:
log_to_client("⚠ 代理已启用但未配置API地址", user_id, account_id)
checkpoint_mgr.update_stage(task_id, TaskStage.STARTING, progress_percent=10)
def custom_log(message: str):
log_to_client(message, user_id, account_id)
log_to_client("开始登录...", user_id, account_id)
safe_update_task_status(account_id, {"detail_status": "正在登录"})
checkpoint_mgr.update_stage(task_id, TaskStage.LOGGING_IN, progress_percent=25)
with APIBrowser(log_callback=custom_log, proxy_config=proxy_config) as api_browser:
if api_browser.login(account.username, account.password):
log_to_client("✓ 登录成功!", user_id, account_id)
api_browser.save_cookies_for_playwright(account.username)
database.reset_account_login_status(account_id)
if not account.remark:
try:
real_name = api_browser.get_real_name()
if real_name:
account.remark = real_name
database.update_account_remark(account_id, real_name)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
logger.info(f"[自动备注] 账号 {account.username} 自动设置备注为: {real_name}")
except Exception as e:
logger.warning(f"[自动备注] 获取姓名失败: {e}")
safe_update_task_status(account_id, {"detail_status": "正在浏览"})
log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id)
def should_stop():
return account.should_stop
checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50)
result = api_browser.browse_content(browse_type=browse_type, should_stop_callback=should_stop)
else:
error_message = "登录失败"
log_to_client(f"{error_message}", user_id, account_id)
is_suspended = database.increment_account_login_fail(account_id, error_message)
if is_suspended:
log_to_client("⚠ 该账号连续3次密码错误已自动暂停", user_id, account_id)
log_to_client("请在前台修改密码后才能继续使用", user_id, account_id)
retry_action = checkpoint_mgr.record_error(task_id, error_message)
if retry_action == "paused":
logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)")
account.status = "登录失败"
account.is_running = False
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="failed",
total_items=0,
total_attachments=0,
error_message=error_message,
duration=int(time_module.time() - task_start_time),
source=source,
)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
return
account.total_items = result.total_items
account.total_attachments = result.total_attachments
if result.success:
log_to_client(
f"浏览完成! 共 {result.total_items} 条内容,{result.total_attachments} 个附件", user_id, account_id
)
safe_update_task_status(
account_id,
{
"detail_status": "浏览完成",
"progress": {"items": result.total_items, "attachments": result.total_attachments},
},
)
account.status = "已完成"
checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95)
checkpoint_mgr.complete_task(task_id, success=True)
logger.info(f"[断点] 任务 {task_id} 已完成")
if not enable_screenshot:
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="success",
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message="",
duration=int(time_module.time() - task_start_time),
source=source,
)
if batch_id:
account_name = account.remark if account.remark else account.username
_batch_task_record_result(
batch_id=batch_id,
account_name=account_name,
screenshot_path=None,
total_items=result.total_items,
total_attachments=result.total_attachments,
)
batch_recorded = True
elif source and source.startswith("user_scheduled"):
try:
user_info = database.get_user_by_id(user_id)
if user_info and user_info.get("email") and database.get_user_email_notify(user_id):
account_name = account.remark if account.remark else account.username
email_service.send_task_complete_email_async(
user_id=user_id,
email=user_info["email"],
username=user_info["username"],
account_name=account_name,
browse_type=browse_type,
total_items=result.total_items,
total_attachments=result.total_attachments,
screenshot_path=None,
log_callback=lambda msg: log_to_client(msg, user_id, account_id),
)
except Exception as email_error:
logger.warning(f"发送任务完成邮件失败: {email_error}")
break
error_msg = result.error_message
if "Timeout" in error_msg or "timeout" in error_msg:
log_to_client(f"⚠ 检测到超时错误: {error_msg}", user_id, account_id)
if account.automation:
try:
account.automation.close()
log_to_client("已关闭超时的浏览器实例", user_id, account_id)
except Exception as e:
logger.debug(f"关闭超时浏览器实例失败: {e}")
account.automation = None
if attempt < max_attempts:
log_to_client(f"⚠ 代理可能速度过慢将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
time_module.sleep(2)
continue
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="failed",
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
duration=int(time_module.time() - task_start_time),
)
break
log_to_client(f"浏览出错: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="failed",
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time),
source=source,
)
break
except Exception as retry_error:
error_msg = str(retry_error)
if account.automation:
try:
account.automation.close()
except Exception as e:
logger.debug(f"关闭浏览器实例失败: {e}")
account.automation = None
if "Timeout" in error_msg or "timeout" in error_msg:
log_to_client(f"⚠ 执行超时: {error_msg}", user_id, account_id)
if attempt < max_attempts:
log_to_client(f"⚠ 将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
time_module.sleep(2)
continue
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="failed",
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
duration=int(time_module.time() - task_start_time),
source=source,
)
break
log_to_client(f"任务执行异常: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="failed",
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time),
source=source,
)
break
except Exception as e:
error_msg = str(e)
log_to_client(f"任务执行出错: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status="failed",
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time),
source=source,
)
finally:
account.is_running = False
screenshot_submitted = False
if account.status not in ["已完成"]:
account.status = "未开始"
if account.automation:
try:
account.automation.close()
except Exception as e:
log_to_client(f"关闭主任务浏览器时出错: {str(e)}", user_id, account_id)
finally:
account.automation = None
safe_remove_task(account_id)
safe_remove_task_status(account_id)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
if account.status == "已完成" and not account.should_stop:
if enable_screenshot:
log_to_client("等待2秒后开始截图...", user_id, account_id)
account.status = "等待截图"
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
import time as time_mod
safe_set_task_status(
account_id,
{
"user_id": user_id,
"username": account.username,
"status": "排队中",
"detail_status": "等待截图资源",
"browse_type": browse_type,
"start_time": time_mod.time(),
"source": source,
"progress": {
"items": result.total_items if result else 0,
"attachments": result.total_attachments if result else 0,
},
},
)
time.sleep(2)
browse_result_dict = {"total_items": result.total_items, "total_attachments": result.total_attachments}
screenshot_submitted = True
threading.Thread(
target=take_screenshot_for_account,
args=(user_id, account_id, browse_type, source, task_start_time, browse_result_dict),
daemon=True,
).start()
else:
account.status = "未开始"
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
log_to_client("截图功能已禁用,跳过截图", user_id, account_id)
else:
if account.status not in ["登录失败", "出错"]:
account.status = "未开始"
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
elif account.status == "出错" and retry_count < MAX_AUTO_RETRY:
log_to_client(
f"⚠ 任务执行失败5秒后自动重试 ({retry_count + 1}/{MAX_AUTO_RETRY})...", user_id, account_id
)
account.status = "等待重试"
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
def delayed_retry_submit():
if account.should_stop:
return
log_to_client(f"🔄 开始第 {retry_count + 1} 次自动重试...", user_id, account_id)
ok, msg = submit_account_task(
user_id=user_id,
account_id=account_id,
browse_type=browse_type,
enable_screenshot=enable_screenshot,
source=source,
retry_count=retry_count + 1,
)
if not ok:
log_to_client(f"自动重试提交失败: {msg}", user_id, account_id)
try:
threading.Timer(5, delayed_retry_submit).start()
except Exception:
delayed_retry_submit()
if batch_id and (not screenshot_submitted) and (not batch_recorded) and account.status != "等待重试":
account_name = account.remark if account.remark else account.username
_batch_task_record_result(
batch_id=batch_id,
account_name=account_name,
screenshot_path=None,
total_items=getattr(account, "total_items", 0) or 0,
total_attachments=getattr(account, "total_attachments", 0) or 0,
)
batch_recorded = True
finally:
pass