- 新增依赖检测模块:启动时自动检测wkhtmltoimage和Playwright Chromium - 新增依赖安装对话框:缺失时提示用户一键下载安装 - 修复选项记忆功能:浏览类型、自动截图、自动上传选项现在会保存 - 优化KDocs登录检测:未登录时自动切换到金山文档页面并显示二维码 - 简化日志输出:移除debug信息,保留用户友好的状态提示 - 新增账号变化信号:账号管理页面的修改会自动同步到浏览任务页面 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
324 lines
10 KiB
Python
324 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
截图模块 - 精简版
|
||
使用wkhtmltoimage进行网页截图
|
||
移除了线程池、复杂重试逻辑,保持简单
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
from datetime import datetime
|
||
from typing import Optional, Callable, List, Tuple
|
||
from dataclasses import dataclass
|
||
|
||
from .api_browser import APIBrowser, get_cookie_jar_path, is_cookie_jar_fresh
|
||
|
||
|
||
@dataclass
|
||
class ScreenshotResult:
|
||
"""截图结果"""
|
||
success: bool
|
||
filename: str = ""
|
||
filepath: str = ""
|
||
error_message: str = ""
|
||
|
||
|
||
def _resolve_wkhtmltoimage_path() -> Optional[str]:
|
||
"""查找wkhtmltoimage路径"""
|
||
from config import get_config
|
||
config = get_config()
|
||
|
||
# 优先使用配置的路径
|
||
custom_path = config.screenshot.wkhtmltoimage_path
|
||
if custom_path and os.path.exists(custom_path):
|
||
return custom_path
|
||
|
||
# 先尝试PATH
|
||
found = shutil.which("wkhtmltoimage")
|
||
if found:
|
||
return found
|
||
|
||
# Windows默认安装路径
|
||
win_paths = [
|
||
r"C:\Program Files\wkhtmltopdf\bin\wkhtmltoimage.exe",
|
||
r"C:\Program Files (x86)\wkhtmltopdf\bin\wkhtmltoimage.exe",
|
||
os.path.expandvars(r"%ProgramFiles%\wkhtmltopdf\bin\wkhtmltoimage.exe"),
|
||
os.path.expandvars(r"%ProgramFiles(x86)%\wkhtmltopdf\bin\wkhtmltoimage.exe"),
|
||
]
|
||
for p in win_paths:
|
||
if os.path.exists(p):
|
||
return p
|
||
|
||
return None
|
||
|
||
|
||
def _read_cookie_pairs(cookies_path: str) -> List[Tuple[str, str]]:
|
||
"""读取cookie文件"""
|
||
if not cookies_path or not os.path.exists(cookies_path):
|
||
return []
|
||
|
||
pairs = []
|
||
try:
|
||
with open(cookies_path, "r", encoding="utf-8", errors="ignore") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
parts = line.split("\t")
|
||
if len(parts) < 7:
|
||
continue
|
||
name = parts[5].strip()
|
||
value = parts[6].strip()
|
||
if name:
|
||
pairs.append((name, value))
|
||
except Exception:
|
||
return []
|
||
return pairs
|
||
|
||
|
||
def _select_cookie_pairs(pairs: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
|
||
"""选择关键cookie"""
|
||
preferred_names = {"ASP.NET_SessionId", ".ASPXAUTH"}
|
||
preferred = [(name, value) for name, value in pairs if name in preferred_names and value]
|
||
if preferred:
|
||
return preferred
|
||
return [(name, value) for name, value in pairs if name and value and name.isascii() and value.isascii()]
|
||
|
||
|
||
def take_screenshot_wkhtmltoimage(
|
||
url: str,
|
||
output_path: str,
|
||
cookies_path: Optional[str] = None,
|
||
proxy_server: Optional[str] = None,
|
||
run_script: Optional[str] = None,
|
||
window_status: Optional[str] = None,
|
||
log_callback: Optional[Callable] = None,
|
||
) -> bool:
|
||
"""
|
||
使用wkhtmltoimage截图
|
||
|
||
Args:
|
||
url: 要截图的URL
|
||
output_path: 输出文件路径
|
||
cookies_path: cookie文件路径
|
||
proxy_server: 代理服务器
|
||
run_script: 运行的JavaScript脚本
|
||
window_status: 等待的window.status值
|
||
log_callback: 日志回调
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
from config import get_config
|
||
config = get_config()
|
||
screenshot_config = config.screenshot
|
||
|
||
wkhtmltoimage_path = _resolve_wkhtmltoimage_path()
|
||
if not wkhtmltoimage_path:
|
||
if log_callback:
|
||
log_callback("wkhtmltoimage 未安装或不在 PATH 中")
|
||
return False
|
||
|
||
ext = os.path.splitext(output_path)[1].lower()
|
||
image_format = "jpg" if ext in (".jpg", ".jpeg") else "png"
|
||
|
||
cmd = [
|
||
wkhtmltoimage_path,
|
||
"--format", image_format,
|
||
"--width", str(screenshot_config.width),
|
||
"--disable-smart-width",
|
||
"--javascript-delay", str(screenshot_config.js_delay_ms),
|
||
"--load-error-handling", "ignore",
|
||
"--enable-local-file-access",
|
||
"--encoding", "utf-8",
|
||
]
|
||
|
||
# User-Agent
|
||
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||
cmd.extend(["--custom-header", "User-Agent", ua, "--custom-header-propagation"])
|
||
|
||
# 图片质量
|
||
if image_format in ("jpg", "jpeg"):
|
||
cmd.extend(["--quality", str(screenshot_config.quality)])
|
||
|
||
# 高度
|
||
if screenshot_config.height > 0:
|
||
cmd.extend(["--height", str(screenshot_config.height)])
|
||
|
||
# 自定义脚本
|
||
if run_script:
|
||
cmd.extend(["--run-script", run_script])
|
||
if window_status:
|
||
cmd.extend(["--window-status", window_status])
|
||
|
||
# Cookies
|
||
if cookies_path:
|
||
cookie_pairs = _select_cookie_pairs(_read_cookie_pairs(cookies_path))
|
||
if cookie_pairs:
|
||
for name, value in cookie_pairs:
|
||
cmd.extend(["--cookie", name, value])
|
||
else:
|
||
cmd.extend(["--cookie-jar", cookies_path])
|
||
|
||
# 代理
|
||
if proxy_server:
|
||
cmd.extend(["--proxy", proxy_server])
|
||
|
||
cmd.extend([url, output_path])
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=screenshot_config.timeout_seconds
|
||
)
|
||
if result.returncode != 0:
|
||
if log_callback:
|
||
err_msg = (result.stderr or result.stdout or "").strip()
|
||
log_callback(f"wkhtmltoimage 截图失败: {err_msg[:200]}")
|
||
return False
|
||
return True
|
||
except subprocess.TimeoutExpired:
|
||
if log_callback:
|
||
log_callback("wkhtmltoimage 截图超时")
|
||
return False
|
||
except Exception as e:
|
||
if log_callback:
|
||
log_callback(f"wkhtmltoimage 截图异常: {e}")
|
||
return False
|
||
|
||
|
||
def take_screenshot(
|
||
username: str,
|
||
password: str,
|
||
browse_type: str = "应读",
|
||
remark: str = "",
|
||
log_callback: Optional[Callable] = None,
|
||
proxy_config: Optional[dict] = None,
|
||
) -> ScreenshotResult:
|
||
"""
|
||
为账号执行完整的截图流程
|
||
|
||
Args:
|
||
username: 用户名
|
||
password: 密码
|
||
browse_type: 浏览类型
|
||
remark: 账号备注(用于文件名)
|
||
log_callback: 日志回调
|
||
proxy_config: 代理配置
|
||
|
||
Returns:
|
||
截图结果
|
||
"""
|
||
from config import get_config, SCREENSHOTS_DIR
|
||
config = get_config()
|
||
|
||
result = ScreenshotResult(success=False)
|
||
|
||
def log(msg: str):
|
||
if log_callback:
|
||
log_callback(msg)
|
||
|
||
# 确保截图目录存在
|
||
SCREENSHOTS_DIR.mkdir(exist_ok=True)
|
||
|
||
# 获取或刷新cookies
|
||
cookie_path = get_cookie_jar_path(username)
|
||
proxy_server = proxy_config.get("server") if proxy_config else None
|
||
|
||
if not is_cookie_jar_fresh(cookie_path):
|
||
log("正在登录获取Cookie...")
|
||
with APIBrowser(log_callback=log, proxy_config=proxy_config) as browser:
|
||
if not browser.login(username, password):
|
||
result.error_message = "登录失败"
|
||
return result
|
||
if not browser.save_cookies_for_screenshot(username):
|
||
result.error_message = "保存Cookie失败"
|
||
return result
|
||
|
||
log(f"导航到 '{browse_type}' 页面...")
|
||
|
||
# 构建截图URL
|
||
from urllib.parse import urlsplit
|
||
parsed = urlsplit(config.zsgl.login_url)
|
||
base = f"{parsed.scheme}://{parsed.netloc}"
|
||
|
||
bz = 0 # 应读
|
||
target_url = f"{base}/admin/center.aspx?bz={bz}"
|
||
index_url = f"{base}/admin/index.aspx"
|
||
|
||
# 生成文件名
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
account_name = remark if remark else username
|
||
screenshot_filename = f"{account_name}_{browse_type}_{timestamp}.jpg"
|
||
screenshot_path = str(SCREENSHOTS_DIR / screenshot_filename)
|
||
|
||
# 构建JavaScript注入脚本(用于正确显示页面)
|
||
run_script = (
|
||
"(function(){"
|
||
"function done(){window.status='ready';}"
|
||
"function ensureNav(){try{if(typeof loadMenuTree==='function'){loadMenuTree(true);}}catch(e){}}"
|
||
"function expandMenu(){"
|
||
"try{var body=document.body;if(body&&body.classList.contains('lay-mini')){body.classList.remove('lay-mini');}}catch(e){}"
|
||
"try{if(typeof mainPageResize==='function'){mainPageResize();}}catch(e){}"
|
||
"}"
|
||
"function navReady(){"
|
||
"try{var nav=document.getElementById('sidebar-nav');return nav && nav.querySelectorAll('a').length>0;}catch(e){return false;}"
|
||
"}"
|
||
"function frameReady(){"
|
||
"try{var f=document.getElementById('mainframe');return f && f.contentDocument && f.contentDocument.readyState==='complete';}catch(e){return false;}"
|
||
"}"
|
||
"function check(){"
|
||
"if(navReady() && frameReady()){done();return;}"
|
||
"setTimeout(check,300);"
|
||
"}"
|
||
"var f=document.getElementById('mainframe');"
|
||
"ensureNav();"
|
||
"expandMenu();"
|
||
"if(!f){done();return;}"
|
||
f"f.src='{target_url}';"
|
||
"f.onload=function(){ensureNav();expandMenu();setTimeout(check,300);};"
|
||
"setTimeout(check,5000);"
|
||
"})();"
|
||
)
|
||
|
||
# 尝试截图(先尝试完整页面,失败则直接截目标页)
|
||
log("正在截图...")
|
||
|
||
cookies_for_shot = cookie_path if is_cookie_jar_fresh(cookie_path) else None
|
||
|
||
success = take_screenshot_wkhtmltoimage(
|
||
index_url,
|
||
screenshot_path,
|
||
cookies_path=cookies_for_shot,
|
||
proxy_server=proxy_server,
|
||
run_script=run_script,
|
||
window_status="ready",
|
||
log_callback=log,
|
||
)
|
||
|
||
if not success:
|
||
# 备选:直接截目标页
|
||
log("尝试直接截图目标页...")
|
||
success = take_screenshot_wkhtmltoimage(
|
||
target_url,
|
||
screenshot_path,
|
||
cookies_path=cookies_for_shot,
|
||
proxy_server=proxy_server,
|
||
log_callback=log,
|
||
)
|
||
|
||
if success and os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000:
|
||
result.success = True
|
||
result.filename = screenshot_filename
|
||
result.filepath = screenshot_path
|
||
else:
|
||
result.error_message = "截图失败"
|
||
if os.path.exists(screenshot_path):
|
||
os.remove(screenshot_path)
|
||
|
||
return result
|