更新说明:\n1. 新增用户端与管理员端 Passkey 登录/注册/设备管理(最多3台,支持设备备注、删除设备)。\n2. 修复 Passkey 注册与登录流程中的浏览器/证书/CSRF相关问题,增强错误提示。\n3. 前台登录页改为独立入口,首屏仅加载必要资源,其他页面按需加载。\n4. 系统配置页改为静默获取金山文档状态,避免首屏阻塞,并优化状态展示为“检测中/已登录/未登录/异常”。\n5. 补充后端接口与页面渲染适配,修复多入口下样式依赖注入问题。\n6. 同步更新前后台构建产物与相关静态资源。
174 lines
5.1 KiB
Python
174 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
|
|
import db_pool
|
|
from db.utils import get_cst_now_str
|
|
|
|
_OWNER_TYPES = {"user", "admin"}
|
|
|
|
|
|
def _normalize_owner_type(owner_type: str) -> str:
|
|
normalized = str(owner_type or "").strip().lower()
|
|
if normalized not in _OWNER_TYPES:
|
|
raise ValueError(f"invalid owner_type: {owner_type}")
|
|
return normalized
|
|
|
|
|
|
def list_passkeys(owner_type: str, owner_id: int) -> list[dict]:
|
|
owner = _normalize_owner_type(owner_type)
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT id, owner_type, owner_id, device_name, credential_id, transports,
|
|
sign_count, aaguid, created_at, last_used_at
|
|
FROM passkeys
|
|
WHERE owner_type = ? AND owner_id = ?
|
|
ORDER BY datetime(created_at) DESC, id DESC
|
|
""",
|
|
(owner, int(owner_id)),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
|
|
|
|
def count_passkeys(owner_type: str, owner_id: int) -> int:
|
|
owner = _normalize_owner_type(owner_type)
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT COUNT(*) AS count FROM passkeys WHERE owner_type = ? AND owner_id = ?",
|
|
(owner, int(owner_id)),
|
|
)
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return 0
|
|
try:
|
|
return int(row["count"] or 0)
|
|
except Exception:
|
|
try:
|
|
return int(row[0] or 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def get_passkey_by_credential_id(credential_id: str) -> dict | None:
|
|
credential = str(credential_id or "").strip()
|
|
if not credential:
|
|
return None
|
|
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT id, owner_type, owner_id, device_name, credential_id, public_key,
|
|
sign_count, transports, aaguid, created_at, last_used_at
|
|
FROM passkeys
|
|
WHERE credential_id = ?
|
|
""",
|
|
(credential,),
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def get_passkey_by_id(owner_type: str, owner_id: int, passkey_id: int) -> dict | None:
|
|
owner = _normalize_owner_type(owner_type)
|
|
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT id, owner_type, owner_id, device_name, credential_id, public_key,
|
|
sign_count, transports, aaguid, created_at, last_used_at
|
|
FROM passkeys
|
|
WHERE id = ? AND owner_type = ? AND owner_id = ?
|
|
""",
|
|
(int(passkey_id), owner, int(owner_id)),
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def create_passkey(
|
|
owner_type: str,
|
|
owner_id: int,
|
|
*,
|
|
credential_id: str,
|
|
public_key: str,
|
|
sign_count: int,
|
|
device_name: str,
|
|
transports: str = "",
|
|
aaguid: str = "",
|
|
) -> int | None:
|
|
owner = _normalize_owner_type(owner_type)
|
|
now = get_cst_now_str()
|
|
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO passkeys (
|
|
owner_type,
|
|
owner_id,
|
|
device_name,
|
|
credential_id,
|
|
public_key,
|
|
sign_count,
|
|
transports,
|
|
aaguid,
|
|
created_at,
|
|
last_used_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
owner,
|
|
int(owner_id),
|
|
str(device_name or "").strip(),
|
|
str(credential_id or "").strip(),
|
|
str(public_key or "").strip(),
|
|
int(sign_count or 0),
|
|
str(transports or "").strip(),
|
|
str(aaguid or "").strip(),
|
|
now,
|
|
now,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return int(cursor.lastrowid)
|
|
except sqlite3.IntegrityError:
|
|
return None
|
|
|
|
|
|
def update_passkey_usage(passkey_id: int, new_sign_count: int) -> bool:
|
|
now = get_cst_now_str()
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
UPDATE passkeys
|
|
SET sign_count = ?,
|
|
last_used_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(int(new_sign_count or 0), now, int(passkey_id)),
|
|
)
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
def delete_passkey(owner_type: str, owner_id: int, passkey_id: int) -> bool:
|
|
owner = _normalize_owner_type(owner_type)
|
|
with db_pool.get_db() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"DELETE FROM passkeys WHERE id = ? AND owner_type = ? AND owner_id = ?",
|
|
(int(passkey_id), owner, int(owner_id)),
|
|
)
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|