#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import os import time import database import email_service from app_config import get_config from app_logger import get_logger from browser_pool_worker import get_browser_worker_pool from playwright_automation import PlaywrightAutomation from services.browser_manager import get_browser_manager from services.client_log import log_to_client from services.runtime import get_socketio from services.state import safe_get_account, safe_remove_task_status, safe_update_task_status from services.task_batches import _batch_task_record_result, _get_batch_id_from_source from services.time_utils import get_beijing_now logger = get_logger("app") config = get_config() SCREENSHOTS_DIR = config.SCREENSHOTS_DIR os.makedirs(SCREENSHOTS_DIR, exist_ok=True) def _emit(event: str, data: object, *, room: str | None = None) -> None: try: socketio = get_socketio() socketio.emit(event, data, room=room) except Exception: # runtime 未初始化时(如测试/离线脚本),忽略推送 pass def take_screenshot_for_account( user_id, account_id, browse_type="应读", source="manual", task_start_time=None, browse_result=None, ): """为账号任务完成后截图(使用工作线程池,真正的浏览器复用)""" account = safe_get_account(user_id, account_id) if not account: return # 以本次调用的 browse_type 为准(避免 last_browse_type 被刷新/重载导致截图页面不一致) if browse_type: account.last_browse_type = browse_type # 标记账号正在截图(防止重复提交截图任务) account.is_running = True def screenshot_task(browser_instance, user_id, account_id, account, browse_type, source, task_start_time, browse_result): """在worker线程中执行的截图任务""" # ✅ 获得worker后,立即更新状态为"截图中" acc = safe_get_account(user_id, account_id) if acc: acc.status = "截图中" safe_update_task_status(account_id, {"status": "运行中", "detail_status": "正在截图"}) _emit("account_update", acc.to_dict(), room=f"user_{user_id}") max_retries = 3 for attempt in range(1, max_retries + 1): automation = None try: safe_update_task_status( account_id, {"detail_status": f"正在截图{f' (第{attempt}次)' if attempt > 1 else ''}"}, ) if attempt > 1: log_to_client(f"🔄 第 {attempt} 次截图尝试...", user_id, account_id) log_to_client( f"使用Worker-{browser_instance['worker_id']}的浏览器(已使用{browser_instance['use_count']}次)", user_id, account_id, ) proxy_config = account.proxy_config if hasattr(account, "proxy_config") else None automation = PlaywrightAutomation(get_browser_manager(), account_id, proxy_config=proxy_config) automation.playwright = browser_instance["playwright"] automation.browser = browser_instance["browser"] def custom_log(message: str): log_to_client(message, user_id, account_id) automation.log = custom_log log_to_client("登录中...", user_id, account_id) login_result = automation.quick_login(account.username, account.password, account.remember) if not login_result["success"]: error_message = login_result.get("message", "截图登录失败") log_to_client(f"截图登录失败: {error_message}", user_id, account_id) if attempt < max_retries: log_to_client("将重试...", user_id, account_id) time.sleep(2) continue log_to_client("❌ 截图失败: 登录失败", user_id, account_id) return {"success": False, "error": "登录失败"} log_to_client(f"导航到 '{browse_type}' 页面...", user_id, account_id) # 截图场景:优先用 bz 参数直达页面(更稳定,避免页面按钮点击失败导致截图跑偏) navigated = False try: from urllib.parse import urlsplit parsed = urlsplit(config.ZSGL_LOGIN_URL) base = f"{parsed.scheme}://{parsed.netloc}" if "注册前" in str(browse_type): bz = 0 else: bz = 2 # 应读 target_url = f"{base}/admin/center.aspx?bz={bz}" # 目标:保留外层框架(左侧菜单/顶部栏),仅在 mainframe 内部导航到目标内容页 iframe = None try: iframe = automation.get_iframe_safe(retry=True, max_retries=5) except Exception: iframe = None if iframe: iframe.goto(target_url, timeout=60000) current_url = getattr(iframe, "url", "") or "" if "center.aspx" not in current_url: raise RuntimeError(f"unexpected_iframe_url:{current_url}") try: iframe.wait_for_load_state("networkidle", timeout=30000) except Exception: pass try: iframe.wait_for_selector("table.ltable", timeout=20000) except Exception: pass else: # 兜底:若获取不到 iframe,则退回到主页面直达 automation.main_page.goto(target_url, timeout=60000) current_url = getattr(automation.main_page, "url", "") or "" if "center.aspx" not in current_url: raise RuntimeError(f"unexpected_url:{current_url}") try: automation.main_page.wait_for_load_state("networkidle", timeout=30000) except Exception: pass try: automation.main_page.wait_for_selector("table.ltable", timeout=20000) except Exception: pass navigated = True except Exception as nav_error: log_to_client(f"直达页面失败,将尝试按钮切换: {str(nav_error)[:120]}", user_id, account_id) # 兼容兜底:若直达失败,则回退到原有按钮切换方式 if not navigated: result = automation.browse_content( navigate_only=True, browse_type=browse_type, auto_next_page=False, auto_view_attachments=False, interval=0, should_stop_callback=None, ) if not result.success and result.error_message: log_to_client(f"导航警告: {result.error_message}", user_id, account_id) time.sleep(2) timestamp = get_beijing_now().strftime("%Y%m%d_%H%M%S") user_info = database.get_user_by_id(user_id) username_prefix = user_info["username"] if user_info else f"user{user_id}" login_account = account.remark if account.remark else account.username screenshot_filename = f"{username_prefix}_{login_account}_{browse_type}_{timestamp}.jpg" screenshot_path = os.path.join(SCREENSHOTS_DIR, screenshot_filename) if automation.take_screenshot(screenshot_path): if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000: log_to_client(f"✓ 截图成功: {screenshot_filename}", user_id, account_id) return {"success": True, "filename": screenshot_filename} log_to_client("截图文件异常,将重试", user_id, account_id) if os.path.exists(screenshot_path): os.remove(screenshot_path) else: log_to_client("截图保存失败", user_id, account_id) if attempt < max_retries: log_to_client("将重试...", user_id, account_id) time.sleep(2) except Exception as e: log_to_client(f"截图出错: {str(e)}", user_id, account_id) if attempt < max_retries: log_to_client("将重试...", user_id, account_id) time.sleep(2) finally: if automation: try: if automation.context: automation.context.close() automation.context = None automation.page = None except Exception as e: logger.debug(f"关闭context时出错: {e}") return {"success": False, "error": "截图失败,已重试3次"} def screenshot_callback(result, error): """截图完成回调""" try: account.is_running = False account.status = "未开始" safe_remove_task_status(account_id) _emit("account_update", account.to_dict(), room=f"user_{user_id}") if error: log_to_client(f"❌ 截图失败: {error}", user_id, account_id) elif not result or not result.get("success"): error_msg = result.get("error", "未知错误") if result else "未知错误" log_to_client(f"❌ 截图失败: {error_msg}", user_id, account_id) if task_start_time and browse_result: import time as time_module total_elapsed = int(time_module.time() - task_start_time) database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status="success", total_items=browse_result.get("total_items", 0), total_attachments=browse_result.get("total_attachments", 0), duration=total_elapsed, source=source, ) try: batch_id = _get_batch_id_from_source(source) screenshot_path = None if result and result.get("success") and result.get("filename"): screenshot_path = os.path.join(SCREENSHOTS_DIR, result["filename"]) account_name = account.remark if account.remark else account.username if batch_id: _batch_task_record_result( batch_id=batch_id, account_name=account_name, screenshot_path=screenshot_path, total_items=browse_result.get("total_items", 0), total_attachments=browse_result.get("total_attachments", 0), ) elif source and source.startswith("user_scheduled"): user_info = database.get_user_by_id(user_id) if user_info and user_info.get("email") and database.get_user_email_notify(user_id): email_service.send_task_complete_email_async( user_id=user_id, email=user_info["email"], username=user_info["username"], account_name=account_name, browse_type=browse_type, total_items=browse_result.get("total_items", 0), total_attachments=browse_result.get("total_attachments", 0), screenshot_path=screenshot_path, log_callback=lambda msg: log_to_client(msg, user_id, account_id), ) except Exception as email_error: logger.warning(f"发送任务完成邮件失败: {email_error}") except Exception as e: logger.error(f"截图回调出错: {e}") pool = get_browser_worker_pool() submitted = pool.submit_task( screenshot_task, screenshot_callback, user_id, account_id, account, browse_type, source, task_start_time, browse_result, ) if not submitted: screenshot_callback(None, "截图队列已满,请稍后重试")