#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations from typing import Dict import db_pool from db.utils import get_cst_now_str def record_login_context(user_id: int, ip_address: str, user_agent: str) -> Dict[str, bool]: """记录登录环境信息,返回是否新设备/新IP。""" user_id = int(user_id) ip_text = str(ip_address or "").strip()[:64] ua_text = str(user_agent or "").strip()[:512] now_str = get_cst_now_str() new_device = False new_ip = False with db_pool.get_db() as conn: cursor = conn.cursor() if ua_text: cursor.execute( "SELECT id FROM login_fingerprints WHERE user_id = ? AND user_agent = ?", (user_id, ua_text), ) row = cursor.fetchone() if row: cursor.execute( """ UPDATE login_fingerprints SET last_seen = ?, last_ip = ? WHERE id = ? """, (now_str, ip_text, row["id"] if isinstance(row, dict) else row[0]), ) else: cursor.execute( """ INSERT INTO login_fingerprints (user_id, user_agent, first_seen, last_seen, last_ip) VALUES (?, ?, ?, ?, ?) """, (user_id, ua_text, now_str, now_str, ip_text), ) new_device = True if ip_text: cursor.execute( "SELECT id FROM login_ips WHERE user_id = ? AND ip = ?", (user_id, ip_text), ) row = cursor.fetchone() if row: cursor.execute( """ UPDATE login_ips SET last_seen = ? WHERE id = ? """, (now_str, row["id"] if isinstance(row, dict) else row[0]), ) else: cursor.execute( """ INSERT INTO login_ips (user_id, ip, first_seen, last_seen) VALUES (?, ?, ?, ?) """, (user_id, ip_text, now_str, now_str), ) new_ip = True conn.commit() return {"new_device": new_device, "new_ip": new_ip}