清理冗余文件,更新 Dockerfile 和配置

This commit is contained in:
2025-12-14 22:49:00 +08:00
parent dac06d187e
commit 1ec0d80f6c
8 changed files with 33 additions and 1054 deletions

11
.gitignore vendored
View File

@@ -25,6 +25,12 @@ __pycache__/
*.class *.class
*.so *.so
.Python .Python
.pytest_cache/
.ruff_cache/
.mypy_cache/
.coverage
coverage.xml
htmlcov/
env/ env/
venv/ venv/
ENV/ ENV/
@@ -56,3 +62,8 @@ verify_*.sh
# 内部文档 # 内部文档
docs/ docs/
# 前端依赖(体积大,不应入库)
node_modules/
app-frontend/node_modules/
admin-frontend/node_modules/

View File

@@ -24,9 +24,7 @@ COPY database.py .
COPY db_pool.py . COPY db_pool.py .
COPY playwright_automation.py . COPY playwright_automation.py .
COPY api_browser.py . COPY api_browser.py .
COPY browser_pool.py .
COPY browser_pool_worker.py . COPY browser_pool_worker.py .
COPY screenshot_worker.py .
COPY browser_installer.py . COPY browser_installer.py .
COPY password_utils.py . COPY password_utils.py .
COPY crypto_utils.py . COPY crypto_utils.py .
@@ -37,8 +35,6 @@ COPY email_service.py .
COPY app_config.py . COPY app_config.py .
COPY app_logger.py . COPY app_logger.py .
COPY app_security.py . COPY app_security.py .
COPY app_state.py .
COPY app_utils.py .
COPY routes/ ./routes/ COPY routes/ ./routes/
COPY services/ ./services/ COPY services/ ./services/
COPY realtime/ ./realtime/ COPY realtime/ ./realtime/

View File

@@ -31,29 +31,35 @@
## 项目结构 ## 项目结构
``` ```
zsgpt2/ zsglpt/
├── app.py # 主应用程序 ├── app.py # 启动/装配入口
├── database.py # 数据库模块 ├── routes/ # 路由层Blueprint
├── playwright_automation.py # 浏览器自动化 ├── services/ # 业务服务层
├── realtime/ # SocketIO 事件与推送
├── database.py # 数据库稳定门面(对外 API
├── db/ # DB 分域实现 + schema/migrations
├── db_pool.py # 数据库连接池
├── playwright_automation.py # Playwright 自动化
├── api_browser.py # Requests 自动化(主浏览流程)
├── browser_pool_worker.py # 截图 WorkerPool浏览器复用
├── browser_installer.py # 浏览器安装检查 ├── browser_installer.py # 浏览器安装检查
├── app_config.py # 配置管理 ├── app_config.py # 配置管理
├── app_logger.py # 日志系统 ├── app_logger.py # 日志系统
├── app_security.py # 安全模块 ├── app_security.py # 安全模块
├── app_state.py # 状态管理
├── app_utils.py # 工具函数
├── db_pool.py # 数据库连接池
├── password_utils.py # 密码工具 ├── password_utils.py # 密码工具
├── crypto_utils.py # 加解密工具
├── email_service.py # 邮件服务
├── requirements.txt # Python依赖 ├── requirements.txt # Python依赖
├── requirements-dev.txt # 开发依赖(不进生产镜像)
├── pyproject.toml # ruff/black/pytest 配置
├── Dockerfile # Docker镜像构建文件 ├── Dockerfile # Docker镜像构建文件
├── docker-compose.yml # Docker编排文件 ├── docker-compose.yml # Docker编排文件
├── templates/ # HTML模板 ├── templates/ # HTML模板(含 SPA fallback
│ ├── index.html # 主页面 ├── app-frontend/ # 用户端前端源码(可选保留)
│ ├── login.html # 登录页 ├── admin-frontend/ # 后台前端源码(可选保留)
│ ├── register.html # 注册页 └── static/ # 前端构建产物(运行时使用)
├── admin.html # 后台管理 ├── app/ # 用户端 SPA
└── ... └── admin/ # 后台 SPA
└── static/ # 静态资源
└── js/ # JavaScript文件
``` ```
--- ---

View File

