feat: 安全增强 + 删除密码重置申请功能 + 登录提醒开关

安全增强:
- 新增 SSRF、XXE、模板注入、敏感路径探测检测规则
- security/constants.py: 添加新的威胁类型和检测模式
- security/threat_detector.py: 实现新检测逻辑

删除密码重置申请功能:
- 移除 /api/password_resets 相关API
- 删除 password_reset_requests 数据库表
- 前端移除密码重置申请页面和菜单
- 用户只能通过邮��找回密码,未绑定邮箱需联系管理员

登录提醒全局开关:
- email_service.py: 添加 login_alert_enabled 字段
- routes/api_auth.py: 检查开关状态再发送登录提醒
- EmailPage.vue: 添加新设备登录提醒开关

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-27 12:08:36 +08:00
parent 4ba933b001
commit 89f3fd9759
65 changed files with 555 additions and 784 deletions

View File

@@ -910,32 +910,6 @@ def admin_reset_password_route(user_id):
return jsonify({"error": "重置失败,用户不存在"}), 400
@admin_api_bp.route("/password_resets", methods=["GET"])
@admin_required
def get_password_resets_route():
"""获取所有待审核的密码重置申请"""
resets = database.get_pending_password_resets()
return jsonify(resets)
@admin_api_bp.route("/password_resets/<int:request_id>/approve", methods=["POST"])
@admin_required
def approve_password_reset_route(request_id):
"""批准密码重置申请"""
if database.approve_password_reset(request_id):
return jsonify({"message": "密码重置申请已批准"})
return jsonify({"error": "批准失败"}), 400
@admin_api_bp.route("/password_resets/<int:request_id>/reject", methods=["POST"])
@admin_required
def reject_password_reset_route(request_id):
"""拒绝密码重置申请"""
if database.reject_password_reset(request_id):
return jsonify({"message": "密码重置申请已拒绝"})
return jsonify({"error": "拒绝失败"}), 400
@admin_api_bp.route("/feedbacks", methods=["GET"])
@admin_required
def get_all_feedbacks():
@@ -1067,6 +1041,7 @@ def update_email_settings_api():
enabled = data.get("enabled", False)
failover_enabled = data.get("failover_enabled", True)
register_verify_enabled = data.get("register_verify_enabled")
login_alert_enabled = data.get("login_alert_enabled")
base_url = data.get("base_url")
task_notify_enabled = data.get("task_notify_enabled")
@@ -1074,6 +1049,7 @@ def update_email_settings_api():
enabled=enabled,
failover_enabled=failover_enabled,
register_verify_enabled=register_verify_enabled,
login_alert_enabled=login_alert_enabled,
base_url=base_url,
task_notify_enabled=task_notify_enabled,
)

View File

@@ -237,23 +237,31 @@ def forgot_password():
"""发送密码重置邮件"""
data = request.json or {}
email = data.get("email", "").strip().lower()
username = data.get("username", "").strip()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not email:
return jsonify({"error": "请输入邮箱"}), 400
if not email and not username:
return jsonify({"error": "请输入邮箱或用户名"}), 400
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
if username:
is_valid, error_msg = validate_username(username)
if not is_valid:
return jsonify({"error": error_msg}), 400
if email:
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "email")
if not allowed:
return jsonify({"error": error_msg}), 429
allowed, error_msg = check_email_rate_limit(email, "forgot_password")
if not allowed:
return jsonify({"error": error_msg}), 429
if email:
allowed, error_msg = check_email_rate_limit(email, "forgot_password")
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
@@ -266,6 +274,34 @@ def forgot_password():
if not email_settings.get("enabled", False):
return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400
if username:
user = database.get_user_by_username(username)
if user and user.get("status") == "approved":
bound_email = (user.get("email") or "").strip()
if not bound_email:
return (
jsonify(
{
"error": "您尚未绑定邮箱,无法通过邮箱找回密码。请联系管理员重置密码。",
"code": "email_not_bound",
}
),
400,
)
allowed, error_msg = check_email_rate_limit(bound_email, "forgot_password")
if not allowed:
return jsonify({"error": error_msg}), 429
result = email_service.send_password_reset_email(
email=bound_email,
username=user["username"],
user_id=user["id"],
)
if not result["success"]:
logger.error(f"密码重置邮件发送失败: {result['error']}")
return jsonify({"success": True, "message": "如果该账号已绑定邮箱,您将收到密码重置邮件"})
user = database.get_user_by_email(email)
if user and user.get("status") == "approved":
result = email_service.send_password_reset_email(email=email, username=user["username"], user_id=user["id"])
@@ -317,46 +353,6 @@ def reset_password_confirm():
return jsonify({"error": "密码重置失败"}), 500
@api_auth_bp.route("/api/reset_password_request", methods=["POST"])
def request_password_reset():
"""用户申请重置密码(需要审核)"""
data = request.json or {}
username = data.get("username", "").strip()
email = data.get("email", "").strip().lower()
new_password = data.get("new_password", "").strip()
if not username or not new_password:
return jsonify({"error": "用户名和新密码不能为空"}), 400
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({"error": error_msg}), 400
if email:
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "email")
if not allowed:
return jsonify({"error": error_msg}), 429
if email:
allowed, error_msg = check_email_rate_limit(email, "reset_request")
if not allowed:
return jsonify({"error": error_msg}), 429
user = database.get_user_by_username(username)
if user:
if email and user.get("email") != email:
pass
else:
database.create_password_reset_request(user["id"], new_password)
return jsonify({"message": "如果账号存在,密码重置申请已提交,请等待管理员审核"})
@api_auth_bp.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码图片"""
@@ -481,15 +477,19 @@ def login():
load_user_accounts(user["id"])
try:
user_agent = request.headers.get("User-Agent", "")
context = database.record_login_context(user["id"], client_ip, user_agent)
if context and (context.get("new_ip") or context.get("new_device")):
if config.LOGIN_ALERT_ENABLED and should_send_login_alert(user["id"], client_ip):
user_info = database.get_user_by_id(user["id"]) or {}
if user_info.get("email") and user_info.get("email_verified"):
if database.get_user_email_notify(user["id"]):
email_service.send_security_alert_email(
email=user_info.get("email"),
user_agent = request.headers.get("User-Agent", "")
context = database.record_login_context(user["id"], client_ip, user_agent)
if context and (context.get("new_ip") or context.get("new_device")):
if (
config.LOGIN_ALERT_ENABLED
and should_send_login_alert(user["id"], client_ip)
and email_service.get_email_settings().get("login_alert_enabled", True)
):
user_info = database.get_user_by_id(user["id"]) or {}
if user_info.get("email") and user_info.get("email_verified"):
if database.get_user_email_notify(user["id"]):
email_service.send_security_alert_email(
email=user_info.get("email"),
username=user_info.get("username") or username,
ip_address=client_ip,
user_agent=user_agent,