refactor: remove passkey login

This commit is contained in:
237899745
2026-05-27 22:32:42 +08:00
parent 89cb98233f
commit 0443c976fc
105 changed files with 410 additions and 2505 deletions

View File

@@ -2,9 +2,6 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import time
import database
import email_service
from app_logger import get_logger
@@ -12,24 +9,12 @@ from app_security import get_rate_limit_ip, require_ip_not_locked, validate_emai
from flask import Blueprint, jsonify, request, session
from flask_login import current_user, login_required
from routes.pages import render_app_spa_or_legacy
from services.passkeys import (
MAX_PASSKEYS_PER_OWNER,
encode_credential_id,
get_credential_transports,
get_expected_origins,
get_rp_id,
is_challenge_valid,
make_registration_options,
normalize_device_name,
verify_registration,
)
from services.state import check_email_rate_limit, check_ip_request_rate, safe_iter_task_status_items
from services.tasks import get_task_scheduler
logger = get_logger("app")
api_user_bp = Blueprint("api_user", __name__)
_USER_PASSKEY_REGISTER_SESSION_KEY = "user_passkey_register_state"
def _get_current_user_record():
@@ -61,26 +46,6 @@ def _coerce_binary_flag(value, *, field_label: str):
return value, None
def _parse_credential_payload(data: dict) -> dict | None:
credential = data.get("credential")
if isinstance(credential, dict):
return credential
if isinstance(credential, str):
try:
parsed = json.loads(credential)
return parsed if isinstance(parsed, dict) else None
except Exception:
return None
return None
def _truncate_text(value, max_len: int = 300) -> str:
text = str(value or "").strip()
if len(text) > max_len:
return f"{text[:max_len]}..."
return text
def _check_bind_email_rate_limits(email: str):
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "email")
@@ -409,176 +374,6 @@ def update_user_email_notify():
return jsonify({"error": "更新失败"}), 500
@api_user_bp.route("/api/user/passkeys", methods=["GET"])
@login_required
def list_user_passkeys():
"""获取当前用户绑定的 Passkey 设备列表。"""
rows = database.list_passkeys("user", int(current_user.id))
items = []
for row in rows:
credential_id = str(row.get("credential_id") or "")
preview = ""
if credential_id:
preview = f"{credential_id[:8]}...{credential_id[-6:]}" if len(credential_id) > 16 else credential_id
items.append(
{
"id": int(row.get("id")),
"device_name": str(row.get("device_name") or ""),
"credential_id_preview": preview,
"created_at": row.get("created_at"),
"last_used_at": row.get("last_used_at"),
"transports": str(row.get("transports") or ""),
}
)
return jsonify({"items": items, "limit": MAX_PASSKEYS_PER_OWNER})
@api_user_bp.route("/api/user/passkeys/register/options", methods=["POST"])
@login_required
def user_passkey_register_options():
"""当前登录用户创建 Passkey下发 registration challenge。"""
user, error_response = _get_current_user_or_404()
if error_response:
return error_response
count = database.count_passkeys("user", int(current_user.id))
if count >= MAX_PASSKEYS_PER_OWNER:
return jsonify({"error": f"最多可绑定{MAX_PASSKEYS_PER_OWNER}台设备"}), 400
data = request.get_json(silent=True) or {}
device_name = normalize_device_name(data.get("device_name"))
existing = database.list_passkeys("user", int(current_user.id))
exclude_credential_ids = [str(item.get("credential_id") or "").strip() for item in existing if item.get("credential_id")]
try:
rp_id = get_rp_id(request)
expected_origins = get_expected_origins(request)
except Exception as e:
logger.warning(f"[passkey] 用户注册 options 失败(user_id={current_user.id}): {e}")
return jsonify({"error": "Passkey配置异常请联系管理员"}), 500
try:
options = make_registration_options(
rp_id=rp_id,
rp_name="知识管理平台",
user_name=str(user.get("username") or f"user-{current_user.id}"),
user_display_name=str(user.get("username") or f"user-{current_user.id}"),
user_id_bytes=f"user:{int(current_user.id)}".encode("utf-8"),
exclude_credential_ids=exclude_credential_ids,
)
except Exception as e:
logger.warning(f"[passkey] 用户注册 options 构建失败(user_id={current_user.id}): {e}")
return jsonify({"error": "生成Passkey挑战失败"}), 500
challenge = str(options.get("challenge") or "").strip()
if not challenge:
return jsonify({"error": "生成Passkey挑战失败"}), 500
session[_USER_PASSKEY_REGISTER_SESSION_KEY] = {
"user_id": int(current_user.id),
"challenge": challenge,
"rp_id": rp_id,
"expected_origins": expected_origins,
"device_name": device_name,
"created_at": time.time(),
}
session.modified = True
return jsonify({"publicKey": options, "limit": MAX_PASSKEYS_PER_OWNER})
@api_user_bp.route("/api/user/passkeys/register/verify", methods=["POST"])
@login_required
def user_passkey_register_verify():
"""当前登录用户创建 Passkey校验 attestation 并落库。"""
state = session.get(_USER_PASSKEY_REGISTER_SESSION_KEY) or {}
if not state:
return jsonify({"error": "Passkey挑战不存在或已过期请重试"}), 400
if int(state.get("user_id") or 0) != int(current_user.id):
return jsonify({"error": "Passkey挑战与当前用户不匹配"}), 400
if not is_challenge_valid(state.get("created_at")):
session.pop(_USER_PASSKEY_REGISTER_SESSION_KEY, None)
return jsonify({"error": "Passkey挑战已过期请重试"}), 400
data = request.get_json(silent=True) or {}
credential = _parse_credential_payload(data)
if not credential:
return jsonify({"error": "Passkey参数缺失"}), 400
count = database.count_passkeys("user", int(current_user.id))
if count >= MAX_PASSKEYS_PER_OWNER:
session.pop(_USER_PASSKEY_REGISTER_SESSION_KEY, None)
return jsonify({"error": f"最多可绑定{MAX_PASSKEYS_PER_OWNER}台设备"}), 400
try:
verified = verify_registration(
credential=credential,
expected_challenge=str(state.get("challenge") or ""),
expected_rp_id=str(state.get("rp_id") or ""),
expected_origins=list(state.get("expected_origins") or []),
)
except Exception as e:
logger.warning(f"[passkey] 用户注册验签失败(user_id={current_user.id}): {e}")
return jsonify({"error": "Passkey验证失败请重试"}), 400
credential_id = encode_credential_id(verified.credential_id)
public_key = encode_credential_id(verified.credential_public_key)
transports = get_credential_transports(credential)
device_name = normalize_device_name(data.get("device_name") if "device_name" in data else state.get("device_name"))
aaguid = str(verified.aaguid or "")
created_id = database.create_passkey(
"user",
int(current_user.id),
credential_id=credential_id,
public_key=public_key,
sign_count=int(verified.sign_count or 0),
device_name=device_name,
transports=transports,
aaguid=aaguid,
)
if not created_id:
return jsonify({"error": "该Passkey已绑定或保存失败"}), 400
session.pop(_USER_PASSKEY_REGISTER_SESSION_KEY, None)
return jsonify({"success": True, "id": int(created_id), "device_name": device_name})
@api_user_bp.route("/api/user/passkeys/<int:passkey_id>", methods=["DELETE"])
@login_required
def delete_user_passkey(passkey_id):
"""删除当前用户绑定的 Passkey 设备。"""
ok = database.delete_passkey("user", int(current_user.id), int(passkey_id))
if ok:
return jsonify({"success": True})
return jsonify({"error": "设备不存在或已删除"}), 404
@api_user_bp.route("/api/user/passkeys/client-error", methods=["POST"])
@login_required
def report_user_passkey_client_error():
"""上报浏览器端 Passkey 失败详情,便于排查兼容性问题。"""
data = request.get_json(silent=True) or {}
error_name = _truncate_text(data.get("name"), 120)
error_message = _truncate_text(data.get("message"), 400)
error_code = _truncate_text(data.get("code"), 120)
ua = _truncate_text(data.get("user_agent") or request.headers.get("User-Agent", ""), 300)
stage = _truncate_text(data.get("stage"), 80)
source = _truncate_text(data.get("source"), 80)
logger.warning(
"[passkey][client-error][user] user_id=%s stage=%s source=%s name=%s code=%s message=%s ua=%s",
current_user.id,
stage or "-",
source or "-",
error_name or "-",
error_code or "-",
error_message or "-",
ua or "-",
)
return jsonify({"success": True})
@api_user_bp.route("/api/run_stats", methods=["GET"])
@login_required
def get_run_stats():