#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import sqlite3 import db_pool from db.utils import get_cst_now_str _OWNER_TYPES = {"user", "admin"} def _normalize_owner_type(owner_type: str) -> str: normalized = str(owner_type or "").strip().lower() if normalized not in _OWNER_TYPES: raise ValueError(f"invalid owner_type: {owner_type}") return normalized def list_passkeys(owner_type: str, owner_id: int) -> list[dict]: owner = _normalize_owner_type(owner_type) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, owner_type, owner_id, device_name, credential_id, transports, sign_count, aaguid, created_at, last_used_at FROM passkeys WHERE owner_type = ? AND owner_id = ? ORDER BY datetime(created_at) DESC, id DESC """, (owner, int(owner_id)), ) return [dict(row) for row in cursor.fetchall()] def count_passkeys(owner_type: str, owner_id: int) -> int: owner = _normalize_owner_type(owner_type) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) AS count FROM passkeys WHERE owner_type = ? AND owner_id = ?", (owner, int(owner_id)), ) row = cursor.fetchone() if not row: return 0 try: return int(row["count"] or 0) except Exception: try: return int(row[0] or 0) except Exception: return 0 def get_passkey_by_credential_id(credential_id: str) -> dict | None: credential = str(credential_id or "").strip() if not credential: return None with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, owner_type, owner_id, device_name, credential_id, public_key, sign_count, transports, aaguid, created_at, last_used_at FROM passkeys WHERE credential_id = ? """, (credential,), ) row = cursor.fetchone() return dict(row) if row else None def get_passkey_by_id(owner_type: str, owner_id: int, passkey_id: int) -> dict | None: owner = _normalize_owner_type(owner_type) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, owner_type, owner_id, device_name, credential_id, public_key, sign_count, transports, aaguid, created_at, last_used_at FROM passkeys WHERE id = ? AND owner_type = ? AND owner_id = ? """, (int(passkey_id), owner, int(owner_id)), ) row = cursor.fetchone() return dict(row) if row else None def create_passkey( owner_type: str, owner_id: int, *, credential_id: str, public_key: str, sign_count: int, device_name: str, transports: str = "", aaguid: str = "", ) -> int | None: owner = _normalize_owner_type(owner_type) now = get_cst_now_str() with db_pool.get_db() as conn: cursor = conn.cursor() try: cursor.execute( """ INSERT INTO passkeys ( owner_type, owner_id, device_name, credential_id, public_key, sign_count, transports, aaguid, created_at, last_used_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( owner, int(owner_id), str(device_name or "").strip(), str(credential_id or "").strip(), str(public_key or "").strip(), int(sign_count or 0), str(transports or "").strip(), str(aaguid or "").strip(), now, now, ), ) conn.commit() return int(cursor.lastrowid) except sqlite3.IntegrityError: return None def update_passkey_usage(passkey_id: int, new_sign_count: int) -> bool: now = get_cst_now_str() with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE passkeys SET sign_count = ?, last_used_at = ? WHERE id = ? """, (int(new_sign_count or 0), now, int(passkey_id)), ) conn.commit() return cursor.rowcount > 0 def delete_passkey(owner_type: str, owner_id: int, passkey_id: int) -> bool: owner = _normalize_owner_type(owner_type) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( "DELETE FROM passkeys WHERE id = ? AND owner_type = ? AND owner_id = ?", (int(passkey_id), owner, int(owner_id)), ) conn.commit() return cursor.rowcount > 0