792 lines
30 KiB
Python
792 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import json
|
||
import random
|
||
import secrets
|
||
import threading
|
||
import time
|
||
import uuid
|
||
from io import BytesIO
|
||
|
||
import database
|
||
import email_service
|
||
from app_config import get_config
|
||
from app_logger import get_logger
|
||
from app_security import get_rate_limit_ip, require_ip_not_locked, validate_email, validate_password, validate_username
|
||
from flask import Blueprint, jsonify, request, session
|
||
from flask_login import login_required, login_user, logout_user
|
||
from routes.pages import render_app_spa_or_legacy
|
||
from services.accounts_service import load_user_accounts
|
||
from services.models import User
|
||
from services.passkeys import (
|
||
encode_credential_id,
|
||
get_expected_origins,
|
||
get_rp_id,
|
||
is_challenge_valid,
|
||
make_authentication_options,
|
||
normalize_device_name,
|
||
verify_authentication,
|
||
)
|
||
from services.state import (
|
||
check_ip_request_rate,
|
||
check_email_rate_limit,
|
||
check_login_ip_user_locked,
|
||
check_login_rate_limits,
|
||
check_login_captcha_required,
|
||
clear_login_failures,
|
||
get_login_failure_delay_seconds,
|
||
record_failed_captcha,
|
||
record_login_failure,
|
||
record_login_username_attempt,
|
||
safe_cleanup_expired_captcha,
|
||
safe_delete_captcha,
|
||
safe_set_captcha,
|
||
safe_verify_and_consume_captcha,
|
||
should_send_login_alert,
|
||
)
|
||
|
||
logger = get_logger("app")
|
||
config = get_config()
|
||
|
||
api_auth_bp = Blueprint("api_auth", __name__)
|
||
|
||
_CAPTCHA_FONT_PATHS = [
|
||
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
|
||
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
|
||
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
|
||
]
|
||
_CAPTCHA_FONT = None
|
||
_CAPTCHA_FONT_LOCK = threading.Lock()
|
||
_USER_PASSKEY_LOGIN_SESSION_KEY = "user_passkey_login_state"
|
||
|
||
|
||
def _get_json_payload() -> dict:
|
||
data = request.get_json(silent=True)
|
||
return data if isinstance(data, dict) else {}
|
||
|
||
|
||
def _load_captcha_font(image_font_module):
|
||
global _CAPTCHA_FONT
|
||
|
||
if _CAPTCHA_FONT is not None:
|
||
return _CAPTCHA_FONT
|
||
|
||
with _CAPTCHA_FONT_LOCK:
|
||
if _CAPTCHA_FONT is not None:
|
||
return _CAPTCHA_FONT
|
||
|
||
for font_path in _CAPTCHA_FONT_PATHS:
|
||
try:
|
||
_CAPTCHA_FONT = image_font_module.truetype(font_path, 42)
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
if _CAPTCHA_FONT is None:
|
||
_CAPTCHA_FONT = image_font_module.load_default()
|
||
|
||
return _CAPTCHA_FONT
|
||
|
||
|
||
def _generate_captcha_image_data_uri(code: str) -> str:
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
width, height = 160, 60
|
||
image = Image.new("RGB", (width, height), color=(255, 255, 255))
|
||
draw = ImageDraw.Draw(image)
|
||
|
||
for _ in range(6):
|
||
x1 = random.randint(0, width)
|
||
y1 = random.randint(0, height)
|
||
x2 = random.randint(0, width)
|
||
y2 = random.randint(0, height)
|
||
draw.line(
|
||
[(x1, y1), (x2, y2)],
|
||
fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)),
|
||
width=1,
|
||
)
|
||
|
||
for _ in range(80):
|
||
x = random.randint(0, width)
|
||
y = random.randint(0, height)
|
||
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
|
||
|
||
font = _load_captcha_font(ImageFont)
|
||
for i, char in enumerate(code):
|
||
x = 12 + i * 35 + random.randint(-3, 3)
|
||
y = random.randint(5, 12)
|
||
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
|
||
draw.text((x, y), char, font=font, fill=color)
|
||
|
||
buffer = BytesIO()
|
||
image.save(buffer, format="PNG")
|
||
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||
return f"data:image/png;base64,{img_base64}"
|
||
|
||
|
||
def _with_vip_suffix(message: str, auto_approve_enabled: bool, auto_approve_vip_days: int) -> str:
|
||
if auto_approve_enabled and auto_approve_vip_days > 0:
|
||
return f"{message},赠送{auto_approve_vip_days}天VIP"
|
||
return message
|
||
|
||
|
||
def _verify_common_captcha(client_ip: str, captcha_session: str, captcha_code: str):
|
||
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
|
||
if success:
|
||
return True, None
|
||
|
||
is_locked = record_failed_captcha(client_ip)
|
||
if is_locked:
|
||
return False, (jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429)
|
||
return False, (jsonify({"error": message}), 400)
|
||
|
||
|
||
def _verify_login_captcha_if_needed(
|
||
*,
|
||
captcha_required: bool,
|
||
captcha_session: str,
|
||
captcha_code: str,
|
||
client_ip: str,
|
||
username_key: str,
|
||
):
|
||
if not captcha_required:
|
||
return True, None
|
||
|
||
if not captcha_session or not captcha_code:
|
||
return False, (jsonify({"error": "请填写验证码", "need_captcha": True}), 400)
|
||
|
||
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
|
||
if success:
|
||
return True, None
|
||
|
||
record_login_failure(client_ip, username_key)
|
||
return False, (jsonify({"error": message, "need_captcha": True}), 400)
|
||
|
||
|
||
def _send_password_reset_email_if_possible(email: str, username: str, user_id: int) -> None:
|
||
result = email_service.send_password_reset_email(email=email, username=username, user_id=user_id)
|
||
if not result["success"]:
|
||
logger.error(f"密码重置邮件发送失败: {result['error']}")
|
||
|
||
|
||
def _send_login_security_alert_if_needed(user: dict, username: str, client_ip: str) -> None:
|
||
try:
|
||
user_agent = request.headers.get("User-Agent", "")
|
||
context = database.record_login_context(user["id"], client_ip, user_agent)
|
||
if not context or (not context.get("new_ip") and not context.get("new_device")):
|
||
return
|
||
|
||
if not config.LOGIN_ALERT_ENABLED:
|
||
return
|
||
if not should_send_login_alert(user["id"], client_ip):
|
||
return
|
||
if not email_service.get_email_settings().get("login_alert_enabled", True):
|
||
return
|
||
|
||
user_info = database.get_user_by_id(user["id"]) or {}
|
||
if (not user_info.get("email")) or (not user_info.get("email_verified")):
|
||
return
|
||
if not database.get_user_email_notify(user["id"]):
|
||
return
|
||
|
||
email_service.send_security_alert_email(
|
||
email=user_info.get("email"),
|
||
username=user_info.get("username") or username,
|
||
ip_address=client_ip,
|
||
user_agent=user_agent,
|
||
new_ip=context.get("new_ip", False),
|
||
new_device=context.get("new_device", False),
|
||
user_id=user["id"],
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"发送登录安全提醒失败: user_id={user.get('id')}, error={e}")
|
||
|
||
|
||
def _parse_credential_payload(data: dict) -> dict | None:
|
||
credential = data.get("credential")
|
||
if isinstance(credential, dict):
|
||
return credential
|
||
if isinstance(credential, str):
|
||
try:
|
||
parsed = json.loads(credential)
|
||
return parsed if isinstance(parsed, dict) else None
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
|
||
@api_auth_bp.route("/api/register", methods=["POST"])
|
||
@require_ip_not_locked
|
||
def register():
|
||
"""用户注册"""
|
||
data = _get_json_payload()
|
||
username = data.get("username", "").strip()
|
||
password = data.get("password", "").strip()
|
||
email = data.get("email", "").strip().lower()
|
||
captcha_session = data.get("captcha_session", "")
|
||
captcha_code = data.get("captcha", "").strip()
|
||
|
||
if not username or not password:
|
||
return jsonify({"error": "用户名和密码不能为空"}), 400
|
||
|
||
is_valid, error_msg = validate_username(username)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
is_valid, error_msg = validate_password(password)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
client_ip = get_rate_limit_ip()
|
||
allowed, error_msg = check_ip_request_rate(client_ip, "register")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
captcha_ok, captcha_error_response = _verify_common_captcha(client_ip, captcha_session, captcha_code)
|
||
if not captcha_ok:
|
||
return captcha_error_response
|
||
|
||
email_settings = email_service.get_email_settings()
|
||
email_verify_enabled = email_settings.get("register_verify_enabled", False) and email_settings.get("enabled", False)
|
||
|
||
if email_verify_enabled and not email:
|
||
return jsonify({"error": "启用邮箱验证后,邮箱为必填项"}), 400
|
||
|
||
if email:
|
||
is_valid, error_msg = validate_email(email)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
system_config = database.get_system_config()
|
||
auto_approve_enabled = system_config.get("auto_approve_enabled", 0) == 1
|
||
auto_approve_hourly_limit = system_config.get("auto_approve_hourly_limit", 10)
|
||
auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7)
|
||
|
||
hourly_limit = int(auto_approve_hourly_limit) if isinstance(auto_approve_hourly_limit, int) else 10
|
||
if hourly_limit > 0:
|
||
hourly_count = database.get_hourly_registration_count()
|
||
if hourly_count >= hourly_limit:
|
||
return jsonify({"error": f"注册人数过多,请稍后再试(每小时限制{hourly_limit}人)"}), 429
|
||
|
||
user_id = database.create_user(username, password, email)
|
||
if user_id:
|
||
if auto_approve_enabled:
|
||
if auto_approve_vip_days > 0:
|
||
database.set_user_vip(user_id, auto_approve_vip_days)
|
||
|
||
if email_verify_enabled and email:
|
||
result = email_service.send_register_verification_email(email=email, username=username, user_id=user_id)
|
||
if result["success"]:
|
||
message = _with_vip_suffix(
|
||
"注册成功!验证邮件已发送(可直接登录,建议完成邮箱验证)",
|
||
auto_approve_enabled,
|
||
auto_approve_vip_days,
|
||
)
|
||
return jsonify({"success": True, "message": message, "need_verify": True})
|
||
|
||
logger.error(f"注册验证邮件发送失败: {result['error']}")
|
||
message = _with_vip_suffix(
|
||
f"注册成功,但验证邮件发送失败({result['error']})。你仍可直接登录",
|
||
auto_approve_enabled,
|
||
auto_approve_vip_days,
|
||
)
|
||
return jsonify({"success": True, "message": message, "need_verify": True})
|
||
|
||
message = _with_vip_suffix("注册成功!可直接登录", auto_approve_enabled, auto_approve_vip_days)
|
||
return jsonify({"success": True, "message": message})
|
||
return jsonify({"error": "用户名已存在"}), 400
|
||
|
||
|
||
@api_auth_bp.route("/api/verify-email/<token>")
|
||
def verify_email(token):
|
||
"""验证邮箱 - 用户点击邮件中的链接"""
|
||
result = email_service.verify_email_token(token, email_service.EMAIL_TYPE_REGISTER, consume=False)
|
||
|
||
if result:
|
||
token_id = result["token_id"]
|
||
user_id = result["user_id"]
|
||
|
||
if not database.approve_user(user_id):
|
||
logger.error(f"用户邮箱验证失败: 用户审核更新失败 user_id={user_id}")
|
||
error_message = "验证处理失败,请稍后重试"
|
||
spa_initial_state = {
|
||
"page": "verify_result",
|
||
"success": False,
|
||
"title": "验证失败",
|
||
"error_message": error_message,
|
||
"primary_label": "返回登录",
|
||
"primary_url": "/login",
|
||
}
|
||
return render_app_spa_or_legacy(
|
||
"verify_failed.html",
|
||
legacy_context={"error_message": error_message},
|
||
spa_initial_state=spa_initial_state,
|
||
)
|
||
|
||
system_config = database.get_system_config()
|
||
auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7)
|
||
if auto_approve_vip_days > 0:
|
||
database.set_user_vip(user_id, auto_approve_vip_days)
|
||
|
||
if not email_service.consume_email_token(token_id):
|
||
logger.warning(f"用户邮箱验证后Token消费失败: user_id={user_id}")
|
||
|
||
logger.info(f"用户邮箱验证成功: user_id={user_id}")
|
||
spa_initial_state = {
|
||
"page": "verify_result",
|
||
"success": True,
|
||
"title": "验证成功",
|
||
"message": "您的邮箱已验证成功!账号已激活,现在可以登录使用了。",
|
||
"primary_label": "立即登录",
|
||
"primary_url": "/login",
|
||
"redirect_url": "/login",
|
||
"redirect_seconds": 5,
|
||
}
|
||
return render_app_spa_or_legacy("verify_success.html", spa_initial_state=spa_initial_state)
|
||
|
||
logger.warning("邮箱验证失败: token无效或已过期")
|
||
error_message = "验证链接无效或已过期,请重新注册或申请重发验证邮件"
|
||
spa_initial_state = {
|
||
"page": "verify_result",
|
||
"success": False,
|
||
"title": "验证失败",
|
||
"error_message": error_message,
|
||
"primary_label": "重新注册",
|
||
"primary_url": "/register",
|
||
"secondary_label": "返回登录",
|
||
"secondary_url": "/login",
|
||
}
|
||
return render_app_spa_or_legacy(
|
||
"verify_failed.html",
|
||
legacy_context={"error_message": error_message},
|
||
spa_initial_state=spa_initial_state,
|
||
)
|
||
|
||
|
||
@api_auth_bp.route("/api/resend-verify-email", methods=["POST"])
|
||
@require_ip_not_locked
|
||
def resend_verify_email():
|
||
"""重发验证邮件"""
|
||
data = _get_json_payload()
|
||
email = data.get("email", "").strip().lower()
|
||
captcha_session = data.get("captcha_session", "")
|
||
captcha_code = data.get("captcha", "").strip()
|
||
|
||
if not email:
|
||
return jsonify({"error": "请输入邮箱"}), 400
|
||
|
||
is_valid, error_msg = validate_email(email)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
client_ip = get_rate_limit_ip()
|
||
allowed, error_msg = check_ip_request_rate(client_ip, "email")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
allowed, error_msg = check_email_rate_limit(email, "resend_verify")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
captcha_ok, captcha_error_response = _verify_common_captcha(client_ip, captcha_session, captcha_code)
|
||
if not captcha_ok:
|
||
return captcha_error_response
|
||
|
||
user = database.get_user_by_email(email)
|
||
if not user:
|
||
return jsonify({"error": "该邮箱未注册"}), 404
|
||
|
||
if user["status"] == "approved":
|
||
return jsonify({"error": "该账号已验证通过,请直接登录"}), 400
|
||
|
||
result = email_service.resend_register_verification_email(user_id=user["id"], email=email, username=user["username"])
|
||
|
||
if result["success"]:
|
||
return jsonify({"success": True, "message": "验证邮件已重新发送,请查收"})
|
||
return jsonify({"error": result["error"]}), 500
|
||
|
||
|
||
@api_auth_bp.route("/api/email/verify-status")
|
||
def get_email_verify_status():
|
||
"""获取邮箱验证功能状态(公开API)"""
|
||
try:
|
||
settings = email_service.get_email_settings()
|
||
return jsonify(
|
||
{
|
||
"email_enabled": settings.get("enabled", False),
|
||
"register_verify_enabled": settings.get("register_verify_enabled", False) and settings.get("enabled", False),
|
||
}
|
||
)
|
||
except Exception:
|
||
return jsonify({"email_enabled": False, "register_verify_enabled": False})
|
||
|
||
|
||
@api_auth_bp.route("/api/forgot-password", methods=["POST"])
|
||
@require_ip_not_locked
|
||
def forgot_password():
|
||
"""发送密码重置邮件"""
|
||
data = _get_json_payload()
|
||
email = data.get("email", "").strip().lower()
|
||
username = data.get("username", "").strip()
|
||
captcha_session = data.get("captcha_session", "")
|
||
captcha_code = data.get("captcha", "").strip()
|
||
|
||
if not email and not username:
|
||
return jsonify({"error": "请输入邮箱或用户名"}), 400
|
||
|
||
if username:
|
||
is_valid, error_msg = validate_username(username)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
if email:
|
||
is_valid, error_msg = validate_email(email)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
client_ip = get_rate_limit_ip()
|
||
allowed, error_msg = check_ip_request_rate(client_ip, "email")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
if email:
|
||
allowed, error_msg = check_email_rate_limit(email, "forgot_password")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
captcha_ok, captcha_error_response = _verify_common_captcha(client_ip, captcha_session, captcha_code)
|
||
if not captcha_ok:
|
||
return captcha_error_response
|
||
|
||
email_settings = email_service.get_email_settings()
|
||
if not email_settings.get("enabled", False):
|
||
return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400
|
||
|
||
if username:
|
||
user = database.get_user_by_username(username)
|
||
if user and user.get("status") == "approved":
|
||
bound_email = (user.get("email") or "").strip()
|
||
if not bound_email:
|
||
return (
|
||
jsonify(
|
||
{
|
||
"error": "您尚未绑定邮箱,无法通过邮箱找回密码。请联系管理员重置密码。",
|
||
"code": "email_not_bound",
|
||
}
|
||
),
|
||
400,
|
||
)
|
||
|
||
allowed, error_msg = check_email_rate_limit(bound_email, "forgot_password")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
_send_password_reset_email_if_possible(
|
||
email=bound_email,
|
||
username=user["username"],
|
||
user_id=user["id"],
|
||
)
|
||
return jsonify({"success": True, "message": "如果该账号已绑定邮箱,您将收到密码重置邮件"})
|
||
|
||
user = database.get_user_by_email(email)
|
||
if user and user.get("status") == "approved":
|
||
_send_password_reset_email_if_possible(email=email, username=user["username"], user_id=user["id"])
|
||
|
||
return jsonify({"success": True, "message": "如果该邮箱已注册,您将收到密码重置邮件"})
|
||
|
||
|
||
@api_auth_bp.route("/reset-password/<token>")
|
||
def reset_password_page(token):
|
||
"""密码重置页面"""
|
||
result = email_service.verify_password_reset_token(token)
|
||
valid = bool(result)
|
||
error_message = "" if valid else "重置链接无效或已过期,请重新申请密码重置"
|
||
|
||
legacy_context = {"token": token, "valid": valid, "error_message": error_message}
|
||
spa_initial_state = {"page": "reset_password", "token": token, "valid": valid, "error_message": error_message}
|
||
|
||
return render_app_spa_or_legacy(
|
||
"reset_password.html",
|
||
legacy_context=legacy_context,
|
||
spa_initial_state=spa_initial_state,
|
||
)
|
||
|
||
|
||
@api_auth_bp.route("/api/reset-password-confirm", methods=["POST"])
|
||
def reset_password_confirm():
|
||
"""确认密码重置"""
|
||
data = _get_json_payload()
|
||
token = data.get("token", "").strip()
|
||
new_password = data.get("new_password", "").strip()
|
||
|
||
if not token or not new_password:
|
||
return jsonify({"error": "参数不完整"}), 400
|
||
|
||
is_valid, error_msg = validate_password(new_password)
|
||
if not is_valid:
|
||
return jsonify({"error": error_msg}), 400
|
||
|
||
result = email_service.confirm_password_reset(token)
|
||
if not result:
|
||
return jsonify({"error": "重置链接无效或已过期"}), 400
|
||
|
||
user_id = result["user_id"]
|
||
if database.admin_reset_user_password(user_id, new_password):
|
||
logger.info(f"用户密码重置成功: user_id={user_id}")
|
||
return jsonify({"success": True, "message": "密码重置成功"})
|
||
return jsonify({"error": "密码重置失败"}), 500
|
||
|
||
|
||
@api_auth_bp.route("/api/generate_captcha", methods=["POST"])
|
||
def generate_captcha():
|
||
"""生成4位数字验证码图片"""
|
||
client_ip = get_rate_limit_ip()
|
||
allowed, error_msg = check_ip_request_rate(client_ip, "login")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
session_id = str(uuid.uuid4())
|
||
code = "".join(str(secrets.randbelow(10)) for _ in range(4))
|
||
|
||
safe_set_captcha(session_id, {"code": code, "expire_time": time.time() + 300, "failed_attempts": 0})
|
||
safe_cleanup_expired_captcha()
|
||
|
||
try:
|
||
captcha_image = _generate_captcha_image_data_uri(code)
|
||
return jsonify({"session_id": session_id, "captcha_image": captcha_image})
|
||
except ImportError as e:
|
||
logger.error(f"PIL库未安装,验证码功能不可用: {e}")
|
||
safe_delete_captcha(session_id)
|
||
return jsonify({"error": "验证码服务暂不可用,请联系管理员安装PIL库"}), 503
|
||
|
||
|
||
@api_auth_bp.route("/api/passkeys/login/options", methods=["POST"])
|
||
@require_ip_not_locked
|
||
def user_passkey_login_options():
|
||
"""用户 Passkey 登录:获取 assertion challenge。"""
|
||
data = _get_json_payload()
|
||
username = str(data.get("username", "") or "").strip()
|
||
client_ip = get_rate_limit_ip()
|
||
mode = "named" if username else "discoverable"
|
||
username_key = f"passkey:{username}" if username else "passkey:discoverable"
|
||
|
||
is_locked, remaining = check_login_ip_user_locked(client_ip, username_key)
|
||
if is_locked:
|
||
wait_hint = f"{remaining // 60 + 1}分钟" if remaining >= 60 else f"{remaining}秒"
|
||
return jsonify({"error": f"账号短时锁定,请{wait_hint}后再试"}), 429
|
||
|
||
allowed, error_msg = check_ip_request_rate(client_ip, "login")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
allowed, error_msg = check_login_rate_limits(client_ip, username_key)
|
||
if not allowed:
|
||
return jsonify({"error": error_msg}), 429
|
||
|
||
user_id = 0
|
||
allow_credential_ids = []
|
||
if mode == "named":
|
||
user = database.get_user_by_username(username)
|
||
if not user or user.get("status") != "approved":
|
||
record_login_failure(client_ip, username_key)
|
||
return jsonify({"error": "账号或Passkey不可用"}), 400
|
||
|
||
user_id = int(user["id"])
|
||
passkeys = database.list_passkeys("user", user_id)
|
||
if not passkeys:
|
||
record_login_failure(client_ip, username_key)
|
||
return jsonify({"error": "该账号尚未绑定Passkey"}), 400
|
||
allow_credential_ids = [str(item.get("credential_id") or "").strip() for item in passkeys if item.get("credential_id")]
|
||
|
||
try:
|
||
rp_id = get_rp_id(request)
|
||
expected_origins = get_expected_origins(request)
|
||
except Exception as e:
|
||
logger.warning(f"[passkey] 生成登录 challenge 失败(mode={mode}, username={username or '-'}) : {e}")
|
||
return jsonify({"error": "Passkey配置异常,请联系管理员"}), 500
|
||
|
||
options = make_authentication_options(rp_id=rp_id, allow_credential_ids=allow_credential_ids)
|
||
challenge = str(options.get("challenge") or "").strip()
|
||
if not challenge:
|
||
return jsonify({"error": "生成Passkey挑战失败"}), 500
|
||
|
||
session[_USER_PASSKEY_LOGIN_SESSION_KEY] = {
|
||
"mode": mode,
|
||
"username": username,
|
||
"user_id": int(user_id),
|
||
"challenge": challenge,
|
||
"rp_id": rp_id,
|
||
"expected_origins": expected_origins,
|
||
"username_key": username_key,
|
||
"created_at": time.time(),
|
||
}
|
||
session.modified = True
|
||
return jsonify({"publicKey": options})
|
||
|
||
|
||
@api_auth_bp.route("/api/passkeys/login/verify", methods=["POST"])
|
||
@require_ip_not_locked
|
||
def user_passkey_login_verify():
|
||
"""用户 Passkey 登录:校验 assertion 并登录。"""
|
||
data = _get_json_payload()
|
||
request_username = str(data.get("username", "") or "").strip()
|
||
credential = _parse_credential_payload(data)
|
||
if not credential:
|
||
return jsonify({"error": "Passkey参数缺失"}), 400
|
||
|
||
state = session.get(_USER_PASSKEY_LOGIN_SESSION_KEY) or {}
|
||
if not state:
|
||
return jsonify({"error": "Passkey挑战不存在或已过期,请重试"}), 400
|
||
if not is_challenge_valid(state.get("created_at")):
|
||
session.pop(_USER_PASSKEY_LOGIN_SESSION_KEY, None)
|
||
return jsonify({"error": "Passkey挑战已过期,请重试"}), 400
|
||
|
||
mode = str(state.get("mode") or "named")
|
||
if mode not in {"named", "discoverable"}:
|
||
session.pop(_USER_PASSKEY_LOGIN_SESSION_KEY, None)
|
||
return jsonify({"error": "Passkey状态异常,请重试"}), 400
|
||
|
||
expected_username = str(state.get("username") or "").strip()
|
||
username = expected_username
|
||
if mode == "named":
|
||
if not expected_username:
|
||
session.pop(_USER_PASSKEY_LOGIN_SESSION_KEY, None)
|
||
return jsonify({"error": "Passkey状态异常,请重试"}), 400
|
||
if request_username and request_username != expected_username:
|
||
return jsonify({"error": "用户名与挑战不匹配,请重试"}), 400
|
||
else:
|
||
username = request_username
|
||
|
||
client_ip = get_rate_limit_ip()
|
||
username_key = str(state.get("username_key") or "").strip() or (
|
||
f"passkey:{expected_username}" if mode == "named" else "passkey:discoverable"
|
||
)
|
||
|
||
is_locked, remaining = check_login_ip_user_locked(client_ip, username_key)
|
||
if is_locked:
|
||
wait_hint = f"{remaining // 60 + 1}分钟" if remaining >= 60 else f"{remaining}秒"
|
||
return jsonify({"error": f"账号短时锁定,请{wait_hint}后再试"}), 429
|
||
|
||
credential_id = str(credential.get("id") or credential.get("rawId") or "").strip()
|
||
if not credential_id:
|
||
return jsonify({"error": "Passkey参数无效"}), 400
|
||
|
||
passkey = database.get_passkey_by_credential_id(credential_id)
|
||
if not passkey:
|
||
record_login_failure(client_ip, username_key)
|
||
return jsonify({"error": "Passkey不存在或已删除"}), 401
|
||
if str(passkey.get("owner_type") or "") != "user":
|
||
record_login_failure(client_ip, username_key)
|
||
return jsonify({"error": "Passkey不属于用户账号"}), 401
|
||
if mode == "named" and int(passkey.get("owner_id") or 0) != int(state.get("user_id") or 0):
|
||
record_login_failure(client_ip, username_key)
|
||
return jsonify({"error": "Passkey与账号不匹配"}), 401
|
||
|
||
try:
|
||
parsed_credential, verified = verify_authentication(
|
||
credential=credential,
|
||
expected_challenge=str(state.get("challenge") or ""),
|
||
expected_rp_id=str(state.get("rp_id") or ""),
|
||
expected_origins=list(state.get("expected_origins") or []),
|
||
credential_public_key=str(passkey.get("public_key") or ""),
|
||
credential_current_sign_count=int(passkey.get("sign_count") or 0),
|
||
)
|
||
verified_credential_id = encode_credential_id(verified.credential_id)
|
||
if verified_credential_id != str(passkey.get("credential_id") or ""):
|
||
raise ValueError("credential_id mismatch")
|
||
except Exception as e:
|
||
logger.warning(f"[passkey] 用户登录验签失败(mode={mode}, username={expected_username or request_username or '-'}) : {e}")
|
||
record_login_failure(client_ip, username_key)
|
||
return jsonify({"error": "Passkey验证失败"}), 401
|
||
|
||
user_id = int(passkey.get("owner_id") or 0)
|
||
user = database.get_user_by_id(user_id)
|
||
if not user or user.get("status") != "approved":
|
||
return jsonify({"error": "账号不可用"}), 401
|
||
|
||
database.update_passkey_usage(int(passkey["id"]), int(verified.new_sign_count))
|
||
clear_login_failures(client_ip, username_key)
|
||
user_login_key = f"passkey:{str(user.get('username') or '').strip()}"
|
||
if user_login_key and user_login_key != username_key:
|
||
clear_login_failures(client_ip, user_login_key)
|
||
session.pop(_USER_PASSKEY_LOGIN_SESSION_KEY, None)
|
||
|
||
user_obj = User(user_id)
|
||
login_user(user_obj)
|
||
load_user_accounts(user_id)
|
||
|
||
resolved_username = str(user.get("username") or "").strip() or username or f"user-{user_id}"
|
||
_send_login_security_alert_if_needed(user=user, username=resolved_username, client_ip=client_ip)
|
||
return jsonify({"success": True, "credential_id": parsed_credential.id, "username": resolved_username})
|
||
|
||
|
||
@api_auth_bp.route("/api/login", methods=["POST"])
|
||
@require_ip_not_locked
|
||
def login():
|
||
"""用户登录"""
|
||
data = _get_json_payload()
|
||
username = data.get("username", "").strip()
|
||
password = data.get("password", "").strip()
|
||
captcha_session = data.get("captcha_session", "")
|
||
captcha_code = data.get("captcha", "").strip()
|
||
need_captcha = data.get("need_captcha", False)
|
||
|
||
client_ip = get_rate_limit_ip()
|
||
username_key = username
|
||
|
||
scan_locked = record_login_username_attempt(client_ip, username_key)
|
||
is_locked, remaining = check_login_ip_user_locked(client_ip, username_key)
|
||
if is_locked:
|
||
wait_hint = f"{remaining // 60 + 1}分钟" if remaining >= 60 else f"{remaining}秒"
|
||
return jsonify({"error": f"账号短时锁定,请{wait_hint}后再试", "need_captcha": True}), 429
|
||
|
||
allowed, error_msg = check_ip_request_rate(client_ip, "login")
|
||
if not allowed:
|
||
return jsonify({"error": error_msg, "need_captcha": True}), 429
|
||
|
||
allowed, error_msg = check_login_rate_limits(client_ip, username_key)
|
||
if not allowed:
|
||
return jsonify({"error": error_msg, "need_captcha": True}), 429
|
||
|
||
captcha_required = check_login_captcha_required(client_ip, username_key) or scan_locked or bool(need_captcha)
|
||
captcha_ok, captcha_error_response = _verify_login_captcha_if_needed(
|
||
captcha_required=captcha_required,
|
||
captcha_session=captcha_session,
|
||
captcha_code=captcha_code,
|
||
client_ip=client_ip,
|
||
username_key=username_key,
|
||
)
|
||
if not captcha_ok:
|
||
return captcha_error_response
|
||
|
||
user = database.verify_user(username, password)
|
||
if not user:
|
||
record_login_failure(client_ip, username_key)
|
||
delay = get_login_failure_delay_seconds(client_ip, username_key)
|
||
if delay > 0:
|
||
time.sleep(delay)
|
||
return jsonify({"error": "用户名或密码错误", "need_captcha": check_login_captcha_required(client_ip, username_key)}), 401
|
||
|
||
if user["status"] != "approved":
|
||
return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401
|
||
|
||
clear_login_failures(client_ip, username_key)
|
||
user_obj = User(user["id"])
|
||
login_user(user_obj)
|
||
load_user_accounts(user["id"])
|
||
|
||
_send_login_security_alert_if_needed(user=user, username=username, client_ip=client_ip)
|
||
return jsonify({"success": True})
|
||
|
||
|
||
@api_auth_bp.route("/api/logout", methods=["POST"])
|
||
@login_required
|
||
def logout():
|
||
"""用户登出"""
|
||
logout_user()
|
||
session.pop("admin_id", None)
|
||
session.pop("admin_username", None)
|
||
session.pop("admin_reauth_until", None)
|
||
return jsonify({"success": True})
|