feat: 定时任务截图打包发送邮件

1. 添加批次任务跟踪机制,收集同一定时任务的所有账号截图
2. 等所有账号执行完成后,将截图打包成ZIP发送一封邮件
3. 邮件包含任务执行详情表格和统计信息

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-11 23:21:24 +08:00
parent fc9c264f37
commit 578489cb0b
2 changed files with 258 additions and 24 deletions

111
app.py
View File

@@ -128,6 +128,10 @@ log_cache = {} # {user_id: [logs]} 每个用户独立的日志缓存
log_cache_lock = threading.Lock() # 保护 log_cache 字典
log_cache_total_count = 0 # 全局日志总数,防止无限增长
# 批次任务截图收集 - 用于定时任务打包发送邮件
batch_task_screenshots = {} # {batch_id: {'user_id': x, 'browse_type': y, 'screenshots': [{'account_name': a, 'path': p, 'items': n, 'attachments': m}], 'total_accounts': n, 'completed': n}}
batch_task_lock = threading.Lock()
# 日志缓存限制
MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条
MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏
@@ -2546,27 +2550,45 @@ def take_screenshot_for_account(user_id, account_id, browse_type="应读", sourc
source=source
)
# 发送任务完成邮件通知
# 处理邮件通知 - 批次任务收集截图,非批次任务直接发送
try:
user_info = database.get_user_by_id(user_id)
# 检查用户是否开启了邮件通知
if user_info and user_info.get('email') and database.get_user_email_notify(user_id):
screenshot_path = None
if result and result.get('success') and result.get('filename'):
screenshot_path = os.path.join(SCREENSHOTS_DIR, result['filename'])
# 检查是否是批次任务 (source格式: user_scheduled:batch_xxx)
batch_id = None
if source and source.startswith('user_scheduled:batch_'):
batch_id = source.split(':', 1)[1]
account_name = account.remark if account.remark else account.username
email_service.send_task_complete_email_async(
user_id=user_id,
email=user_info['email'],
username=user_info['username'],
account_name=account_name,
browse_type=browse_type,
total_items=browse_result.get('total_items', 0),
total_attachments=browse_result.get('total_attachments', 0),
screenshot_path=screenshot_path,
log_callback=lambda msg: log_to_client(msg, user_id, account_id)
)
screenshot_path = None
if result and result.get('success') and result.get('filename'):
screenshot_path = os.path.join(SCREENSHOTS_DIR, result['filename'])
account_name = account.remark if account.remark else account.username
if batch_id:
# 批次任务:收集截图信息,等待统一发送
with batch_task_lock:
if batch_id in batch_task_screenshots:
batch_task_screenshots[batch_id]['screenshots'].append({
'account_name': account_name,
'path': screenshot_path,
'items': browse_result.get('total_items', 0),
'attachments': browse_result.get('total_attachments', 0)
})
batch_task_screenshots[batch_id]['completed'] += 1
else:
# 非批次任务:直接发送邮件(保持原有逻辑)
user_info = database.get_user_by_id(user_id)
if user_info and user_info.get('email') and database.get_user_email_notify(user_id):
email_service.send_task_complete_email_async(
user_id=user_id,
email=user_info['email'],
username=user_info['username'],
account_name=account_name,
browse_type=browse_type,
total_items=browse_result.get('total_items', 0),
total_attachments=browse_result.get('total_attachments', 0),
screenshot_path=screenshot_path,
log_callback=lambda msg: log_to_client(msg, user_id, account_id)
)
except Exception as email_error:
logger.warning(f"发送任务完成邮件失败: {email_error}")
@@ -3683,6 +3705,7 @@ def scheduled_task_worker():
# 创建执行日志
import time as time_mod
import uuid
execution_start_time = time_mod.time()
log_id = database.create_schedule_execution_log(
schedule_id=schedule_id,
@@ -3690,6 +3713,18 @@ def scheduled_task_worker():
schedule_name=schedule_config.get('name', '未命名任务')
)
# 创建批次ID用于收集截图
batch_id = f"batch_{uuid.uuid4().hex[:12]}"
with batch_task_lock:
batch_task_screenshots[batch_id] = {
'user_id': user_id,
'browse_type': browse_type,
'schedule_name': schedule_config.get('name', '未命名任务'),
'screenshots': [],
'total_accounts': 0,
'completed': 0
}
started_count = 0
skipped_count = 0
task_threads = [] # 收集所有启动的任务线程
@@ -3710,9 +3745,11 @@ def scheduled_task_worker():
account.should_stop = False
account.status = "排队中"
# 传递批次ID格式: user_scheduled:batch_xxx
task_source = f"user_scheduled:{batch_id}"
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'),
args=(user_id, account_id, browse_type, enable_screenshot, task_source),
daemon=True
)
thread.start()
@@ -3722,13 +3759,18 @@ def scheduled_task_worker():
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 更新批次的总账号数
with batch_task_lock:
if batch_id in batch_task_screenshots:
batch_task_screenshots[batch_id]['total_accounts'] = started_count
# 更新最后执行时间
database.update_schedule_last_run(schedule_id)
print(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号")
print(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号批次ID: {batch_id}")
# 启动监控线程,等待所有任务完成后更新日志
def wait_and_update_log(threads, start_time, lid, total, success, sid):
# 启动监控线程,等待所有任务完成后更新日志并发送打包邮件
def wait_and_update_log(threads, start_time, lid, total, success, sid, bid):
for t in threads:
t.join() # 等待每个任务完成
execution_duration = int(time_mod.time() - start_time)
@@ -3742,10 +3784,31 @@ def scheduled_task_worker():
)
print(f"[用户定时任务] 任务#{sid}执行完成,耗时{execution_duration}")
# 发送打包邮件
try:
with batch_task_lock:
batch_info = batch_task_screenshots.pop(bid, None)
if batch_info and batch_info['screenshots']:
batch_user_id = batch_info['user_id']
user_info = database.get_user_by_id(batch_user_id)
if user_info and user_info.get('email') and database.get_user_email_notify(batch_user_id):
email_service.send_batch_task_complete_email_async(
user_id=batch_user_id,
email=user_info['email'],
username=user_info['username'],
schedule_name=batch_info['schedule_name'],
browse_type=batch_info['browse_type'],
screenshots=batch_info['screenshots']
)
print(f"[用户定时任务] 批次邮件已发送,包含 {len(batch_info['screenshots'])} 个账号截图")
except Exception as email_err:
print(f"[用户定时任务] 发送批次邮件失败: {email_err}")
if task_threads:
monitor_thread = threading.Thread(
target=wait_and_update_log,
args=(task_threads, execution_start_time, log_id, len(account_ids), started_count, schedule_id),
args=(task_threads, execution_start_time, log_id, len(account_ids), started_count, schedule_id, batch_id),
daemon=True
)
monitor_thread.start()