#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import db_pool from crypto_utils import decrypt_password, encrypt_password from db.utils import get_cst_now_str def create_account(user_id, account_id, username, password, remember=True, remark=""): """创建账号(密码加密存储)""" with db_pool.get_db() as conn: cursor = conn.cursor() cst_time = get_cst_now_str() encrypted_password = encrypt_password(password) cursor.execute( """ INSERT INTO accounts (id, user_id, username, password, remember, remark, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (account_id, user_id, username, encrypted_password, 1 if remember else 0, remark, cst_time), ) conn.commit() return cursor.lastrowid def get_user_accounts(user_id): """获取用户的所有账号(自动解密密码)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM accounts WHERE user_id = ? ORDER BY created_at DESC", (user_id,)) accounts = [] for row in cursor.fetchall(): account = dict(row) account["password"] = decrypt_password(account.get("password", "")) accounts.append(account) return accounts def get_account(account_id): """获取单个账号(自动解密密码)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,)) row = cursor.fetchone() if row: account = dict(row) account["password"] = decrypt_password(account.get("password", "")) return account return None def update_account_remark(account_id, remark): """更新账号备注""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("UPDATE accounts SET remark = ? WHERE id = ?", (remark, account_id)) conn.commit() return cursor.rowcount > 0 def delete_account(account_id): """删除账号""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM accounts WHERE id = ?", (account_id,)) conn.commit() return cursor.rowcount > 0 def increment_account_login_fail(account_id, error_message): """增加账号登录失败次数,如果达到3次则暂停账号""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT login_fail_count FROM accounts WHERE id = ?", (account_id,)) row = cursor.fetchone() if not row: return False fail_count = (row["login_fail_count"] or 0) + 1 if fail_count >= 3: cursor.execute( """ UPDATE accounts SET login_fail_count = ?, last_login_error = ?, status = 'suspended' WHERE id = ? """, (fail_count, error_message, account_id), ) conn.commit() return True cursor.execute( """ UPDATE accounts SET login_fail_count = ?, last_login_error = ? WHERE id = ? """, (fail_count, error_message, account_id), ) conn.commit() return False def reset_account_login_status(account_id): """重置账号登录状态(修改密码后调用)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE accounts SET login_fail_count = 0, last_login_error = NULL, status = 'active' WHERE id = ? """, (account_id,), ) conn.commit() return cursor.rowcount > 0 def get_account_status(account_id): """获取账号状态信息""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT status, login_fail_count, last_login_error FROM accounts WHERE id = ? """, (account_id,), ) return cursor.fetchone() def delete_user_accounts(user_id): """删除用户的所有账号""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM accounts WHERE user_id = ?", (user_id,)) conn.commit() return cursor.rowcount