#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 加密工具模块 用于加密存储敏感信息(如第三方账号密码) 使用Fernet对称加密 安全增强版本 - 2026-01-21 - 支持 ENCRYPTION_KEY_RAW 直接使用 Fernet 密钥 - 增加密钥丢失保护机制 - 增加启动时密钥验证 """ import os import sys import base64 from pathlib import Path from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from app_logger import get_logger logger = get_logger(__name__) # 安全修复: 支持通过环境变量配置密钥文件路径 ENCRYPTION_KEY_FILE = os.environ.get('ENCRYPTION_KEY_FILE', 'data/encryption_key.bin') ENCRYPTION_SALT_FILE = os.environ.get('ENCRYPTION_SALT_FILE', 'data/encryption_salt.bin') def _get_or_create_salt(): """获取或创建盐值""" salt_path = Path(ENCRYPTION_SALT_FILE) if salt_path.exists(): with open(salt_path, 'rb') as f: return f.read() # 生成新的盐值 salt = os.urandom(16) os.makedirs(salt_path.parent, exist_ok=True) with open(salt_path, 'wb') as f: f.write(salt) return salt def _derive_key(password: bytes, salt: bytes) -> bytes: """从密码派生加密密钥""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=480000, # OWASP推荐的迭代次数 ) return base64.urlsafe_b64encode(kdf.derive(password)) def _check_existing_encrypted_data() -> bool: """ 检查是否存在已加密的数据 用于防止在有加密数据的情况下意外生成新密钥 """ try: import sqlite3 db_path = os.environ.get('DB_FILE', 'data/app_data.db') if not Path(db_path).exists(): return False conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM accounts WHERE password LIKE 'gAAAAA%'") count = cursor.fetchone()[0] conn.close() return count > 0 except Exception as e: logger.warning(f"检查加密数据时出错: {e}") return False def get_encryption_key(): """ 获取加密密钥 优先级: 1. ENCRYPTION_KEY_RAW - 直接使用 Fernet 密钥(推荐用于 Docker 部署) 2. ENCRYPTION_KEY - 通过 PBKDF2 派生密钥 3. 从文件读取 4. 生成新密钥(仅在无现有加密数据时) """ # 优先级 1: 直接使用 Fernet 密钥(推荐) raw_key = os.environ.get('ENCRYPTION_KEY_RAW') if raw_key: logger.info("使用环境变量 ENCRYPTION_KEY_RAW 作为加密密钥") return raw_key.encode() if isinstance(raw_key, str) else raw_key # 优先级 2: 从环境变量派生密钥 env_key = os.environ.get('ENCRYPTION_KEY') if env_key: logger.info("使用环境变量 ENCRYPTION_KEY 派生加密密钥") salt = _get_or_create_salt() return _derive_key(env_key.encode(), salt) # 优先级 3: 从文件读取 key_path = Path(ENCRYPTION_KEY_FILE) if key_path.exists(): logger.info(f"从文件 {ENCRYPTION_KEY_FILE} 读取加密密钥") with open(key_path, 'rb') as f: return f.read() # 优先级 4: 生成新密钥(带保护检查) # 安全检查:如果已有加密数据,禁止生成新密钥 if _check_existing_encrypted_data(): error_msg = ( "\n" + "=" * 60 + "\n" "[严重错误] 检测到数据库中存在已加密的密码数据,但加密密钥文件丢失!\n" "\n" "这将导致所有已加密的密码无法解密!\n" "\n" "解决方案:\n" "1. 恢复 data/encryption_key.bin 文件(如有备份)\n" "2. 或在 docker-compose.yml 中设置 ENCRYPTION_KEY_RAW 环境变量\n" "3. 如果密钥确实丢失,需要重新录入所有账号密码\n" "\n" "设置 ALLOW_NEW_KEY=true 环境变量可强制生成新密钥(不推荐)\n" + "=" * 60 ) logger.error(error_msg) # 检查是否强制允许生成新密钥 if os.environ.get('ALLOW_NEW_KEY', '').lower() != 'true': print(error_msg, file=sys.stderr) raise RuntimeError("加密密钥丢失且存在已加密数据,请检查配置") # 生成新的密钥 key = Fernet.generate_key() os.makedirs(key_path.parent, exist_ok=True) with open(key_path, 'wb') as f: f.write(key) logger.info(f"已生成新的加密密钥并保存到 {ENCRYPTION_KEY_FILE}") logger.warning("请立即备份此密钥文件,并建议设置 ENCRYPTION_KEY_RAW 环境变量!") return key # 全局Fernet实例 _fernet = None def _get_fernet(): """获取Fernet加密器(懒加载)""" global _fernet if _fernet is None: key = get_encryption_key() _fernet = Fernet(key) return _fernet def encrypt_password(plain_password: str) -> str: """ 加密密码 Args: plain_password: 明文密码 Returns: str: 加密后的密码(base64编码) """ if not plain_password: return '' fernet = _get_fernet() encrypted = fernet.encrypt(plain_password.encode('utf-8')) return encrypted.decode('utf-8') def decrypt_password(encrypted_password: str) -> str: """ 解密密码 Args: encrypted_password: 加密的密码 Returns: str: 明文密码 """ if not encrypted_password: return '' try: fernet = _get_fernet() decrypted = fernet.decrypt(encrypted_password.encode('utf-8')) return decrypted.decode('utf-8') except Exception as e: # 解密失败,可能是旧的明文密码或密钥不匹配 if is_encrypted(encrypted_password): logger.error(f"密码解密失败(密钥可能不匹配): {e}") else: logger.warning(f"密码解密失败,可能是未加密的旧数据: {e}") return encrypted_password def is_encrypted(password: str) -> bool: """ 检查密码是否已加密 Fernet加密的数据以'gAAAAA'开头 Args: password: 要检查的密码 Returns: bool: 是否已加密 """ if not password: return False return password.startswith('gAAAAA') def migrate_password(password: str) -> str: """ 迁移密码:如果是明文则加密,如果已加密则保持不变 Args: password: 密码(可能是明文或已加密) Returns: str: 加密后的密码 """ if is_encrypted(password): return password return encrypt_password(password) def verify_encryption_key() -> bool: """ 验证当前密钥是否能解密现有数据 用于启动时检查 Returns: bool: 密钥是否有效 """ try: import sqlite3 db_path = os.environ.get('DB_FILE', 'data/app_data.db') if not Path(db_path).exists(): return True conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT password FROM accounts WHERE password LIKE 'gAAAAA%' LIMIT 1") row = cursor.fetchone() conn.close() if not row: return True # 尝试解密 fernet = _get_fernet() fernet.decrypt(row[0].encode('utf-8')) logger.info("加密密钥验证成功") return True except Exception as e: logger.error(f"加密密钥验证失败: {e}") return False if __name__ == '__main__': # 测试加密解密 test_password = "test_password_123" print(f"原始密码: {test_password}") encrypted = encrypt_password(test_password) print(f"加密后: {encrypted}") decrypted = decrypt_password(encrypted) print(f"解密后: {decrypted}") print(f"加密解密成功: {test_password == decrypted}") print(f"是否已加密: {is_encrypted(encrypted)}") print(f"明文是否加密: {is_encrypted(test_password)}") # 验证密钥 print(f"\n密钥验证: {verify_encryption_key()}")