修复多项安全漏洞和Bug

1. 安全修复:
   - 修复密码重置接口用户枚举漏洞,统一返回消息防止信息泄露
   - 统一密码强度验证为8位以上且包含字母和数字
   - 添加第三方账号密码加密存储(Fernet对称加密)
   - 修复默认管理员弱密码问题,改用随机生成强密码
   - 修复管理员回复XSS漏洞,添加HTML转义
   - 将MD5哈希替换为SHA256

2. 并发Bug修复:
   - 修复日志缓存竞态条件,添加锁保护
   - 修复截图信号量配置变更后不生效问题

3. 其他改进:
   - 添加API参数类型验证和边界检查
   - 新增crypto_utils.py加密工具模块

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-11 19:14:14 +08:00
parent 70cd95c366
commit a25c9fbba0
6 changed files with 293 additions and 70 deletions

169
crypto_utils.py Normal file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
加密工具模块
用于加密存储敏感信息(如第三方账号密码)
使用Fernet对称加密
"""
import os
import base64
from pathlib import Path
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
# 加密密钥文件路径
ENCRYPTION_KEY_FILE = 'data/encryption_key.bin'
ENCRYPTION_SALT_FILE = 'data/encryption_salt.bin'
def _get_or_create_salt():
"""获取或创建盐值"""
salt_path = Path(ENCRYPTION_KEY_FILE).parent / 'encryption_salt.bin'
if salt_path.exists():
with open(salt_path, 'rb') as f:
return f.read()
# 生成新的盐值
salt = os.urandom(16)
os.makedirs(salt_path.parent, exist_ok=True)
with open(salt_path, 'wb') as f:
f.write(salt)
return salt
def _derive_key(password: bytes, salt: bytes) -> bytes:
"""从密码派生加密密钥"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=480000, # OWASP推荐的迭代次数
)
return base64.urlsafe_b64encode(kdf.derive(password))
def get_encryption_key():
"""获取加密密钥(优先环境变量,否则从文件读取或生成)"""
# 优先从环境变量读取
env_key = os.environ.get('ENCRYPTION_KEY')
if env_key:
# 使用环境变量中的密钥派生Fernet密钥
salt = _get_or_create_salt()
return _derive_key(env_key.encode(), salt)
# 从文件读取
key_path = Path(ENCRYPTION_KEY_FILE)
if key_path.exists():
with open(key_path, 'rb') as f:
return f.read()
# 生成新的密钥
key = Fernet.generate_key()
os.makedirs(key_path.parent, exist_ok=True)
with open(key_path, 'wb') as f:
f.write(key)
print(f"[安全] 已生成新的加密密钥并保存到 {ENCRYPTION_KEY_FILE}")
return key
# 全局Fernet实例
_fernet = None
def _get_fernet():
"""获取Fernet加密器懒加载"""
global _fernet
if _fernet is None:
key = get_encryption_key()
_fernet = Fernet(key)
return _fernet
def encrypt_password(plain_password: str) -> str:
"""
加密密码
Args:
plain_password: 明文密码
Returns:
str: 加密后的密码base64编码
"""
if not plain_password:
return ''
fernet = _get_fernet()
encrypted = fernet.encrypt(plain_password.encode('utf-8'))
return encrypted.decode('utf-8')
def decrypt_password(encrypted_password: str) -> str:
"""
解密密码
Args:
encrypted_password: 加密的密码
Returns:
str: 明文密码
"""
if not encrypted_password:
return ''
try:
fernet = _get_fernet()
decrypted = fernet.decrypt(encrypted_password.encode('utf-8'))
return decrypted.decode('utf-8')
except Exception as e:
# 解密失败,可能是旧的明文密码
print(f"[警告] 密码解密失败,可能是未加密的旧数据: {e}")
return encrypted_password
def is_encrypted(password: str) -> bool:
"""
检查密码是否已加密
Fernet加密的数据以'gAAAAA'开头
Args:
password: 要检查的密码
Returns:
bool: 是否已加密
"""
if not password:
return False
# Fernet加密的数据是base64编码以'gAAAAA'开头
return password.startswith('gAAAAA')
def migrate_password(password: str) -> str:
"""
迁移密码:如果是明文则加密,如果已加密则保持不变
Args:
password: 密码(可能是明文或已加密)
Returns:
str: 加密后的密码
"""
if is_encrypted(password):
return password
return encrypt_password(password)
if __name__ == '__main__':
# 测试加密解密
test_password = "test_password_123"
print(f"原始密码: {test_password}")
encrypted = encrypt_password(test_password)
print(f"加密后: {encrypted}")
decrypted = decrypt_password(encrypted)
print(f"解密后: {decrypted}")
print(f"加密解密成功: {test_password == decrypted}")
print(f"是否已加密: {is_encrypted(encrypted)}")
print(f"明文是否加密: {is_encrypted(test_password)}")