@@ -1,332 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用状态管理模块
提供线程安全的全局状态管理
说明P0P3 优化后):
- 该模块为历史遗留实现,保留用于兼容与参考
- 当前实际生效的全局状态入口为 `services/state.py`safe_* API
"""
import threading
from typing import Tuple
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from app_logger import get_logger
logger = get_logger('app_state')
class ThreadSafeDict:
"""线程安全的字典包装类"""
def __init__(self):
self._dict = {}
self._lock = threading.RLock()
def get(self, key, default=None):
"""获取值"""
with self._lock:
return self._dict.get(key, default)
def set(self, key, value):
"""设置值"""
with self._lock:
self._dict[key] = value
def delete(self, key):
"""删除键"""
with self._lock:
if key in self._dict:
del self._dict[key]
def pop(self, key, default=None):
"""弹出键值"""
with self._lock:
return self._dict.pop(key, default)
def keys(self):
"""获取所有键(返回副本)"""
with self._lock:
return list(self._dict.keys())
def items(self):
"""获取所有键值对(返回副本)"""
with self._lock:
return list(self._dict.items())
def __contains__(self, key):
"""检查键是否存在"""
with self._lock:
return key in self._dict
def clear(self):
"""清空字典"""
with self._lock:
self._dict.clear()
def __len__(self):
"""获取长度"""
with self._lock:
return len(self._dict)
class LogCacheManager:
"""日志缓存管理器(线程安全)"""
def __init__(self, max_logs_per_user=100, max_total_logs=1000):
self._cache = {} # {user_id: [logs]}
self._total_count = 0
self._lock = threading.RLock()
self._max_logs_per_user = max_logs_per_user
self._max_total_logs = max_total_logs
def add_log(self, user_id: int, log_entry: Dict[str, Any]) -> bool:
"""添加日志到缓存"""
with self._lock:
# 检查总数限制
if self._total_count >= self._max_total_logs:
logger.warning(f"日志缓存已满 ({self._max_total_logs}),拒绝添加")
return False
# 初始化用户日志列表
if user_id not in self._cache:
self._cache[user_id] = []
user_logs = self._cache[user_id]
# 检查用户日志数限制
if len(user_logs) >= self._max_logs_per_user:
# 移除最旧的日志
user_logs.pop(0)
self._total_count -= 1
# 添加新日志
user_logs.append(log_entry)
self._total_count += 1
return True
def get_logs(self, user_id: int) -> list:
"""获取用户的所有日志(返回副本)"""
with self._lock:
return list(self._cache.get(user_id, []))
def clear_user_logs(self, user_id: int):
"""清空用户的日志"""
with self._lock:
if user_id in self._cache:
count = len(self._cache[user_id])
del self._cache[user_id]
self._total_count -= count
logger.info(f"清空用户 {user_id}{count} 条日志")
def get_total_count(self) -> int:
"""获取总日志数"""
with self._lock:
return self._total_count
def get_stats(self) -> Dict[str, int]:
"""获取统计信息"""
with self._lock:
return {
'total_count': self._total_count,
'user_count': len(self._cache),
'max_per_user': self._max_logs_per_user,
'max_total': self._max_total_logs
}
class CaptchaManager:
"""验证码管理器(线程安全)"""
def __init__(self, expire_seconds=300):
self._storage = {} # {identifier: {'code': str, 'expire': datetime}}
self._lock = threading.RLock()
self._expire_seconds = expire_seconds
def create(self, identifier: str, code: str) -> None:
"""创建验证码"""
with self._lock:
self._storage[identifier] = {
'code': code,
'expire': datetime.now() + timedelta(seconds=self._expire_seconds)
}
def verify(self, identifier: str, code: str) -> Tuple[bool, str]:
"""验证验证码"""
with self._lock:
if identifier not in self._storage:
return False, "验证码不存在或已过期"
captcha_data = self._storage[identifier]
# 检查是否过期
if datetime.now() > captcha_data['expire']:
del self._storage[identifier]
return False, "验证码已过期,请重新获取"
# 验证码码值
if captcha_data['code'] != code:
return False, "验证码错误"
# 验证成功,删除验证码
del self._storage[identifier]
return True, "验证成功"
def cleanup_expired(self) -> int:
"""清理过期的验证码"""
with self._lock:
now = datetime.now()
expired_keys = [
key for key, data in self._storage.items()
if now > data['expire']
]
for key in expired_keys:
del self._storage[key]
if expired_keys:
logger.info(f"清理了 {len(expired_keys)} 个过期验证码")
return len(expired_keys)
def get_count(self) -> int:
"""获取当前验证码数量"""
with self._lock:
return len(self._storage)
class ApplicationState:
"""应用全局状态管理器(单例模式)"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 浏览器管理器
self.browser_manager = None
self._browser_lock = threading.Lock()
# 用户账号管理 {user_id: {account_id: Account对象}}
self.user_accounts = ThreadSafeDict()
# 活动任务管理 {account_id: Thread对象}
self.active_tasks = ThreadSafeDict()
# 日志缓存管理
self.log_cache = LogCacheManager()
# 验证码管理
self.captcha = CaptchaManager()
# 用户信号量管理 {account_id: Semaphore}
self.user_semaphores = ThreadSafeDict()
# 全局信号量
self.global_semaphore = None
self.screenshot_semaphore = threading.Semaphore(1)
self._initialized = True
logger.info("应用状态管理器初始化完成")
def set_browser_manager(self, manager):
"""设置浏览器管理器"""
with self._browser_lock:
self.browser_manager = manager
def get_browser_manager(self):
"""获取浏览器管理器"""
with self._browser_lock:
return self.browser_manager
def get_user_semaphore(self, account_id: int, max_concurrent: int = 1):
"""获取或创建用户信号量"""
if account_id not in self.user_semaphores:
self.user_semaphores.set(account_id, threading.Semaphore(max_concurrent))
return self.user_semaphores.get(account_id)
def set_global_semaphore(self, max_concurrent: int):
"""设置全局信号量"""
self.global_semaphore = threading.Semaphore(max_concurrent)
def get_stats(self) -> Dict[str, Any]:
"""获取状态统计信息"""
return {
'user_accounts_count': len(self.user_accounts),
'active_tasks_count': len(self.active_tasks),
'log_cache_stats': self.log_cache.get_stats(),
'captcha_count': self.captcha.get_count(),
'user_semaphores_count': len(self.user_semaphores),
'browser_manager': 'initialized' if self.browser_manager else 'not_initialized'
}
# 全局单例实例
app_state = ApplicationState()
# 向后兼容的辅助函数
def verify_captcha(identifier: str, code: str) -> Tuple[bool, str]:
"""验证验证码(向后兼容接口)"""
return app_state.captcha.verify(identifier, code)
def create_captcha(identifier: str, code: str) -> None:
"""创建验证码(向后兼容接口)"""
app_state.captcha.create(identifier, code)
def cleanup_expired_captchas() -> int:
"""清理过期验证码(向后兼容接口)"""
return app_state.captcha.cleanup_expired()
if __name__ == '__main__':
# 测试代码
print("测试线程安全状态管理器...")
print("=" * 60)
# 测试 ThreadSafeDict
print("\n1. 测试 ThreadSafeDict:")
td = ThreadSafeDict()
td.set('key1', 'value1')
print(f" 设置 key1 = {td.get('key1')}")
print(f" 长度: {len(td)}")
# 测试 LogCacheManager
print("\n2. 测试 LogCacheManager:")
lcm = LogCacheManager(max_logs_per_user=3, max_total_logs=10)
for i in range(5):
lcm.add_log(1, {'message': f'log {i}'})
print(f" 用户1日志数: {len(lcm.get_logs(1))}")
print(f" 总日志数: {lcm.get_total_count()}")
print(f" 统计: {lcm.get_stats()}")
# 测试 CaptchaManager
print("\n3. 测试 CaptchaManager:")
cm = CaptchaManager(expire_seconds=2)
cm.create('test@example.com', '1234')
success, msg = cm.verify('test@example.com', '1234')
print(f" 验证结果: {success}, {msg}")
# 测试 ApplicationState
print("\n4. 测试 ApplicationState (单例):")
state1 = ApplicationState()
state2 = ApplicationState()
print(f" 单例验证: {state1 is state2}")
print(f" 状态统计: {state1.get_stats()}")
print("\n" + "=" * 60)
print("✓ 所有测试通过!")

