- 修复添加账号按钮无反应问题
- 添加账号备注字段(可选)
- 添加账号设置按钮(修改密码/备注)
- 修复用户反馈���能
- 添加定时任务执行日志
- 修复容器重启后账号加载问题
- 修复所有JavaScript语法错误
- 优化账号加载机制(4层保障)
🤖 Generated with Claude Code
1379 lines
61 KiB
Python
Executable File
1379 lines
61 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Playwright版本 - 知识管理系统自动化核心
|
||
使用浏览器上下文(Context)实现高性能并发
|
||
"""
|
||
|
||
import os
|
||
from pathlib import Path
|
||
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
|
||
import time
|
||
import json
|
||
import threading
|
||
from typing import Optional, Callable
|
||
from dataclasses import dataclass
|
||
from app_config import get_config
|
||
|
||
# 设置浏览器安装路径(优先使用环境变量,否则使用默认路径)
|
||
if 'PLAYWRIGHT_BROWSERS_PATH' not in os.environ:
|
||
# 本地开发环境,使用Windows默认路径
|
||
BROWSERS_PATH = str(Path.home() / "AppData" / "Local" / "ms-playwright")
|
||
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = BROWSERS_PATH
|
||
else:
|
||
# Docker环境,使用已设置的环境变量
|
||
BROWSERS_PATH = os.environ["PLAYWRIGHT_BROWSERS_PATH"]
|
||
|
||
# 获取配置
|
||
config = get_config()
|
||
|
||
|
||
@dataclass
|
||
class BrowseResult:
|
||
"""浏览结果"""
|
||
success: bool
|
||
total_items: int = 0
|
||
total_attachments: int = 0
|
||
error_message: str = ""
|
||
|
||
|
||
class PlaywrightBrowserManager:
|
||
"""Playwright浏览器管理器 - 每个账号独立的浏览器实例"""
|
||
|
||
def __init__(self, headless: bool = True, log_callback: Optional[Callable] = None):
|
||
"""
|
||
初始化浏览器管理器
|
||
|
||
Args:
|
||
headless: 是否使用无头模式
|
||
log_callback: 日志回调函数,签名: log_callback(message, account_id=None)
|
||
"""
|
||
self.headless = headless
|
||
self.log_callback = log_callback
|
||
self._lock = threading.Lock()
|
||
|
||
def log(self, message: str, account_id: Optional[str] = None):
|
||
"""记录日志"""
|
||
if self.log_callback:
|
||
self.log_callback(message, account_id)
|
||
|
||
def create_browser(self, proxy_config=None):
|
||
"""创建新的独立浏览器实例(每个账号独立)"""
|
||
try:
|
||
# self.log("初始化Playwright实例...") # 精简日志
|
||
playwright = sync_playwright().start()
|
||
|
||
# self.log("启动独立浏览器进程...") # 精简日志
|
||
start_time = time.time()
|
||
|
||
# 准备浏览器启动参数
|
||
launch_options = {
|
||
'headless': self.headless,
|
||
'args': [
|
||
'--no-sandbox',
|
||
'--disable-dev-shm-usage',
|
||
'--disable-gpu',
|
||
'--disable-extensions',
|
||
'--disable-notifications',
|
||
'--disable-infobars',
|
||
'--disable-default-apps',
|
||
'--disable-background-timer-throttling',
|
||
'--disable-backgrounding-occluded-windows',
|
||
'--disable-renderer-backgrounding',
|
||
]
|
||
}
|
||
|
||
# 如果有代理配置,添加代理
|
||
if proxy_config and proxy_config.get('server'):
|
||
launch_options['proxy'] = {
|
||
'server': proxy_config['server']
|
||
}
|
||
self.log(f"使用代理: {proxy_config['server']}")
|
||
|
||
browser = playwright.chromium.launch(**launch_options)
|
||
|
||
elapsed = time.time() - start_time
|
||
# self.log(f"独立浏览器启动成功") # 精简日志
|
||
|
||
return playwright, browser
|
||
|
||
except Exception as e:
|
||
self.log(f"启动浏览器失败: {str(e)}")
|
||
raise
|
||
|
||
def create_browser_and_context(self, proxy_config=None, storage_state=None):
|
||
"""创建独立的浏览器和上下文(每个账号完全隔离)"""
|
||
playwright, browser = self.create_browser(proxy_config)
|
||
|
||
start_time = time.time()
|
||
# self.log("创建浏览器上下文...") # 精<><E7B2BE><EFBFBD>日志
|
||
|
||
context_options = {
|
||
'viewport': {'width': 1920, 'height': 1080},
|
||
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
||
'device_scale_factor': 2, # 2倍设备像素比,提高文字清晰度
|
||
}
|
||
if storage_state:
|
||
context_options['storage_state'] = storage_state
|
||
|
||
context = browser.new_context(**context_options)
|
||
|
||
# 设置默认超时
|
||
context.set_default_timeout(config.DEFAULT_TIMEOUT)
|
||
context.set_default_navigation_timeout(config.PAGE_LOAD_TIMEOUT)
|
||
|
||
elapsed = time.time() - start_time
|
||
# self.log(f"上下文创建完成") # 精简日志
|
||
|
||
return playwright, browser, context
|
||
|
||
|
||
class PlaywrightAutomation:
|
||
"""Playwright自动化操作类"""
|
||
|
||
def __init__(self, browser_manager: PlaywrightBrowserManager, account_id: str, proxy_config: Optional[dict] = None):
|
||
"""
|
||
初始化自动化操作
|
||
|
||
Args:
|
||
browser_manager: 浏览器管理器
|
||
account_id: 账号ID(用于日志)
|
||
"""
|
||
self.browser_manager = browser_manager
|
||
self.account_id = account_id
|
||
self.proxy_config = proxy_config
|
||
self.playwright: Optional[Playwright] = None
|
||
self.browser: Optional[Browser] = None
|
||
self.context: Optional[BrowserContext] = None
|
||
self.page: Optional[Page] = None
|
||
self.main_page: Optional[Page] = None
|
||
|
||
def log(self, message: str):
|
||
"""记录日志"""
|
||
self.browser_manager.log(message, self.account_id)
|
||
|
||
|
||
# Cookies存储目录
|
||
COOKIES_DIR = '/app/data/cookies'
|
||
|
||
def get_cookies_path(self, username: str) -> str:
|
||
"""获取cookies文件路径"""
|
||
import os
|
||
os.makedirs(self.COOKIES_DIR, exist_ok=True)
|
||
# 用用户名的hash作为文件名,避免特殊字符问题
|
||
import hashlib
|
||
filename = hashlib.md5(username.encode()).hexdigest() + '.json'
|
||
return os.path.join(self.COOKIES_DIR, filename)
|
||
|
||
def save_cookies(self, username: str):
|
||
"""保存当前会话的cookies"""
|
||
try:
|
||
if self.context:
|
||
storage = self.context.storage_state()
|
||
cookies_path = self.get_cookies_path(username)
|
||
with open(cookies_path, 'w', encoding='utf-8') as f:
|
||
json.dump(storage, f)
|
||
self.log(f"Cookies已保存")
|
||
return True
|
||
except Exception as e:
|
||
self.log(f"保存cookies失败: {e}")
|
||
return False
|
||
|
||
def load_cookies(self, username: str) -> bool:
|
||
"""加载已保存的cookies"""
|
||
import os
|
||
cookies_path = self.get_cookies_path(username)
|
||
if not os.path.exists(cookies_path):
|
||
return False
|
||
|
||
try:
|
||
# 检查cookies文件是否过期(24小时)
|
||
import time as time_module
|
||
file_age = time_module.time() - os.path.getmtime(cookies_path)
|
||
if file_age > 24 * 3600: # 24小时
|
||
self.log(f"Cookies已过期,需要重新登录")
|
||
os.remove(cookies_path)
|
||
return False
|
||
|
||
with open(cookies_path, 'r', encoding='utf-8') as f:
|
||
storage = json.load(f)
|
||
|
||
# 创建带cookies的浏览器上下文
|
||
self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(
|
||
self.proxy_config,
|
||
storage_state=storage
|
||
)
|
||
self.page = self.context.new_page()
|
||
self.main_page = self.page
|
||
return True
|
||
except Exception as e:
|
||
self.log(f"加载cookies失败: {e}")
|
||
return False
|
||
|
||
def check_login_state(self) -> bool:
|
||
"""检查当前是否处于登录状态"""
|
||
try:
|
||
# 访问首页检查是否跳转到登录页
|
||
self.page.goto('https://postoa.aidunsoft.com/admin/index.aspx', timeout=15000)
|
||
self.page.wait_for_load_state('networkidle', timeout=10000)
|
||
current_url = self.page.url
|
||
# 如果还在index页面,说明登录态有效
|
||
if 'index.aspx' in current_url:
|
||
return True
|
||
return False
|
||
except:
|
||
return False
|
||
|
||
def quick_login(self, username: str, password: str, remember: bool = True):
|
||
"""快速登录 - 使用池中浏览器时直接登录,否则尝试cookies"""
|
||
# 如果已有浏览器实例(从池中获取),直接使用该浏览器登录
|
||
# 不尝试加载cookies,因为load_cookies会创建新浏览器覆盖池中的
|
||
if self.browser and self.browser.is_connected():
|
||
self.log("使用池中浏览器,直接登录")
|
||
result = self.login(username, password, remember)
|
||
if result.get('success'):
|
||
self.save_cookies(username)
|
||
result['used_cookies'] = False
|
||
return result
|
||
|
||
# 无现有浏览器时,尝试使用cookies
|
||
if self.load_cookies(username):
|
||
self.log(f"尝试使用已保存的登录态...")
|
||
if self.check_login_state():
|
||
self.log(f"✓ 登录态有效,跳过登录")
|
||
return {"success": True, "message": "使用已保存的登录态", "used_cookies": True}
|
||
else:
|
||
self.log(f"登录态已失效,重新登录")
|
||
# 关闭当前context,重新登录
|
||
try:
|
||
if self.context:
|
||
self.context.close()
|
||
if self.browser:
|
||
self.browser.close()
|
||
if self.playwright:
|
||
self.playwright.stop()
|
||
except:
|
||
pass
|
||
|
||
# 正常登录
|
||
result = self.login(username, password, remember)
|
||
|
||
# 登录成功后保存cookies
|
||
if result.get('success'):
|
||
self.save_cookies(username)
|
||
result['used_cookies'] = False
|
||
|
||
return result
|
||
|
||
|
||
def login(self, username: str, password: str, remember: bool = True) -> bool:
|
||
"""
|
||
登录系统
|
||
|
||
Args:
|
||
username: 用户名
|
||
password: 密码
|
||
remember: 是否记住密码
|
||
|
||
Returns:
|
||
是否登录成功
|
||
"""
|
||
try:
|
||
start_time = time.time()
|
||
|
||
# 如果已有浏览器实例(从浏览器池获取),只创建context
|
||
if self.browser and self.browser.is_connected():
|
||
self.log("使用池中浏览器...")
|
||
context_options = {
|
||
'viewport': {'width': 1920, 'height': 1080},
|
||
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
|
||
'device_scale_factor': 2,
|
||
}
|
||
self.context = self.browser.new_context(**context_options)
|
||
else:
|
||
# 创建新的浏览器和上下文
|
||
self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(self.proxy_config)
|
||
elapsed = time.time() - start_time
|
||
# self.log("浏览器就绪") # 精简日志
|
||
|
||
# self.log("创建页面...") # 精简日志
|
||
self.page = self.context.new_page()
|
||
self.main_page = self.page
|
||
|
||
# self.log("访问登录页面...") # 精简日志
|
||
# 使用重试机制处理超时
|
||
max_retries = 2
|
||
for attempt in range(max_retries):
|
||
try:
|
||
self.page.goto(config.ZSGL_LOGIN_URL, timeout=60000)
|
||
break
|
||
except Exception as e:
|
||
if attempt < max_retries - 1:
|
||
self.log(f"页面加载超时,重试中... ({attempt + 1}/{max_retries})")
|
||
time.sleep(2)
|
||
else:
|
||
raise
|
||
|
||
# self.log("填写登录信息...") # 精简日志
|
||
self.page.fill('#txtUserName', username)
|
||
self.page.fill('#txtPassword', password)
|
||
|
||
if remember:
|
||
self.page.check('#chkRemember')
|
||
|
||
# self.log("点击登录按钮...") # 精简日志
|
||
self.page.click('#btnSubmit')
|
||
|
||
# 等待跳转
|
||
# self.log("等待登录处理...") # 精简日志
|
||
self.page.wait_for_load_state('networkidle', timeout=30000) # 增加到30秒
|
||
|
||
# 检查登录结果
|
||
current_url = self.page.url
|
||
self.log(f"当前URL: {current_url}")
|
||
|
||
if config.ZSGL_INDEX_URL_PATTERN in current_url:
|
||
self.log("登录成功!")
|
||
return {"success": True, "error_type": None, "message": "登录成功"}
|
||
else:
|
||
# 检查是否显示了错误提示
|
||
error_message = "登录失败"
|
||
error_type = "unknown" # 默认为未知错误,不是密码错误
|
||
|
||
try:
|
||
# 尝试获取页面上的错误提示
|
||
error_element = self.page.locator('#lblMsg, .error-message, [class*="error"]').first
|
||
if error_element.is_visible(timeout=2000):
|
||
error_text = error_element.inner_text().strip()
|
||
if error_text:
|
||
error_message = error_text
|
||
self.log(f"登录错误提示: {error_text}")
|
||
# 只有明确提示密码错误时才标记为密码错误
|
||
if "密码" in error_text or "password" in error_text.lower() or "用户名" in error_text or "账号" in error_text:
|
||
error_type = "password_error"
|
||
else:
|
||
error_type = "login_error"
|
||
except:
|
||
pass
|
||
|
||
# 如果没有明确的错误提示,可能是网络问题,不认为是密码错误
|
||
if error_type == "unknown":
|
||
error_message = "登录失败,可能是网络问题或页面加载超时"
|
||
error_type = "network_error"
|
||
|
||
self.log(error_message)
|
||
return {"success": False, "error_type": error_type, "message": error_message}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
self.log(f"登录过程中出错: {error_msg}")
|
||
return {"success": False, "error_type": "exception", "message": error_msg}
|
||
|
||
def is_context_error(self, error_msg: str) -> bool:
|
||
"""检查是否是上下文/导航相关错误"""
|
||
error_keywords = [
|
||
"Frame was detached",
|
||
"Execution context was destroyed",
|
||
"navigation",
|
||
"detached",
|
||
"Target closed",
|
||
"Session closed",
|
||
"Connection closed"
|
||
]
|
||
error_lower = error_msg.lower()
|
||
return any(keyword.lower() in error_lower for keyword in error_keywords)
|
||
|
||
def safe_execute(self, action, description="操作", max_retries=3, recover_browse_type=None):
|
||
"""安全执行操作,自动处理上下文销毁等错误
|
||
|
||
Args:
|
||
action: 要执行的函数
|
||
description: 操作描述(用于日志)
|
||
max_retries: 最大重试次数
|
||
recover_browse_type: 恢复时需要重新点击的浏览类型
|
||
|
||
Returns:
|
||
(success, result) 元组
|
||
"""
|
||
last_error = None
|
||
for attempt in range(max_retries):
|
||
try:
|
||
result = action()
|
||
return True, result
|
||
except Exception as e:
|
||
last_error = str(e)
|
||
if self.is_context_error(last_error):
|
||
if attempt < max_retries - 1:
|
||
self.log(f"⚠ {description}时上下文失效,尝试恢复... ({attempt+1}/{max_retries})")
|
||
time.sleep(1 + attempt * 0.5)
|
||
# 尝试恢复iframe
|
||
if self.recover_iframe(recover_browse_type):
|
||
continue
|
||
else:
|
||
self.log(f" iframe恢复失败,继续重试...")
|
||
else:
|
||
self.log(f"✗ {description}失败,已重试{max_retries}次: {last_error}")
|
||
else:
|
||
# 非上下文错误,直接返回失败
|
||
self.log(f"✗ {description}失败: {last_error}")
|
||
return False, None
|
||
|
||
return False, None
|
||
|
||
def get_iframe_safe(self, retry=True, max_retries=5):
|
||
"""安全地获取iframe,带重试机制
|
||
|
||
Args:
|
||
retry: 是否启用重试
|
||
max_retries: 最大重试次数
|
||
"""
|
||
for attempt in range(max_retries if retry else 1):
|
||
try:
|
||
# 先检查main_page是否有效
|
||
if not self.main_page:
|
||
self.log("⚠ main_page无效")
|
||
return None
|
||
|
||
iframe = self.main_page.frame('mainframe')
|
||
if iframe:
|
||
return iframe
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if self.is_context_error(error_msg):
|
||
self.log(f"⚠ 获取iframe时上下文失效,等待恢复... ({attempt+1}/{max_retries})")
|
||
else:
|
||
self.log(f"⚠ 获取iframe出错: {error_msg}")
|
||
|
||
if attempt < max_retries - 1:
|
||
time.sleep(0.5 + attempt * 0.3) # 递增等待时间
|
||
|
||
return None
|
||
|
||
def recover_iframe(self, browse_type: str = None) -> bool:
|
||
"""尝试恢复iframe连接
|
||
|
||
当遇到 Frame was detached / Execution context was destroyed 错误时调用此函数
|
||
采用多级恢复策略,逐步升级恢复力度
|
||
"""
|
||
self.log("🔄 尝试恢复iframe连接...")
|
||
|
||
# 方法1: 直接尝试获取iframe(最快,适用于短暂的上下文切换)
|
||
self.page = self.get_iframe_safe(retry=True, max_retries=3)
|
||
if self.page:
|
||
self.log("✓ iframe恢复成功(直接获取)")
|
||
return True
|
||
|
||
# 方法2: 等待页面稳定后重试(适用于页面正在加载的情况)
|
||
self.log(" 等待页面稳定...")
|
||
time.sleep(1.5)
|
||
try:
|
||
self.main_page.wait_for_load_state('domcontentloaded', timeout=5000)
|
||
except:
|
||
pass
|
||
try:
|
||
self.main_page.wait_for_load_state('networkidle', timeout=10000)
|
||
except:
|
||
pass
|
||
|
||
self.page = self.get_iframe_safe(retry=True, max_retries=3)
|
||
if self.page:
|
||
self.log("✓ iframe恢复成功(等待后获取)")
|
||
return True
|
||
|
||
# 方法3: 使用JavaScript强制等待并获取iframe
|
||
self.log(" 尝试JavaScript方式获取iframe...")
|
||
try:
|
||
# 等待iframe存在
|
||
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=5000)
|
||
# 使用evaluate确保iframe可用
|
||
has_iframe = self.main_page.evaluate("""() => {
|
||
const iframe = document.querySelector('iframe[name="mainframe"]');
|
||
return iframe && iframe.contentWindow && iframe.contentDocument;
|
||
}""")
|
||
if has_iframe:
|
||
time.sleep(0.5)
|
||
self.page = self.get_iframe_safe(retry=True, max_retries=3)
|
||
if self.page:
|
||
self.log("✓ iframe恢复成功(JavaScript验证后获取)")
|
||
return True
|
||
except Exception as e:
|
||
self.log(f" JavaScript方式失败: {str(e)[:50]}")
|
||
|
||
# 方法4: 刷新页面并重新切换(最后手段)
|
||
self.log(" 刷新页面重试...")
|
||
try:
|
||
self.main_page.reload(wait_until='domcontentloaded')
|
||
time.sleep(2)
|
||
|
||
# 等待iframe出现
|
||
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=15000)
|
||
time.sleep(1)
|
||
|
||
self.page = self.get_iframe_safe(retry=True, max_retries=5)
|
||
if self.page:
|
||
# 如果有浏览类型,重新点击
|
||
if browse_type:
|
||
self.log(f" 重新点击'{browse_type}'...")
|
||
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
|
||
try:
|
||
self.page.locator(selector).click(timeout=5000)
|
||
time.sleep(1.5)
|
||
# 等待表格加载
|
||
try:
|
||
self.page.locator("//table[@class='ltable']").wait_for(timeout=10000)
|
||
except:
|
||
pass
|
||
self.log(f"✓ iframe恢复成功(刷新后重新点击'{browse_type}')")
|
||
except:
|
||
# 尝试点击label
|
||
try:
|
||
label_selector = f"//label[contains(text(), '{browse_type}')]"
|
||
self.page.locator(label_selector).click(timeout=5000)
|
||
time.sleep(1.5)
|
||
self.log(f"✓ iframe恢复成功(刷新后点击label)")
|
||
except Exception as label_e:
|
||
self.log(f" 点击label也失败: {str(label_e)[:30]}")
|
||
return False
|
||
else:
|
||
self.log("✓ iframe恢复成功(刷新后获取)")
|
||
return True
|
||
except Exception as e:
|
||
self.log(f"✗ 刷新恢复失败: {str(e)[:50]}")
|
||
|
||
self.log("✗ iframe恢复失败,所有方法都已尝试")
|
||
return False
|
||
|
||
def switch_to_iframe(self) -> bool:
|
||
"""切换到mainframe iframe"""
|
||
try:
|
||
# self.log("查找并切换到iframe...") # 精简日志
|
||
|
||
# 使用Playwright的等待机制
|
||
max_retries = 3
|
||
for i in range(max_retries):
|
||
try:
|
||
# 等待iframe元素出现
|
||
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=2000)
|
||
|
||
# 获取iframe
|
||
iframe = self.get_iframe_safe()
|
||
if iframe:
|
||
self.page = iframe
|
||
self.log(f"✓ 成功切换到iframe (尝试 {i+1}/{max_retries})")
|
||
return True
|
||
except Exception as e:
|
||
if i < max_retries - 1:
|
||
self.log(f"未找到iframe,重试中... ({i+1}/{max_retries})")
|
||
time.sleep(1)
|
||
else:
|
||
self.log(f"所有重试都失败,未找到iframe")
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.log(f"切换到iframe时出错: {str(e)}")
|
||
return False
|
||
|
||
def safe_click(self, locator, timeout=5000, description="元素"):
|
||
"""安全地点击元素,捕获导航异常"""
|
||
try:
|
||
locator.click(timeout=timeout)
|
||
return True
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if "Execution context was destroyed" in error_msg or "navigation" in error_msg.lower():
|
||
self.log(f"⚠ 点击{description}时检测到页面导航,等待页面稳定...")
|
||
time.sleep(1)
|
||
return True # 虽然有异常,但导航成功,返回True
|
||
else:
|
||
self.log(f"点击{description}失败: {error_msg}")
|
||
return False
|
||
|
||
def switch_browse_type(self, browse_type: str, max_retries: int = 2) -> bool:
|
||
"""
|
||
切换浏览类型(带重试机制)
|
||
|
||
Args:
|
||
browse_type: 浏览类型(注册前未读/应读/已读)
|
||
max_retries: 最大重试次数(默认2次)
|
||
|
||
Returns:
|
||
是否切换成功
|
||
"""
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
if attempt > 0:
|
||
self.log(f"⚠ 第 {attempt + 1} 次尝试切换浏览类型...")
|
||
else:
|
||
self.log(f"切换到'{browse_type}'类型...")
|
||
|
||
# 切换到iframe
|
||
if not self.switch_to_iframe():
|
||
if attempt < max_retries:
|
||
self.log(f"iframe切换失败,等待1秒后重试...")
|
||
time.sleep(1)
|
||
continue
|
||
return False
|
||
|
||
# 方法1: 尝试查找<a>标签(如果JavaScript创建了的话)
|
||
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
|
||
|
||
try:
|
||
# 等待并点击
|
||
self.page.locator(selector).click(timeout=5000)
|
||
self.log(f"点击'{browse_type}'按钮成功")
|
||
|
||
# 等待页面刷新并加载内容
|
||
time.sleep(1.5)
|
||
|
||
# 等待表格加载(最多等待30秒)
|
||
try:
|
||
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
|
||
self.log("内容表格已加载")
|
||
except Exception as e:
|
||
self.log("等待表格加载超时,继续...")
|
||
|
||
return True
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if "Execution context was destroyed" in error_msg:
|
||
self.log(f"⚠ 检测到执行上下文被销毁")
|
||
if attempt < max_retries:
|
||
self.log(f"等待2秒后重试...")
|
||
time.sleep(2)
|
||
continue
|
||
self.log(f"未找到<a>标签,尝试点击<label>...")
|
||
|
||
# 方法2: 点击label(模拟点击radio button)
|
||
label_selector = f"//label[contains(text(), '{browse_type}')]"
|
||
|
||
try:
|
||
self.page.locator(label_selector).click(timeout=5000)
|
||
self.log(f"点击'{browse_type}'标签成功")
|
||
|
||
# 等待页面刷新并加载内容
|
||
time.sleep(1.5)
|
||
|
||
# 等待表格加载(最多等待30秒)
|
||
try:
|
||
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
|
||
self.log("内容表格已加载")
|
||
except Exception as e:
|
||
self.log("等待表格加载超时,继续...")
|
||
|
||
return True
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if "Execution context was destroyed" in error_msg:
|
||
self.log(f"⚠ 检测到执行上下文被销毁")
|
||
if attempt < max_retries:
|
||
self.log(f"等待2秒后重试...")
|
||
time.sleep(2)
|
||
continue
|
||
self.log(f"未找到<label>标签")
|
||
|
||
# 如果两种方法都失败,但还有重试机会
|
||
if attempt < max_retries:
|
||
self.log(f"切换失败,等待2秒后重试...")
|
||
time.sleep(2)
|
||
continue
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
self.log(f"切换浏览类型时出错: {error_msg}")
|
||
|
||
# 检查是否是 "Execution context was destroyed" 错误
|
||
if "Execution context was destroyed" in error_msg or "navigation" in error_msg.lower():
|
||
if attempt < max_retries:
|
||
self.log(f"⚠ 检测到执行上下文被销毁或导航错误,等待2秒后重试...")
|
||
time.sleep(2)
|
||
continue
|
||
|
||
return False
|
||
|
||
# 所有重试都失败
|
||
self.log(f"❌ 切换浏览类型失败,已重试 {max_retries} 次")
|
||
return False
|
||
|
||
def browse_content(self, browse_type: str,
|
||
auto_next_page: bool = True,
|
||
auto_view_attachments: bool = True,
|
||
interval: float = 1.0,
|
||
should_stop_callback: Optional[Callable] = None,
|
||
navigate_only: bool = False) -> BrowseResult:
|
||
"""
|
||
浏览内容
|
||
|
||
Args:
|
||
browse_type: 浏览类型
|
||
auto_next_page: 是否自动翻页
|
||
auto_view_attachments: 是否自动查看附件
|
||
interval: 查看附件的间隔时间(秒)
|
||
should_stop_callback: 检查是否应该停止的回调函数
|
||
|
||
Returns:
|
||
浏览结果
|
||
"""
|
||
result = BrowseResult(success=False)
|
||
|
||
try:
|
||
# 先导航到浏览页面
|
||
self.log(f"导航到 '{browse_type}' 页面...")
|
||
try:
|
||
# 等待页面完全加载
|
||
time.sleep(2)
|
||
self.log(f"当前URL: {self.main_page.url}")
|
||
except Exception as e:
|
||
self.log(f"获取URL失败: {str(e)}")
|
||
|
||
# 切换浏览类型
|
||
if not self.switch_browse_type(browse_type):
|
||
result.error_message = "切换浏览类型失败"
|
||
return result
|
||
|
||
# 如果只是导航(用于截图),切换完成后直接返回
|
||
if navigate_only:
|
||
time.sleep(1) # 等待页面稳定
|
||
result.success = True
|
||
return result
|
||
|
||
# 原有逻辑继续...
|
||
if False: # 占位,保持原有代码结构
|
||
result.error_message = "切换浏览类型失败"
|
||
return result
|
||
|
||
current_page = 1
|
||
total_items = 0
|
||
total_attachments = 0
|
||
completed_first_round = False
|
||
empty_page_counter = 0
|
||
expected_total = None # 预期总数(从分页信息获取)
|
||
|
||
while True:
|
||
# 检查是否应该停止
|
||
if should_stop_callback and should_stop_callback():
|
||
self.log("收到停止信号,终止浏览")
|
||
break
|
||
|
||
self.log(f"处理第 {current_page} 页...")
|
||
|
||
# 确保在iframe中(关键!)
|
||
time.sleep(0.3)
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log("错误:无法获取iframe")
|
||
break
|
||
|
||
# 等待表格加载完成(最多等待10秒)
|
||
try:
|
||
self.page.locator("//table[@class='ltable']").wait_for(timeout=10000)
|
||
except:
|
||
self.log("等待表格超时,继续尝试...")
|
||
|
||
# 额外等待,确保AJAX内容加载完成
|
||
# 第一页等待更长时间,因为是首次加载(并发时尤其<E5B0A4><E585B6><EFBFBD>要)
|
||
if current_page == 1 and total_items == 0:
|
||
time.sleep(3.0)
|
||
else:
|
||
time.sleep(1.0)
|
||
|
||
# 获取内容行数量(带重试机制,避免AJAX加载慢导致误判)
|
||
# 第一页使用更多重试次数(8次×3秒=24秒),处理高并发时的慢加载
|
||
# 后续页使用3次×1.5秒=4.5秒
|
||
max_retries = 8 if (current_page == 1 and total_items == 0) else 3
|
||
retry_wait = 3.0 if (current_page == 1 and total_items == 0) else 1.5
|
||
rows_count = 0
|
||
for retry in range(max_retries):
|
||
rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
|
||
rows_count = rows_locator.count()
|
||
if rows_count > 0:
|
||
break
|
||
if retry < max_retries - 1:
|
||
self.log(f"未检测到内容,等待后重试... ({retry+1}/{max_retries})")
|
||
time.sleep(retry_wait)
|
||
|
||
if rows_count == 0:
|
||
self.log("当前页面没有内容")
|
||
# 调试:输出页面信息帮助诊断
|
||
try:
|
||
page_html = self.page.content()
|
||
if 'ltable' in page_html:
|
||
self.log(f"[调试] 表格存在,但没有数据行")
|
||
# 检查是否有"暂无记录"提示
|
||
if '暂无' in page_html or '没有' in page_html:
|
||
self.log(f"[调试] 页面显示暂无记录")
|
||
else:
|
||
self.log(f"[调试] 页面中没有找到ltable表格")
|
||
# 检查URL
|
||
self.log(f"[调试] iframe URL: {self.page.url}")
|
||
except Exception as debug_e:
|
||
self.log(f"[调试] 获取页面信息失败: {str(debug_e)[:50]}")
|
||
empty_page_counter += 1
|
||
self.log(f"连续空页面数: {empty_page_counter}")
|
||
|
||
# 结束条件判断
|
||
# 条件1: 基于计数 - 已浏览数量 >= 预期总数
|
||
if expected_total is not None and total_items >= expected_total:
|
||
self.log(f"已浏览 {total_items}/{expected_total} 条,基于计数判断完成")
|
||
break
|
||
|
||
# 条件2: 空页面兜底 - 已完成一轮且空页面(只需1次确认)
|
||
if completed_first_round and empty_page_counter >= 1:
|
||
self.log(f"检测到空页面,已浏览 {total_items} 条,内容已浏览完毕")
|
||
break
|
||
|
||
# 尝试翻页或返回第一页
|
||
if auto_next_page:
|
||
# 检查是否有下一页
|
||
try:
|
||
next_button = self.page.locator("//div[@id='PageContent']/a[contains(text(), '下一页') or contains(text(), '»')]")
|
||
if next_button.count() > 0:
|
||
self.log("点击下一页...")
|
||
next_button.click()
|
||
time.sleep(1.5)
|
||
current_page += 1
|
||
continue
|
||
else:
|
||
# 没有下一页,返回第一页
|
||
if not completed_first_round:
|
||
completed_first_round = True
|
||
self.log("完成第一轮浏览,准备返回第一页继续浏览...")
|
||
else:
|
||
self.log("完成一轮浏览,返回第一页继续...")
|
||
|
||
# 刷新页面并重新点击浏览类型
|
||
self.log("刷新页面并重新点击浏览类型...")
|
||
self.main_page.reload()
|
||
time.sleep(1.5)
|
||
|
||
# 切换到iframe
|
||
time.sleep(0.5)
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log("错误:刷新后无法获取iframe,停止浏览")
|
||
break
|
||
|
||
# 重新点击浏览类型按钮
|
||
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
|
||
try:
|
||
self.page.locator(selector).click(timeout=5000)
|
||
self.log(f"重新点击'{browse_type}'按钮成功")
|
||
time.sleep(1.5)
|
||
|
||
# 等待表格加载
|
||
try:
|
||
self.page.locator("//table[@class='ltable']").wait_for(timeout=15000)
|
||
self.log("内容表格已加载")
|
||
# 额外等待AJAX数据加载
|
||
time.sleep(1.0)
|
||
except Exception as e:
|
||
self.log("等待表格加载超时,继续...")
|
||
except Exception as e:
|
||
# 尝试点击label
|
||
label_selector = f"//label[contains(text(), '{browse_type}')]"
|
||
self.page.locator(label_selector).click(timeout=5000)
|
||
self.log(f"点击'{browse_type}'标签成功")
|
||
time.sleep(2.0)
|
||
|
||
current_page = 1
|
||
continue
|
||
except Exception as e:
|
||
self.log(f"翻页时出错: {str(e)}")
|
||
break
|
||
else:
|
||
break
|
||
|
||
# 找到内容,重置空页面计数
|
||
empty_page_counter = 0
|
||
self.log(f"找到 {rows_count} 条内容")
|
||
|
||
# 获取分页信息,解析总数
|
||
try:
|
||
page_content = self.page.locator("//div[@id='PageContent']")
|
||
if page_content.count() > 0:
|
||
page_text = page_content.inner_text(timeout=3000).strip()
|
||
if page_text:
|
||
self.log(f"[分页信息] {page_text}")
|
||
# 解析"共XXX记录"获取总数
|
||
if expected_total is None:
|
||
import re
|
||
match = re.search(r'共(\d+)记录', page_text)
|
||
if match:
|
||
expected_total = int(match.group(1))
|
||
self.log(f"[总数] 预期浏览 {expected_total} 条内容")
|
||
except:
|
||
pass
|
||
|
||
# 处理每一行 (每次从头重新获取所有行)
|
||
for i in range(rows_count):
|
||
if should_stop_callback and should_stop_callback():
|
||
break
|
||
|
||
# 每次处理新行前,确保在iframe中(关键!尤其是history.back()后)
|
||
if i > 0:
|
||
time.sleep(0.2)
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log("警告:无法获取iframe,尝试恢复...")
|
||
if not self.recover_iframe(browse_type):
|
||
self.log("错误:iframe恢复失败,停止处理当前页")
|
||
break
|
||
# 恢复成功后重新获取page
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
break
|
||
|
||
# 每次都重新获取rows_locator和row,确保元素是最新的
|
||
row = None
|
||
row_retry_count = 0
|
||
max_row_retries = 3
|
||
|
||
while row is None and row_retry_count < max_row_retries:
|
||
try:
|
||
current_rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
|
||
row = current_rows_locator.nth(i)
|
||
# 验证row是否有效
|
||
_ = row.count()
|
||
break
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
row_retry_count += 1
|
||
if self.is_context_error(error_msg):
|
||
self.log(f"⚠ 获取行时上下文失效,尝试恢复... ({row_retry_count}/{max_row_retries})")
|
||
if self.recover_iframe(browse_type):
|
||
row = None # 重置row,重新获取
|
||
continue
|
||
else:
|
||
self.log("错误:iframe恢复失败")
|
||
break
|
||
else:
|
||
self.log(f"获取行时出错: {error_msg[:50]}")
|
||
break
|
||
|
||
if row is None:
|
||
self.log("错误:无法获取行数据,停止处理当前页")
|
||
break
|
||
|
||
# 获取标题(带错误处理和重试)
|
||
title = None
|
||
title_retry_count = 0
|
||
max_title_retries = 3
|
||
|
||
while title is None and title_retry_count < max_title_retries:
|
||
try:
|
||
title_cell = row.locator("xpath=.//td[4]")
|
||
title = title_cell.inner_text(timeout=10000).strip()
|
||
break
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
title_retry_count += 1
|
||
if self.is_context_error(error_msg) or "Timeout" in error_msg:
|
||
self.log(f"⚠ 获取标题时失败({title_retry_count}/{max_title_retries}),尝试恢复...")
|
||
if self.recover_iframe(browse_type):
|
||
time.sleep(0.3)
|
||
try:
|
||
current_rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
|
||
row = current_rows_locator.nth(i)
|
||
_ = row.count()
|
||
self.log(f" ✓ 恢复成功,重新获取行数据")
|
||
except Exception as row_e:
|
||
self.log(f" ✗ 重新获取行数据失败: {str(row_e)[:50]}")
|
||
break
|
||
else:
|
||
self.log(f" ✗ 恢复失败")
|
||
break
|
||
else:
|
||
self.log(f"获取标题时出错: {error_msg[:50]}")
|
||
break
|
||
|
||
if title is None:
|
||
title = "(无法获取标题)"
|
||
self.log(f" [{i+1}] {title} - 跳过此行")
|
||
continue
|
||
|
||
self.log(f" [{i+1}] {title[:50]}")
|
||
total_items += 1
|
||
|
||
# 处理附件 (使用xpath:)
|
||
if auto_view_attachments:
|
||
# 每次都重新获取附件链接数量(带错误处理和重试)
|
||
att_count = 0
|
||
att_retry_count = 0
|
||
max_att_retries = 2
|
||
|
||
while att_retry_count < max_att_retries:
|
||
try:
|
||
att_links_locator = row.locator("xpath=.//td[5]//a[contains(@class, 'link-btn')]")
|
||
att_count = att_links_locator.count()
|
||
break
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
att_retry_count += 1
|
||
if self.is_context_error(error_msg):
|
||
self.log(f" ⚠ 获取附件时上下文失效({att_retry_count}/{max_att_retries})...")
|
||
if self.recover_iframe(browse_type):
|
||
try:
|
||
current_rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
|
||
row = current_rows_locator.nth(i)
|
||
except:
|
||
break
|
||
else:
|
||
break
|
||
else:
|
||
break
|
||
|
||
if att_count > 0:
|
||
# 只处理第一个附件
|
||
try:
|
||
att_link = att_links_locator.first
|
||
att_text = att_link.inner_text().strip() or "附件"
|
||
except Exception as e:
|
||
if self.is_context_error(str(e)):
|
||
self.log(f" ⚠ 获取附件信息时上下文失效,跳过")
|
||
self.recover_iframe(browse_type)
|
||
continue
|
||
att_text = "附件"
|
||
|
||
self.log(f" - 处理{att_text}...")
|
||
|
||
try:
|
||
# 记录点击前的页面数量
|
||
pages_before = len(self.context.pages)
|
||
|
||
# 点击附件(带上下文错误检测)
|
||
try:
|
||
att_link.click()
|
||
except Exception as click_e:
|
||
if self.is_context_error(str(click_e)):
|
||
self.log(f" ⚠ 点击附件时上下文失效,尝试恢复...")
|
||
if self.recover_iframe(browse_type):
|
||
continue # 恢复后跳到下一行
|
||
else:
|
||
raise
|
||
raise
|
||
|
||
# 快速检测是否有新窗口
|
||
time.sleep(0.5)
|
||
|
||
# 检查是否有新窗口
|
||
try:
|
||
pages_after = self.context.pages
|
||
except Exception as e:
|
||
if self.is_context_error(str(e)):
|
||
self.log(f" ⚠ 检查页面时上下文失效,尝试恢复...")
|
||
self.recover_iframe(browse_type)
|
||
continue
|
||
raise
|
||
|
||
if len(pages_after) > pages_before:
|
||
# 有新窗口打开
|
||
new_page = pages_after[-1]
|
||
self.log(f" - 新窗口已打开,等待加载...")
|
||
time.sleep(interval)
|
||
|
||
# 关闭新窗口
|
||
try:
|
||
new_page.close()
|
||
except:
|
||
pass
|
||
self.log(f" - 新窗口已关闭")
|
||
else:
|
||
# 没有新窗口,使用浏览器返回
|
||
try:
|
||
self.main_page.evaluate("() => window.history.back()")
|
||
except Exception as e:
|
||
if self.is_context_error(str(e)):
|
||
self.log(f" ⚠ 返回时上下文失效,尝试恢复...")
|
||
self.recover_iframe(browse_type)
|
||
continue
|
||
time.sleep(0.5)
|
||
|
||
# 确保回到iframe中
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log(f" - 警告:返回后无法获取iframe,尝试恢复")
|
||
if not self.recover_iframe(browse_type):
|
||
continue
|
||
|
||
# 确保回到iframe中
|
||
time.sleep(0.2)
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log(f" - 无法恢复iframe,尝试完整恢复...")
|
||
if not self.recover_iframe(browse_type):
|
||
continue
|
||
|
||
total_attachments += 1
|
||
self.log(f" - {att_text}处理完成")
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
self.log(f" - 处理{att_text}时出错: {error_msg[:60]}")
|
||
|
||
# 统一使用 is_context_error 检查
|
||
if self.is_context_error(error_msg):
|
||
self.log(f" - 检测到上下文失效,尝试恢复...")
|
||
if self.recover_iframe(browse_type):
|
||
self.log(f" - 已恢复,继续处理下一条")
|
||
else:
|
||
self.log(f" - 恢复失败,将在下一行重试")
|
||
else:
|
||
# 其他错误,尝试简单恢复
|
||
try:
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.recover_iframe(browse_type)
|
||
except:
|
||
pass
|
||
|
||
# 处理完当前页后,检查是否需要翻页
|
||
if auto_next_page:
|
||
page_retry_count = 0
|
||
max_page_retries = 3
|
||
|
||
while page_retry_count < max_page_retries:
|
||
try:
|
||
# 确保在iframe中
|
||
time.sleep(0.2)
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log("警告:翻页前无法获取iframe,尝试恢复...")
|
||
if not self.recover_iframe(browse_type):
|
||
self.log("错误:iframe恢复失败,停止浏览")
|
||
break
|
||
|
||
# 检查是否有下一页
|
||
next_button = self.page.locator("//div[@id='PageContent']/a[contains(text(), '下一页') or contains(text(), '»')]")
|
||
if next_button.count() > 0:
|
||
self.log("点击下一页...")
|
||
try:
|
||
next_button.click()
|
||
except Exception as click_e:
|
||
if self.is_context_error(str(click_e)):
|
||
page_retry_count += 1
|
||
self.log(f"⚠ 点击下一页时上下文失效,重试... ({page_retry_count}/{max_page_retries})")
|
||
self.recover_iframe(browse_type)
|
||
continue
|
||
raise
|
||
time.sleep(1.5)
|
||
current_page += 1
|
||
break # 成功翻页,退出重试循环
|
||
else:
|
||
# 没有下一页了,检查是否可以提前结束
|
||
if not completed_first_round:
|
||
completed_first_round = True
|
||
self.log("完成第一轮浏览,准备返回第一页继续浏览...")
|
||
else:
|
||
self.log("完成一轮浏览,返回第一页继续...")
|
||
|
||
# 基于计数判断是否已完成
|
||
if expected_total is not None and total_items >= expected_total:
|
||
self.log(f"已浏览 {total_items}/{expected_total} 条,无需继续刷新")
|
||
break
|
||
|
||
# 刷新页面并重新点击浏览类型
|
||
self.log("刷新页面并重新点击浏览类型...")
|
||
try:
|
||
self.main_page.reload(wait_until='domcontentloaded')
|
||
except Exception as reload_e:
|
||
if self.is_context_error(str(reload_e)):
|
||
self.log("⚠ 刷新页面时上下文失效,等待后重试...")
|
||
time.sleep(2)
|
||
time.sleep(1.5)
|
||
|
||
# 切换到iframe
|
||
time.sleep(0.5)
|
||
self.page = self.get_iframe_safe()
|
||
if not self.page:
|
||
self.log("警告:刷新后无法获取iframe,尝试恢复...")
|
||
if not self.recover_iframe(browse_type):
|
||
self.log("错误:刷新后无法恢复iframe,停止翻页")
|
||
break
|
||
|
||
# 重新点击浏览类型按钮
|
||
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
|
||
try:
|
||
self.page.locator(selector).click(timeout=5000)
|
||
self.log(f"重新点击'{browse_type}'按钮成功")
|
||
time.sleep(1.5)
|
||
|
||
# 等待表格加载
|
||
try:
|
||
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
|
||
self.log("内容表格已加载")
|
||
except:
|
||
self.log("等待表格加载超时,继续...")
|
||
except Exception as e:
|
||
if self.is_context_error(str(e)):
|
||
page_retry_count += 1
|
||
self.log(f"⚠ 点击浏览类型时上下文失效,重试... ({page_retry_count}/{max_page_retries})")
|
||
continue
|
||
# 尝试点击label
|
||
try:
|
||
label_selector = f"//label[contains(text(), '{browse_type}')]"
|
||
self.page.locator(label_selector).click(timeout=5000)
|
||
self.log(f"点击'{browse_type}'标签成功")
|
||
time.sleep(1.5)
|
||
except Exception as label_e:
|
||
if self.is_context_error(str(label_e)):
|
||
page_retry_count += 1
|
||
self.log(f"⚠ 点击label时上下文失效,重试... ({page_retry_count}/{max_page_retries})")
|
||
continue
|
||
self.log(f"点击浏览类型失败: {str(e)[:50]}")
|
||
|
||
current_page = 1
|
||
break # 成功返回第一页,退出重试循环
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if self.is_context_error(error_msg):
|
||
page_retry_count += 1
|
||
self.log(f"⚠ 翻页时上下文失效,尝试恢复... ({page_retry_count}/{max_page_retries})")
|
||
time.sleep(1)
|
||
self.recover_iframe(browse_type)
|
||
continue
|
||
else:
|
||
self.log(f"翻页时出错: {error_msg[:60]}")
|
||
break
|
||
|
||
# 如果重试次数用完,跳出主循环
|
||
if page_retry_count >= max_page_retries:
|
||
self.log(f"✗ 翻页重试{max_page_retries}次后仍失败,停止浏览")
|
||
break
|
||
|
||
result.success = True
|
||
result.total_items = total_items
|
||
result.total_attachments = total_attachments
|
||
self.log(f"浏览完成!共 {total_items} 条内容,{total_attachments} 个附件")
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
result.error_message = error_msg
|
||
self.log(f"浏览内容时出错: {error_msg[:80]}")
|
||
|
||
# 如果是上下文错误,记录更详细的信息
|
||
if self.is_context_error(error_msg):
|
||
self.log("⚠ 检测到上下文/导航相关错误,可能是页面发生了意外导航")
|
||
|
||
return result
|
||
|
||
def take_screenshot(self, filepath: str) -> bool:
|
||
"""
|
||
截图
|
||
|
||
Args:
|
||
filepath: 截图保存路径
|
||
|
||
Returns:
|
||
是否截图成功
|
||
"""
|
||
try:
|
||
# 使用最高质量设置截图
|
||
# type='jpeg' 指定JPEG格式(支持quality参数)
|
||
# quality=100 表示100%的JPEG质量(范围0-100,最高质量)
|
||
# full_page=True 表示截取整个页面
|
||
# 视口分辨率 2560x1440 确保高清晰度
|
||
# 这样可以生成更清晰的截图,大小约500KB-1MB左右
|
||
self.main_page.screenshot(
|
||
path=filepath,
|
||
type='jpeg',
|
||
full_page=True,
|
||
quality=100
|
||
)
|
||
self.log(f"截图已保存: {filepath}")
|
||
return True
|
||
except Exception as e:
|
||
self.log(f"截图失败: {str(e)}")
|
||
return False
|
||
|
||
def close(self):
|
||
"""完全关闭浏览器进程(每个账号独立)并确保资源释放"""
|
||
errors = []
|
||
|
||
# 第一步:关闭上下文
|
||
if self.context:
|
||
try:
|
||
self.context.close()
|
||
# self.log("上下文已关闭") # 精简日志
|
||
except Exception as e:
|
||
error_msg = f"关闭上下文时出错: {str(e)}"
|
||
self.log(error_msg)
|
||
errors.append(error_msg)
|
||
|
||
# 第二步:关闭浏览器进程
|
||
if self.browser:
|
||
try:
|
||
self.browser.close()
|
||
# self.log("浏览器进程已关闭") # 精简日志
|
||
except Exception as e:
|
||
error_msg = f"关闭浏览器时出错: {str(e)}"
|
||
self.log(error_msg)
|
||
errors.append(error_msg)
|
||
|
||
# 第三步:停止Playwright
|
||
if self.playwright:
|
||
try:
|
||
self.playwright.stop()
|
||
# self.log("Playwright已停止") # 精简日志
|
||
except Exception as e:
|
||
error_msg = f"停止Playwright时出错: {str(e)}"
|
||
self.log(error_msg)
|
||
errors.append(error_msg)
|
||
|
||
# 第四步:清空引用,确保垃圾回收
|
||
self.context = None
|
||
self.page = None
|
||
self.main_page = None
|
||
self.browser = None
|
||
self.playwright = None
|
||
|
||
# 第五步:强制等待,确保进程完全退出
|
||
time.sleep(0.5)
|
||
|
||
# 强制垃圾回收,释放内存
|
||
import gc
|
||
gc.collect()
|
||
if errors:
|
||
self.log(f"资源清理完成,但有{len(errors)}个警告")
|
||
# else部分日志已精简
|
||
|
||
|
||
# 简单的测试函数
|
||
if __name__ == "__main__":
|
||
print("Playwright自动化核心 - 测试")
|
||
print("="*60)
|
||
|
||
# 创建浏览器管理器
|
||
manager = PlaywrightBrowserManager(headless=True)
|
||
|
||
try:
|
||
# 初始化浏览器
|
||
manager.initialize()
|
||
|
||
# 创建自动化实例
|
||
automation = PlaywrightAutomation(manager, "test_account")
|
||
|
||
# 登录
|
||
if automation.login("19174616018", "aa123456"):
|
||
# 浏览内容
|
||
result = automation.browse_content(
|
||
browse_type="应读",
|
||
auto_next_page=True,
|
||
auto_view_attachments=True,
|
||
interval=2.0 # 增加间隔时间
|
||
)
|
||
|
||
print(f"\n浏览结果: {result}")
|
||
|
||
# 关闭
|
||
automation.close()
|
||
|
||
finally:
|
||
# 关闭浏览器管理器
|
||
manager.close()
|
||
|
||
print("="*60)
|
||
print("测试完成")
|