- 新增 ENCRYPTION_KEY_RAW 环境变量支持,可直接使用 Fernet 密钥 - 添加密钥丢失保护机制,防止在有加密数据时意外生成新密钥 - 新增 verify_encryption_key() 函数用于启动时验证密钥 - docker-compose.yml 改为从 .env 文件读取敏感配置 - 新增 crypto_utils.py 文件挂载,支持热更新 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
278 lines
8.1 KiB
Python
278 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
加密工具模块
|
||
用于加密存储敏感信息(如第三方账号密码)
|
||
使用Fernet对称加密
|
||
|
||
安全增强版本 - 2026-01-21
|
||
- 支持 ENCRYPTION_KEY_RAW 直接使用 Fernet 密钥
|
||
- 增加密钥丢失保护机制
|
||
- 增加启动时密钥验证
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import base64
|
||
from pathlib import Path
|
||
from cryptography.fernet import Fernet
|
||
from cryptography.hazmat.primitives import hashes
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||
from app_logger import get_logger
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
# 安全修复: 支持通过环境变量配置密钥文件路径
|
||
ENCRYPTION_KEY_FILE = os.environ.get('ENCRYPTION_KEY_FILE', 'data/encryption_key.bin')
|
||
ENCRYPTION_SALT_FILE = os.environ.get('ENCRYPTION_SALT_FILE', 'data/encryption_salt.bin')
|
||
|
||
|
||
def _get_or_create_salt():
|
||
"""获取或创建盐值"""
|
||
salt_path = Path(ENCRYPTION_SALT_FILE)
|
||
if salt_path.exists():
|
||
with open(salt_path, 'rb') as f:
|
||
return f.read()
|
||
|
||
# 生成新的盐值
|
||
salt = os.urandom(16)
|
||
os.makedirs(salt_path.parent, exist_ok=True)
|
||
with open(salt_path, 'wb') as f:
|
||
f.write(salt)
|
||
return salt
|
||
|
||
|
||
def _derive_key(password: bytes, salt: bytes) -> bytes:
|
||
"""从密码派生加密密钥"""
|
||
kdf = PBKDF2HMAC(
|
||
algorithm=hashes.SHA256(),
|
||
length=32,
|
||
salt=salt,
|
||
iterations=480000, # OWASP推荐的迭代次数
|
||
)
|
||
return base64.urlsafe_b64encode(kdf.derive(password))
|
||
|
||
|
||
def _check_existing_encrypted_data() -> bool:
|
||
"""
|
||
检查是否存在已加密的数据
|
||
用于防止在有加密数据的情况下意外生成新密钥
|
||
"""
|
||
try:
|
||
import sqlite3
|
||
db_path = os.environ.get('DB_FILE', 'data/app_data.db')
|
||
if not Path(db_path).exists():
|
||
return False
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) FROM accounts WHERE password LIKE 'gAAAAA%'")
|
||
count = cursor.fetchone()[0]
|
||
conn.close()
|
||
return count > 0
|
||
except Exception as e:
|
||
logger.warning(f"检查加密数据时出错: {e}")
|
||
return False
|
||
|
||
|
||
def get_encryption_key():
|
||
"""
|
||
获取加密密钥
|
||
|
||
优先级:
|
||
1. ENCRYPTION_KEY_RAW - 直接使用 Fernet 密钥(推荐用于 Docker 部署)
|
||
2. ENCRYPTION_KEY - 通过 PBKDF2 派生密钥
|
||
3. 从文件读取
|
||
4. 生成新密钥(仅在无现有加密数据时)
|
||
"""
|
||
# 优先级 1: 直接使用 Fernet 密钥(推荐)
|
||
raw_key = os.environ.get('ENCRYPTION_KEY_RAW')
|
||
if raw_key:
|
||
logger.info("使用环境变量 ENCRYPTION_KEY_RAW 作为加密密钥")
|
||
return raw_key.encode() if isinstance(raw_key, str) else raw_key
|
||
|
||
# 优先级 2: 从环境变量派生密钥
|
||
env_key = os.environ.get('ENCRYPTION_KEY')
|
||
if env_key:
|
||
logger.info("使用环境变量 ENCRYPTION_KEY 派生加密密钥")
|
||
salt = _get_or_create_salt()
|
||
return _derive_key(env_key.encode(), salt)
|
||
|
||
# 优先级 3: 从文件读取
|
||
key_path = Path(ENCRYPTION_KEY_FILE)
|
||
if key_path.exists():
|
||
logger.info(f"从文件 {ENCRYPTION_KEY_FILE} 读取加密密钥")
|
||
with open(key_path, 'rb') as f:
|
||
return f.read()
|
||
|
||
# 优先级 4: 生成新密钥(带保护检查)
|
||
# 安全检查:如果已有加密数据,禁止生成新密钥
|
||
if _check_existing_encrypted_data():
|
||
error_msg = (
|
||
"\n" + "=" * 60 + "\n"
|
||
"[严重错误] 检测到数据库中存在已加密的密码数据,但加密密钥文件丢失!\n"
|
||
"\n"
|
||
"这将导致所有已加密的密码无法解密!\n"
|
||
"\n"
|
||
"解决方案:\n"
|
||
"1. 恢复 data/encryption_key.bin 文件(如有备份)\n"
|
||
"2. 或在 docker-compose.yml 中设置 ENCRYPTION_KEY_RAW 环境变量\n"
|
||
"3. 如果密钥确实丢失,需要重新录入所有账号密码\n"
|
||
"\n"
|
||
"设置 ALLOW_NEW_KEY=true 环境变量可强制生成新密钥(不推荐)\n"
|
||
+ "=" * 60
|
||
)
|
||
logger.error(error_msg)
|
||
|
||
# 检查是否强制允许生成新密钥
|
||
if os.environ.get('ALLOW_NEW_KEY', '').lower() != 'true':
|
||
print(error_msg, file=sys.stderr)
|
||
raise RuntimeError("加密密钥丢失且存在已加密数据,请检查配置")
|
||
|
||
# 生成新的密钥
|
||
key = Fernet.generate_key()
|
||
os.makedirs(key_path.parent, exist_ok=True)
|
||
with open(key_path, 'wb') as f:
|
||
f.write(key)
|
||
logger.info(f"已生成新的加密密钥并保存到 {ENCRYPTION_KEY_FILE}")
|
||
logger.warning("请立即备份此密钥文件,并建议设置 ENCRYPTION_KEY_RAW 环境变量!")
|
||
return key
|
||
|
||
|
||
# 全局Fernet实例
|
||
_fernet = None
|
||
|
||
|
||
def _get_fernet():
|
||
"""获取Fernet加密器(懒加载)"""
|
||
global _fernet
|
||
if _fernet is None:
|
||
key = get_encryption_key()
|
||
_fernet = Fernet(key)
|
||
return _fernet
|
||
|
||
|
||
def encrypt_password(plain_password: str) -> str:
|
||
"""
|
||
加密密码
|
||
|
||
Args:
|
||
plain_password: 明文密码
|
||
|
||
Returns:
|
||
str: 加密后的密码(base64编码)
|
||
"""
|
||
if not plain_password:
|
||
return ''
|
||
|
||
fernet = _get_fernet()
|
||
encrypted = fernet.encrypt(plain_password.encode('utf-8'))
|
||
return encrypted.decode('utf-8')
|
||
|
||
|
||
def decrypt_password(encrypted_password: str) -> str:
|
||
"""
|
||
解密密码
|
||
|
||
Args:
|
||
encrypted_password: 加密的密码
|
||
|
||
Returns:
|
||
str: 明文密码
|
||
"""
|
||
if not encrypted_password:
|
||
return ''
|
||
|
||
try:
|
||
fernet = _get_fernet()
|
||
decrypted = fernet.decrypt(encrypted_password.encode('utf-8'))
|
||
return decrypted.decode('utf-8')
|
||
except Exception as e:
|
||
# 解密失败,可能是旧的明文密码或密钥不匹配
|
||
if is_encrypted(encrypted_password):
|
||
logger.error(f"密码解密失败(密钥可能不匹配): {e}")
|
||
else:
|
||
logger.warning(f"密码解密失败,可能是未加密的旧数据: {e}")
|
||
return encrypted_password
|
||
|
||
|
||
def is_encrypted(password: str) -> bool:
|
||
"""
|
||
检查密码是否已加密
|
||
Fernet加密的数据以'gAAAAA'开头
|
||
|
||
Args:
|
||
password: 要检查的密码
|
||
|
||
Returns:
|
||
bool: 是否已加密
|
||
"""
|
||
if not password:
|
||
return False
|
||
return password.startswith('gAAAAA')
|
||
|
||
|
||
def migrate_password(password: str) -> str:
|
||
"""
|
||
迁移密码:如果是明文则加密,如果已加密则保持不变
|
||
|
||
Args:
|
||
password: 密码(可能是明文或已加密)
|
||
|
||
Returns:
|
||
str: 加密后的密码
|
||
"""
|
||
if is_encrypted(password):
|
||
return password
|
||
return encrypt_password(password)
|
||
|
||
|
||
def verify_encryption_key() -> bool:
|
||
"""
|
||
验证当前密钥是否能解密现有数据
|
||
用于启动时检查
|
||
|
||
Returns:
|
||
bool: 密钥是否有效
|
||
"""
|
||
try:
|
||
import sqlite3
|
||
db_path = os.environ.get('DB_FILE', 'data/app_data.db')
|
||
if not Path(db_path).exists():
|
||
return True
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT password FROM accounts WHERE password LIKE 'gAAAAA%' LIMIT 1")
|
||
row = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if not row:
|
||
return True
|
||
|
||
# 尝试解密
|
||
fernet = _get_fernet()
|
||
fernet.decrypt(row[0].encode('utf-8'))
|
||
logger.info("加密密钥验证成功")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"加密密钥验证失败: {e}")
|
||
return False
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 测试加密解密
|
||
test_password = "test_password_123"
|
||
|
||
print(f"原始密码: {test_password}")
|
||
encrypted = encrypt_password(test_password)
|
||
print(f"加密后: {encrypted}")
|
||
decrypted = decrypt_password(encrypted)
|
||
print(f"解密后: {decrypted}")
|
||
print(f"加密解密成功: {test_password == decrypted}")
|
||
print(f"是否已加密: {is_encrypted(encrypted)}")
|
||
print(f"明文是否加密: {is_encrypted(test_password)}")
|
||
|
||
# 验证密钥
|
||
print(f"\n密钥验证: {verify_encryption_key()}")
|