#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import os import shutil import subprocess import time import database import email_service from api_browser import APIBrowser, get_cookie_jar_path, is_cookie_jar_fresh from app_config import get_config from app_logger import get_logger from browser_pool_worker import get_browser_worker_pool from services.client_log import log_to_client from services.runtime import get_socketio from services.state import safe_get_account, safe_remove_task_status, safe_update_task_status from services.task_batches import _batch_task_record_result, _get_batch_id_from_source from services.time_utils import get_beijing_now logger = get_logger("app") config = get_config() SCREENSHOTS_DIR = config.SCREENSHOTS_DIR os.makedirs(SCREENSHOTS_DIR, exist_ok=True) _WKHTMLTOIMAGE_TIMEOUT_SECONDS = int(os.environ.get("WKHTMLTOIMAGE_TIMEOUT_SECONDS", "60")) _WKHTMLTOIMAGE_JS_DELAY_MS = int(os.environ.get("WKHTMLTOIMAGE_JS_DELAY_MS", "3000")) _WKHTMLTOIMAGE_WIDTH = int(os.environ.get("WKHTMLTOIMAGE_WIDTH", "1920")) _WKHTMLTOIMAGE_HEIGHT = int(os.environ.get("WKHTMLTOIMAGE_HEIGHT", "1080")) _WKHTMLTOIMAGE_QUALITY = int(os.environ.get("WKHTMLTOIMAGE_QUALITY", "95")) _WKHTMLTOIMAGE_ZOOM = float(os.environ.get("WKHTMLTOIMAGE_ZOOM", "1.0")) _WKHTMLTOIMAGE_FULL_PAGE = str(os.environ.get("WKHTMLTOIMAGE_FULL_PAGE", "")).strip().lower() in ( "1", "true", "yes", "on", ) _env_crop_w = os.environ.get("WKHTMLTOIMAGE_CROP_WIDTH") _env_crop_h = os.environ.get("WKHTMLTOIMAGE_CROP_HEIGHT") _WKHTMLTOIMAGE_CROP_WIDTH = int(_env_crop_w) if _env_crop_w is not None else _WKHTMLTOIMAGE_WIDTH _WKHTMLTOIMAGE_CROP_HEIGHT = ( int(_env_crop_h) if _env_crop_h is not None else (_WKHTMLTOIMAGE_HEIGHT if _WKHTMLTOIMAGE_HEIGHT > 0 else 0) ) _WKHTMLTOIMAGE_CROP_X = int(os.environ.get("WKHTMLTOIMAGE_CROP_X", "0")) _WKHTMLTOIMAGE_CROP_Y = int(os.environ.get("WKHTMLTOIMAGE_CROP_Y", "0")) _WKHTMLTOIMAGE_UA = os.environ.get( "WKHTMLTOIMAGE_USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ) def _resolve_wkhtmltoimage_path() -> str | None: return os.environ.get("WKHTMLTOIMAGE_PATH") or shutil.which("wkhtmltoimage") def _read_cookie_pairs(cookies_path: str) -> list[tuple[str, str]]: if not cookies_path or not os.path.exists(cookies_path): return [] pairs = [] try: with open(cookies_path, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 7: continue name = parts[5].strip() value = parts[6].strip() if name: pairs.append((name, value)) except Exception: return [] return pairs def _select_cookie_pairs(pairs: list[tuple[str, str]]) -> list[tuple[str, str]]: preferred_names = {"ASP.NET_SessionId", ".ASPXAUTH"} preferred = [(name, value) for name, value in pairs if name in preferred_names and value] if preferred: return preferred return [(name, value) for name, value in pairs if name and value and name.isascii() and value.isascii()] def _ensure_login_cookies(account, proxy_config, log_callback) -> bool: """确保有可用的登录 cookies(通过 API 登录刷新)""" try: with APIBrowser(log_callback=log_callback, proxy_config=proxy_config) as api_browser: if not api_browser.login(account.username, account.password): return False return api_browser.save_cookies_for_screenshot(account.username) except Exception: return False def take_screenshot_wkhtmltoimage( url: str, output_path: str, cookies_path: str | None = None, proxy_server: str | None = None, run_script: str | None = None, window_status: str | None = None, log_callback=None, ) -> bool: wkhtmltoimage_path = _resolve_wkhtmltoimage_path() if not wkhtmltoimage_path: if log_callback: log_callback("wkhtmltoimage 未安装或不在 PATH 中") return False ext = os.path.splitext(output_path)[1].lower() image_format = "jpg" if ext in (".jpg", ".jpeg") else "png" cmd = [ wkhtmltoimage_path, "--format", image_format, "--width", str(_WKHTMLTOIMAGE_WIDTH), "--disable-smart-width", "--javascript-delay", str(_WKHTMLTOIMAGE_JS_DELAY_MS), "--load-error-handling", "ignore", "--enable-local-file-access", "--encoding", "utf-8", ] if _WKHTMLTOIMAGE_UA: cmd.extend(["--custom-header", "User-Agent", _WKHTMLTOIMAGE_UA, "--custom-header-propagation"]) if image_format in ("jpg", "jpeg"): cmd.extend(["--quality", str(_WKHTMLTOIMAGE_QUALITY)]) if _WKHTMLTOIMAGE_HEIGHT > 0 and not _WKHTMLTOIMAGE_FULL_PAGE: cmd.extend(["--height", str(_WKHTMLTOIMAGE_HEIGHT)]) if abs(_WKHTMLTOIMAGE_ZOOM - 1.0) > 1e-6: cmd.extend(["--zoom", str(_WKHTMLTOIMAGE_ZOOM)]) if not _WKHTMLTOIMAGE_FULL_PAGE and (_WKHTMLTOIMAGE_CROP_WIDTH > 0 or _WKHTMLTOIMAGE_CROP_HEIGHT > 0): cmd.extend(["--crop-x", str(_WKHTMLTOIMAGE_CROP_X), "--crop-y", str(_WKHTMLTOIMAGE_CROP_Y)]) if _WKHTMLTOIMAGE_CROP_WIDTH > 0: cmd.extend(["--crop-w", str(_WKHTMLTOIMAGE_CROP_WIDTH)]) if _WKHTMLTOIMAGE_CROP_HEIGHT > 0: cmd.extend(["--crop-h", str(_WKHTMLTOIMAGE_CROP_HEIGHT)]) if run_script: cmd.extend(["--run-script", run_script]) if window_status: cmd.extend(["--window-status", window_status]) if cookies_path: cookie_pairs = _select_cookie_pairs(_read_cookie_pairs(cookies_path)) if cookie_pairs: for name, value in cookie_pairs: cmd.extend(["--cookie", name, value]) else: cmd.extend(["--cookie-jar", cookies_path]) if proxy_server: cmd.extend(["--proxy", proxy_server]) cmd.extend([url, output_path]) try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=_WKHTMLTOIMAGE_TIMEOUT_SECONDS) if result.returncode != 0: if log_callback: err_msg = (result.stderr or result.stdout or "").strip() log_callback(f"wkhtmltoimage 截图失败: {err_msg[:200]}") return False return True except subprocess.TimeoutExpired: if log_callback: log_callback("wkhtmltoimage 截图超时") return False except Exception as e: if log_callback: log_callback(f"wkhtmltoimage 截图异常: {e}") return False def _emit(event: str, data: object, *, room: str | None = None) -> None: try: socketio = get_socketio() socketio.emit(event, data, room=room) except Exception: # runtime 未初始化时(如测试/离线脚本),忽略推送 pass def take_screenshot_for_account( user_id, account_id, browse_type="应读", source="manual", task_start_time=None, browse_result=None, ): """为账号任务完成后截图(使用截图线程池并发执行)""" account = safe_get_account(user_id, account_id) if not account: return # 以本次调用的 browse_type 为准(避免 last_browse_type 被刷新/重载导致截图页面不一致) if browse_type: account.last_browse_type = browse_type # 标记账号正在截图(防止重复提交截图任务) account.is_running = True def screenshot_task(browser_instance, user_id, account_id, account, browse_type, source, task_start_time, browse_result): """在worker线程中执行的截图任务""" # ✅ 获得worker后,立即更新状态为"截图中" acc = safe_get_account(user_id, account_id) if acc: acc.status = "截图中" safe_update_task_status(account_id, {"status": "运行中", "detail_status": "正在截图"}) _emit("account_update", acc.to_dict(), room=f"user_{user_id}") max_retries = 3 proxy_config = account.proxy_config if hasattr(account, "proxy_config") else None proxy_server = proxy_config.get("server") if proxy_config else None cookie_path = get_cookie_jar_path(account.username) for attempt in range(1, max_retries + 1): try: safe_update_task_status( account_id, {"detail_status": f"正在截图{f' (第{attempt}次)' if attempt > 1 else ''}"}, ) if attempt > 1: log_to_client(f"🔄 第 {attempt} 次截图尝试...", user_id, account_id) worker_id = browser_instance.get("worker_id", "?") if isinstance(browser_instance, dict) else "?" use_count = browser_instance.get("use_count", 0) if isinstance(browser_instance, dict) else 0 log_to_client( f"使用Worker-{worker_id}执行截图(已执行{use_count}次)", user_id, account_id, ) def custom_log(message: str): log_to_client(message, user_id, account_id) if not is_cookie_jar_fresh(cookie_path) or attempt > 1: log_to_client("正在刷新登录态...", user_id, account_id) if not _ensure_login_cookies(account, proxy_config, custom_log): log_to_client("截图登录失败", user_id, account_id) if attempt < max_retries: log_to_client("将重试...", user_id, account_id) time.sleep(2) continue log_to_client("❌ 截图失败: 登录失败", user_id, account_id) return {"success": False, "error": "登录失败"} log_to_client(f"导航到 '{browse_type}' 页面...", user_id, account_id) from urllib.parse import urlsplit parsed = urlsplit(config.ZSGL_LOGIN_URL) base = f"{parsed.scheme}://{parsed.netloc}" if "注册前" in str(browse_type): bz = 0 else: bz = 2 # 应读 target_url = f"{base}/admin/center.aspx?bz={bz}" index_url = config.ZSGL_INDEX_URL or f"{base}/admin/index.aspx" run_script = ( "(function(){" "var f=document.getElementById('mainframe');" "function done(){window.status='ready';}" "function fit(){" "if(!f){done();return;}" "try{" "var doc=f.contentDocument||f.contentWindow.document;" "if(doc&&doc.body&&doc.documentElement){" "doc.body.style.height='auto';" "doc.documentElement.style.height='auto';" "doc.body.style.overflow='visible';" "doc.documentElement.style.overflow='visible';" "var h=Math.max(doc.body.scrollHeight,doc.documentElement.scrollHeight);" "if(h&&h>0){" "f.style.height=h+'px';" "var extra=0;" "var topBar=document.querySelector('.main-top');" "if(topBar){extra+=topBar.offsetHeight||0;}" "var container=document.querySelector('.main-container');" "if(container){extra+=container.offsetTop||0;}" "var total=h+extra+20;" "document.body.style.height=total+'px';" "document.documentElement.style.height=total+'px';" "}" "}" "}catch(e){}" "done();" "}" "if(!f){fit();return;}" f"f.src='{target_url}';" "f.onload=function(){setTimeout(fit,500);};" "setTimeout(fit,4000);" "})();" ) timestamp = get_beijing_now().strftime("%Y%m%d_%H%M%S") user_info = database.get_user_by_id(user_id) username_prefix = user_info["username"] if user_info else f"user{user_id}" login_account = account.remark if account.remark else account.username screenshot_filename = f"{username_prefix}_{login_account}_{browse_type}_{timestamp}.jpg" screenshot_path = os.path.join(SCREENSHOTS_DIR, screenshot_filename) cookies_for_shot = cookie_path if is_cookie_jar_fresh(cookie_path) else None if take_screenshot_wkhtmltoimage( index_url, screenshot_path, cookies_path=cookies_for_shot, proxy_server=proxy_server, run_script=run_script, window_status="ready", log_callback=custom_log, ) or take_screenshot_wkhtmltoimage( target_url, screenshot_path, cookies_path=cookies_for_shot, proxy_server=proxy_server, log_callback=custom_log, ): if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000: log_to_client(f"✓ 截图成功: {screenshot_filename}", user_id, account_id) return {"success": True, "filename": screenshot_filename} log_to_client("截图文件异常,将重试", user_id, account_id) if os.path.exists(screenshot_path): os.remove(screenshot_path) else: log_to_client("截图保存失败", user_id, account_id) if attempt < max_retries: log_to_client("将重试...", user_id, account_id) time.sleep(2) except Exception as e: log_to_client(f"截图出错: {str(e)}", user_id, account_id) if attempt < max_retries: log_to_client("将重试...", user_id, account_id) time.sleep(2) return {"success": False, "error": "截图失败,已重试3次"} def screenshot_callback(result, error): """截图完成回调""" try: account.is_running = False account.status = "未开始" safe_remove_task_status(account_id) _emit("account_update", account.to_dict(), room=f"user_{user_id}") if error: log_to_client(f"❌ 截图失败: {error}", user_id, account_id) elif not result or not result.get("success"): error_msg = result.get("error", "未知错误") if result else "未知错误" log_to_client(f"❌ 截图失败: {error_msg}", user_id, account_id) if task_start_time and browse_result: import time as time_module total_elapsed = int(time_module.time() - task_start_time) database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status="success", total_items=browse_result.get("total_items", 0), total_attachments=browse_result.get("total_attachments", 0), duration=total_elapsed, source=source, ) try: batch_id = _get_batch_id_from_source(source) screenshot_path = None if result and result.get("success") and result.get("filename"): screenshot_path = os.path.join(SCREENSHOTS_DIR, result["filename"]) account_name = account.remark if account.remark else account.username if batch_id: _batch_task_record_result( batch_id=batch_id, account_name=account_name, screenshot_path=screenshot_path, total_items=browse_result.get("total_items", 0), total_attachments=browse_result.get("total_attachments", 0), ) elif source and source.startswith("user_scheduled"): user_info = database.get_user_by_id(user_id) if user_info and user_info.get("email") and database.get_user_email_notify(user_id): email_service.send_task_complete_email_async( user_id=user_id, email=user_info["email"], username=user_info["username"], account_name=account_name, browse_type=browse_type, total_items=browse_result.get("total_items", 0), total_attachments=browse_result.get("total_attachments", 0), screenshot_path=screenshot_path, log_callback=lambda msg: log_to_client(msg, user_id, account_id), ) except Exception as email_error: logger.warning(f"发送任务完成邮件失败: {email_error}") except Exception as e: logger.error(f"截图回调出错: {e}") pool = get_browser_worker_pool() submitted = pool.submit_task( screenshot_task, screenshot_callback, user_id, account_id, account, browse_type, source, task_start_time, browse_result, ) if not submitted: screenshot_callback(None, "截图队列已满,请稍后重试")