View File

@@ -1,366 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用工具模块
提取重复的业务逻辑
"""
from typing import Dict, Any, Optional, Tuple
from flask import session, jsonify
from app_logger import get_logger, audit_logger
from app_security import get_client_ip
import database
logger = get_logger('app_utils')
class ValidationError(Exception):
"""验证错误异常"""
pass
def verify_user_file_permission(user_id: int, filename: str) -> Tuple[bool, Optional[str]]:
"""
验证用户文件访问权限
Args:
user_id: 用户ID
filename: 文件名
Returns:
(是否有权限, 错误消息)
"""
# 获取用户信息
user = database.get_user_by_id(user_id)
if not user:
return False, "用户不存在"
username = user['username']
# 检查文件名是否以用户名开头
if not filename.startswith(f"{username}_"):
logger.warning(f"用户 {username} (ID:{user_id}) 尝试访问未授权文件: {filename}")
return False, "无权访问此文件"
return True, None
def log_task_event(account_id: int, status: str, message: str,
browse_type: Optional[str] = None,
screenshot_path: Optional[str] = None) -> bool:
"""
记录任务日志(统一接口)
Args:
account_id: 账号ID
status: 状态running/completed/failed/stopped
message: 消息
browse_type: 浏览类型
screenshot_path: 截图路径
Returns:
是否成功
"""
try:
return database.create_task_log(
account_id=account_id,
status=status,
message=message,
browse_type=browse_type,
screenshot_path=screenshot_path
)
except Exception as e:
logger.error(f"记录任务日志失败: {e}", exc_info=True)
return False
def update_account_status(account_id: int, status: str,
error_message: Optional[str] = None) -> bool:
"""
更新账号状态(统一接口)
Args:
account_id: 账号ID
status: 状态idle/running/error/stopped
error_message: 错误消息仅当status=error时
Returns:
是否成功
"""
try:
return database.update_account_status(
account_id=account_id,
status=status,
error_message=error_message
)
except Exception as e:
logger.error(f"更新账号状态失败 (account_id={account_id}): {e}", exc_info=True)
return False
def get_or_create_config_cache() -> Optional[Dict[str, Any]]:
"""
获取或创建系统配置缓存
缓存存储在session中避免重复查询数据库
Returns:
配置字典失败返回None
"""
# 尝试从session获取缓存
if '_system_config' in session:
return session['_system_config']
# 从数据库加载
try:
config = database.get_system_config()
if config:
# 存入session缓存
session['_system_config'] = config
return config
return None
except Exception as e:
logger.error(f"获取系统配置失败: {e}", exc_info=True)
return None
def clear_config_cache():
"""清除配置缓存(配置变更时调用)"""
if '_system_config' in session:
del session['_system_config']
logger.debug("已清除系统配置缓存")
def safe_close_browser(automation_obj, account_id: int):
"""
安全关闭浏览器(统一错误处理)
Args:
automation_obj: PlaywrightAutomation对象
account_id: 账号ID
"""
if automation_obj:
try:
automation_obj.close()
logger.info(f"账号 {account_id} 的浏览器已关闭")
except Exception as e:
logger.error(f"关闭账号 {account_id} 的浏览器失败: {e}", exc_info=True)
def format_error_response(error: str, status_code: int = 400,
need_captcha: bool = False,
extra_data: Optional[Dict] = None) -> Tuple[Any, int]:
"""
格式化错误响应(统一接口)
Args:
error: 错误消息
status_code: HTTP状态码
need_captcha: 是否需要验证码
extra_data: 额外数据
Returns:
(jsonify响应, 状态码)
"""
response_data = {"error": error}
if need_captcha:
response_data["need_captcha"] = True
if extra_data:
response_data.update(extra_data)
return jsonify(response_data), status_code
def format_success_response(message: str = "操作成功",
extra_data: Optional[Dict] = None) -> Any:
"""
格式化成功响应(统一接口)
Args:
message: 成功消息
extra_data: 额外数据
Returns:
jsonify响应
"""
response_data = {"success": True, "message": message}
if extra_data:
response_data.update(extra_data)
return jsonify(response_data)
def log_user_action(action: str, user_id: int, username: str,
success: bool, details: Optional[str] = None):
"""
记录用户操作到审计日志(统一接口)
Args:
action: 操作类型login/register/logout等
user_id: 用户ID
username: 用户名
success: 是否成功
details: 详细信息
"""
ip = get_client_ip()
if action == 'login':
audit_logger.log_user_login(user_id, username, ip, success)
elif action == 'logout':
audit_logger.log_user_logout(user_id, username, ip)
elif action == 'register':
audit_logger.log_user_created(user_id, username, created_by='self')
if details:
logger.info(f"用户操作: {action}, 用户={username}, 成功={success}, 详情={details}")
def validate_pagination(page: Any, page_size: Any,
max_page_size: int = 100) -> Tuple[int, int, Optional[str]]:
"""
验证分页参数
Args:
page: 页码
page_size: 每页大小
max_page_size: 最大每页大小
Returns:
(页码, 每页大小, 错误消息)
"""
try:
page = int(page) if page else 1
page_size = int(page_size) if page_size else 20
except (ValueError, TypeError):
return 1, 20, "无效的分页参数"
if page < 1:
return 1, 20, "页码必须大于0"
if page_size < 1 or page_size > max_page_size:
return page, 20, f"每页大小必须在1-{max_page_size}之间"
return page, page_size, None
def check_user_ownership(user_id: int, resource_type: str,
resource_id: int) -> Tuple[bool, Optional[str]]:
"""
检查用户是否拥有资源
Args:
user_id: 用户ID
resource_type: 资源类型account/task等
resource_id: 资源ID
Returns:
(是否拥有, 错误消息)
"""
try:
if resource_type == 'account':
account = database.get_account_by_id(resource_id)
if not account:
return False, "账号不存在"
if account['user_id'] != user_id:
return False, "无权访问此账号"
return True, None
elif resource_type == 'task':
# 通过account查询所属用户
# 这里需要根据实际数据库结构实现
pass
return False, "不支持的资源类型"
except Exception as e:
logger.error(f"检查资源所有权失败: {e}", exc_info=True)
return False, "系统错误"
def verify_and_consume_captcha(session_id: str, code: str, captcha_storage: dict, max_attempts: int = 5) -> Tuple[bool, str]:
"""
验证并消费验证码(安全增强版)
安全特性:
- 先删除验证码再验证,防止重放攻击
- 异常情况下也确保验证码被删除
Args:
session_id: 验证码会话ID
code: 用户输入的验证码
captcha_storage: 验证码存储字典
max_attempts: 最大尝试次数默认5次
Returns:
Tuple[bool, str]: (是否成功, 消息)
- 成功时返回 (True, "验证成功")
- 失败时返回 (False, 错误消息)
Example:
success, message = verify_and_consume_captcha(
captcha_session,
captcha_code,
captcha_storage,
max_attempts=5
)
if not success:
return jsonify({"error": message}), 400
"""
import time
# 安全修复:先取出并删除验证码,无论验证是否成功都不能重用
captcha_data = captcha_storage.pop(session_id, None)
# 检查验证码是否存在
if captcha_data is None:
return False, "验证码已过期或不存在,请重新获取"
try:
# 检查过期时间
if captcha_data["expire_time"] < time.time():
return False, "验证码已过期,请重新获取"
# 检查尝试次数
if captcha_data.get("failed_attempts", 0) >= max_attempts:
return False, f"验证码错误次数过多({max_attempts}次),请重新获取"
# 验证代码(不区分大小写)
if captcha_data["code"].lower() != code.lower():
# 验证失败,增加失败计数后放回(允许继续尝试)
captcha_data["failed_attempts"] = captcha_data.get("failed_attempts", 0) + 1
# 只有未超过最大尝试次数才放回
if captcha_data["failed_attempts"] < max_attempts:
captcha_storage[session_id] = captcha_data
return False, "验证码错误"
# 验证成功,验证码已被删除,不会被重用
return True, "验证成功"
except Exception as e:
# 异常情况下确保验证码不会被重用(已在函数开头删除)
logger.error(f"验证码验证异常: {e}")
return False, "验证码验证失败,请重新获取"
if __name__ == '__main__':
# 测试代码
print("测试应用工具模块...")
print("=" * 60)
# 测试分页验证
print("\n1. 测试分页验证:")
page, page_size, error = validate_pagination("2", "50")
print(f" 页码={page}, 每页={page_size}, 错误={error}")
page, page_size, error = validate_pagination("invalid", "50")
print(f" 无效输入: 页码={page}, 每页={page_size}, 错误={error}")
# 测试响应格式化
print("\n2. 测试响应格式化:")
print(f" 错误响应: {format_error_response('测试错误', need_captcha=True)}")
print(f" 成功响应: {format_success_response('测试成功', {'data': [1, 2, 3]})}")
print("\n" + "=" * 60)
print("✓ 工具模块加载成功!")

View File

@@ -1,165 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""浏览器池管理 - 线程本地存储,每个线程复用自己的浏览器
说明P0P3 优化后):
- 该实现为遗留版本,当前截图并发与浏览器复用已迁移到 `browser_pool_worker.py` 的 WorkerPool 方案
- 本文件保留用于兼容/回滚参考(当前主流程不再依赖)
"""
import threading
import time
import nest_asyncio
nest_asyncio.apply()
# 线程本地存储
_thread_local = threading.local()
class BrowserPool:
"""浏览器池 - 使用线程本地存储,每个线程有自己的浏览器"""
def __init__(self, pool_size=3, log_callback=None):
self.pool_size = pool_size
self.log_callback = log_callback
self.lock = threading.Lock()
self.all_browsers = [] # 追踪所有浏览器(用于关闭)
self.initialized = True
def log(self, message):
if self.log_callback:
self.log_callback(message)
else:
print(f"[浏览器池] {message}")
def initialize(self):
"""初始化(线程本地模式下不预热)"""
self.log(f"浏览器池已就绪(线程本地模式,每线程独立浏览器)")
self.initialized = True
def _create_browser(self):
"""创建一个浏览器实例"""
try:
from playwright.sync_api import sync_playwright
playwright = sync_playwright().start()
browser = playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
'--single-process'
]
)
instance = {
'playwright': playwright,
'browser': browser,
'thread_id': threading.current_thread().ident,
'created_at': time.time(),
'use_count': 0
}
with self.lock:
self.all_browsers.append(instance)
return instance
except Exception as e:
self.log(f"创建浏览器失败: {e}")
return None
def acquire(self, timeout=60):
"""获取当前线程的浏览器实例(如果没有则创建)"""
# 检查当前线程是否已有浏览器
browser_instance = getattr(_thread_local, 'browser_instance', None)
if browser_instance:
# 检查浏览器是否还有效
try:
if browser_instance['browser'].is_connected():
browser_instance['use_count'] += 1
self.log(f"复用线程浏览器(第{browser_instance['use_count']}次使用)")
return browser_instance
except:
pass
# 浏览器已失效,清理
self._close_browser(browser_instance)
_thread_local.browser_instance = None
# 为当前线程创建新浏览器
self.log("为当前线程创建新浏览器...")
browser_instance = self._create_browser()
if browser_instance:
browser_instance['use_count'] = 1
_thread_local.browser_instance = browser_instance
return browser_instance
def release(self, browser_instance):
"""释放浏览器(线程本地模式下保留不关闭)"""
if browser_instance is None:
return
# 检查浏览器是否还有效
try:
if browser_instance['browser'].is_connected():
self.log(f"浏览器保持活跃(已使用{browser_instance['use_count']}次)")
return
except:
pass
# 浏览器已断开,清理
self.log("浏览器已断开,清理资源")
self._close_browser(browser_instance)
if getattr(_thread_local, 'browser_instance', None) == browser_instance:
_thread_local.browser_instance = None
def _close_browser(self, browser_instance):
"""关闭单个浏览器实例"""
try:
if browser_instance.get('browser'):
browser_instance['browser'].close()
if browser_instance.get('playwright'):
browser_instance['playwright'].stop()
with self.lock:
if browser_instance in self.all_browsers:
self.all_browsers.remove(browser_instance)
except Exception as e:
self.log(f"关闭浏览器失败: {e}")
def shutdown(self):
"""关闭所有浏览器"""
self.log("正在关闭所有浏览器...")
for browser_instance in list(self.all_browsers):
self._close_browser(browser_instance)
self.all_browsers.clear()
self.initialized = False
self.log("浏览器池已关闭")
def get_status(self):
"""获取池状态"""
return {
'pool_size': self.pool_size,
'total_browsers': len(self.all_browsers),
'initialized': self.initialized,
'mode': 'thread_local'
}
# 全局浏览器池实例
_browser_pool = None
_pool_lock = threading.Lock()
def get_browser_pool(pool_size=3, log_callback=None):
"""获取全局浏览器池实例"""
global _browser_pool
with _pool_lock:
if _browser_pool is None:
_browser_pool = BrowserPool(pool_size=pool_size, log_callback=log_callback)
return _browser_pool
def init_browser_pool(pool_size=3, log_callback=None):
"""初始化浏览器池"""
pool = get_browser_pool(pool_size, log_callback)
pool.initialize()
return pool

