From 578489cb0b2bf3d08436dcd8069d3c9d35863e28 Mon Sep 17 00:00:00 2001 From: yuyx <237899745@qq.com> Date: Thu, 11 Dec 2025 23:21:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=88=AA=E5=9B=BE=E6=89=93=E5=8C=85=E5=8F=91=E9=80=81=E9=82=AE?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 添加批次任务跟踪机制,收集同一定时任务的所有账号截图 2. 等所有账号执行完成后,将截图打包成ZIP发送一封邮件 3. 邮件包含任务执行详情表格和统计信息 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app.py | 111 +++++++++++++++++++++++------- email_service.py | 171 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 258 insertions(+), 24 deletions(-) diff --git a/app.py b/app.py index bdf89fc..a503cb6 100755 --- a/app.py +++ b/app.py @@ -128,6 +128,10 @@ log_cache = {} # {user_id: [logs]} 每个用户独立的日志缓存 log_cache_lock = threading.Lock() # 保护 log_cache 字典 log_cache_total_count = 0 # 全局日志总数,防止无限增长 +# 批次任务截图收集 - 用于定时任务打包发送邮件 +batch_task_screenshots = {} # {batch_id: {'user_id': x, 'browse_type': y, 'screenshots': [{'account_name': a, 'path': p, 'items': n, 'attachments': m}], 'total_accounts': n, 'completed': n}} +batch_task_lock = threading.Lock() + # 日志缓存限制 MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条 MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏 @@ -2546,27 +2550,45 @@ def take_screenshot_for_account(user_id, account_id, browse_type="应读", sourc source=source ) - # 发送任务完成邮件通知 + # 处理邮件通知 - 批次任务收集截图,非批次任务直接发送 try: - user_info = database.get_user_by_id(user_id) - # 检查用户是否开启了邮件通知 - if user_info and user_info.get('email') and database.get_user_email_notify(user_id): - screenshot_path = None - if result and result.get('success') and result.get('filename'): - screenshot_path = os.path.join(SCREENSHOTS_DIR, result['filename']) + # 检查是否是批次任务 (source格式: user_scheduled:batch_xxx) + batch_id = None + if source and source.startswith('user_scheduled:batch_'): + batch_id = source.split(':', 1)[1] - account_name = account.remark if account.remark else account.username - email_service.send_task_complete_email_async( - user_id=user_id, - email=user_info['email'], - username=user_info['username'], - account_name=account_name, - browse_type=browse_type, - total_items=browse_result.get('total_items', 0), - total_attachments=browse_result.get('total_attachments', 0), - screenshot_path=screenshot_path, - log_callback=lambda msg: log_to_client(msg, user_id, account_id) - ) + screenshot_path = None + if result and result.get('success') and result.get('filename'): + screenshot_path = os.path.join(SCREENSHOTS_DIR, result['filename']) + + account_name = account.remark if account.remark else account.username + + if batch_id: + # 批次任务:收集截图信息,等待统一发送 + with batch_task_lock: + if batch_id in batch_task_screenshots: + batch_task_screenshots[batch_id]['screenshots'].append({ + 'account_name': account_name, + 'path': screenshot_path, + 'items': browse_result.get('total_items', 0), + 'attachments': browse_result.get('total_attachments', 0) + }) + batch_task_screenshots[batch_id]['completed'] += 1 + else: + # 非批次任务:直接发送邮件(保持原有逻辑) + user_info = database.get_user_by_id(user_id) + if user_info and user_info.get('email') and database.get_user_email_notify(user_id): + email_service.send_task_complete_email_async( + user_id=user_id, + email=user_info['email'], + username=user_info['username'], + account_name=account_name, + browse_type=browse_type, + total_items=browse_result.get('total_items', 0), + total_attachments=browse_result.get('total_attachments', 0), + screenshot_path=screenshot_path, + log_callback=lambda msg: log_to_client(msg, user_id, account_id) + ) except Exception as email_error: logger.warning(f"发送任务完成邮件失败: {email_error}") @@ -3683,6 +3705,7 @@ def scheduled_task_worker(): # 创建执行日志 import time as time_mod + import uuid execution_start_time = time_mod.time() log_id = database.create_schedule_execution_log( schedule_id=schedule_id, @@ -3690,6 +3713,18 @@ def scheduled_task_worker(): schedule_name=schedule_config.get('name', '未命名任务') ) + # 创建批次ID用于收集截图 + batch_id = f"batch_{uuid.uuid4().hex[:12]}" + with batch_task_lock: + batch_task_screenshots[batch_id] = { + 'user_id': user_id, + 'browse_type': browse_type, + 'schedule_name': schedule_config.get('name', '未命名任务'), + 'screenshots': [], + 'total_accounts': 0, + 'completed': 0 + } + started_count = 0 skipped_count = 0 task_threads = [] # 收集所有启动的任务线程 @@ -3710,9 +3745,11 @@ def scheduled_task_worker(): account.should_stop = False account.status = "排队中" + # 传递批次ID,格式: user_scheduled:batch_xxx + task_source = f"user_scheduled:{batch_id}" thread = threading.Thread( target=run_task, - args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'), + args=(user_id, account_id, browse_type, enable_screenshot, task_source), daemon=True ) thread.start() @@ -3722,13 +3759,18 @@ def scheduled_task_worker(): socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') + # 更新批次的总账号数 + with batch_task_lock: + if batch_id in batch_task_screenshots: + batch_task_screenshots[batch_id]['total_accounts'] = started_count + # 更新最后执行时间 database.update_schedule_last_run(schedule_id) - print(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号") + print(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号,批次ID: {batch_id}") - # 启动监控线程,等待所有任务完成后更新日志 - def wait_and_update_log(threads, start_time, lid, total, success, sid): + # 启动监控线程,等待所有任务完成后更新日志并发送打包邮件 + def wait_and_update_log(threads, start_time, lid, total, success, sid, bid): for t in threads: t.join() # 等待每个任务完成 execution_duration = int(time_mod.time() - start_time) @@ -3742,10 +3784,31 @@ def scheduled_task_worker(): ) print(f"[用户定时任务] 任务#{sid}执行完成,耗时{execution_duration}秒") + # 发送打包邮件 + try: + with batch_task_lock: + batch_info = batch_task_screenshots.pop(bid, None) + + if batch_info and batch_info['screenshots']: + batch_user_id = batch_info['user_id'] + user_info = database.get_user_by_id(batch_user_id) + if user_info and user_info.get('email') and database.get_user_email_notify(batch_user_id): + email_service.send_batch_task_complete_email_async( + user_id=batch_user_id, + email=user_info['email'], + username=user_info['username'], + schedule_name=batch_info['schedule_name'], + browse_type=batch_info['browse_type'], + screenshots=batch_info['screenshots'] + ) + print(f"[用户定时任务] 批次邮件已发送,包含 {len(batch_info['screenshots'])} 个账号截图") + except Exception as email_err: + print(f"[用户定时任务] 发送批次邮件失败: {email_err}") + if task_threads: monitor_thread = threading.Thread( target=wait_and_update_log, - args=(task_threads, execution_start_time, log_id, len(account_ids), started_count, schedule_id), + args=(task_threads, execution_start_time, log_id, len(account_ids), started_count, schedule_id, batch_id), daemon=True ) monitor_thread.start() diff --git a/email_service.py b/email_service.py index 4890021..ecdb661 100644 --- a/email_service.py +++ b/email_service.py @@ -1943,6 +1943,177 @@ def send_task_complete_email_async( thread.start() +def send_batch_task_complete_email( + user_id: int, + email: str, + username: str, + schedule_name: str, + browse_type: str, + screenshots: List[Dict[str, Any]] +) -> Dict[str, Any]: + """ + 发送批次任务完成通知邮件(多账号截图打包) + + Args: + user_id: 用户ID + email: 收件人邮箱 + username: 用户名 + schedule_name: 定时任务名称 + browse_type: 浏览类型 + screenshots: 截图列表 [{'account_name': x, 'path': y, 'items': n, 'attachments': m}, ...] + + Returns: + {'success': bool, 'error': str} + """ + # 检查邮件功能是否启用 + settings = get_email_settings() + if not settings.get('enabled', False): + return {'success': False, 'error': '邮件功能未启用'} + + if not settings.get('task_notify_enabled', False): + return {'success': False, 'error': '任务通知功能未启用'} + + if not email: + return {'success': False, 'error': '用户未设置邮箱'} + + if not screenshots: + return {'success': False, 'error': '没有截图需要发送'} + + # 获取完成时间 + complete_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 统计信息 + total_items_sum = sum(s.get('items', 0) for s in screenshots) + total_attachments_sum = sum(s.get('attachments', 0) for s in screenshots) + account_count = len(screenshots) + + # 构建账号详情HTML + accounts_html = "" + for s in screenshots: + accounts_html += f""" + + {s.get('account_name', '未知')} + {s.get('items', 0)} + {s.get('attachments', 0)} + + """ + + # 构建HTML邮件内容 + html_content = f""" + + +
+

