安全修复清单: 1. 验证码改为图片方式返回,防止明文泄露 2. CORS配置从环境变量读取,不再使用通配符"*" 3. VIP API添加@admin_required装饰器,统一认证 4. 用户登录统一错误消息,防止用户枚举 5. IP限流不再信任X-Forwarded-For头,防止伪造绕过 6. 密码强度要求提升(8位+字母+数字) 7. 日志不���记录完整session/cookie内容,防止敏感信息泄露 8. XSS防护:日志输出和Bug反馈内容转义HTML 9. SQL注入防护:LIKE查询参数转义 10. 路径遍历防护:截图目录白名单验证 11. 验证码重放防护:验证前删除验证码 12. 数据库连接池健康检查 13. 正则DoS防护:限制数字匹配长度 14. Account类密码私有化,__repr__不暴露密码 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
367 lines
11 KiB
Python
Executable File
367 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
应用工具模块
|
||
提取重复的业务逻辑
|
||
"""
|
||
|
||
from typing import Dict, Any, Optional, Tuple
|
||
from flask import session, jsonify
|
||
from app_logger import get_logger, audit_logger
|
||
from app_security import get_client_ip
|
||
import database
|
||
|
||
logger = get_logger('app_utils')
|
||
|
||
|
||
class ValidationError(Exception):
|
||
"""验证错误异常"""
|
||
pass
|
||
|
||
|
||
def verify_user_file_permission(user_id: int, filename: str) -> Tuple[bool, Optional[str]]:
|
||
"""
|
||
验证用户文件访问权限
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
filename: 文件名
|
||
|
||
Returns:
|
||
(是否有权限, 错误消息)
|
||
"""
|
||
# 获取用户信息
|
||
user = database.get_user_by_id(user_id)
|
||
if not user:
|
||
return False, "用户不存在"
|
||
|
||
username = user['username']
|
||
|
||
# 检查文件名是否以用户名开头
|
||
if not filename.startswith(f"{username}_"):
|
||
logger.warning(f"用户 {username} (ID:{user_id}) 尝试访问未授权文件: {filename}")
|
||
return False, "无权访问此文件"
|
||
|
||
return True, None
|
||
|
||
|
||
def log_task_event(account_id: int, status: str, message: str,
|
||
browse_type: Optional[str] = None,
|
||
screenshot_path: Optional[str] = None) -> bool:
|
||
"""
|
||
记录任务日志(统一接口)
|
||
|
||
Args:
|
||
account_id: 账号ID
|
||
status: 状态(running/completed/failed/stopped)
|
||
message: 消息
|
||
browse_type: 浏览类型
|
||
screenshot_path: 截图路径
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
try:
|
||
return database.create_task_log(
|
||
account_id=account_id,
|
||
status=status,
|
||
message=message,
|
||
browse_type=browse_type,
|
||
screenshot_path=screenshot_path
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"记录任务日志失败: {e}", exc_info=True)
|
||
return False
|
||
|
||
|
||
def update_account_status(account_id: int, status: str,
|
||
error_message: Optional[str] = None) -> bool:
|
||
"""
|
||
更新账号状态(统一接口)
|
||
|
||
Args:
|
||
account_id: 账号ID
|
||
status: 状态(idle/running/error/stopped)
|
||
error_message: 错误消息(仅当status=error时)
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
try:
|
||
return database.update_account_status(
|
||
account_id=account_id,
|
||
status=status,
|
||
error_message=error_message
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"更新账号状态失败 (account_id={account_id}): {e}", exc_info=True)
|
||
return False
|
||
|
||
|
||
def get_or_create_config_cache() -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取或创建系统配置缓存
|
||
|
||
缓存存储在session中,避免重复查询数据库
|
||
|
||
Returns:
|
||
配置字典,失败返回None
|
||
"""
|
||
# 尝试从session获取缓存
|
||
if '_system_config' in session:
|
||
return session['_system_config']
|
||
|
||
# 从数据库加载
|
||
try:
|
||
config = database.get_system_config()
|
||
if config:
|
||
# 存入session缓存
|
||
session['_system_config'] = config
|
||
return config
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取系统配置失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
def clear_config_cache():
|
||
"""清除配置缓存(配置变更时调用)"""
|
||
if '_system_config' in session:
|
||
del session['_system_config']
|
||
logger.debug("已清除系统配置缓存")
|
||
|
||
|
||
def safe_close_browser(automation_obj, account_id: int):
|
||
"""
|
||
安全关闭浏览器(统一错误处理)
|
||
|
||
Args:
|
||
automation_obj: PlaywrightAutomation对象
|
||
account_id: 账号ID
|
||
"""
|
||
if automation_obj:
|
||
try:
|
||
automation_obj.close()
|
||
logger.info(f"账号 {account_id} 的浏览器已关闭")
|
||
except Exception as e:
|
||
logger.error(f"关闭账号 {account_id} 的浏览器失败: {e}", exc_info=True)
|
||
|
||
|
||
def format_error_response(error: str, status_code: int = 400,
|
||
need_captcha: bool = False,
|
||
extra_data: Optional[Dict] = None) -> Tuple[Any, int]:
|
||
"""
|
||
格式化错误响应(统一接口)
|
||
|
||
Args:
|
||
error: 错误消息
|
||
status_code: HTTP状态码
|
||
need_captcha: 是否需要验证码
|
||
extra_data: 额外数据
|
||
|
||
Returns:
|
||
(jsonify响应, 状态码)
|
||
"""
|
||
response_data = {"error": error}
|
||
|
||
if need_captcha:
|
||
response_data["need_captcha"] = True
|
||
|
||
if extra_data:
|
||
response_data.update(extra_data)
|
||
|
||
return jsonify(response_data), status_code
|
||
|
||
|
||
def format_success_response(message: str = "操作成功",
|
||
extra_data: Optional[Dict] = None) -> Any:
|
||
"""
|
||
格式化成功响应(统一接口)
|
||
|
||
Args:
|
||
message: 成功消息
|
||
extra_data: 额外数据
|
||
|
||
Returns:
|
||
jsonify响应
|
||
"""
|
||
response_data = {"success": True, "message": message}
|
||
|
||
if extra_data:
|
||
response_data.update(extra_data)
|
||
|
||
return jsonify(response_data)
|
||
|
||
|
||
def log_user_action(action: str, user_id: int, username: str,
|
||
success: bool, details: Optional[str] = None):
|
||
"""
|
||
记录用户操作到审计日志(统一接口)
|
||
|
||
Args:
|
||
action: 操作类型(login/register/logout等)
|
||
user_id: 用户ID
|
||
username: 用户名
|
||
success: 是否成功
|
||
details: 详细信息
|
||
"""
|
||
ip = get_client_ip()
|
||
|
||
if action == 'login':
|
||
audit_logger.log_user_login(user_id, username, ip, success)
|
||
elif action == 'logout':
|
||
audit_logger.log_user_logout(user_id, username, ip)
|
||
elif action == 'register':
|
||
audit_logger.log_user_created(user_id, username, created_by='self')
|
||
|
||
if details:
|
||
logger.info(f"用户操作: {action}, 用户={username}, 成功={success}, 详情={details}")
|
||
|
||
|
||
def validate_pagination(page: Any, page_size: Any,
|
||
max_page_size: int = 100) -> Tuple[int, int, Optional[str]]:
|
||
"""
|
||
验证分页参数
|
||
|
||
Args:
|
||
page: 页码
|
||
page_size: 每页大小
|
||
max_page_size: 最大每页大小
|
||
|
||
Returns:
|
||
(页码, 每页大小, 错误消息)
|
||
"""
|
||
try:
|
||
page = int(page) if page else 1
|
||
page_size = int(page_size) if page_size else 20
|
||
except (ValueError, TypeError):
|
||
return 1, 20, "无效的分页参数"
|
||
|
||
if page < 1:
|
||
return 1, 20, "页码必须大于0"
|
||
|
||
if page_size < 1 or page_size > max_page_size:
|
||
return page, 20, f"每页大小必须在1-{max_page_size}之间"
|
||
|
||
return page, page_size, None
|
||
|
||
|
||
def check_user_ownership(user_id: int, resource_type: str,
|
||
resource_id: int) -> Tuple[bool, Optional[str]]:
|
||
"""
|
||
检查用户是否拥有资源
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
resource_type: 资源类型(account/task等)
|
||
resource_id: 资源ID
|
||
|
||
Returns:
|
||
(是否拥有, 错误消息)
|
||
"""
|
||
try:
|
||
if resource_type == 'account':
|
||
account = database.get_account_by_id(resource_id)
|
||
if not account:
|
||
return False, "账号不存在"
|
||
if account['user_id'] != user_id:
|
||
return False, "无权访问此账号"
|
||
return True, None
|
||
|
||
elif resource_type == 'task':
|
||
# 通过account查询所属用户
|
||
# 这里需要根据实际数据库结构实现
|
||
pass
|
||
|
||
return False, "不支持的资源类型"
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查资源所有权失败: {e}", exc_info=True)
|
||
return False, "系统错误"
|
||
|
||
|
||
def verify_and_consume_captcha(session_id: str, code: str, captcha_storage: dict, max_attempts: int = 5) -> Tuple[bool, str]:
|
||
"""
|
||
验证并消费验证码(安全增强版)
|
||
|
||
安全特性:
|
||
- 先删除验证码再验证,防止重放攻击
|
||
- 异常情况下也确保验证码被删除
|
||
|
||
Args:
|
||
session_id: 验证码会话ID
|
||
code: 用户输入的验证码
|
||
captcha_storage: 验证码存储字典
|
||
max_attempts: 最大尝试次数,默认5次
|
||
|
||
Returns:
|
||
Tuple[bool, str]: (是否成功, 消息)
|
||
- 成功时返回 (True, "验证成功")
|
||
- 失败时返回 (False, 错误消息)
|
||
|
||
Example:
|
||
success, message = verify_and_consume_captcha(
|
||
captcha_session,
|
||
captcha_code,
|
||
captcha_storage,
|
||
max_attempts=5
|
||
)
|
||
if not success:
|
||
return jsonify({"error": message}), 400
|
||
"""
|
||
import time
|
||
|
||
# 安全修复:先取出并删除验证码,无论验证是否成功都不能重用
|
||
captcha_data = captcha_storage.pop(session_id, None)
|
||
|
||
# 检查验证码是否存在
|
||
if captcha_data is None:
|
||
return False, "验证码已过期或不存在,请重新获取"
|
||
|
||
try:
|
||
# 检查过期时间
|
||
if captcha_data["expire_time"] < time.time():
|
||
return False, "验证码已过期,请重新获取"
|
||
|
||
# 检查尝试次数
|
||
if captcha_data.get("failed_attempts", 0) >= max_attempts:
|
||
return False, f"验证码错误次数过多({max_attempts}次),请重新获取"
|
||
|
||
# 验证代码(不区分大小写)
|
||
if captcha_data["code"].lower() != code.lower():
|
||
# 验证失败,增加失败计数后放回(允许继续尝试)
|
||
captcha_data["failed_attempts"] = captcha_data.get("failed_attempts", 0) + 1
|
||
# 只有未超过最大尝试次数才放回
|
||
if captcha_data["failed_attempts"] < max_attempts:
|
||
captcha_storage[session_id] = captcha_data
|
||
return False, "验证码错误"
|
||
|
||
# 验证成功,验证码已被删除,不会被重用
|
||
return True, "验证成功"
|
||
except Exception as e:
|
||
# 异常情况下确保验证码不会被重用(已在函数开头删除)
|
||
logger.error(f"验证码验证异常: {e}")
|
||
return False, "验证码验证失败,请重新获取"
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 测试代码
|
||
print("测试应用工具模块...")
|
||
print("=" * 60)
|
||
|
||
# 测试分页验证
|
||
print("\n1. 测试分页验证:")
|
||
page, page_size, error = validate_pagination("2", "50")
|
||
print(f" 页码={page}, 每页={page_size}, 错误={error}")
|
||
|
||
page, page_size, error = validate_pagination("invalid", "50")
|
||
print(f" 无效输入: 页码={page}, 每页={page_size}, 错误={error}")
|
||
|
||
# 测试响应格式化
|
||
print("\n2. 测试响应格式化:")
|
||
print(f" 错误响应: {format_error_response('测试错误', need_captcha=True)}")
|
||
print(f" 成功响应: {format_success_response('测试成功', {'data': [1, 2, 3]})}")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("✓ 工具模块加载成功!")
|