#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Playwright版本 - 知识管理系统自动化核心
使用浏览器上下文(Context)实现高性能并发
"""
import os
from pathlib import Path
from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright
import time
import json
import threading
from typing import Optional, Callable
from dataclasses import dataclass
from app_config import get_config
# 设置浏览器安装路径(优先使用环境变量,否则使用默认路径)
if 'PLAYWRIGHT_BROWSERS_PATH' not in os.environ:
# 本地开发环境,使用Windows默认路径
BROWSERS_PATH = str(Path.home() / "AppData" / "Local" / "ms-playwright")
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = BROWSERS_PATH
else:
# Docker环境,使用已设置的环境变量
BROWSERS_PATH = os.environ["PLAYWRIGHT_BROWSERS_PATH"]
# 获取配置
config = get_config()
@dataclass
class BrowseResult:
"""浏览结果"""
success: bool
total_items: int = 0
total_attachments: int = 0
error_message: str = ""
class PlaywrightBrowserManager:
"""Playwright浏览器管理器 - 每个账号独立的浏览器实例"""
def __init__(self, headless: bool = True, log_callback: Optional[Callable] = None):
"""
初始化浏览器管理器
Args:
headless: 是否使用无头模式
log_callback: 日志回调函数,签名: log_callback(message, account_id=None)
"""
self.headless = headless
self.log_callback = log_callback
self._lock = threading.Lock()
def log(self, message: str, account_id: Optional[str] = None):
"""记录日志"""
if self.log_callback:
self.log_callback(message, account_id)
def create_browser(self, proxy_config=None):
"""创建新的独立浏览器实例(每个账号独立)"""
try:
# self.log("初始化Playwright实例...") # 精简日志
playwright = sync_playwright().start()
# self.log("启动独立浏览器进程...") # 精简日志
start_time = time.time()
# 准备浏览器启动参数
launch_options = {
'headless': self.headless,
'args': [
'--no-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--disable-extensions',
'--disable-notifications',
'--disable-infobars',
'--disable-default-apps',
'--disable-background-timer-throttling',
'--disable-backgrounding-occluded-windows',
'--disable-renderer-backgrounding',
]
}
# 如果有代理配置,添加代理
if proxy_config and proxy_config.get('server'):
launch_options['proxy'] = {
'server': proxy_config['server']
}
self.log(f"使用代理: {proxy_config['server']}")
browser = playwright.chromium.launch(**launch_options)
elapsed = time.time() - start_time
# self.log(f"独立浏览器启动成功") # 精简日志
return playwright, browser
except Exception as e:
self.log(f"启动浏览器失败: {str(e)}")
raise
def create_browser_and_context(self, proxy_config=None, storage_state=None):
"""创建独立的浏览器和上下文(每个账号完全隔离)"""
playwright, browser = self.create_browser(proxy_config)
start_time = time.time()
# self.log("创建浏览器上下文...") # 精���日志
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'device_scale_factor': 2, # 2倍设备像素比,提高文字清晰度
}
if storage_state:
context_options['storage_state'] = storage_state
context = browser.new_context(**context_options)
# 设置默认超时
context.set_default_timeout(config.DEFAULT_TIMEOUT)
context.set_default_navigation_timeout(config.PAGE_LOAD_TIMEOUT)
elapsed = time.time() - start_time
# self.log(f"上下文创建完成") # 精简日志
return playwright, browser, context
class PlaywrightAutomation:
"""Playwright自动化操作类"""
def __init__(self, browser_manager: PlaywrightBrowserManager, account_id: str, proxy_config: Optional[dict] = None):
"""
初始化自动化操作
Args:
browser_manager: 浏览器管理器
account_id: 账号ID(用于日志)
"""
self.browser_manager = browser_manager
self.account_id = account_id
self.proxy_config = proxy_config
self.playwright: Optional[Playwright] = None
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.main_page: Optional[Page] = None
def log(self, message: str):
"""记录日志"""
self.browser_manager.log(message, self.account_id)
# Cookies存储目录
COOKIES_DIR = '/app/data/cookies'
def get_cookies_path(self, username: str) -> str:
"""获取cookies文件路径"""
import os
os.makedirs(self.COOKIES_DIR, exist_ok=True)
# 用用户名的hash作为文件名,避免特殊字符问题
import hashlib
filename = hashlib.md5(username.encode()).hexdigest() + '.json'
return os.path.join(self.COOKIES_DIR, filename)
def save_cookies(self, username: str):
"""保存当前会话的cookies"""
try:
if self.context:
storage = self.context.storage_state()
cookies_path = self.get_cookies_path(username)
with open(cookies_path, 'w', encoding='utf-8') as f:
json.dump(storage, f)
self.log(f"Cookies已保存")
return True
except Exception as e:
self.log(f"保存cookies失败: {e}")
return False
def load_cookies(self, username: str) -> bool:
"""加载已保存的cookies"""
import os
cookies_path = self.get_cookies_path(username)
if not os.path.exists(cookies_path):
return False
try:
# 检查cookies文件是否过期(24小时)
import time as time_module
file_age = time_module.time() - os.path.getmtime(cookies_path)
if file_age > 24 * 3600: # 24小时
self.log(f"Cookies已过期,需要重新登录")
os.remove(cookies_path)
return False
with open(cookies_path, 'r', encoding='utf-8') as f:
storage = json.load(f)
# 创建带cookies的浏览器上下文
self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(
self.proxy_config,
storage_state=storage
)
self.page = self.context.new_page()
self.main_page = self.page
return True
except Exception as e:
self.log(f"加载cookies失败: {e}")
return False
def check_login_state(self) -> bool:
"""检查当前是否处于登录状态"""
try:
# 访问首页检查是否跳转到登录页
self.page.goto('https://postoa.aidunsoft.com/admin/index.aspx', timeout=15000)
self.page.wait_for_load_state('networkidle', timeout=10000)
current_url = self.page.url
# 如果还在index页面,说明登录态有效
if 'index.aspx' in current_url:
return True
return False
except:
return False
def quick_login(self, username: str, password: str, remember: bool = True):
"""快速登录 - 使用池中浏览器时直接登录,否则尝试cookies"""
# 如果已有浏览器实例(从池中获取),直接使用该浏览器登录
# 不尝试加载cookies,因为load_cookies会创建新浏览器覆盖池中的
if self.browser and self.browser.is_connected():
self.log("使用池中浏览器,直接登录")
result = self.login(username, password, remember)
if result.get('success'):
self.save_cookies(username)
result['used_cookies'] = False
return result
# 无现有浏览器时,尝试使用cookies
if self.load_cookies(username):
self.log(f"尝试使用已保存的登录态...")
if self.check_login_state():
self.log(f"✓ 登录态有效,跳过登录")
return {"success": True, "message": "使用已保存的登录态", "used_cookies": True}
else:
self.log(f"登录态已失效,重新登录")
# 关闭当前context,重新登录
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
except:
pass
# 正常登录
result = self.login(username, password, remember)
# 登录成功后保存cookies
if result.get('success'):
self.save_cookies(username)
result['used_cookies'] = False
return result
def login(self, username: str, password: str, remember: bool = True) -> bool:
"""
登录系统
Args:
username: 用户名
password: 密码
remember: 是否记住密码
Returns:
是否登录成功
"""
try:
start_time = time.time()
# 如果已有浏览器实例(从浏览器池获取),只创建context
if self.browser and self.browser.is_connected():
self.log("使用池中浏览器...")
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'device_scale_factor': 2,
}
self.context = self.browser.new_context(**context_options)
else:
# 创建新的浏览器和上下文
self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(self.proxy_config)
elapsed = time.time() - start_time
# self.log("浏览器就绪") # 精简日志
# self.log("创建页面...") # 精简日志
self.page = self.context.new_page()
self.main_page = self.page
# self.log("访问登录页面...") # 精简日志
# 使用重试机制处理超时
max_retries = 2
for attempt in range(max_retries):
try:
self.page.goto(config.ZSGL_LOGIN_URL, timeout=60000)
break
except Exception as e:
if attempt < max_retries - 1:
self.log(f"页面加载超时,重试中... ({attempt + 1}/{max_retries})")
time.sleep(2)
else:
raise
# self.log("填写登录信息...") # 精简日志
self.page.fill('#txtUserName', username)
self.page.fill('#txtPassword', password)
if remember:
self.page.check('#chkRemember')
# self.log("点击登录按钮...") # 精简日志
self.page.click('#btnSubmit')
# 等待跳转
# self.log("等待登录处理...") # 精简日志
self.page.wait_for_load_state('networkidle', timeout=30000) # 增加到30秒
# 检查登录结果
current_url = self.page.url
self.log(f"当前URL: {current_url}")
if config.ZSGL_INDEX_URL_PATTERN in current_url:
self.log("登录成功!")
return {"success": True, "error_type": None, "message": "登录成功"}
else:
# 检查是否显示了错误提示
error_message = "登录失败"
error_type = "unknown" # 默认为未知错误,不是密码错误
try:
# 尝试获取页面上的错误提示
error_element = self.page.locator('#lblMsg, .error-message, [class*="error"]').first
if error_element.is_visible(timeout=2000):
error_text = error_element.inner_text().strip()
if error_text:
error_message = error_text
self.log(f"登录错误提示: {error_text}")
# 只有明确提示密码错误时才标记为密码错误
if "密码" in error_text or "password" in error_text.lower() or "用户名" in error_text or "账号" in error_text:
error_type = "password_error"
else:
error_type = "login_error"
except:
pass
# 如果没有明确的错误提示,可能是网络问题,不认为是密码错误
if error_type == "unknown":
error_message = "登录失败,可能是网络问题或页面加载超时"
error_type = "network_error"
self.log(error_message)
return {"success": False, "error_type": error_type, "message": error_message}
except Exception as e:
error_msg = str(e)
self.log(f"登录过程中出错: {error_msg}")
return {"success": False, "error_type": "exception", "message": error_msg}
def is_context_error(self, error_msg: str) -> bool:
"""检查是否是上下文/导航相关错误"""
error_keywords = [
"Frame was detached",
"Execution context was destroyed",
"navigation",
"detached",
"Target closed",
"Session closed",
"Connection closed"
]
error_lower = error_msg.lower()
return any(keyword.lower() in error_lower for keyword in error_keywords)
def safe_execute(self, action, description="操作", max_retries=3, recover_browse_type=None):
"""安全执行操作,自动处理上下文销毁等错误
Args:
action: 要执行的函数
description: 操作描述(用于日志)
max_retries: 最大重试次数
recover_browse_type: 恢复时需要重新点击的浏览类型
Returns:
(success, result) 元组
"""
last_error = None
for attempt in range(max_retries):
try:
result = action()
return True, result
except Exception as e:
last_error = str(e)
if self.is_context_error(last_error):
if attempt < max_retries - 1:
self.log(f"⚠ {description}时上下文失效,尝试恢复... ({attempt+1}/{max_retries})")
time.sleep(1 + attempt * 0.5)
# 尝试恢复iframe
if self.recover_iframe(recover_browse_type):
continue
else:
self.log(f" iframe恢复失败,继续重试...")
else:
self.log(f"✗ {description}失败,已重试{max_retries}次: {last_error}")
else:
# 非上下文错误,直接返回失败
self.log(f"✗ {description}失败: {last_error}")
return False, None
return False, None
def get_iframe_safe(self, retry=True, max_retries=5):
"""安全地获取iframe,带重试机制
Args:
retry: 是否启用重试
max_retries: 最大重试次数
"""
for attempt in range(max_retries if retry else 1):
try:
# 先检查main_page是否有效
if not self.main_page:
self.log("⚠ main_page无效")
return None
iframe = self.main_page.frame('mainframe')
if iframe:
return iframe
except Exception as e:
error_msg = str(e)
if self.is_context_error(error_msg):
self.log(f"⚠ 获取iframe时上下文失效,等待恢复... ({attempt+1}/{max_retries})")
else:
self.log(f"⚠ 获取iframe出错: {error_msg}")
if attempt < max_retries - 1:
time.sleep(0.5 + attempt * 0.3) # 递增等待时间
return None
def recover_iframe(self, browse_type: str = None) -> bool:
"""尝试恢复iframe连接
当遇到 Frame was detached / Execution context was destroyed 错误时调用此函数
采用多级恢复策略,逐步升级恢复力度
"""
self.log("🔄 尝试恢复iframe连接...")
# 方法1: 直接尝试获取iframe(最快,适用于短暂的上下文切换)
self.page = self.get_iframe_safe(retry=True, max_retries=3)
if self.page:
self.log("✓ iframe恢复成功(直接获取)")
return True
# 方法2: 等待页面稳定后重试(适用于页面正在加载的情况)
self.log(" 等待页面稳定...")
time.sleep(1.5)
try:
self.main_page.wait_for_load_state('domcontentloaded', timeout=5000)
except:
pass
try:
self.main_page.wait_for_load_state('networkidle', timeout=10000)
except:
pass
self.page = self.get_iframe_safe(retry=True, max_retries=3)
if self.page:
self.log("✓ iframe恢复成功(等待后获取)")
return True
# 方法3: 使用JavaScript强制等待并获取iframe
self.log(" 尝试JavaScript方式获取iframe...")
try:
# 等待iframe存在
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=5000)
# 使用evaluate确保iframe可用
has_iframe = self.main_page.evaluate("""() => {
const iframe = document.querySelector('iframe[name="mainframe"]');
return iframe && iframe.contentWindow && iframe.contentDocument;
}""")
if has_iframe:
time.sleep(0.5)
self.page = self.get_iframe_safe(retry=True, max_retries=3)
if self.page:
self.log("✓ iframe恢复成功(JavaScript验证后获取)")
return True
except Exception as e:
self.log(f" JavaScript方式失败: {str(e)[:50]}")
# 方法4: 刷新页面并重新切换(最后手段)
self.log(" 刷新页面重试...")
try:
self.main_page.reload(wait_until='domcontentloaded')
time.sleep(2)
# 等待iframe出现
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=15000)
time.sleep(1)
self.page = self.get_iframe_safe(retry=True, max_retries=5)
if self.page:
# 如果有浏览类型,重新点击
if browse_type:
self.log(f" 重新点击'{browse_type}'...")
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
try:
self.page.locator(selector).click(timeout=5000)
time.sleep(1.5)
# 等待表格加载
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=10000)
except:
pass
self.log(f"✓ iframe恢复成功(刷新后重新点击'{browse_type}')")
except:
# 尝试点击label
try:
label_selector = f"//label[contains(text(), '{browse_type}')]"
self.page.locator(label_selector).click(timeout=5000)
time.sleep(1.5)
self.log(f"✓ iframe恢复成功(刷新后点击label)")
except Exception as label_e:
self.log(f" 点击label也失败: {str(label_e)[:30]}")
return False
else:
self.log("✓ iframe恢复成功(刷新后获取)")
return True
except Exception as e:
self.log(f"✗ 刷新恢复失败: {str(e)[:50]}")
self.log("✗ iframe恢复失败,所有方法都已尝试")
return False
def switch_to_iframe(self) -> bool:
"""切换到mainframe iframe"""
try:
# self.log("查找并切换到iframe...") # 精简日志
# 使用Playwright的等待机制
max_retries = 3
for i in range(max_retries):
try:
# 等待iframe元素出现
self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=2000)
# 获取iframe
iframe = self.get_iframe_safe()
if iframe:
self.page = iframe
self.log(f"✓ 成功切换到iframe (尝试 {i+1}/{max_retries})")
return True
except Exception as e:
if i < max_retries - 1:
self.log(f"未找到iframe,重试中... ({i+1}/{max_retries})")
time.sleep(1)
else:
self.log(f"所有重试都失败,未找到iframe")
return False
except Exception as e:
self.log(f"切换到iframe时出错: {str(e)}")
return False
def safe_click(self, locator, timeout=5000, description="元素"):
"""安全地点击元素,捕获导航异常"""
try:
locator.click(timeout=timeout)
return True
except Exception as e:
error_msg = str(e)
if "Execution context was destroyed" in error_msg or "navigation" in error_msg.lower():
self.log(f"⚠ 点击{description}时检测到页面导航,等待页面稳定...")
time.sleep(1)
return True # 虽然有异常,但导航成功,返回True
else:
self.log(f"点击{description}失败: {error_msg}")
return False
def switch_browse_type(self, browse_type: str, max_retries: int = 2) -> bool:
"""
切换浏览类型(带重试机制)
Args:
browse_type: 浏览类型(注册前未读/应读/已读)
max_retries: 最大重试次数(默认2次)
Returns:
是否切换成功
"""
for attempt in range(max_retries + 1):
try:
if attempt > 0:
self.log(f"⚠ 第 {attempt + 1} 次尝试切换浏览类型...")
else:
self.log(f"切换到'{browse_type}'类型...")
# 切换到iframe
if not self.switch_to_iframe():
if attempt < max_retries:
self.log(f"iframe切换失败,等待1秒后重试...")
time.sleep(1)
continue
return False
# 方法1: 尝试查找标签(如果JavaScript创建了的话)
selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]"
try:
# 等待并点击
self.page.locator(selector).click(timeout=5000)
self.log(f"点击'{browse_type}'按钮成功")
# 等待页面刷新并加载内容
time.sleep(1.5)
# 等待表格加载(最多等待30秒)
try:
self.page.locator("//table[@class='ltable']").wait_for(timeout=30000)
self.log("内容表格已加载")
except Exception as e:
self.log("等待表格加载超时,继续...")
return True
except Exception as e:
error_msg = str(e)
if "Execution context was destroyed" in error_msg:
self.log(f"⚠ 检测到执行上下文被销毁")
if attempt < max_retries:
self.log(f"等待2秒后重试...")
time.sleep(2)
continue
self.log(f"未找到标签,尝试点击