定时任务完成通知

+

您好,{username}!

+

您的定时任务 {schedule_name} 已完成执行。

+ +
+

浏览类型:{browse_type}

+

执行账号:{account_count} 个

+

总浏览条目:{total_items_sum} 条

+

总附件数量:{total_attachments_sum} 个

+

完成时间:{complete_time}

+
+ +

账号执行详情

+ + + + + + + {accounts_html} +
账号名称浏览条目附件数量
+ +

+ 截图已打包为ZIP附件,请查收。 +

+
+ + + """ + + # 收集所有截图文件 + screenshot_files = [] + for s in screenshots: + if s.get('path') and os.path.exists(s['path']): + try: + with open(s['path'], 'rb') as f: + screenshot_files.append({ + 'filename': f"{s.get('account_name', 'screenshot')}_{os.path.basename(s['path'])}", + 'data': f.read() + }) + except Exception as e: + print(f"[邮件] 读取截图文件失败: {e}") + + # 如果有截图,打包成ZIP + zip_data = None + zip_filename = None + if screenshot_files: + try: + zip_buffer = BytesIO() + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: + for sf in screenshot_files: + zf.writestr(sf['filename'], sf['data']) + zip_data = zip_buffer.getvalue() + zip_filename = f"screenshots_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" + except Exception as e: + print(f"[邮件] 打包截图失败: {e}") + + # 发送邮件 + attachments = [] + if zip_data and zip_filename: + attachments.append({ + 'filename': zip_filename, + 'data': zip_data, + 'mime_type': 'application/zip' + }) + + result = send_email( + to=email, + subject=f'【自动化学习】定时任务完成 - {schedule_name}', + html_content=html_content, + attachments=attachments + ) + + if result['success']: + # 记录发送日志 + log_email_send( + email_type='batch_task_complete', + to_email=email, + subject=f'定时任务完成 - {schedule_name}', + success=True + ) + return {'success': True} + else: + log_email_send( + email_type='batch_task_complete', + to_email=email, + subject=f'定时任务完成 - {schedule_name}', + success=False, + error=result.get('error', '') + ) + return {'success': False, 'error': result.get('error', '发送失败')} + + +def send_batch_task_complete_email_async( + user_id: int, + email: str, + username: str, + schedule_name: str, + browse_type: str, + screenshots: List[Dict[str, Any]] +): + """异步发送批次任务完成通知邮件""" + import threading + thread = threading.Thread( + target=send_batch_task_complete_email, + args=(user_id, email, username, schedule_name, browse_type, screenshots), + daemon=True + ) + thread.start() + + # ============ 初始化 ============ def init_email_service():