View File

@@ -1380,6 +1380,7 @@ class PlaywrightAutomation:
path=temp_filepath, path=temp_filepath,
type='jpeg', type='jpeg',
full_page=True, full_page=True,
scale="css", # 输出按CSS像素(1920x1080)避免device_scale_factor=2 导致4K截图
quality=100 quality=100
) )

View File

@@ -1,172 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
独立截图进程 - 使用已保存的Cookies直接截图
"""
import sys
import json
import os
import time
import traceback
def take_screenshot(config):
"""执行截图任务"""
from playwright.sync_api import sync_playwright
username = config['username']
browse_type = config.get('browse_type', '应读')
screenshot_path = config['screenshot_path']
cookies_file = config.get('cookies_file', '')
# 安全修复:验证截图路径在允许的目录内,防止路径遍历攻击
ALLOWED_SCREENSHOT_DIRS = [
'/root/zsglpt/screenshots',
'/root/zsglpt/static/screenshots',
'/tmp/zsglpt_screenshots'
]
def is_safe_screenshot_path(path):
"""验证截图路径是否安全"""
abs_path = os.path.abspath(path)
return any(abs_path.startswith(os.path.abspath(allowed_dir))
for allowed_dir in ALLOWED_SCREENSHOT_DIRS)
if not is_safe_screenshot_path(screenshot_path):
return {
'success': False,
'message': '非法截图路径',
'screenshot_path': ''
}
result = {
'success': False,
'message': '',
'screenshot_path': screenshot_path
}
playwright = None
browser = None
context = None
try:
print(f"[截图进程] 启动浏览器...", flush=True)
playwright = sync_playwright().start()
browser = playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
# 创建 context
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
page = context.new_page()
page.set_default_timeout(30000)
# 加载已保存的 Cookies
if cookies_file and os.path.exists(cookies_file):
print(f"[截图进程] 加载Cookies: {cookies_file}", flush=True)
with open(cookies_file, 'r', encoding='utf-8') as f:
cookies_data = json.load(f)
cookies = cookies_data.get('cookies', [])
if cookies:
context.add_cookies(cookies)
print(f"[截图进程] 已加载 {len(cookies)} 个Cookie", flush=True)
else:
print(f"[截图进程] 警告: Cookies文件不存在", flush=True)
# 根据浏览类型导航到对应页面
if browse_type == "应读":
url = "https://zsgl.gat.zj.gov.cn/web/learn/readList"
elif browse_type == "应学":
url = "https://zsgl.gat.zj.gov.cn/web/learn/learnList"
elif browse_type == "应考":
url = "https://zsgl.gat.zj.gov.cn/web/exam"
else:
url = "https://zsgl.gat.zj.gov.cn/web/learn/readList"
print(f"[截图进程] 导航到: {url}", flush=True)
page.goto(url, wait_until='networkidle', timeout=30000)
# 等待页面加载
time.sleep(3)
# 检查是否被重定向到登录页
if '/login' in page.url.lower() or '/web/' == page.url.rstrip('/').split('/')[-1]:
print(f"[截图进程] 登录已过期,需要重新登录", flush=True)
result['message'] = '登录已过期'
return result
# 确保截图目录存在
os.makedirs(os.path.dirname(screenshot_path), exist_ok=True)
# 截图
print(f"[截图进程] 截图保存到: {screenshot_path}", flush=True)
page.screenshot(path=screenshot_path, full_page=True, type='jpeg', quality=85)
# 验证截图文件
if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000:
result['success'] = True
result['message'] = '截图成功'
print(f"[截图进程] 截图成功!", flush=True)
else:
result['message'] = '截图文件异常'
except Exception as e:
result['message'] = f'截图出错: {str(e)}'
print(f"[截图进程] 错误: {traceback.format_exc()}", flush=True)
finally:
try:
if context:
context.close()
if browser:
browser.close()
if playwright:
playwright.stop()
except:
pass
return result
def main():
if len(sys.argv) < 2:
print("用法: python screenshot_worker.py <config_json_file>")
sys.exit(1)
config_file = sys.argv[1]
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except Exception as e:
print(json.dumps({'success': False, 'message': f'读取配置失败: {e}'}))
sys.exit(1)
result = take_screenshot(config)
# 输出 JSON 结果
print("===RESULT===", flush=True)
print(json.dumps(result, ensure_ascii=False), flush=True)
# 清理配置文件
try:
os.remove(config_file)
except:
pass
sys.exit(0 if result['success'] else 1)
if __name__ == '__main__':
main()