#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Playwright版本 - 知识管理系统自动化核心 使用浏览器上下文(Context)实现高性能并发 """ import os from pathlib import Path from playwright.sync_api import sync_playwright, Browser, BrowserContext, Page, Playwright import time import json import threading from typing import Optional, Callable from dataclasses import dataclass from app_config import get_config # 设置浏览器安装路径(优先使用环境变量,否则使用默认路径) if 'PLAYWRIGHT_BROWSERS_PATH' not in os.environ: # 本地开发环境,使用Windows默认路径 BROWSERS_PATH = str(Path.home() / "AppData" / "Local" / "ms-playwright") os.environ["PLAYWRIGHT_BROWSERS_PATH"] = BROWSERS_PATH else: # Docker环境,使用已设置的环境变量 BROWSERS_PATH = os.environ["PLAYWRIGHT_BROWSERS_PATH"] # 获取配置 config = get_config() @dataclass class BrowseResult: """浏览结果""" success: bool total_items: int = 0 total_attachments: int = 0 error_message: str = "" class PlaywrightBrowserManager: """Playwright浏览器管理器 - 每个账号独立的浏览器实例""" def __init__(self, headless: bool = True, log_callback: Optional[Callable] = None): """ 初始化浏览器管理器 Args: headless: 是否使用无头模式 log_callback: 日志回调函数,签名: log_callback(message, account_id=None) """ self.headless = headless self.log_callback = log_callback self._lock = threading.Lock() def log(self, message: str, account_id: Optional[str] = None): """记录日志""" if self.log_callback: self.log_callback(message, account_id) def create_browser(self, proxy_config=None): """创建新的独立浏览器实例(每个账号独立)""" try: # self.log("初始化Playwright实例...") # 精简日志 playwright = sync_playwright().start() # self.log("启动独立浏览器进程...") # 精简日志 start_time = time.time() # 准备浏览器启动参数 launch_options = { 'headless': self.headless, 'args': [ '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu', '--disable-extensions', '--disable-notifications', '--disable-infobars', '--disable-default-apps', '--disable-background-timer-throttling', '--disable-backgrounding-occluded-windows', '--disable-renderer-backgrounding', ] } # 如果有代理配置,添加代理 if proxy_config and proxy_config.get('server'): launch_options['proxy'] = { 'server': proxy_config['server'] } self.log(f"使用代理: {proxy_config['server']}") browser = playwright.chromium.launch(**launch_options) elapsed = time.time() - start_time # self.log(f"独立浏览器启动成功") # 精简日志 return playwright, browser except Exception as e: self.log(f"启动浏览器失败: {str(e)}") raise def create_browser_and_context(self, proxy_config=None, storage_state=None): """创建独立的浏览器和上下文(每个账号完全隔离)""" playwright, browser = self.create_browser(proxy_config) start_time = time.time() # self.log("创建浏览器上下文...") # 精���日志 context_options = { 'viewport': {'width': 1920, 'height': 1080}, 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'device_scale_factor': 2, # 2倍设备像素比,提高文字清晰度 } if storage_state: context_options['storage_state'] = storage_state context = browser.new_context(**context_options) # 设置默认超时 context.set_default_timeout(config.DEFAULT_TIMEOUT) context.set_default_navigation_timeout(config.PAGE_LOAD_TIMEOUT) elapsed = time.time() - start_time # self.log(f"上下文创建完成") # 精简日志 return playwright, browser, context class PlaywrightAutomation: """Playwright自动化操作类""" def __init__(self, browser_manager: PlaywrightBrowserManager, account_id: str, proxy_config: Optional[dict] = None): """ 初始化自动化操作 Args: browser_manager: 浏览器管理器 account_id: 账号ID(用于日志) """ self.browser_manager = browser_manager self.account_id = account_id self.proxy_config = proxy_config self.playwright: Optional[Playwright] = None self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None self.main_page: Optional[Page] = None def log(self, message: str): """记录日志""" self.browser_manager.log(message, self.account_id) # Cookies存储目录 COOKIES_DIR = '/app/data/cookies' def get_cookies_path(self, username: str) -> str: """获取cookies文件路径""" import os os.makedirs(self.COOKIES_DIR, exist_ok=True) # 用用户名的hash作为文件名,避免特殊字符问题 import hashlib filename = hashlib.md5(username.encode()).hexdigest() + '.json' return os.path.join(self.COOKIES_DIR, filename) def save_cookies(self, username: str): """保存当前会话的cookies""" try: if self.context: storage = self.context.storage_state() cookies_path = self.get_cookies_path(username) with open(cookies_path, 'w', encoding='utf-8') as f: json.dump(storage, f) self.log(f"Cookies已保存") return True except Exception as e: self.log(f"保存cookies失败: {e}") return False def load_cookies(self, username: str) -> bool: """加载已保存的cookies""" import os cookies_path = self.get_cookies_path(username) if not os.path.exists(cookies_path): return False try: # 检查cookies文件是否过期(24小时) import time as time_module file_age = time_module.time() - os.path.getmtime(cookies_path) if file_age > 24 * 3600: # 24小时 self.log(f"Cookies已过期,需要重新登录") os.remove(cookies_path) return False with open(cookies_path, 'r', encoding='utf-8') as f: storage = json.load(f) # 创建带cookies的浏览器上下文 self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context( self.proxy_config, storage_state=storage ) self.page = self.context.new_page() self.main_page = self.page return True except Exception as e: self.log(f"加载cookies失败: {e}") return False def check_login_state(self) -> bool: """检查当前是否处于登录状态""" try: # 访问首页检查是否跳转到登录页 self.page.goto('https://postoa.aidunsoft.com/admin/index.aspx', timeout=15000) self.page.wait_for_load_state('networkidle', timeout=10000) current_url = self.page.url # 如果还在index页面,说明登录态有效 if 'index.aspx' in current_url: return True return False except: return False def quick_login(self, username: str, password: str, remember: bool = True): """快速登录 - 使用池中浏览器时直接登录,否则尝试cookies""" # 如果已有浏览器实例(从池中获取),直接使用该浏览器登录 # 不尝试加载cookies,因为load_cookies会创建新浏览器覆盖池中的 if self.browser and self.browser.is_connected(): self.log("使用池中浏览器,直接登录") result = self.login(username, password, remember) if result.get('success'): self.save_cookies(username) result['used_cookies'] = False return result # 无现有浏览器时,尝试使用cookies if self.load_cookies(username): self.log(f"尝试使用已保存的登录态...") if self.check_login_state(): self.log(f"✓ 登录态有效,跳过登录") return {"success": True, "message": "使用已保存的登录态", "used_cookies": True} else: self.log(f"登录态已失效,重新登录") # 关闭当前context,重新登录 try: if self.context: self.context.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() except: pass # 正常登录 result = self.login(username, password, remember) # 登录成功后保存cookies if result.get('success'): self.save_cookies(username) result['used_cookies'] = False return result def login(self, username: str, password: str, remember: bool = True) -> bool: """ 登录系统 Args: username: 用户名 password: 密码 remember: 是否记住密码 Returns: 是否登录成功 """ try: start_time = time.time() # 如果已有浏览器实例(从浏览器池获取),只创建context if self.browser and self.browser.is_connected(): self.log("使用池中浏览器...") context_options = { 'viewport': {'width': 1920, 'height': 1080}, 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'device_scale_factor': 2, } self.context = self.browser.new_context(**context_options) else: # 创建新的浏览器和上下文 self.playwright, self.browser, self.context = self.browser_manager.create_browser_and_context(self.proxy_config) elapsed = time.time() - start_time # self.log("浏览器就绪") # 精简日志 # self.log("创建页面...") # 精简日志 self.page = self.context.new_page() self.main_page = self.page # self.log("访问登录页面...") # 精简日志 # 使用重试机制处理超时 max_retries = 2 for attempt in range(max_retries): try: self.page.goto(config.ZSGL_LOGIN_URL, timeout=60000) break except Exception as e: if attempt < max_retries - 1: self.log(f"页面加载超时,重试中... ({attempt + 1}/{max_retries})") time.sleep(2) else: raise # self.log("填写登录信息...") # 精简日志 self.page.fill('#txtUserName', username) self.page.fill('#txtPassword', password) if remember: self.page.check('#chkRemember') # self.log("点击登录按钮...") # 精简日志 self.page.click('#btnSubmit') # 等待跳转 # self.log("等待登录处理...") # 精简日志 self.page.wait_for_load_state('networkidle', timeout=30000) # 增加到30秒 # 检查登录结果 current_url = self.page.url self.log(f"当前URL: {current_url}") if config.ZSGL_INDEX_URL_PATTERN in current_url: self.log("登录成功!") return {"success": True, "error_type": None, "message": "登录成功"} else: # 检查是否显示了错误提示 error_message = "登录失败" error_type = "unknown" # 默认为未知错误,不是密码错误 try: # 尝试获取页面上的错误提示 error_element = self.page.locator('#lblMsg, .error-message, [class*="error"]').first if error_element.is_visible(timeout=2000): error_text = error_element.inner_text().strip() if error_text: error_message = error_text self.log(f"登录错误提示: {error_text}") # 只有明确提示密码错误时才标记为密码错误 if "密码" in error_text or "password" in error_text.lower() or "用户名" in error_text or "账号" in error_text: error_type = "password_error" else: error_type = "login_error" except: pass # 如果没有明确的错误提示,可能是网络问题,不认为是密码错误 if error_type == "unknown": error_message = "登录失败,可能是网络问题或页面加载超时" error_type = "network_error" self.log(error_message) return {"success": False, "error_type": error_type, "message": error_message} except Exception as e: error_msg = str(e) self.log(f"登录过程中出错: {error_msg}") return {"success": False, "error_type": "exception", "message": error_msg} def is_context_error(self, error_msg: str) -> bool: """检查是否是上下文/导航相关错误""" error_keywords = [ "Frame was detached", "Execution context was destroyed", "navigation", "detached", "Target closed", "Session closed", "Connection closed" ] error_lower = error_msg.lower() return any(keyword.lower() in error_lower for keyword in error_keywords) def safe_execute(self, action, description="操作", max_retries=3, recover_browse_type=None): """安全执行操作,自动处理上下文销毁等错误 Args: action: 要执行的函数 description: 操作描述(用于日志) max_retries: 最大重试次数 recover_browse_type: 恢复时需要重新点击的浏览类型 Returns: (success, result) 元组 """ last_error = None for attempt in range(max_retries): try: result = action() return True, result except Exception as e: last_error = str(e) if self.is_context_error(last_error): if attempt < max_retries - 1: self.log(f"⚠ {description}时上下文失效,尝试恢复... ({attempt+1}/{max_retries})") time.sleep(1 + attempt * 0.5) # 尝试恢复iframe if self.recover_iframe(recover_browse_type): continue else: self.log(f" iframe恢复失败,继续重试...") else: self.log(f"✗ {description}失败,已重试{max_retries}次: {last_error}") else: # 非上下文错误,直接返回失败 self.log(f"✗ {description}失败: {last_error}") return False, None return False, None def get_iframe_safe(self, retry=True, max_retries=5): """安全地获取iframe,带重试机制 Args: retry: 是否启用重试 max_retries: 最大重试次数 """ for attempt in range(max_retries if retry else 1): try: # 先检查main_page是否有效 if not self.main_page: self.log("⚠ main_page无效") return None iframe = self.main_page.frame('mainframe') if iframe: return iframe except Exception as e: error_msg = str(e) if self.is_context_error(error_msg): self.log(f"⚠ 获取iframe时上下文失效,等待恢复... ({attempt+1}/{max_retries})") else: self.log(f"⚠ 获取iframe出错: {error_msg}") if attempt < max_retries - 1: time.sleep(0.5 + attempt * 0.3) # 递增等待时间 return None def recover_iframe(self, browse_type: str = None) -> bool: """尝试恢复iframe连接 当遇到 Frame was detached / Execution context was destroyed 错误时调用此函数 采用多级恢复策略,逐步升级恢复力度 """ self.log("🔄 尝试恢复iframe连接...") # 方法1: 直接尝试获取iframe(最快,适用于短暂的上下文切换) self.page = self.get_iframe_safe(retry=True, max_retries=3) if self.page: self.log("✓ iframe恢复成功(直接获取)") return True # 方法2: 等待页面稳定后重试(适用于页面正在加载的情况) self.log(" 等待页面稳定...") time.sleep(1.5) try: self.main_page.wait_for_load_state('domcontentloaded', timeout=5000) except: pass try: self.main_page.wait_for_load_state('networkidle', timeout=10000) except: pass self.page = self.get_iframe_safe(retry=True, max_retries=3) if self.page: self.log("✓ iframe恢复成功(等待后获取)") return True # 方法3: 使用JavaScript强制等待并获取iframe self.log(" 尝试JavaScript方式获取iframe...") try: # 等待iframe存在 self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=5000) # 使用evaluate确保iframe可用 has_iframe = self.main_page.evaluate("""() => { const iframe = document.querySelector('iframe[name="mainframe"]'); return iframe && iframe.contentWindow && iframe.contentDocument; }""") if has_iframe: time.sleep(0.5) self.page = self.get_iframe_safe(retry=True, max_retries=3) if self.page: self.log("✓ iframe恢复成功(JavaScript验证后获取)") return True except Exception as e: self.log(f" JavaScript方式失败: {str(e)[:50]}") # 方法4: 刷新页面并重新切换(最后手段) self.log(" 刷新页面重试...") try: self.main_page.reload(wait_until='domcontentloaded') time.sleep(2) # 等待iframe出现 self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=15000) time.sleep(1) self.page = self.get_iframe_safe(retry=True, max_retries=5) if self.page: # 如果有浏览类型,重新点击 if browse_type: self.log(f" 重新点击'{browse_type}'...") selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]" try: self.page.locator(selector).click(timeout=5000) time.sleep(1.5) # 等待表格加载 try: self.page.locator("//table[@class='ltable']").wait_for(timeout=10000) except: pass self.log(f"✓ iframe恢复成功(刷新后重新点击'{browse_type}')") except: # 尝试点击label try: label_selector = f"//label[contains(text(), '{browse_type}')]" self.page.locator(label_selector).click(timeout=5000) time.sleep(1.5) self.log(f"✓ iframe恢复成功(刷新后点击label)") except Exception as label_e: self.log(f" 点击label也失败: {str(label_e)[:30]}") return False else: self.log("✓ iframe恢复成功(刷新后获取)") return True except Exception as e: self.log(f"✗ 刷新恢复失败: {str(e)[:50]}") self.log("✗ iframe恢复失败,所有方法都已尝试") return False def switch_to_iframe(self) -> bool: """切换到mainframe iframe""" try: # self.log("查找并切换到iframe...") # 精简日志 # 使用Playwright的等待机制 max_retries = 3 for i in range(max_retries): try: # 等待iframe元素出现 self.main_page.wait_for_selector("iframe[name='mainframe']", timeout=2000) # 获取iframe iframe = self.get_iframe_safe() if iframe: self.page = iframe self.log(f"✓ 成功切换到iframe (尝试 {i+1}/{max_retries})") return True except Exception as e: if i < max_retries - 1: self.log(f"未找到iframe,重试中... ({i+1}/{max_retries})") time.sleep(1) else: self.log(f"所有重试都失败,未找到iframe") return False except Exception as e: self.log(f"切换到iframe时出错: {str(e)}") return False def safe_click(self, locator, timeout=5000, description="元素"): """安全地点击元素,捕获导航异常""" try: locator.click(timeout=timeout) return True except Exception as e: error_msg = str(e) if "Execution context was destroyed" in error_msg or "navigation" in error_msg.lower(): self.log(f"⚠ 点击{description}时检测到页面导航,等待页面稳定...") time.sleep(1) return True # 虽然有异常,但导航成功,返回True else: self.log(f"点击{description}失败: {error_msg}") return False def switch_browse_type(self, browse_type: str, max_retries: int = 2) -> bool: """ 切换浏览类型(带重试机制) Args: browse_type: 浏览类型(注册前未读/应读/已读) max_retries: 最大重试次数(默认2次) Returns: 是否切换成功 """ for attempt in range(max_retries + 1): try: if attempt > 0: self.log(f"⚠ 第 {attempt + 1} 次尝试切换浏览类型...") else: self.log(f"切换到'{browse_type}'类型...") # 切换到iframe if not self.switch_to_iframe(): if attempt < max_retries: self.log(f"iframe切换失败,等待1秒后重试...") time.sleep(1) continue return False # 方法1: 尝试查找标签(如果JavaScript创建了的话) selector = f"//div[contains(@class, 'rule-multi-radio')]//a[contains(text(), '{browse_type}')]" try: # 等待并点击 self.page.locator(selector).click(timeout=5000) self.log(f"点击'{browse_type}'按钮成功") # 等待页面刷新并加载内容 time.sleep(1.5) # 等待表格加载(最多等待30秒) try: self.page.locator("//table[@class='ltable']").wait_for(timeout=30000) self.log("内容表格已加载") except Exception as e: self.log("等待表格加载超时,继续...") return True except Exception as e: error_msg = str(e) if "Execution context was destroyed" in error_msg: self.log(f"⚠ 检测到执行上下文被销毁") if attempt < max_retries: self.log(f"等待2秒后重试...") time.sleep(2) continue self.log(f"未找到标签,尝试点击