From dfc93bce2e81b699d648a8c8113cfa800dc1a306 Mon Sep 17 00:00:00 2001 From: Yu Yon Date: Wed, 21 Jan 2026 09:31:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(security):=20=E5=A2=9E=E5=BC=BA=E5=AF=86?= =?UTF-8?q?=E7=A0=81=E5=8A=A0=E5=AF=86=E5=AE=89=E5=85=A8=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ENCRYPTION_KEY_RAW 环境变量支持,可直接使用 Fernet 密钥 - 添加密钥丢失保护机制,防止在有加密数据时意外生成新密钥 - 新增 verify_encryption_key() 函数用于启动时验证密钥 - docker-compose.yml 改为从 .env 文件读取敏感配置 - 新增 crypto_utils.py 文件挂载,支持热更新 Co-Authored-By: Claude Opus 4.5 --- crypto_utils.py | 120 ++++++++++++++++++++++++++++++++++++++++++--- docker-compose.yml | 3 ++ 2 files changed, 116 insertions(+), 7 deletions(-) diff --git a/crypto_utils.py b/crypto_utils.py index 0f3daf4..de7986d 100644 --- a/crypto_utils.py +++ b/crypto_utils.py @@ -4,9 +4,15 @@ 加密工具模块 用于加密存储敏感信息(如第三方账号密码) 使用Fernet对称加密 + +安全增强版本 - 2026-01-21 +- 支持 ENCRYPTION_KEY_RAW 直接使用 Fernet 密钥 +- 增加密钥丢失保护机制 +- 增加启动时密钥验证 """ import os +import sys import base64 from pathlib import Path from cryptography.fernet import Fernet @@ -47,27 +53,89 @@ def _derive_key(password: bytes, salt: bytes) -> bytes: return base64.urlsafe_b64encode(kdf.derive(password)) +def _check_existing_encrypted_data() -> bool: + """ + 检查是否存在已加密的数据 + 用于防止在有加密数据的情况下意外生成新密钥 + """ + try: + import sqlite3 + db_path = os.environ.get('DB_FILE', 'data/app_data.db') + if not Path(db_path).exists(): + return False + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM accounts WHERE password LIKE 'gAAAAA%'") + count = cursor.fetchone()[0] + conn.close() + return count > 0 + except Exception as e: + logger.warning(f"检查加密数据时出错: {e}") + return False + + def get_encryption_key(): - """获取加密密钥(优先环境变量,否则从文件读取或生成)""" - # 优先从环境变量读取 + """ + 获取加密密钥 + + 优先级: + 1. ENCRYPTION_KEY_RAW - 直接使用 Fernet 密钥(推荐用于 Docker 部署) + 2. ENCRYPTION_KEY - 通过 PBKDF2 派生密钥 + 3. 从文件读取 + 4. 生成新密钥(仅在无现有加密数据时) + """ + # 优先级 1: 直接使用 Fernet 密钥(推荐) + raw_key = os.environ.get('ENCRYPTION_KEY_RAW') + if raw_key: + logger.info("使用环境变量 ENCRYPTION_KEY_RAW 作为加密密钥") + return raw_key.encode() if isinstance(raw_key, str) else raw_key + + # 优先级 2: 从环境变量派生密钥 env_key = os.environ.get('ENCRYPTION_KEY') if env_key: - # 使用环境变量中的密钥派生Fernet密钥 + logger.info("使用环境变量 ENCRYPTION_KEY 派生加密密钥") salt = _get_or_create_salt() return _derive_key(env_key.encode(), salt) - # 从文件读取 + # 优先级 3: 从文件读取 key_path = Path(ENCRYPTION_KEY_FILE) if key_path.exists(): + logger.info(f"从文件 {ENCRYPTION_KEY_FILE} 读取加密密钥") with open(key_path, 'rb') as f: return f.read() + # 优先级 4: 生成新密钥(带保护检查) + # 安全检查:如果已有加密数据,禁止生成新密钥 + if _check_existing_encrypted_data(): + error_msg = ( + "\n" + "=" * 60 + "\n" + "[严重错误] 检测到数据库中存在已加密的密码数据,但加密密钥文件丢失!\n" + "\n" + "这将导致所有已加密的密码无法解密!\n" + "\n" + "解决方案:\n" + "1. 恢复 data/encryption_key.bin 文件(如有备份)\n" + "2. 或在 docker-compose.yml 中设置 ENCRYPTION_KEY_RAW 环境变量\n" + "3. 如果密钥确实丢失,需要重新录入所有账号密码\n" + "\n" + "设置 ALLOW_NEW_KEY=true 环境变量可强制生成新密钥(不推荐)\n" + + "=" * 60 + ) + logger.error(error_msg) + + # 检查是否强制允许生成新密钥 + if os.environ.get('ALLOW_NEW_KEY', '').lower() != 'true': + print(error_msg, file=sys.stderr) + raise RuntimeError("加密密钥丢失且存在已加密数据,请检查配置") + # 生成新的密钥 key = Fernet.generate_key() os.makedirs(key_path.parent, exist_ok=True) with open(key_path, 'wb') as f: f.write(key) logger.info(f"已生成新的加密密钥并保存到 {ENCRYPTION_KEY_FILE}") + logger.warning("请立即备份此密钥文件,并建议设置 ENCRYPTION_KEY_RAW 环境变量!") return key @@ -120,8 +188,11 @@ def decrypt_password(encrypted_password: str) -> str: decrypted = fernet.decrypt(encrypted_password.encode('utf-8')) return decrypted.decode('utf-8') except Exception as e: - # 解密失败,可能是旧的明文密码 - logger.warning(f"密码解密失败,可能是未加密的旧数据: {e}") + # 解密失败,可能是旧的明文密码或密钥不匹配 + if is_encrypted(encrypted_password): + logger.error(f"密码解密失败(密钥可能不匹配): {e}") + else: + logger.warning(f"密码解密失败,可能是未加密的旧数据: {e}") return encrypted_password @@ -138,7 +209,6 @@ def is_encrypted(password: str) -> bool: """ if not password: return False - # Fernet加密的数据是base64编码,以'gAAAAA'开头 return password.startswith('gAAAAA') @@ -157,6 +227,39 @@ def migrate_password(password: str) -> str: return encrypt_password(password) +def verify_encryption_key() -> bool: + """ + 验证当前密钥是否能解密现有数据 + 用于启动时检查 + + Returns: + bool: 密钥是否有效 + """ + try: + import sqlite3 + db_path = os.environ.get('DB_FILE', 'data/app_data.db') + if not Path(db_path).exists(): + return True + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute("SELECT password FROM accounts WHERE password LIKE 'gAAAAA%' LIMIT 1") + row = cursor.fetchone() + conn.close() + + if not row: + return True + + # 尝试解密 + fernet = _get_fernet() + fernet.decrypt(row[0].encode('utf-8')) + logger.info("加密密钥验证成功") + return True + except Exception as e: + logger.error(f"加密密钥验证失败: {e}") + return False + + if __name__ == '__main__': # 测试加密解密 test_password = "test_password_123" @@ -169,3 +272,6 @@ if __name__ == '__main__': print(f"加密解密成功: {test_password == decrypted}") print(f"是否已加密: {is_encrypted(encrypted)}") print(f"明文是否加密: {is_encrypted(test_password)}") + + # 验证密钥 + print(f"\n密钥验证: {verify_encryption_key()}") diff --git a/docker-compose.yml b/docker-compose.yml index 0aaf3eb..915bbce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,7 @@ services: - ./templates:/app/templates # 模板文件(实时更新) - ./app.py:/app/app.py # 主程序(实时更新) - ./database.py:/app/database.py # 数据库模块(实时更新) + - ./crypto_utils.py:/app/crypto_utils.py # 加密工具(实时更新) dns: - 223.5.5.5 - 114.114.114.114 @@ -37,6 +38,8 @@ services: - MAX_CONCURRENT_PER_ACCOUNT=1 - MAX_CONCURRENT_CONTEXTS=100 # 安全配置 + # 加密密钥配置(重要!防止容器重建时丢失密钥) + - ENCRYPTION_KEY_RAW=${ENCRYPTION_KEY_RAW} - SESSION_LIFETIME_HOURS=24 - SESSION_COOKIE_SECURE=false - MAX_CAPTCHA_ATTEMPTS=5