Files
zsglpt/playwright_automation.py
Yu Yon b5344cd55e 修复所有bug并添加新功能
- 修复添加账号按钮无反应问题
- 添加账号备注字段(可选)
- 添加账号设置按钮(修改密码/备注)
- 修复用户反馈���能
- 添加定时任务执行日志
- 修复容器重启后账号加载问题
- 修复所有JavaScript语法错误
- 优化账号加载机制(4层保障)

🤖 Generated with Claude Code
2025-12-10 11:19:16 +08:00

1379 lines
61 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Playwright版本 - 知识管理系统自动化核心
使用浏览器上下文(Context)实现高性能并发
"""
import os
from pathlib import Path
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
import time
import json
import threading
from typing import Optional, Callable
from dataclasses import dataclass
from app_config import get_config
# 设置浏览器安装路径(优先使用环境变量,否则使用默认路径)
if 'PLAYWRIGHT_BROWSERS_PATH' not in os.environ:
# 本地开发环境使用Windows默认路径
BROWSERS_PATH = str(Path.home() / "AppData" / "Local" / "ms-playwright")
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = BROWSERS_PATH
else:
# Docker环境使用已设置的环境变量
BROWSERS_PATH = os.environ["PLAYWRIGHT_BROWSERS_PATH"]
# 获取配置
config = get_config()
@dataclass
class BrowseResult:
"""浏览结果"""
success: bool
total_items: int = 0
total_attachments: int = 0
error_message: str = ""
class PlaywrightBrowserManager:
"""Playwright浏览器管理器 - 每个账号独立的浏览器实例"""
def __init__(self, headless: bool = True, log_callback: Optional[Callable] = None):
"""
初始化浏览器管理器
Args:
headless: 是否使用无头模式
log_callback: 日志回调函数,签名: log_callback(message, account_id=None)
"""
self.headless = headless
self.log_callback = log_callback
self._lock = threading.Lock()
def log(self, message: str, account_id: Optional[str] = None):
"""记录日志"""
if self.log_callback:
self.log_callback(message, account_id)
def create_browser(self, proxy_config=None):
"""创建新的独立浏览器实例(每个账号独立)"""
try:
# self.log("初始化Playwright实例...") # 精简日志
playwright = sync_playwright().start()
# self.log("启动独立浏览器进程...") # 精简日志
start_time = time.time()
# 准备浏览器启动参数
launch_options = {
'headless': self.headless,
'args': [
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-notifications',
'--disable-infobars',
'--disable-default-apps',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding',
]
}
# 如果有代理配置,添加代理
if proxy_config and proxy_config.get('server'):
launch_options['proxy'] = {
'server': proxy_config['server']
}
self.log(f"使用代理: {proxy_config['server']}")
browser = playwright.chromium.launch(**launch_options)
elapsed = time.time() - start_time
# self.log(f"独立浏览器启动成功") # 精简日志
return playwright, browser
except Exception as e:
self.log(f"启动浏览器失败: {str(e)}")
raise
def create_browser_and_context(self, proxy_config=None, storage_state=None):
"""创建独立的浏览器和上下文(每个账号完全隔离)"""
playwright, browser = self.create_browser(proxy_config)
start_time = time.time()
# self.log("创建浏览器上下文...") # 精<><E7B2BE><EFBFBD>日志
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'device_scale_factor': 2, # 2倍设备像素比提高文字清晰度
}
if storage_state:
context_options['storage_state'] = storage_state
context = browser.new_context(**context_options)
# 设置默认超时
context.set_default_timeout(config.DEFAULT_TIMEOUT)
context.set_default_navigation_timeout(config.PAGE_LOAD_TIMEOUT)
elapsed = time.time() - start_time
# self.log(f"上下文创建完成") # 精简日志
return playwright, browser, context
class PlaywrightAutomation:
"""Playwright自动化操作类"""
def __init__(self, browser_manager: PlaywrightBrowserManager, account_id: str, proxy_config: Optional[dict] = None):
"""
初始化自动化操作
Args:
browser_manager: 浏览器管理器
account_id: 账号ID用于日志
"""
self.browser_manager = browser_manager
self.account_id = account_id
self.proxy_config = proxy_config
self.playwright: Optional[Playwright] = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.main_page: Optional[Page] = None
def log(self, message: str):
"""记录日志"""
self.browser_manager.log(message, self.account_id)
# Cookies存储目录
COOKIES_DIR = '/app/data/cookies'
def get_cookies_path(self, username: str) -> str:
"""获取cookies文件路径"""
import os
os.makedirs(self.COOKIES_DIR, exist_ok=True)
# 用用户名的hash作为文件名避免特殊字符问题
import hashlib
filename = hashlib.md5(username.encode()).hexdigest() + '.json'
return os.path.join(self.COOKIES_DIR, filename)
def save_cookies(self, username: str):
"""保存当前会话的cookies"""
try:
if self.context:
storage = self.context.storage_state()
cookies_path = self.get_cookies_path(username)
with open(cookies_path, 'w', encoding='utf-8') as f:
json.dump(storage, f)
self.log(f"Cookies已保存")
return True
except Exception as e:
self.log(f"保存cookies失败: {e}")
return False
def load_cookies(self, username: str) -> bool:
"""加载已保存的cookies"""
import os
cookies_path = self.get_cookies_path(username)
if not os.path.exists(cookies_path):
return False
try:
# 检查cookies文件是否过期24小时
import time as time_module
file_age = time_module.time() - os.path.getmtime(cookies_path)
if file_age > 24 * 3600: # 24小时
self.log(f"Cookies已过期需要重新登录")
os.remove(cookies_path)
return False
with open(cookies_path, 'r', encoding='utf-8') as f:
storage = json.load(f)
# 创建带cookies的浏览器上下文
self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(
self.proxy_config,
storage_state=storage
)
self.page = self.context.new_page()
self.main_page = self.page
return True
except Exception as e:
self.log(f"加载cookies失败: {e}")
return False
def check_login_state(self) -> bool:
"""检查当前是否处于登录状态"""
try:
# 访问首页检查是否跳转到登录页
self.page.goto('https://postoa.aidunsoft.com/admin/index.aspx', timeout=15000)
self.page.wait_for_load_state('networkidle', timeout=10000)
current_url = self.page.url
# 如果还在index页面说明登录态有效
if 'index.aspx' in current_url:
return True
return False
except:
return False
def quick_login(self, username: str, password: str, remember: bool = True):
"""快速登录 - 使用池中浏览器时直接登录否则尝试cookies"""
# 如果已有浏览器实例(从池中获取),直接使用该浏览器登录
# 不尝试加载cookies因为load_cookies会创建新浏览器覆盖池中的
if self.browser and self.browser.is_connected():
self.log("使用池中浏览器,直接登录")
result = self.login(username, password, remember)
if result.get('success'):
self.save_cookies(username)
result['used_cookies'] = False
return result
# 无现有浏览器时尝试使用cookies
if self.load_cookies(username):
self.log(f"尝试使用已保存的登录态...")
if self.check_login_state():
self.log(f"✓ 登录态有效,跳过登录")
return {"success": True, "message": "使用已保存的登录态", "used_cookies": True}
else:
self.log(f"登录态已失效,重新登录")
# 关闭当前context重新登录
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
except:
pass
# 正常登录
result = self.login(username, password, remember)
# 登录成功后保存cookies
if result.get('success'):
self.save_cookies(username)
result['used_cookies'] = False
return result
def login(self, username: str, password: str, remember: bool = True) -> bool:
"""
登录系统
Args:
username: 用户名
password: 密码
remember: 是否记住密码
Returns:
是否登录成功
"""
try:
start_time = time.time()
# 如果已有浏览器实例从浏览器池获取只创建context
if self.browser and self.browser.is_connected():
self.log("使用池中浏览器...")
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'device_scale_factor': 2,
}
self.context = self.browser.new_context(**context_options)
else:
# 创建新的浏览器和上下文
self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(self.proxy_config)
elapsed = time.time() - start_time
# self.log("浏览器就绪") # 精简日志
# self.log("创建页面...") # 精简日志
self.page = self.context.new_page()
self.main_page = self.page
# self.log("访问登录页面...") # 精简日志
# 使用重试机制处理超时
max_retries = 2
for attempt in range(max_retries):
try:
self.page.goto(config.ZSGL_LOGIN_URL, timeout=60000)
break
except Exception as e:
if attempt < max_retries - 1:
self.log(f"页面加载超时,重试中... ({attempt + 1}/{max_retries})")
time.sleep(2)
else:
raise
# self.log("填写登录信息...") # 精简日志
self.page.fill('#txtUserName', username)
self.page.fill('#txtPassword', password)
if remember:
self.page.check('#chkRemember')
# self.log("点击登录按钮...") # 精简日志
self.page.click('#btnSubmit')
# 等待跳转
# self.log("等待登录处理...") # 精简日志
self.page.wait_for_load_state('networkidle', timeout=30000) # 增加到30秒
# 检查登录结果
current_url = self.page.url
self.log(f"当前URL: {current_url}")
if config.ZSGL_INDEX_URL_PATTERN in current_url:
self.log("登录成功!")
return {"success": True, "error_type": None, "message": "登录成功"}
else:
# 检查是否显示了错误提示
error_message = "登录失败"
error_type = "unknown" # 默认为未知错误,不是密码错误
try:
# 尝试获取页面上的错误提示
error_element = self.page.locator('#lblMsg, .error-message, [class*="error"]').first
if error_element.is_visible(timeout=2000):
error_text = error_element.inner_text().strip()
if error_text:
error_message = error_text
self.log(f"登录错误提示: {error_text}")
# 只有明确提示密码错误时才标记为密码错误
if "密码" in error_text or "password" in error_text.lower() or "用户名" in error_text or "账号" in error_text:
error_type = "password_error"
else:
error_type = "login_error"
except:
pass
# 如果没有明确的错误提示,可能是网络问题,不认为是密码错误
if error_type == "unknown":
error_message = "登录失败,可能是网络问题或页面加载超时"
error_type = "network_error"
self.log(error_message)
return {"success": False, "error_type": error_type, "message": error_message}
except Exception as e:
error_msg = str(e)
self.log(f"登录过程中出错: {error_msg}")
return {"success": False, "error_type": "exception", "message": error_msg}
def is_context_error(self, error_msg: str) -> bool:
"""检查是否是上下文/导航相关错误"""
error_keywords = [
"Frame was detached",
"Execution context was destroyed",
"navigation",
"detached",
"Target closed",
"Session closed",
"Connection closed"
]
error_lower = error_msg.lower()
return any(keyword.lower() in error_lower for keyword in error_keywords)
def safe_execute(self, action, description="操作", max_retries=3, recover_browse_type=None):
"""安全执行操作,自动处理上下文销毁等错误
Args:
action: 要执行的函数
description: 操作描述(用于日志)
max_retries: 最大重试次数
recover_browse_type: 恢复时需要重新点击的浏览类型
Returns:
(success, result) 元组
"""
last_error = None
for attempt in range(max_retries):
try:
result = action()
return True, result
except Exception as e:
last_error = str(e)
if self.is_context_error(last_error):
if attempt < max_retries - 1:
self.log(f"{description}时上下文失效,尝试恢复... ({attempt+1}/{max_retries})")
time.sleep(1 + attempt * 0.5)
# 尝试恢复iframe
if self.recover_iframe(recover_browse_type):
continue
else:
self.log(f" iframe恢复失败继续重试...")
else:
self.log(f"{description}失败,已重试{max_retries}次: {last_error}")
else:
# 非上下文错误,直接返回失败
self.log(f"{description}失败: {last_error}")
return False, None
return False, None
def get_iframe_safe(self, retry=True, max_retries=5):
"""安全地获取iframe带重试机制
Args:
retry: 是否启用重试
max_retries: 最大重试次数
"""
for attempt in range(max_retries if retry else 1):
try:
# 先检查main_page是否有效
if not self.main_page:
self.log("⚠ main_page无效")
return None
iframe = self.main_page.frame('mainframe')
if iframe:
return iframe
except Exception as e:
error_msg = str(e)
if self.is_context_error(error_msg):
self.log(f"⚠ 获取iframe时上下文失效等待恢复... ({attempt+1}/{max_retries})")
else:
self.log(f"⚠ 获取iframe出错: {error_msg}")
if attempt < max_retries - 1:
time.sleep(0.5 + attempt * 0.3) # 递增等待时间
return None
def recover_iframe(self, browse_type: str = None) -> bool:
"""尝试恢复iframe连接
当遇到 Frame was detached / Execution context was destroyed 错误时调用此函数
采用多级恢复策略,逐步升级恢复力度
"""
self.log("🔄 尝试恢复iframe连接...")
# 方法1: 直接尝试获取iframe最快适用于短暂的上下文切换
self.page = self.get_iframe_safe(retry=True, max_retries=3)
if self.page:
self.log("✓ iframe恢复成功直接获取")
return True
# 方法2: 等待页面稳定后重试(适用于页面正在加载的情况)
self.log(" 等待页面稳定...")
time.sleep(1.5)
try:
self.main_page.wait_for_load_state('domcontentloaded', timeout=5000)
except:
pass
try:
self.main_page.wait_for_load_state('networkidle', timeout=10000)
except:
pass
self.page = self.get_iframe_safe(retry=True, max_retries=3)
if self.page:
self.log("✓ iframe恢复成功等待后获取")
return True
# 方法3: 使用JavaScript强制等待并获取iframe
self.log(" 尝试JavaScript方式获取iframe...")
try:
# 等待iframe存在
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=5000)
# 使用evaluate确保iframe可用
has_iframe = self.main_page.evaluate("""() => {
const iframe = document.querySelector('iframe[name="mainframe"]');
return iframe && iframe.contentWindow && iframe.contentDocument;
}""")
if has_iframe:
time.sleep(0.5)
self.page = self.get_iframe_safe(retry=True, max_retries=3)
if self.page:
self.log("✓ iframe恢复成功JavaScript验证后获取")
return True
except Exception as e:
self.log(f" JavaScript方式失败: {str(e)[:50]}")
# 方法4: 刷新页面并重新切换(最后手段)
self.log(" 刷新页面重试...")
try:
self.main_page.reload(wait_until='domcontentloaded')
time.sleep(2)
# 等待iframe出现
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=15000)
time.sleep(1)
self.page = self.get_iframe_safe(retry=True, max_retries=5)
if self.page:
# 如果有浏览类型,重新点击
if browse_type:
self.log(f" 重新点击'{browse_type}'...")
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
try:
self.page.locator(selector).click(timeout=5000)
time.sleep(1.5)
# 等待表格加载
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=10000)
except:
pass
self.log(f"✓ iframe恢复成功刷新后重新点击'{browse_type}'")
except:
# 尝试点击label
try:
label_selector = f"//label[contains(text(), '{browse_type}')]"
self.page.locator(label_selector).click(timeout=5000)
time.sleep(1.5)
self.log(f"✓ iframe恢复成功刷新后点击label")
except Exception as label_e:
self.log(f" 点击label也失败: {str(label_e)[:30]}")
return False
else:
self.log("✓ iframe恢复成功刷新后获取")
return True
except Exception as e:
self.log(f"✗ 刷新恢复失败: {str(e)[:50]}")
self.log("✗ iframe恢复失败所有方法都已尝试")
return False
def switch_to_iframe(self) -> bool:
"""切换到mainframe iframe"""
try:
# self.log("查找并切换到iframe...") # 精简日志
# 使用Playwright的等待机制
max_retries = 3
for i in range(max_retries):
try:
# 等待iframe元素出现
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=2000)
# 获取iframe
iframe = self.get_iframe_safe()
if iframe:
self.page = iframe
self.log(f"✓ 成功切换到iframe (尝试 {i+1}/{max_retries})")
return True
except Exception as e:
if i < max_retries - 1:
self.log(f"未找到iframe重试中... ({i+1}/{max_retries})")
time.sleep(1)
else:
self.log(f"所有重试都失败未找到iframe")
return False
except Exception as e:
self.log(f"切换到iframe时出错: {str(e)}")
return False
def safe_click(self, locator, timeout=5000, description="元素"):
"""安全地点击元素,捕获导航异常"""
try:
locator.click(timeout=timeout)
return True
except Exception as e:
error_msg = str(e)
if "Execution context was destroyed" in error_msg or "navigation" in error_msg.lower():
self.log(f"⚠ 点击{description}时检测到页面导航,等待页面稳定...")
time.sleep(1)
return True # 虽然有异常但导航成功返回True
else:
self.log(f"点击{description}失败: {error_msg}")
return False
def switch_browse_type(self, browse_type: str, max_retries: int = 2) -> bool:
"""
切换浏览类型(带重试机制)
Args:
browse_type: 浏览类型(注册前未读/应读/已读)
max_retries: 最大重试次数(默认2次)
Returns:
是否切换成功
"""
for attempt in range(max_retries + 1):
try:
if attempt > 0:
self.log(f"⚠ 第 {attempt + 1} 次尝试切换浏览类型...")
else:
self.log(f"切换到'{browse_type}'类型...")
# 切换到iframe
if not self.switch_to_iframe():
if attempt < max_retries:
self.log(f"iframe切换失败,等待1秒后重试...")
time.sleep(1)
continue
return False
# 方法1: 尝试查找<a>标签如果JavaScript创建了的话
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
try:
# 等待并点击
self.page.locator(selector).click(timeout=5000)
self.log(f"点击'{browse_type}'按钮成功")
# 等待页面刷新并加载内容
time.sleep(1.5)
# 等待表格加载最多等待30秒
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
self.log("内容表格已加载")
except Exception as e:
self.log("等待表格加载超时,继续...")
return True
except Exception as e:
error_msg = str(e)
if "Execution context was destroyed" in error_msg:
self.log(f"⚠ 检测到执行上下文被销毁")
if attempt < max_retries:
self.log(f"等待2秒后重试...")
time.sleep(2)
continue
self.log(f"未找到<a>标签,尝试点击<label>...")
# 方法2: 点击label模拟点击radio button
label_selector = f"//label[contains(text(), '{browse_type}')]"
try:
self.page.locator(label_selector).click(timeout=5000)
self.log(f"点击'{browse_type}'标签成功")
# 等待页面刷新并加载内容
time.sleep(1.5)
# 等待表格加载最多等待30秒
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
self.log("内容表格已加载")
except Exception as e:
self.log("等待表格加载超时,继续...")
return True
except Exception as e:
error_msg = str(e)
if "Execution context was destroyed" in error_msg:
self.log(f"⚠ 检测到执行上下文被销毁")
if attempt < max_retries:
self.log(f"等待2秒后重试...")
time.sleep(2)
continue
self.log(f"未找到<label>标签")
# 如果两种方法都失败,但还有重试机会
if attempt < max_retries:
self.log(f"切换失败,等待2秒后重试...")
time.sleep(2)
continue
return False
except Exception as e:
error_msg = str(e)
self.log(f"切换浏览类型时出错: {error_msg}")
# 检查是否是 "Execution context was destroyed" 错误
if "Execution context was destroyed" in error_msg or "navigation" in error_msg.lower():
if attempt < max_retries:
self.log(f"⚠ 检测到执行上下文被销毁或导航错误,等待2秒后重试...")
time.sleep(2)
continue
return False
# 所有重试都失败
self.log(f"❌ 切换浏览类型失败,已重试 {max_retries}")
return False
def browse_content(self, browse_type: str,
auto_next_page: bool = True,
auto_view_attachments: bool = True,
interval: float = 1.0,
should_stop_callback: Optional[Callable] = None,
navigate_only: bool = False) -> BrowseResult:
"""
浏览内容
Args:
browse_type: 浏览类型
auto_next_page: 是否自动翻页
auto_view_attachments: 是否自动查看附件
interval: 查看附件的间隔时间(秒)
should_stop_callback: 检查是否应该停止的回调函数
Returns:
浏览结果
"""
result = BrowseResult(success=False)
try:
# 先导航到浏览页面
self.log(f"导航到 '{browse_type}' 页面...")
try:
# 等待页面完全加载
time.sleep(2)
self.log(f"当前URL: {self.main_page.url}")
except Exception as e:
self.log(f"获取URL失败: {str(e)}")
# 切换浏览类型
if not self.switch_browse_type(browse_type):
result.error_message = "切换浏览类型失败"
return result
# 如果只是导航(用于截图),切换完成后直接返回
if navigate_only:
time.sleep(1) # 等待页面稳定
result.success = True
return result
# 原有逻辑继续...
if False: # 占位,保持原有代码结构
result.error_message = "切换浏览类型失败"
return result
current_page = 1
total_items = 0
total_attachments = 0
completed_first_round = False
empty_page_counter = 0
expected_total = None # 预期总数(从分页信息获取)
while True:
# 检查是否应该停止
if should_stop_callback and should_stop_callback():
self.log("收到停止信号,终止浏览")
break
self.log(f"处理第 {current_page} 页...")
# 确保在iframe中(关键!)
time.sleep(0.3)
self.page = self.get_iframe_safe()
if not self.page:
self.log("错误无法获取iframe")
break
# 等待表格加载完成最多等待10秒
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=10000)
except:
self.log("等待表格超时,继续尝试...")
# 额外等待确保AJAX内容加载完成
# 第一页等待更长时间因为是首次加载并发时尤其<E5B0A4><E585B6><EFBFBD>
if current_page == 1 and total_items == 0:
time.sleep(3.0)
else:
time.sleep(1.0)
# 获取内容行数量带重试机制避免AJAX加载慢导致误判
# 第一页使用更多重试次数8次×3秒=24秒处理高并发时的慢加载
# 后续页使用3次×1.5秒=4.5秒
max_retries = 8 if (current_page == 1 and total_items == 0) else 3
retry_wait = 3.0 if (current_page == 1 and total_items == 0) else 1.5
rows_count = 0
for retry in range(max_retries):
rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
rows_count = rows_locator.count()
if rows_count > 0:
break
if retry < max_retries - 1:
self.log(f"未检测到内容,等待后重试... ({retry+1}/{max_retries})")
time.sleep(retry_wait)
if rows_count == 0:
self.log("当前页面没有内容")
# 调试:输出页面信息帮助诊断
try:
page_html = self.page.content()
if 'ltable' in page_html:
self.log(f"[调试] 表格存在,但没有数据行")
# 检查是否有"暂无记录"提示
if '暂无' in page_html or '没有' in page_html:
self.log(f"[调试] 页面显示暂无记录")
else:
self.log(f"[调试] 页面中没有找到ltable表格")
# 检查URL
self.log(f"[调试] iframe URL: {self.page.url}")
except Exception as debug_e:
self.log(f"[调试] 获取页面信息失败: {str(debug_e)[:50]}")
empty_page_counter += 1
self.log(f"连续空页面数: {empty_page_counter}")
# 结束条件判断
# 条件1: 基于计数 - 已浏览数量 >= 预期总数
if expected_total is not None and total_items >= expected_total:
self.log(f"已浏览 {total_items}/{expected_total} 条,基于计数判断完成")
break
# 条件2: 空页面兜底 - 已完成一轮且空页面只需1次确认
if completed_first_round and empty_page_counter >= 1:
self.log(f"检测到空页面,已浏览 {total_items} 条,内容已浏览完毕")
break
# 尝试翻页或返回第一页
if auto_next_page:
# 检查是否有下一页
try:
next_button = self.page.locator("//div[@id='PageContent']/a[contains(text(), '下一页') or contains(text(), '»')]")
if next_button.count() > 0:
self.log("点击下一页...")
next_button.click()
time.sleep(1.5)
current_page += 1
continue
else:
# 没有下一页,返回第一页
if not completed_first_round:
completed_first_round = True
self.log("完成第一轮浏览,准备返回第一页继续浏览...")
else:
self.log("完成一轮浏览,返回第一页继续...")
# 刷新页面并重新点击浏览类型
self.log("刷新页面并重新点击浏览类型...")
self.main_page.reload()
time.sleep(1.5)
# 切换到iframe
time.sleep(0.5)
self.page = self.get_iframe_safe()
if not self.page:
self.log("错误刷新后无法获取iframe停止浏览")
break
# 重新点击浏览类型按钮
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
try:
self.page.locator(selector).click(timeout=5000)
self.log(f"重新点击'{browse_type}'按钮成功")
time.sleep(1.5)
# 等待表格加载
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=15000)
self.log("内容表格已加载")
# 额外等待AJAX数据加载
time.sleep(1.0)
except Exception as e:
self.log("等待表格加载超时,继续...")
except Exception as e:
# 尝试点击label
label_selector = f"//label[contains(text(), '{browse_type}')]"
self.page.locator(label_selector).click(timeout=5000)
self.log(f"点击'{browse_type}'标签成功")
time.sleep(2.0)
current_page = 1
continue
except Exception as e:
self.log(f"翻页时出错: {str(e)}")
break
else:
break
# 找到内容,重置空页面计数
empty_page_counter = 0
self.log(f"找到 {rows_count} 条内容")
# 获取分页信息,解析总数
try:
page_content = self.page.locator("//div[@id='PageContent']")
if page_content.count() > 0:
page_text = page_content.inner_text(timeout=3000).strip()
if page_text:
self.log(f"[分页信息] {page_text}")
# 解析"共XXX记录"获取总数
if expected_total is None:
import re
match = re.search(r'共(\d+)记录', page_text)
if match:
expected_total = int(match.group(1))
self.log(f"[总数] 预期浏览 {expected_total} 条内容")
except:
pass
# 处理每一行 (每次从头重新获取所有行)
for i in range(rows_count):
if should_stop_callback and should_stop_callback():
break
# 每次处理新行前,确保在iframe中(关键!尤其是history.back()后)
if i > 0:
time.sleep(0.2)
self.page = self.get_iframe_safe()
if not self.page:
self.log("警告无法获取iframe尝试恢复...")
if not self.recover_iframe(browse_type):
self.log("错误iframe恢复失败停止处理当前页")
break
# 恢复成功后重新获取page
self.page = self.get_iframe_safe()
if not self.page:
break
# 每次都重新获取rows_locator和row,确保元素是最新的
row = None
row_retry_count = 0
max_row_retries = 3
while row is None and row_retry_count < max_row_retries:
try:
current_rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
row = current_rows_locator.nth(i)
# 验证row是否有效
_ = row.count()
break
except Exception as e:
error_msg = str(e)
row_retry_count += 1
if self.is_context_error(error_msg):
self.log(f"⚠ 获取行时上下文失效,尝试恢复... ({row_retry_count}/{max_row_retries})")
if self.recover_iframe(browse_type):
row = None # 重置row重新获取
continue
else:
self.log("错误iframe恢复失败")
break
else:
self.log(f"获取行时出错: {error_msg[:50]}")
break
if row is None:
self.log("错误:无法获取行数据,停止处理当前页")
break
# 获取标题(带错误处理和重试)
title = None
title_retry_count = 0
max_title_retries = 3
while title is None and title_retry_count < max_title_retries:
try:
title_cell = row.locator("xpath=.//td[4]")
title = title_cell.inner_text(timeout=10000).strip()
break
except Exception as e:
error_msg = str(e)
title_retry_count += 1
if self.is_context_error(error_msg) or "Timeout" in error_msg:
self.log(f"⚠ 获取标题时失败({title_retry_count}/{max_title_retries}),尝试恢复...")
if self.recover_iframe(browse_type):
time.sleep(0.3)
try:
current_rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
row = current_rows_locator.nth(i)
_ = row.count()
self.log(f" ✓ 恢复成功,重新获取行数据")
except Exception as row_e:
self.log(f" ✗ 重新获取行数据失败: {str(row_e)[:50]}")
break
else:
self.log(f" ✗ 恢复失败")
break
else:
self.log(f"获取标题时出错: {error_msg[:50]}")
break
if title is None:
title = "(无法获取标题)"
self.log(f" [{i+1}] {title} - 跳过此行")
continue
self.log(f" [{i+1}] {title[:50]}")
total_items += 1
# 处理附件 (使用xpath:)
if auto_view_attachments:
# 每次都重新获取附件链接数量(带错误处理和重试)
att_count = 0
att_retry_count = 0
max_att_retries = 2
while att_retry_count < max_att_retries:
try:
att_links_locator = row.locator("xpath=.//td[5]//a[contains(@class, 'link-btn')]")
att_count = att_links_locator.count()
break
except Exception as e:
error_msg = str(e)
att_retry_count += 1
if self.is_context_error(error_msg):
self.log(f" ⚠ 获取附件时上下文失效({att_retry_count}/{max_att_retries})...")
if self.recover_iframe(browse_type):
try:
current_rows_locator = self.page.locator("//table[@class='ltable']/tbody/tr[position()>1 and count(td)>=5]")
row = current_rows_locator.nth(i)
except:
break
else:
break
else:
break
if att_count > 0:
# 只处理第一个附件
try:
att_link = att_links_locator.first
att_text = att_link.inner_text().strip() or "附件"
except Exception as e:
if self.is_context_error(str(e)):
self.log(f" ⚠ 获取附件信息时上下文失效,跳过")
self.recover_iframe(browse_type)
continue
att_text = "附件"
self.log(f" - 处理{att_text}...")
try:
# 记录点击前的页面数量
pages_before = len(self.context.pages)
# 点击附件(带上下文错误检测)
try:
att_link.click()
except Exception as click_e:
if self.is_context_error(str(click_e)):
self.log(f" ⚠ 点击附件时上下文失效,尝试恢复...")
if self.recover_iframe(browse_type):
continue # 恢复后跳到下一行
else:
raise
raise
# 快速检测是否有新窗口
time.sleep(0.5)
# 检查是否有新窗口
try:
pages_after = self.context.pages
except Exception as e:
if self.is_context_error(str(e)):
self.log(f" ⚠ 检查页面时上下文失效,尝试恢复...")
self.recover_iframe(browse_type)
continue
raise
if len(pages_after) > pages_before:
# 有新窗口打开
new_page = pages_after[-1]
self.log(f" - 新窗口已打开,等待加载...")
time.sleep(interval)
# 关闭新窗口
try:
new_page.close()
except:
pass
self.log(f" - 新窗口已关闭")
else:
# 没有新窗口,使用浏览器返回
try:
self.main_page.evaluate("() => window.history.back()")
except Exception as e:
if self.is_context_error(str(e)):
self.log(f" ⚠ 返回时上下文失效,尝试恢复...")
self.recover_iframe(browse_type)
continue
time.sleep(0.5)
# 确保回到iframe中
self.page = self.get_iframe_safe()
if not self.page:
self.log(f" - 警告返回后无法获取iframe尝试恢复")
if not self.recover_iframe(browse_type):
continue
# 确保回到iframe中
time.sleep(0.2)
self.page = self.get_iframe_safe()
if not self.page:
self.log(f" - 无法恢复iframe尝试完整恢复...")
if not self.recover_iframe(browse_type):
continue
total_attachments += 1
self.log(f" - {att_text}处理完成")
except Exception as e:
error_msg = str(e)
self.log(f" - 处理{att_text}时出错: {error_msg[:60]}")
# 统一使用 is_context_error 检查
if self.is_context_error(error_msg):
self.log(f" - 检测到上下文失效,尝试恢复...")
if self.recover_iframe(browse_type):
self.log(f" - 已恢复,继续处理下一条")
else:
self.log(f" - 恢复失败,将在下一行重试")
else:
# 其他错误,尝试简单恢复
try:
self.page = self.get_iframe_safe()
if not self.page:
self.recover_iframe(browse_type)
except:
pass
# 处理完当前页后,检查是否需要翻页
if auto_next_page:
page_retry_count = 0
max_page_retries = 3
while page_retry_count < max_page_retries:
try:
# 确保在iframe中
time.sleep(0.2)
self.page = self.get_iframe_safe()
if not self.page:
self.log("警告翻页前无法获取iframe尝试恢复...")
if not self.recover_iframe(browse_type):
self.log("错误iframe恢复失败停止浏览")
break
# 检查是否有下一页
next_button = self.page.locator("//div[@id='PageContent']/a[contains(text(), '下一页') or contains(text(), '»')]")
if next_button.count() > 0:
self.log("点击下一页...")
try:
next_button.click()
except Exception as click_e:
if self.is_context_error(str(click_e)):
page_retry_count += 1
self.log(f"⚠ 点击下一页时上下文失效,重试... ({page_retry_count}/{max_page_retries})")
self.recover_iframe(browse_type)
continue
raise
time.sleep(1.5)
current_page += 1
break # 成功翻页,退出重试循环
else:
# 没有下一页了,检查是否可以提前结束
if not completed_first_round:
completed_first_round = True
self.log("完成第一轮浏览,准备返回第一页继续浏览...")
else:
self.log("完成一轮浏览,返回第一页继续...")
# 基于计数判断是否已完成
if expected_total is not None and total_items >= expected_total:
self.log(f"已浏览 {total_items}/{expected_total} 条,无需继续刷新")
break
# 刷新页面并重新点击浏览类型
self.log("刷新页面并重新点击浏览类型...")
try:
self.main_page.reload(wait_until='domcontentloaded')
except Exception as reload_e:
if self.is_context_error(str(reload_e)):
self.log("⚠ 刷新页面时上下文失效,等待后重试...")
time.sleep(2)
time.sleep(1.5)
# 切换到iframe
time.sleep(0.5)
self.page = self.get_iframe_safe()
if not self.page:
self.log("警告刷新后无法获取iframe尝试恢复...")
if not self.recover_iframe(browse_type):
self.log("错误刷新后无法恢复iframe停止翻页")
break
# 重新点击浏览类型按钮
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
try:
self.page.locator(selector).click(timeout=5000)
self.log(f"重新点击'{browse_type}'按钮成功")
time.sleep(1.5)
# 等待表格加载
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
self.log("内容表格已加载")
except:
self.log("等待表格加载超时,继续...")
except Exception as e:
if self.is_context_error(str(e)):
page_retry_count += 1
self.log(f"⚠ 点击浏览类型时上下文失效,重试... ({page_retry_count}/{max_page_retries})")
continue
# 尝试点击label
try:
label_selector = f"//label[contains(text(), '{browse_type}')]"
self.page.locator(label_selector).click(timeout=5000)
self.log(f"点击'{browse_type}'标签成功")
time.sleep(1.5)
except Exception as label_e:
if self.is_context_error(str(label_e)):
page_retry_count += 1
self.log(f"⚠ 点击label时上下文失效重试... ({page_retry_count}/{max_page_retries})")
continue
self.log(f"点击浏览类型失败: {str(e)[:50]}")
current_page = 1
break # 成功返回第一页,退出重试循环
except Exception as e:
error_msg = str(e)
if self.is_context_error(error_msg):
page_retry_count += 1
self.log(f"⚠ 翻页时上下文失效,尝试恢复... ({page_retry_count}/{max_page_retries})")
time.sleep(1)
self.recover_iframe(browse_type)
continue
else:
self.log(f"翻页时出错: {error_msg[:60]}")
break
# 如果重试次数用完,跳出主循环
if page_retry_count >= max_page_retries:
self.log(f"✗ 翻页重试{max_page_retries}次后仍失败,停止浏览")
break
result.success = True
result.total_items = total_items
result.total_attachments = total_attachments
self.log(f"浏览完成!共 {total_items} 条内容,{total_attachments} 个附件")
except Exception as e:
error_msg = str(e)
result.error_message = error_msg
self.log(f"浏览内容时出错: {error_msg[:80]}")
# 如果是上下文错误,记录更详细的信息
if self.is_context_error(error_msg):
self.log("⚠ 检测到上下文/导航相关错误,可能是页面发生了意外导航")
return result
def take_screenshot(self, filepath: str) -> bool:
"""
截图
Args:
filepath: 截图保存路径
Returns:
是否截图成功
"""
try:
# 使用最高质量设置截图
# type='jpeg' 指定JPEG格式支持quality参数
# quality=100 表示100%的JPEG质量范围0-100最高质量
# full_page=True 表示截取整个页面
# 视口分辨率 2560x1440 确保高清晰度
# 这样可以生成更清晰的截图大小约500KB-1MB左右
self.main_page.screenshot(
path=filepath,
type='jpeg',
full_page=True,
quality=100
)
self.log(f"截图已保存: {filepath}")
return True
except Exception as e:
self.log(f"截图失败: {str(e)}")
return False
def close(self):
"""完全关闭浏览器进程(每个账号独立)并确保资源释放"""
errors = []
# 第一步:关闭上下文
if self.context:
try:
self.context.close()
# self.log("上下文已关闭") # 精简日志
except Exception as e:
error_msg = f"关闭上下文时出错: {str(e)}"
self.log(error_msg)
errors.append(error_msg)
# 第二步:关闭浏览器进程
if self.browser:
try:
self.browser.close()
# self.log("浏览器进程已关闭") # 精简日志
except Exception as e:
error_msg = f"关闭浏览器时出错: {str(e)}"
self.log(error_msg)
errors.append(error_msg)
# 第三步:停止Playwright
if self.playwright:
try:
self.playwright.stop()
# self.log("Playwright已停止") # 精简日志
except Exception as e:
error_msg = f"停止Playwright时出错: {str(e)}"
self.log(error_msg)
errors.append(error_msg)
# 第四步:清空引用,确保垃圾回收
self.context = None
self.page = None
self.main_page = None
self.browser = None
self.playwright = None
# 第五步:强制等待,确保进程完全退出
time.sleep(0.5)
# 强制垃圾回收,释放内存
import gc
gc.collect()
if errors:
self.log(f"资源清理完成,但有{len(errors)}个警告")
# else部分日志已精简
# 简单的测试函数
if __name__ == "__main__":
print("Playwright自动化核心 - 测试")
print("="*60)
# 创建浏览器管理器
manager = PlaywrightBrowserManager(headless=True)
try:
# 初始化浏览器
manager.initialize()
# 创建自动化实例
automation = PlaywrightAutomation(manager, "test_account")
# 登录
if automation.login("19174616018", "aa123456"):
# 浏览内容
result = automation.browse_content(
browse_type="应读",
auto_next_page=True,
auto_view_attachments=True,
interval=2.0 # 增加间隔时间
)
print(f"\n浏览结果: {result}")
# 关闭
automation.close()
finally:
# 关闭浏览器管理器
manager.close()
print("="*60)
print("测试完成")