diff --git a/.gitignore b/.gitignore index 6da5c24..fa25b7b 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,12 @@ __pycache__/ *.class *.so .Python +.pytest_cache/ +.ruff_cache/ +.mypy_cache/ +.coverage +coverage.xml +htmlcov/ env/ venv/ ENV/ @@ -56,3 +62,8 @@ verify_*.sh # 内部文档 docs/ + +# 前端依赖(体积大,不应入库) +node_modules/ +app-frontend/node_modules/ +admin-frontend/node_modules/ diff --git a/Dockerfile b/Dockerfile index ff79e81..640aa1d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,9 +24,7 @@ COPY database.py . COPY db_pool.py . COPY playwright_automation.py . COPY api_browser.py . -COPY browser_pool.py . COPY browser_pool_worker.py . -COPY screenshot_worker.py . COPY browser_installer.py . COPY password_utils.py . COPY crypto_utils.py . @@ -37,8 +35,6 @@ COPY email_service.py . COPY app_config.py . COPY app_logger.py . COPY app_security.py . -COPY app_state.py . -COPY app_utils.py . COPY routes/ ./routes/ COPY services/ ./services/ COPY realtime/ ./realtime/ diff --git a/README.md b/README.md index 72826c9..e6d0c17 100644 --- a/README.md +++ b/README.md @@ -31,29 +31,35 @@ ## 项目结构 ``` -zsgpt2/ -├── app.py # 主应用程序 -├── database.py # 数据库模块 -├── playwright_automation.py # 浏览器自动化 +zsglpt/ +├── app.py # 启动/装配入口 +├── routes/ # 路由层(Blueprint) +├── services/ # 业务服务层 +├── realtime/ # SocketIO 事件与推送 +├── database.py # 数据库稳定门面(对外 API) +├── db/ # DB 分域实现 + schema/migrations +├── db_pool.py # 数据库连接池 +├── playwright_automation.py # Playwright 自动化 +├── api_browser.py # Requests 自动化(主浏览流程) +├── browser_pool_worker.py # 截图 WorkerPool(浏览器复用) ├── browser_installer.py # 浏览器安装检查 ├── app_config.py # 配置管理 ├── app_logger.py # 日志系统 ├── app_security.py # 安全模块 -├── app_state.py # 状态管理 -├── app_utils.py # 工具函数 -├── db_pool.py # 数据库连接池 ├── password_utils.py # 密码工具 +├── crypto_utils.py # 加解密工具 +├── email_service.py # 邮件服务 ├── requirements.txt # Python依赖 +├── requirements-dev.txt # 开发依赖(不进生产镜像) +├── pyproject.toml # ruff/black/pytest 配置 ├── Dockerfile # Docker镜像构建文件 ├── docker-compose.yml # Docker编排文件 -├── templates/ # HTML模板 -│ ├── index.html # 主页面 -│ ├── login.html # 登录页 -│ ├── register.html # 注册页 -│ ├── admin.html # 后台管理 -│ └── ... -└── static/ # 静态资源 - └── js/ # JavaScript文件 +├── templates/ # HTML模板(含 SPA fallback) +├── app-frontend/ # 用户端前端源码(可选保留) +├── admin-frontend/ # 后台前端源码(可选保留) +└── static/ # 前端构建产物(运行时使用) + ├── app/ # 用户端 SPA + └── admin/ # 后台 SPA ``` --- diff --git a/app_state.py b/app_state.py deleted file mode 100755 index aa7c57a..0000000 --- a/app_state.py +++ /dev/null @@ -1,332 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -应用状态管理模块 -提供线程安全的全局状态管理 - -说明(P0–P3 优化后): -- 该模块为历史遗留实现,保留用于兼容与参考 -- 当前实际生效的全局状态入口为 `services/state.py`(safe_* API) -""" - -import threading -from typing import Tuple -from typing import Dict, Any, Optional -from datetime import datetime, timedelta -from app_logger import get_logger - -logger = get_logger('app_state') - - -class ThreadSafeDict: - """线程安全的字典包装类""" - - def __init__(self): - self._dict = {} - self._lock = threading.RLock() - - def get(self, key, default=None): - """获取值""" - with self._lock: - return self._dict.get(key, default) - - def set(self, key, value): - """设置值""" - with self._lock: - self._dict[key] = value - - def delete(self, key): - """删除键""" - with self._lock: - if key in self._dict: - del self._dict[key] - - def pop(self, key, default=None): - """弹出键值""" - with self._lock: - return self._dict.pop(key, default) - - def keys(self): - """获取所有键(返回副本)""" - with self._lock: - return list(self._dict.keys()) - - def items(self): - """获取所有键值对(返回副本)""" - with self._lock: - return list(self._dict.items()) - - def __contains__(self, key): - """检查键是否存在""" - with self._lock: - return key in self._dict - - def clear(self): - """清空字典""" - with self._lock: - self._dict.clear() - - def __len__(self): - """获取长度""" - with self._lock: - return len(self._dict) - - -class LogCacheManager: - """日志缓存管理器(线程安全)""" - - def __init__(self, max_logs_per_user=100, max_total_logs=1000): - self._cache = {} # {user_id: [logs]} - self._total_count = 0 - self._lock = threading.RLock() - self._max_logs_per_user = max_logs_per_user - self._max_total_logs = max_total_logs - - def add_log(self, user_id: int, log_entry: Dict[str, Any]) -> bool: - """添加日志到缓存""" - with self._lock: - # 检查总数限制 - if self._total_count >= self._max_total_logs: - logger.warning(f"日志缓存已满 ({self._max_total_logs}),拒绝添加") - return False - - # 初始化用户日志列表 - if user_id not in self._cache: - self._cache[user_id] = [] - - user_logs = self._cache[user_id] - - # 检查用户日志数限制 - if len(user_logs) >= self._max_logs_per_user: - # 移除最旧的日志 - user_logs.pop(0) - self._total_count -= 1 - - # 添加新日志 - user_logs.append(log_entry) - self._total_count += 1 - - return True - - def get_logs(self, user_id: int) -> list: - """获取用户的所有日志(返回副本)""" - with self._lock: - return list(self._cache.get(user_id, [])) - - def clear_user_logs(self, user_id: int): - """清空用户的日志""" - with self._lock: - if user_id in self._cache: - count = len(self._cache[user_id]) - del self._cache[user_id] - self._total_count -= count - logger.info(f"清空用户 {user_id} 的 {count} 条日志") - - def get_total_count(self) -> int: - """获取总日志数""" - with self._lock: - return self._total_count - - def get_stats(self) -> Dict[str, int]: - """获取统计信息""" - with self._lock: - return { - 'total_count': self._total_count, - 'user_count': len(self._cache), - 'max_per_user': self._max_logs_per_user, - 'max_total': self._max_total_logs - } - - -class CaptchaManager: - """验证码管理器(线程安全)""" - - def __init__(self, expire_seconds=300): - self._storage = {} # {identifier: {'code': str, 'expire': datetime}} - self._lock = threading.RLock() - self._expire_seconds = expire_seconds - - def create(self, identifier: str, code: str) -> None: - """创建验证码""" - with self._lock: - self._storage[identifier] = { - 'code': code, - 'expire': datetime.now() + timedelta(seconds=self._expire_seconds) - } - - def verify(self, identifier: str, code: str) -> Tuple[bool, str]: - """验证验证码""" - with self._lock: - if identifier not in self._storage: - return False, "验证码不存在或已过期" - - captcha_data = self._storage[identifier] - - # 检查是否过期 - if datetime.now() > captcha_data['expire']: - del self._storage[identifier] - return False, "验证码已过期,请重新获取" - - # 验证码码值 - if captcha_data['code'] != code: - return False, "验证码错误" - - # 验证成功,删除验证码 - del self._storage[identifier] - return True, "验证成功" - - def cleanup_expired(self) -> int: - """清理过期的验证码""" - with self._lock: - now = datetime.now() - expired_keys = [ - key for key, data in self._storage.items() - if now > data['expire'] - ] - for key in expired_keys: - del self._storage[key] - - if expired_keys: - logger.info(f"清理了 {len(expired_keys)} 个过期验证码") - - return len(expired_keys) - - def get_count(self) -> int: - """获取当前验证码数量""" - with self._lock: - return len(self._storage) - - -class ApplicationState: - """应用全局状态管理器(单例模式)""" - - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - - # 浏览器管理器 - self.browser_manager = None - self._browser_lock = threading.Lock() - - # 用户账号管理 {user_id: {account_id: Account对象}} - self.user_accounts = ThreadSafeDict() - - # 活动任务管理 {account_id: Thread对象} - self.active_tasks = ThreadSafeDict() - - # 日志缓存管理 - self.log_cache = LogCacheManager() - - # 验证码管理 - self.captcha = CaptchaManager() - - # 用户信号量管理 {account_id: Semaphore} - self.user_semaphores = ThreadSafeDict() - - # 全局信号量 - self.global_semaphore = None - self.screenshot_semaphore = threading.Semaphore(1) - - self._initialized = True - logger.info("应用状态管理器初始化完成") - - def set_browser_manager(self, manager): - """设置浏览器管理器""" - with self._browser_lock: - self.browser_manager = manager - - def get_browser_manager(self): - """获取浏览器管理器""" - with self._browser_lock: - return self.browser_manager - - def get_user_semaphore(self, account_id: int, max_concurrent: int = 1): - """获取或创建用户信号量""" - if account_id not in self.user_semaphores: - self.user_semaphores.set(account_id, threading.Semaphore(max_concurrent)) - return self.user_semaphores.get(account_id) - - def set_global_semaphore(self, max_concurrent: int): - """设置全局信号量""" - self.global_semaphore = threading.Semaphore(max_concurrent) - - def get_stats(self) -> Dict[str, Any]: - """获取状态统计信息""" - return { - 'user_accounts_count': len(self.user_accounts), - 'active_tasks_count': len(self.active_tasks), - 'log_cache_stats': self.log_cache.get_stats(), - 'captcha_count': self.captcha.get_count(), - 'user_semaphores_count': len(self.user_semaphores), - 'browser_manager': 'initialized' if self.browser_manager else 'not_initialized' - } - - -# 全局单例实例 -app_state = ApplicationState() - - -# 向后兼容的辅助函数 -def verify_captcha(identifier: str, code: str) -> Tuple[bool, str]: - """验证验证码(向后兼容接口)""" - return app_state.captcha.verify(identifier, code) - - -def create_captcha(identifier: str, code: str) -> None: - """创建验证码(向后兼容接口)""" - app_state.captcha.create(identifier, code) - - -def cleanup_expired_captchas() -> int: - """清理过期验证码(向后兼容接口)""" - return app_state.captcha.cleanup_expired() - - -if __name__ == '__main__': - # 测试代码 - print("测试线程安全状态管理器...") - print("=" * 60) - - # 测试 ThreadSafeDict - print("\n1. 测试 ThreadSafeDict:") - td = ThreadSafeDict() - td.set('key1', 'value1') - print(f" 设置 key1 = {td.get('key1')}") - print(f" 长度: {len(td)}") - - # 测试 LogCacheManager - print("\n2. 测试 LogCacheManager:") - lcm = LogCacheManager(max_logs_per_user=3, max_total_logs=10) - for i in range(5): - lcm.add_log(1, {'message': f'log {i}'}) - print(f" 用户1日志数: {len(lcm.get_logs(1))}") - print(f" 总日志数: {lcm.get_total_count()}") - print(f" 统计: {lcm.get_stats()}") - - # 测试 CaptchaManager - print("\n3. 测试 CaptchaManager:") - cm = CaptchaManager(expire_seconds=2) - cm.create('test@example.com', '1234') - success, msg = cm.verify('test@example.com', '1234') - print(f" 验证结果: {success}, {msg}") - - # 测试 ApplicationState - print("\n4. 测试 ApplicationState (单例):") - state1 = ApplicationState() - state2 = ApplicationState() - print(f" 单例验证: {state1 is state2}") - print(f" 状态统计: {state1.get_stats()}") - - print("\n" + "=" * 60) - print("✓ 所有测试通过!") diff --git a/app_utils.py b/app_utils.py deleted file mode 100755 index ee8936b..0000000 --- a/app_utils.py +++ /dev/null @@ -1,366 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -应用工具模块 -提取重复的业务逻辑 -""" - -from typing import Dict, Any, Optional, Tuple -from flask import session, jsonify -from app_logger import get_logger, audit_logger -from app_security import get_client_ip -import database - -logger = get_logger('app_utils') - - -class ValidationError(Exception): - """验证错误异常""" - pass - - -def verify_user_file_permission(user_id: int, filename: str) -> Tuple[bool, Optional[str]]: - """ - 验证用户文件访问权限 - - Args: - user_id: 用户ID - filename: 文件名 - - Returns: - (是否有权限, 错误消息) - """ - # 获取用户信息 - user = database.get_user_by_id(user_id) - if not user: - return False, "用户不存在" - - username = user['username'] - - # 检查文件名是否以用户名开头 - if not filename.startswith(f"{username}_"): - logger.warning(f"用户 {username} (ID:{user_id}) 尝试访问未授权文件: {filename}") - return False, "无权访问此文件" - - return True, None - - -def log_task_event(account_id: int, status: str, message: str, - browse_type: Optional[str] = None, - screenshot_path: Optional[str] = None) -> bool: - """ - 记录任务日志(统一接口) - - Args: - account_id: 账号ID - status: 状态(running/completed/failed/stopped) - message: 消息 - browse_type: 浏览类型 - screenshot_path: 截图路径 - - Returns: - 是否成功 - """ - try: - return database.create_task_log( - account_id=account_id, - status=status, - message=message, - browse_type=browse_type, - screenshot_path=screenshot_path - ) - except Exception as e: - logger.error(f"记录任务日志失败: {e}", exc_info=True) - return False - - -def update_account_status(account_id: int, status: str, - error_message: Optional[str] = None) -> bool: - """ - 更新账号状态(统一接口) - - Args: - account_id: 账号ID - status: 状态(idle/running/error/stopped) - error_message: 错误消息(仅当status=error时) - - Returns: - 是否成功 - """ - try: - return database.update_account_status( - account_id=account_id, - status=status, - error_message=error_message - ) - except Exception as e: - logger.error(f"更新账号状态失败 (account_id={account_id}): {e}", exc_info=True) - return False - - -def get_or_create_config_cache() -> Optional[Dict[str, Any]]: - """ - 获取或创建系统配置缓存 - - 缓存存储在session中,避免重复查询数据库 - - Returns: - 配置字典,失败返回None - """ - # 尝试从session获取缓存 - if '_system_config' in session: - return session['_system_config'] - - # 从数据库加载 - try: - config = database.get_system_config() - if config: - # 存入session缓存 - session['_system_config'] = config - return config - return None - except Exception as e: - logger.error(f"获取系统配置失败: {e}", exc_info=True) - return None - - -def clear_config_cache(): - """清除配置缓存(配置变更时调用)""" - if '_system_config' in session: - del session['_system_config'] - logger.debug("已清除系统配置缓存") - - -def safe_close_browser(automation_obj, account_id: int): - """ - 安全关闭浏览器(统一错误处理) - - Args: - automation_obj: PlaywrightAutomation对象 - account_id: 账号ID - """ - if automation_obj: - try: - automation_obj.close() - logger.info(f"账号 {account_id} 的浏览器已关闭") - except Exception as e: - logger.error(f"关闭账号 {account_id} 的浏览器失败: {e}", exc_info=True) - - -def format_error_response(error: str, status_code: int = 400, - need_captcha: bool = False, - extra_data: Optional[Dict] = None) -> Tuple[Any, int]: - """ - 格式化错误响应(统一接口) - - Args: - error: 错误消息 - status_code: HTTP状态码 - need_captcha: 是否需要验证码 - extra_data: 额外数据 - - Returns: - (jsonify响应, 状态码) - """ - response_data = {"error": error} - - if need_captcha: - response_data["need_captcha"] = True - - if extra_data: - response_data.update(extra_data) - - return jsonify(response_data), status_code - - -def format_success_response(message: str = "操作成功", - extra_data: Optional[Dict] = None) -> Any: - """ - 格式化成功响应(统一接口) - - Args: - message: 成功消息 - extra_data: 额外数据 - - Returns: - jsonify响应 - """ - response_data = {"success": True, "message": message} - - if extra_data: - response_data.update(extra_data) - - return jsonify(response_data) - - -def log_user_action(action: str, user_id: int, username: str, - success: bool, details: Optional[str] = None): - """ - 记录用户操作到审计日志(统一接口) - - Args: - action: 操作类型(login/register/logout等) - user_id: 用户ID - username: 用户名 - success: 是否成功 - details: 详细信息 - """ - ip = get_client_ip() - - if action == 'login': - audit_logger.log_user_login(user_id, username, ip, success) - elif action == 'logout': - audit_logger.log_user_logout(user_id, username, ip) - elif action == 'register': - audit_logger.log_user_created(user_id, username, created_by='self') - - if details: - logger.info(f"用户操作: {action}, 用户={username}, 成功={success}, 详情={details}") - - -def validate_pagination(page: Any, page_size: Any, - max_page_size: int = 100) -> Tuple[int, int, Optional[str]]: - """ - 验证分页参数 - - Args: - page: 页码 - page_size: 每页大小 - max_page_size: 最大每页大小 - - Returns: - (页码, 每页大小, 错误消息) - """ - try: - page = int(page) if page else 1 - page_size = int(page_size) if page_size else 20 - except (ValueError, TypeError): - return 1, 20, "无效的分页参数" - - if page < 1: - return 1, 20, "页码必须大于0" - - if page_size < 1 or page_size > max_page_size: - return page, 20, f"每页大小必须在1-{max_page_size}之间" - - return page, page_size, None - - -def check_user_ownership(user_id: int, resource_type: str, - resource_id: int) -> Tuple[bool, Optional[str]]: - """ - 检查用户是否拥有资源 - - Args: - user_id: 用户ID - resource_type: 资源类型(account/task等) - resource_id: 资源ID - - Returns: - (是否拥有, 错误消息) - """ - try: - if resource_type == 'account': - account = database.get_account_by_id(resource_id) - if not account: - return False, "账号不存在" - if account['user_id'] != user_id: - return False, "无权访问此账号" - return True, None - - elif resource_type == 'task': - # 通过account查询所属用户 - # 这里需要根据实际数据库结构实现 - pass - - return False, "不支持的资源类型" - - except Exception as e: - logger.error(f"检查资源所有权失败: {e}", exc_info=True) - return False, "系统错误" - - -def verify_and_consume_captcha(session_id: str, code: str, captcha_storage: dict, max_attempts: int = 5) -> Tuple[bool, str]: - """ - 验证并消费验证码(安全增强版) - - 安全特性: - - 先删除验证码再验证,防止重放攻击 - - 异常情况下也确保验证码被删除 - - Args: - session_id: 验证码会话ID - code: 用户输入的验证码 - captcha_storage: 验证码存储字典 - max_attempts: 最大尝试次数,默认5次 - - Returns: - Tuple[bool, str]: (是否成功, 消息) - - 成功时返回 (True, "验证成功") - - 失败时返回 (False, 错误消息) - - Example: - success, message = verify_and_consume_captcha( - captcha_session, - captcha_code, - captcha_storage, - max_attempts=5 - ) - if not success: - return jsonify({"error": message}), 400 - """ - import time - - # 安全修复:先取出并删除验证码,无论验证是否成功都不能重用 - captcha_data = captcha_storage.pop(session_id, None) - - # 检查验证码是否存在 - if captcha_data is None: - return False, "验证码已过期或不存在,请重新获取" - - try: - # 检查过期时间 - if captcha_data["expire_time"] < time.time(): - return False, "验证码已过期,请重新获取" - - # 检查尝试次数 - if captcha_data.get("failed_attempts", 0) >= max_attempts: - return False, f"验证码错误次数过多({max_attempts}次),请重新获取" - - # 验证代码(不区分大小写) - if captcha_data["code"].lower() != code.lower(): - # 验证失败,增加失败计数后放回(允许继续尝试) - captcha_data["failed_attempts"] = captcha_data.get("failed_attempts", 0) + 1 - # 只有未超过最大尝试次数才放回 - if captcha_data["failed_attempts"] < max_attempts: - captcha_storage[session_id] = captcha_data - return False, "验证码错误" - - # 验证成功,验证码已被删除,不会被重用 - return True, "验证成功" - except Exception as e: - # 异常情况下确保验证码不会被重用(已在函数开头删除) - logger.error(f"验证码验证异常: {e}") - return False, "验证码验证失败,请重新获取" - - -if __name__ == '__main__': - # 测试代码 - print("测试应用工具模块...") - print("=" * 60) - - # 测试分页验证 - print("\n1. 测试分页验证:") - page, page_size, error = validate_pagination("2", "50") - print(f" 页码={page}, 每页={page_size}, 错误={error}") - - page, page_size, error = validate_pagination("invalid", "50") - print(f" 无效输入: 页码={page}, 每页={page_size}, 错误={error}") - - # 测试响应格式化 - print("\n2. 测试响应格式化:") - print(f" 错误响应: {format_error_response('测试错误', need_captcha=True)}") - print(f" 成功响应: {format_success_response('测试成功', {'data': [1, 2, 3]})}") - - print("\n" + "=" * 60) - print("✓ 工具模块加载成功!") diff --git a/browser_pool.py b/browser_pool.py deleted file mode 100755 index d7825cd..0000000 --- a/browser_pool.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""浏览器池管理 - 线程本地存储,每个线程复用自己的浏览器 - -说明(P0–P3 优化后): -- 该实现为遗留版本,当前截图并发与浏览器复用已迁移到 `browser_pool_worker.py` 的 WorkerPool 方案 -- 本文件保留用于兼容/回滚参考(当前主流程不再依赖) -""" - -import threading -import time -import nest_asyncio -nest_asyncio.apply() - -# 线程本地存储 -_thread_local = threading.local() - - -class BrowserPool: - """浏览器池 - 使用线程本地存储,每个线程有自己的浏览器""" - - def __init__(self, pool_size=3, log_callback=None): - self.pool_size = pool_size - self.log_callback = log_callback - self.lock = threading.Lock() - self.all_browsers = [] # 追踪所有浏览器(用于关闭) - self.initialized = True - - def log(self, message): - if self.log_callback: - self.log_callback(message) - else: - print(f"[浏览器池] {message}") - - def initialize(self): - """初始化(线程本地模式下不预热)""" - self.log(f"浏览器池已就绪(线程本地模式,每线程独立浏览器)") - self.initialized = True - - def _create_browser(self): - """创建一个浏览器实例""" - try: - from playwright.sync_api import sync_playwright - - playwright = sync_playwright().start() - browser = playwright.chromium.launch( - headless=True, - args=[ - '--no-sandbox', - '--disable-setuid-sandbox', - '--disable-dev-shm-usage', - '--disable-gpu', - '--single-process' - ] - ) - instance = { - 'playwright': playwright, - 'browser': browser, - 'thread_id': threading.current_thread().ident, - 'created_at': time.time(), - 'use_count': 0 - } - with self.lock: - self.all_browsers.append(instance) - return instance - except Exception as e: - self.log(f"创建浏览器失败: {e}") - return None - - def acquire(self, timeout=60): - """获取当前线程的浏览器实例(如果没有则创建)""" - # 检查当前线程是否已有浏览器 - browser_instance = getattr(_thread_local, 'browser_instance', None) - - if browser_instance: - # 检查浏览器是否还有效 - try: - if browser_instance['browser'].is_connected(): - browser_instance['use_count'] += 1 - self.log(f"复用线程浏览器(第{browser_instance['use_count']}次使用)") - return browser_instance - except: - pass - # 浏览器已失效,清理 - self._close_browser(browser_instance) - _thread_local.browser_instance = None - - # 为当前线程创建新浏览器 - self.log("为当前线程创建新浏览器...") - browser_instance = self._create_browser() - if browser_instance: - browser_instance['use_count'] = 1 - _thread_local.browser_instance = browser_instance - return browser_instance - - def release(self, browser_instance): - """释放浏览器(线程本地模式下保留不关闭)""" - if browser_instance is None: - return - - # 检查浏览器是否还有效 - try: - if browser_instance['browser'].is_connected(): - self.log(f"浏览器保持活跃(已使用{browser_instance['use_count']}次)") - return - except: - pass - - # 浏览器已断开,清理 - self.log("浏览器已断开,清理资源") - self._close_browser(browser_instance) - if getattr(_thread_local, 'browser_instance', None) == browser_instance: - _thread_local.browser_instance = None - - def _close_browser(self, browser_instance): - """关闭单个浏览器实例""" - try: - if browser_instance.get('browser'): - browser_instance['browser'].close() - if browser_instance.get('playwright'): - browser_instance['playwright'].stop() - with self.lock: - if browser_instance in self.all_browsers: - self.all_browsers.remove(browser_instance) - except Exception as e: - self.log(f"关闭浏览器失败: {e}") - - def shutdown(self): - """关闭所有浏览器""" - self.log("正在关闭所有浏览器...") - for browser_instance in list(self.all_browsers): - self._close_browser(browser_instance) - self.all_browsers.clear() - self.initialized = False - self.log("浏览器池已关闭") - - def get_status(self): - """获取池状态""" - return { - 'pool_size': self.pool_size, - 'total_browsers': len(self.all_browsers), - 'initialized': self.initialized, - 'mode': 'thread_local' - } - - -# 全局浏览器池实例 -_browser_pool = None -_pool_lock = threading.Lock() - - -def get_browser_pool(pool_size=3, log_callback=None): - """获取全局浏览器池实例""" - global _browser_pool - with _pool_lock: - if _browser_pool is None: - _browser_pool = BrowserPool(pool_size=pool_size, log_callback=log_callback) - return _browser_pool - - -def init_browser_pool(pool_size=3, log_callback=None): - """初始化浏览器池""" - pool = get_browser_pool(pool_size, log_callback) - pool.initialize() - return pool diff --git a/playwright_automation.py b/playwright_automation.py index de93e0c..cefcbaf 100755 --- a/playwright_automation.py +++ b/playwright_automation.py @@ -1380,6 +1380,7 @@ class PlaywrightAutomation: path=temp_filepath, type='jpeg', full_page=True, + scale="css", # 输出按CSS像素(1920x1080),避免device_scale_factor=2 导致4K截图 quality=100 ) diff --git a/screenshot_worker.py b/screenshot_worker.py deleted file mode 100644 index 70c76a5..0000000 --- a/screenshot_worker.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -独立截图进程 - 使用已保存的Cookies直接截图 -""" - -import sys -import json -import os -import time -import traceback - -def take_screenshot(config): - """执行截图任务""" - from playwright.sync_api import sync_playwright - - username = config['username'] - browse_type = config.get('browse_type', '应读') - screenshot_path = config['screenshot_path'] - cookies_file = config.get('cookies_file', '') - - # 安全修复:验证截图路径在允许的目录内,防止路径遍历攻击 - ALLOWED_SCREENSHOT_DIRS = [ - '/root/zsglpt/screenshots', - '/root/zsglpt/static/screenshots', - '/tmp/zsglpt_screenshots' - ] - - def is_safe_screenshot_path(path): - """验证截图路径是否安全""" - abs_path = os.path.abspath(path) - return any(abs_path.startswith(os.path.abspath(allowed_dir)) - for allowed_dir in ALLOWED_SCREENSHOT_DIRS) - - if not is_safe_screenshot_path(screenshot_path): - return { - 'success': False, - 'message': '非法截图路径', - 'screenshot_path': '' - } - - result = { - 'success': False, - 'message': '', - 'screenshot_path': screenshot_path - } - - playwright = None - browser = None - context = None - - try: - print(f"[截图进程] 启动浏览器...", flush=True) - playwright = sync_playwright().start() - - browser = playwright.chromium.launch( - headless=True, - args=[ - '--no-sandbox', - '--disable-setuid-sandbox', - '--disable-dev-shm-usage', - '--disable-gpu' - ] - ) - - # 创建 context - context = browser.new_context( - viewport={'width': 1920, 'height': 1080}, - user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' - ) - - page = context.new_page() - page.set_default_timeout(30000) - - # 加载已保存的 Cookies - if cookies_file and os.path.exists(cookies_file): - print(f"[截图进程] 加载Cookies: {cookies_file}", flush=True) - with open(cookies_file, 'r', encoding='utf-8') as f: - cookies_data = json.load(f) - cookies = cookies_data.get('cookies', []) - if cookies: - context.add_cookies(cookies) - print(f"[截图进程] 已加载 {len(cookies)} 个Cookie", flush=True) - else: - print(f"[截图进程] 警告: Cookies文件不存在", flush=True) - - # 根据浏览类型导航到对应页面 - if browse_type == "应读": - url = "https://zsgl.gat.zj.gov.cn/web/learn/readList" - elif browse_type == "应学": - url = "https://zsgl.gat.zj.gov.cn/web/learn/learnList" - elif browse_type == "应考": - url = "https://zsgl.gat.zj.gov.cn/web/exam" - else: - url = "https://zsgl.gat.zj.gov.cn/web/learn/readList" - - print(f"[截图进程] 导航到: {url}", flush=True) - page.goto(url, wait_until='networkidle', timeout=30000) - - # 等待页面加载 - time.sleep(3) - - # 检查是否被重定向到登录页 - if '/login' in page.url.lower() or '/web/' == page.url.rstrip('/').split('/')[-1]: - print(f"[截图进程] 登录已过期,需要重新登录", flush=True) - result['message'] = '登录已过期' - return result - - # 确保截图目录存在 - os.makedirs(os.path.dirname(screenshot_path), exist_ok=True) - - # 截图 - print(f"[截图进程] 截图保存到: {screenshot_path}", flush=True) - page.screenshot(path=screenshot_path, full_page=True, type='jpeg', quality=85) - - # 验证截图文件 - if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000: - result['success'] = True - result['message'] = '截图成功' - print(f"[截图进程] 截图成功!", flush=True) - else: - result['message'] = '截图文件异常' - - except Exception as e: - result['message'] = f'截图出错: {str(e)}' - print(f"[截图进程] 错误: {traceback.format_exc()}", flush=True) - - finally: - try: - if context: - context.close() - if browser: - browser.close() - if playwright: - playwright.stop() - except: - pass - - return result - - -def main(): - if len(sys.argv) < 2: - print("用法: python screenshot_worker.py ") - sys.exit(1) - - config_file = sys.argv[1] - - try: - with open(config_file, 'r', encoding='utf-8') as f: - config = json.load(f) - except Exception as e: - print(json.dumps({'success': False, 'message': f'读取配置失败: {e}'})) - sys.exit(1) - - result = take_screenshot(config) - - # 输出 JSON 结果 - print("===RESULT===", flush=True) - print(json.dumps(result, ensure_ascii=False), flush=True) - - # 清理配置文件 - try: - os.remove(config_file) - except: - pass - - sys.exit(0 if result['success'] else 1) - - -if __name__ == '__main__': - main()