diff --git a/app.py b/app.py index 3d42426..0d90c87 100755 --- a/app.py +++ b/app.py @@ -23,6 +23,7 @@ import threading import time import json import os +import secrets # 安全修复: 使用加密安全的随机数生成 from datetime import datetime, timedelta, timezone from functools import wraps @@ -131,9 +132,14 @@ log_cache_total_count = 0 # 全局日志总数,防止无限增长 MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条 MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏 +# 安全修复: 内存清理配置 +USER_ACCOUNTS_EXPIRE_SECONDS = 3600 # 用户账号缓存1小时过期 +user_accounts_last_access = {} # {user_id: last_access_timestamp} + # 并发控制:每个用户同时最多运行1个账号(避免内存不足) # 验证码存储:{session_id: {"code": "1234", "expire_time": timestamp, "failed_attempts": 0}} captcha_storage = {} +captcha_storage_lock = threading.Lock() # 安全修复: 保护captcha_storage的线程安全 # IP限流存储:{ip: {"attempts": count, "lock_until": timestamp, "first_attempt": timestamp}} ip_rate_limit = {} @@ -171,6 +177,90 @@ def get_screenshot_semaphore(): return screenshot_semaphore, max_concurrent +# ==================== 内存清理函数 ==================== +def cleanup_expired_data(): + """定期清理过期数据,防止内存泄漏 + + 清理内容: + - 过期的验证码 + - 过期的IP限流记录 + - 长时间未访问的用户账号缓存 + - 完成的任务状态 + """ + global log_cache_total_count + current_time = time.time() + + # 1. 清理过期验证码 + with captcha_storage_lock: + expired_captchas = [k for k, v in captcha_storage.items() if v.get("expire_time", 0) < current_time] + for k in expired_captchas: + del captcha_storage[k] + if expired_captchas: + logger.debug(f"已清理 {len(expired_captchas)} 个过期验证码") + + # 2. 清理过期IP限流记录 + with ip_rate_limit_lock: + expired_ips = [] + for ip, data in ip_rate_limit.items(): + # 如果锁定已过期且首次尝试超过1小时,则清理 + lock_until = data.get("lock_until", 0) + first_attempt = data.get("first_attempt", 0) + if lock_until < current_time and (current_time - first_attempt) > 3600: + expired_ips.append(ip) + for ip in expired_ips: + del ip_rate_limit[ip] + if expired_ips: + logger.debug(f"已清理 {len(expired_ips)} 个过期IP限流记录") + + # 3. 清理长时间未访问的用户账号缓存 + with user_accounts_lock: + expired_users = [] + for user_id, last_access in list(user_accounts_last_access.items()): + if (current_time - last_access) > USER_ACCOUNTS_EXPIRE_SECONDS: + # 检查该用户是否有活跃任务 + has_active_task = False + with task_status_lock: + for task_data in task_status.values(): + if task_data.get("user_id") == user_id: + has_active_task = True + break + if not has_active_task and user_id in user_accounts: + expired_users.append(user_id) + for user_id in expired_users: + del user_accounts[user_id] + del user_accounts_last_access[user_id] + if expired_users: + logger.debug(f"已清理 {len(expired_users)} 个过期用户账号缓存") + + # 4. 清理已完成任务的状态(保留最近10分钟的) + with task_status_lock: + completed_tasks = [] + for account_id, status_data in list(task_status.items()): + if status_data.get("status") in ["已完成", "失败", "已停止"]: + start_time = status_data.get("start_time", 0) + if (current_time - start_time) > 600: # 10分钟 + completed_tasks.append(account_id) + for account_id in completed_tasks: + del task_status[account_id] + if completed_tasks: + logger.debug(f"已清理 {len(completed_tasks)} 个已完成任务状态") + + +def start_cleanup_scheduler(): + """启动定期清理调度器""" + def cleanup_loop(): + while True: + try: + time.sleep(300) # 每5分钟执行一次清理 + cleanup_expired_data() + except Exception as e: + logger.error(f"清理任务执行失败: {e}") + + cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="cleanup-scheduler") + cleanup_thread.start() + logger.info("内存清理调度器已启动") + + class User(UserMixin): """Flask-Login 用户类""" def __init__(self, user_id): @@ -717,8 +807,8 @@ def generate_captcha(): session_id = str(uuid.uuid4()) - # 生成4位随机数字 - code = "".join([str(random.randint(0, 9)) for _ in range(4)]) + # 安全修复: 使用加密安全的随机数生成验证码 + code = "".join([str(secrets.randbelow(10)) for _ in range(4)]) # 存储验证码,5分钟过期 captcha_storage[session_id] = { @@ -777,15 +867,17 @@ def generate_captcha(): "session_id": session_id, "captcha_image": f"data:image/png;base64,{img_base64}" }) - except ImportError: - # 如果没有PIL,退回到简单文本(但添加混淆) - # 安全警告:生产环境应安装PIL - logger.warning("PIL未安装,验证码安全性降低") - # 不直接返回验证码,返回混淆后的提示 + except ImportError as e: + # 如果没有PIL,不再降级服务,直接返回错误 + # 安全修复:不返回任何验证码相关信息 + logger.error(f"PIL库未安装,验证码功能不可用: {e}") + # 清理刚创建的验证码记录 + with captcha_storage_lock: + if session_id in captcha_storage: + del captcha_storage[session_id] return jsonify({ - "session_id": session_id, - "captcha_hint": "验证码图片生成失败,请联系管理员" - }), 500 + "error": "验证码服务暂不可用,请联系管理员安装PIL库" + }), 503 # Service Unavailable @app.route('/api/login', methods=['POST']) @@ -3756,6 +3848,9 @@ if __name__ == '__main__': checkpoint_mgr = get_checkpoint_manager() print("✓ 任务断点管理器已初始化") + # 启动内存清理调度器 + start_cleanup_scheduler() + # 加载系统配置(并发设置) try: system_config = database.get_system_config() diff --git a/app_config.py b/app_config.py index 5e099e5..6c640d8 100755 --- a/app_config.py +++ b/app_config.py @@ -55,11 +55,14 @@ class Config: SECRET_KEY = get_secret_key() # ==================== 会话安全配置 ==================== - # Bug fix: 生产环境安全警告 - SESSION_COOKIE_SECURE = os.environ.get('SESSION_COOKIE_SECURE', 'False').lower() == 'true' + # 安全修复: 根据环境自动选择安全配置 + # 生产环境(FLASK_ENV=production)时自动启用更严格的安全设置 + _is_production = os.environ.get('FLASK_ENV', 'production') == 'production' + _force_secure = os.environ.get('SESSION_COOKIE_SECURE', '').lower() == 'true' + SESSION_COOKIE_SECURE = _force_secure or (_is_production and os.environ.get('HTTPS_ENABLED', 'false').lower() == 'true') SESSION_COOKIE_HTTPONLY = True # 防止XSS攻击 - # SameSite配置:HTTP环境使用Lax,HTTPS环境使用None - SESSION_COOKIE_SAMESITE = 'None' if os.environ.get('SESSION_COOKIE_SECURE', 'False').lower() == 'true' else 'Lax' + # SameSite配置:HTTPS环境使用None,HTTP环境使用Lax + SESSION_COOKIE_SAMESITE = 'None' if SESSION_COOKIE_SECURE else 'Lax' # 自定义cookie名称,避免与其他应用冲突 SESSION_COOKIE_NAME = os.environ.get('SESSION_COOKIE_NAME', 'zsglpt_session') # Cookie路径,确保整个应用都能访问 @@ -124,7 +127,8 @@ class Config: SOCKETIO_CORS_ALLOWED_ORIGINS = os.environ.get('SOCKETIO_CORS_ALLOWED_ORIGINS', '*') # ==================== 日志配置 ==================== - LOG_LEVEL = os.environ.get('LOG_LEVEL', 'DEBUG') + # 安全修复: 生产环境默认使用INFO级别,避免泄露敏感调试信息 + LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') LOG_FILE = os.environ.get('LOG_FILE', 'logs/app.log') LOG_MAX_BYTES = int(os.environ.get('LOG_MAX_BYTES', '10485760')) # 10MB LOG_BACKUP_COUNT = int(os.environ.get('LOG_BACKUP_COUNT', '5')) diff --git a/browser_pool_worker.py b/browser_pool_worker.py index 88cc95b..bd7317e 100755 --- a/browser_pool_worker.py +++ b/browser_pool_worker.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- """浏览器池管理 - 工作线程池模式(真正的浏览器复用)""" +import os import threading import queue import time @@ -9,6 +10,10 @@ from typing import Callable, Optional, Dict, Any import nest_asyncio nest_asyncio.apply() +# 安全修复: 将魔法数字提取为可配置常量 +BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒),默认5分钟 +TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒) + class BrowserWorker(threading.Thread): """浏览器工作线程 - 每个worker维护自己的浏览器""" @@ -101,19 +106,18 @@ class BrowserWorker(threading.Thread): """工作线程主循环 - 按需启动浏览器模式""" self.log("Worker启动(按需模式,等待任务时不占用浏览器资源)") last_task_time = 0 - IDLE_TIMEOUT = 300 # 空闲5分钟后关闭浏览器 while self.running: try: # 从队列获取任务(带超时,以便能响应停止信号和空闲检查) self.idle = True try: - task = self.task_queue.get(timeout=10) + task = self.task_queue.get(timeout=TASK_QUEUE_TIMEOUT) except queue.Empty: # 检查是否需要关闭空闲的浏览器 if self.browser_instance and last_task_time > 0: idle_time = time.time() - last_task_time - if idle_time > IDLE_TIMEOUT: + if idle_time > BROWSER_IDLE_TIMEOUT: self.log(f"空闲{int(idle_time)}秒,关闭浏览器释放资源") self._close_browser() continue diff --git a/crypto_utils.py b/crypto_utils.py index 8ebd6f8..6bf153d 100644 --- a/crypto_utils.py +++ b/crypto_utils.py @@ -14,14 +14,14 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -# 加密密钥文件路径 -ENCRYPTION_KEY_FILE = 'data/encryption_key.bin' -ENCRYPTION_SALT_FILE = 'data/encryption_salt.bin' +# 安全修复: 支持通过环境变量配置密钥文件路径 +ENCRYPTION_KEY_FILE = os.environ.get('ENCRYPTION_KEY_FILE', 'data/encryption_key.bin') +ENCRYPTION_SALT_FILE = os.environ.get('ENCRYPTION_SALT_FILE', 'data/encryption_salt.bin') def _get_or_create_salt(): """获取或创建盐值""" - salt_path = Path(ENCRYPTION_KEY_FILE).parent / 'encryption_salt.bin' + salt_path = Path(ENCRYPTION_SALT_FILE) if salt_path.exists(): with open(salt_path, 'rb') as f: return f.read() diff --git a/db_pool.py b/db_pool.py index b48a5b2..c1eca05 100755 --- a/db_pool.py +++ b/db_pool.py @@ -65,7 +65,7 @@ class ConnectionPool: def return_connection(self, conn): """ - 归还连接到连接池 [已修复Bug#7, Bug#11] + 归还连接到连接池 [安全修复: 改进竞态条件处理] Args: conn: 要归还的连接 @@ -73,42 +73,53 @@ class ConnectionPool: import sqlite3 from queue import Full + if conn is None: + return + + connection_healthy = False try: # 回滚任何未提交的事务 conn.rollback() # 安全修复:验证连接是否健康,防止损坏的连接污染连接池 conn.execute("SELECT 1") - self._pool.put(conn, block=False) + connection_healthy = True except sqlite3.Error as e: # 数据库相关错误,连接可能损坏 - print(f"归还连接失败(数据库错误): {e}") - try: - conn.close() - except Exception as close_error: - print(f"关闭损坏的连接失败: {close_error}") - # 创建新连接补充 - with self._lock: - try: - new_conn = self._create_connection() - self._pool.put(new_conn, block=False) - except Exception as create_error: - print(f"重建连接失败: {create_error}") - except Full: - # 队列已满(不应该发生) - print(f"警告: 连接池已满,关闭多余连接") - try: - conn.close() - except Exception as close_error: - print(f"关闭多余连接失败: {close_error}") + print(f"连接健康检查失败(数据库错误): {e}") except Exception as e: - # Bug fix: 记录详细的异常堆栈,便于调试 - import traceback - print(f"归还连接失败(未知错误): {e}") - print(f"异常堆栈:\n{traceback.format_exc()}") + print(f"连接健康检查失败(未知错误): {e}") + + if connection_healthy: try: - conn.close() - except Exception as close_error: - print(f"关闭异常连接失败: {close_error}") + self._pool.put(conn, block=False) + return # 成功归还 + except Full: + # 队列已满(不应该发生,但处理它) + print(f"警告: 连接池已满,关闭多余连接") + connection_healthy = False # 标记为需要关闭 + + # 连接不健康或队列已满,关闭它 + try: + conn.close() + except Exception as close_error: + print(f"关闭连接失败: {close_error}") + + # 如果连接不健康,尝试创建新连接补充池 + if not connection_healthy: + with self._lock: + # 双重检查:确保池确实需要补充 + if self._pool.qsize() < self.pool_size: + try: + new_conn = self._create_connection() + self._pool.put(new_conn, block=False) + except Full: + # 在获取锁期间池被填满了,关闭新建的连接 + try: + new_conn.close() + except Exception: + pass + except Exception as create_error: + print(f"重建连接失败: {create_error}") def close_all(self): """关闭所有连接""" diff --git a/playwright_automation.py b/playwright_automation.py index b28420a..38cd347 100755 --- a/playwright_automation.py +++ b/playwright_automation.py @@ -227,8 +227,10 @@ class PlaywrightAutomation: if 'index.aspx' in current_url: return True return False - except Exception: - # Bug fix: 明确捕获Exception而非所有异常 + except (TimeoutError, Exception) as e: + # 安全修复: 记录异常信息便于调试,但不重新抛出SystemExit/KeyboardInterrupt + if isinstance(e, (SystemExit, KeyboardInterrupt)): + raise return False def quick_login(self, username: str, password: str, remember: bool = True): @@ -748,11 +750,6 @@ class PlaywrightAutomation: result.success = True return result - # 原有逻辑继续... - if False: # 占位,保持原有代码结构 - result.error_message = "切换浏览类型失败" - return result - current_page = 1 total_items = 0 total_attachments = 0 @@ -1331,7 +1328,10 @@ class PlaywrightAutomation: return False def close(self): - """完全关闭浏览器进程(每个账号独立)并确保资源释放""" + """完全关闭浏览器进程(每个账号独立)并确保资源释放 + + 安全修复: 使用try-finally确保资源一定被释放 + """ # Bug #13 fix: 使用锁保护close操作 with self._lock: # 防止重复关闭 @@ -1340,43 +1340,50 @@ class PlaywrightAutomation: self._closed = True errors = [] + context_ref = self.context + browser_ref = self.browser + playwright_ref = self.playwright + # 先清空引用,防止其他线程访问 + self.context = None + self.page = None + self.main_page = None + self.browser = None + self.playwright = None + + # 在锁外执行实际关闭操作,避免死锁 + try: # 第一步:关闭上下文 - if self.context: + if context_ref: try: - self.context.close() - # self.log("上下文已关闭") # 精简日志 + context_ref.close() except Exception as e: error_msg = f"关闭上下文时出错: {str(e)}" self.log(error_msg) errors.append(error_msg) # 第二步:关闭浏览器进程 - if self.browser: + if browser_ref: try: - self.browser.close() - # self.log("浏览器进程已关闭") # 精简日志 + browser_ref.close() except Exception as e: error_msg = f"关闭浏览器时出错: {str(e)}" self.log(error_msg) errors.append(error_msg) # 第三步:停止Playwright - if self.playwright: + if playwright_ref: try: - self.playwright.stop() - # self.log("Playwright已停止") # 精简日志 + playwright_ref.stop() except Exception as e: error_msg = f"停止Playwright时出错: {str(e)}" self.log(error_msg) errors.append(error_msg) - - # 第四步:清空引用,确保垃圾回收 - self.context = None - self.page = None - self.main_page = None - self.browser = None - self.playwright = None + finally: + # 确保引用被清空(即使上面出错) + context_ref = None + browser_ref = None + playwright_ref = None # 第五步:强制等待,确保进程完全退出 time.sleep(0.5) diff --git a/task_checkpoint.py b/task_checkpoint.py index eb5b1c8..309c92d 100644 --- a/task_checkpoint.py +++ b/task_checkpoint.py @@ -32,6 +32,23 @@ class TaskCheckpoint: """初始化(使用全局连接池)""" self._init_table() + def _safe_json_loads(self, data): + """安全的JSON解析,处理损坏或无效的数据 + + Args: + data: JSON字符串或None + + Returns: + 解析后的对象或None + """ + if not data: + return None + try: + return json.loads(data) + except (json.JSONDecodeError, TypeError, ValueError) as e: + print(f"[警告] JSON解析失败: {e}, 数据: {data[:100] if isinstance(data, str) else data}") + return None + def _init_table(self): """初始化任务进度表""" with db_pool.get_db() as conn: @@ -260,7 +277,7 @@ class TaskCheckpoint: 'max_retries': row[13], 'last_error': row[14], 'error_count': row[15], - 'checkpoint_data': json.loads(row[16]) if row[16] else None, + 'checkpoint_data': self._safe_json_loads(row[16]), 'created_at': row[17], 'updated_at': row[18], 'completed_at': row[19]