修复多项安全漏洞

安全修复清单:
1. 验证码改为图片方式返回,防止明文泄露
2. CORS配置从环境变量读取,不再使用通配符"*"
3. VIP API添加@admin_required装饰器,统一认证
4. 用户登录统一错误消息,防止用户枚举
5. IP限流不再信任X-Forwarded-For头,防止伪造绕过
6. 密码强度要求提升(8位+字母+数字)
7. 日志不���记录完整session/cookie内容,防止敏感信息泄露
8. XSS防护:日志输出和Bug反馈内容转义HTML
9. SQL注入防护:LIKE查询参数转义
10. 路径遍历防护:截图目录白名单验证
11. 验证码重放防护:验证前删除验证码
12. 数据库连接池健康检查
13. 正则DoS防护:限制数字匹配长度
14. Account类密码私有化,__repr__不暴露密码

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-11 17:53:48 +08:00
parent 4d3e4a09fd
commit b9edc4aaa2
10 changed files with 256 additions and 101 deletions

188
app.py
View File

@@ -60,9 +60,17 @@ app.config.from_object(config)
# 确保SECRET_KEY已设置
if not app.config.get('SECRET_KEY'):
raise RuntimeError("SECRET_KEY未配置请检查app_config.py")
# 安全修复从环境变量获取允许的CORS源不再使用"*"
cors_origins = os.environ.get('CORS_ALLOWED_ORIGINS', '').strip()
if cors_origins:
cors_allowed = [origin.strip() for origin in cors_origins.split(',') if origin.strip()]
else:
# 默认只允许同源生产环<E4BAA7><E78EAF>应该明确配置
cors_allowed = []
socketio = SocketIO(
app,
cors_allowed_origins="*",
cors_allowed_origins=cors_allowed if cors_allowed else None, # None表示只允许同源
async_mode='threading', # 明确指定async模式
ping_timeout=60, # ping超时60秒
ping_interval=25, # 每25秒ping一次
@@ -169,12 +177,22 @@ class Admin(UserMixin):
class Account:
"""账号类"""
"""账号类
安全注意事项:
- 密码以私有属性存储,避免意外泄露
- __repr__不包含密码防止日志意外记录
- 生产环境应考虑使用加密存储如Fernet加密
"""
__slots__ = ['id', 'user_id', 'username', '_password', 'remember', 'remark',
'status', 'is_running', 'should_stop', 'total_items',
'total_attachments', 'automation', 'last_browse_type', 'proxy_config']
def __init__(self, account_id, user_id, username, password, remember=True, remark=''):
self.id = account_id
self.user_id = user_id
self.username = username
self.password = password
self._password = password # 安全修复:使用私有属性存储密码
self.remember = remember
self.remark = remark
self.status = "未开始"
@@ -186,6 +204,15 @@ class Account:
self.last_browse_type = "注册前未读"
self.proxy_config = None # 保存代理配置,浏览和截图共用
@property
def password(self):
"""获取密码(仅供自动化登录使用)"""
return self._password
def __repr__(self):
"""安全的字符串表示,不包含密码"""
return f"Account(id={self.id}, username={self.username}, status={self.status})"
def to_dict(self):
result = {
"id": self.id,
@@ -307,8 +334,8 @@ def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
logger.debug(f"[admin_required] Session内容: {dict(session)}")
logger.debug(f"[admin_required] Cookies: {request.cookies}")
# 安全修复不再记录完整的session和cookies内容防止敏感信息泄露
logger.debug(f"[admin_required] 检查会话admin_id存在: {'admin_id' in session}")
if 'admin_id' not in session:
logger.warning(f"[admin_required] 拒绝访问 {request.path} - session中无admin_id")
return jsonify({"error": "需要管理员权限"}), 403
@@ -319,10 +346,12 @@ def admin_required(f):
def log_to_client(message, user_id=None, account_id=None):
"""发送日志到Web客户端(用户隔离)"""
from app_security import escape_html
timestamp = get_beijing_now().strftime('%H:%M:%S')
# 安全修复转义HTML特殊字符防止XSS攻击
log_data = {
'timestamp': timestamp,
'message': message,
'message': escape_html(str(message)) if message else '',
'account_id': account_id
}
@@ -579,10 +608,17 @@ def check_ip_rate_limit(ip_address):
"""检查IP是否被限流"""
current_time = time.time()
# 清理过期IP记录
expired_ips = [ip for ip, data in ip_rate_limit.items()
if data.get("lock_until", 0) < current_time and
current_time - data.get("first_attempt", current_time) > 3600]
# 安全修复:修正过期IP清理逻辑
# 原问题first_attempt不存在时默认使用current_time导致永远不会被清理
expired_ips = []
for ip, data in ip_rate_limit.items():
lock_expired = data.get("lock_until", 0) < current_time
first_attempt = data.get("first_attempt")
# 修复如果first_attempt不存在或超过1小时视为过期
attempt_expired = first_attempt is None or (current_time - first_attempt > 3600)
if lock_expired and attempt_expired:
expired_ips.append(ip)
for ip in expired_ips:
del ip_rate_limit[ip]
@@ -596,7 +632,8 @@ def check_ip_rate_limit(ip_address):
return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1)
# 如果超过1小时,重置计数
if current_time - ip_data.get("first_attempt", current_time) > 3600:
first_attempt = ip_data.get("first_attempt")
if first_attempt is None or current_time - first_attempt > 3600:
ip_rate_limit[ip_address] = {
"attempts": 0,
"first_attempt": current_time
@@ -627,26 +664,85 @@ def record_failed_captcha(ip_address):
@app.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码"""
"""生成4位数字验证码图片
安全修复验证码不再以明文返回而是生成base64图片
"""
import uuid
import base64
from io import BytesIO
session_id = str(uuid.uuid4())
# 生成4位随机数字
code = "".join([str(random.randint(0, 9)) for _ in range(4)])
# 存储验证码5分钟过期
captcha_storage[session_id] = {
"code": code,
"expire_time": time.time() + 300,
"failed_attempts": 0
}
# 清理过期验证码
expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < time.time()]
for k in expired_keys:
del captcha_storage[k]
return jsonify({"session_id": session_id, "captcha": code})
# 生成验证码图片
try:
from PIL import Image, ImageDraw, ImageFont
import io
# 创建图片
width, height = 120, 40
image = Image.new('RGB', (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
# 添加干扰线
for _ in range(5):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
# 添加干扰点
for _ in range(50):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
# 绘制验证码文字
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 28)
except:
font = ImageFont.load_default()
for i, char in enumerate(code):
x = 10 + i * 25 + random.randint(-3, 3)
y = random.randint(2, 8)
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
draw.text((x, y), char, font=font, fill=color)
# 转换为base64
buffer = io.BytesIO()
image.save(buffer, format='PNG')
img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return jsonify({
"session_id": session_id,
"captcha_image": f"data:image/png;base64,{img_base64}"
})
except ImportError:
# 如果没有PIL退回到简单文本但添加混淆
# 安全警告生产环境应安装PIL
logger.warning("PIL未安装验证码安全性降低")
# 不直接返回验证码,返回混淆后的提示
return jsonify({
"session_id": session_id,
"captcha_hint": "验证码图片生成失败,请联系管理员"
}), 500
@app.route('/api/login', methods=['POST'])
@@ -666,16 +762,12 @@ def login():
if not success:
return jsonify({"error": message}), 400
# 先检查用户是否存在
user_exists = database.get_user_by_username(username)
if not user_exists:
return jsonify({"error": "账号未注册", "need_captcha": True}), 401
# 检查密码是否正确
# 安全修复:使用通用错误消息防止用户枚举
# 不再分开检查用户是否存在和密码是否正确
user = database.verify_user(username, password)
if not user:
# 密码错误
return jsonify({"error": "密码错误", "need_captcha": True}), 401
# 返回通用错误消息,不透露账号是否存在
return jsonify({"error": "用户名或密码错误", "need_captcha": True}), 401
# 检查审核状态
if user['status'] != 'approved':
@@ -699,8 +791,13 @@ def logout():
# ==================== 管理员认证API ====================
@app.route('/yuyx/api/debug-config', methods=['GET'])
@admin_required # 安全修复:添加管理员认证
def debug_config():
"""调试配置信息"""
"""调试配置信息(仅管理员可访问,生产环境应禁用)"""
# 安全修复:生产环境禁用调试端点
if not app.debug:
return jsonify({"error": "调试端点已在生产环境禁用"}), 403
return jsonify({
"secret_key_set": bool(app.secret_key),
"secret_key_length": len(app.secret_key) if app.secret_key else 0,
@@ -711,7 +808,8 @@ def debug_config():
"SESSION_COOKIE_SAMESITE": app.config.get('SESSION_COOKIE_SAMESITE'),
"PERMANENT_SESSION_LIFETIME": str(app.config.get('PERMANENT_SESSION_LIFETIME')),
},
"current_session": dict(session),
# 安全修复移除敏感的session内容只显示有无
"has_session": bool(session),
"cookies_received": list(request.cookies.keys())
})
@@ -751,9 +849,8 @@ def admin_login():
session.permanent = True # 设置为永久会话使用PERMANENT_SESSION_LIFETIME配置
session.modified = True # 强制标记session为已修改确保保存
logger.info(f"[admin_login] 管理员 {username} 登录成功, session已设置: admin_id={admin['id']}")
logger.debug(f"[admin_login] Session内容: {dict(session)}")
logger.debug(f"[admin_login] Cookie将被设置: name={app.config.get('SESSION_COOKIE_NAME', 'session')}")
logger.info(f"[admin_login] 管理员 {username} 登录成功")
# 安全修复不再记录完整session内容
# 根据请求类型返回不同响应
if request.is_json:
@@ -1234,12 +1331,14 @@ def get_accounts():
user_id = current_user.id
refresh = request.args.get('refresh', 'false').lower() == 'true'
# 如果user_accounts中没有数据或者请求刷新则从数据库加载
if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh:
load_user_accounts(user_id)
# 安全修复:使用锁保护检查和加载操作,防止竞态条件
with user_accounts_lock:
# 如果user_accounts中没有数据或者请求刷新,则从数据库加载
if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh:
load_user_accounts(user_id)
accounts = user_accounts.get(user_id, {})
return jsonify([acc.to_dict() for acc in accounts.values()])
accounts = user_accounts.get(user_id, {})
return jsonify([acc.to_dict() for acc in accounts.values()])
@app.route('/api/accounts', methods=['POST'])
@@ -2252,20 +2351,17 @@ def serve_static(filename):
# ==================== 管理员VIP管理API ====================
@app.route('/yuyx/api/vip/config', methods=['GET'])
@admin_required # 安全修复:使用统一的装饰器
def get_vip_config_api():
"""获取VIP配置"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
config = database.get_vip_config()
return jsonify(config)
@app.route('/yuyx/api/vip/config', methods=['POST'])
@admin_required # 安全修复:使用统一的装饰器
def set_vip_config_api():
"""设置默认VIP天数"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
data = request.json
days = data.get('default_vip_days', 0)
@@ -2277,11 +2373,9 @@ def set_vip_config_api():
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['POST'])
@admin_required # 安全修复:使用统一的装饰器
def set_user_vip_api(user_id):
"""设置用户VIP"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
data = request.json
days = data.get('days', 30)
@@ -2297,22 +2391,18 @@ def set_user_vip_api(user_id):
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['DELETE'])
@admin_required # 安全修复:使用统一的装饰器
def remove_user_vip_api(user_id):
"""移除用户VIP"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
if database.remove_user_vip(user_id):
return jsonify({"message": "VIP已移除"})
return jsonify({"error": "移除失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['GET'])
@admin_required # 安全修复:使用统一的装饰器
def get_user_vip_info_api(user_id):
"""获取用户VIP信息(管理员)"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
vip_info = database.get_user_vip_info(user_id)
return jsonify(vip_info)