Files
zsglpt/routes/api_accounts.py

405 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import threading
import database
import db_pool
from app_logger import get_logger
from crypto_utils import encrypt_password as encrypt_account_password
from flask import Blueprint, jsonify, request
from flask_login import current_user, login_required
from services.accounts_service import load_user_accounts
from services.browse_types import BROWSE_TYPE_SHOULD_READ, normalize_browse_type, validate_browse_type
from services.client_log import log_to_client
from services.models import Account
from services.runtime import get_socketio
from services.screenshots import take_screenshot_for_account
from services.state import (
safe_get_account,
safe_get_user_accounts_snapshot,
safe_remove_account,
safe_remove_task,
safe_remove_task_status,
safe_set_account,
safe_set_user_accounts,
)
from services.tasks import get_task_scheduler, submit_account_task
logger = get_logger("app")
api_accounts_bp = Blueprint("api_accounts", __name__)
def _emit(event: str, data: object, *, room: str | None = None) -> None:
try:
socketio = get_socketio()
socketio.emit(event, data, room=room)
except Exception:
pass
@api_accounts_bp.route("/api/accounts", methods=["GET"])
@login_required
def get_accounts():
"""获取当前用户的所有账号"""
user_id = current_user.id
refresh = request.args.get("refresh", "false").lower() == "true"
accounts = safe_get_user_accounts_snapshot(user_id)
if refresh or not accounts:
load_user_accounts(user_id)
accounts = safe_get_user_accounts_snapshot(user_id)
return jsonify([acc.to_dict() for acc in accounts.values()])
@api_accounts_bp.route("/api/accounts", methods=["POST"])
@login_required
def add_account():
"""添加账号"""
user_id = current_user.id
current_count = len(database.get_user_accounts(user_id))
is_vip = database.is_user_vip(user_id)
if not is_vip and current_count >= 3:
return jsonify({"error": "普通用户最多添加3个账号升级VIP可无限添加"}), 403
data = request.json
username = data.get("username", "").strip()
password = data.get("password", "").strip()
remark = data.get("remark", "").strip()[:200]
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
accounts = safe_get_user_accounts_snapshot(user_id)
if not accounts:
load_user_accounts(user_id)
accounts = safe_get_user_accounts_snapshot(user_id)
for acc in accounts.values():
if acc.username == username:
return jsonify({"error": f"账号 '{username}' 已存在"}), 400
import uuid
account_id = str(uuid.uuid4())[:8]
remember = data.get("remember", True)
database.create_account(user_id, account_id, username, password, remember, remark)
account = Account(account_id, user_id, username, password, remember, remark)
safe_set_account(user_id, account_id, account)
log_to_client(f"添加账号: {username}", user_id)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
return jsonify(account.to_dict())
@api_accounts_bp.route("/api/accounts/<account_id>", methods=["PUT"])
@login_required
def update_account(account_id):
"""更新账号信息(密码等)"""
user_id = current_user.id
account = safe_get_account(user_id, account_id)
if not account:
return jsonify({"error": "账号不存在"}), 404
if account.is_running:
return jsonify({"error": "账号正在运行中,请先停止"}), 400
data = request.json
new_password = data.get("password", "").strip()
new_remember = data.get("remember", account.remember)
if not new_password:
return jsonify({"error": "密码不能为空"}), 400
encrypted_password = encrypt_account_password(new_password)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE accounts
SET password = ?, remember = ?
WHERE id = ?
""",
(encrypted_password, new_remember, account_id),
)
conn.commit()
database.reset_account_login_status(account_id)
logger.info(f"[账号更新] 用户 {user_id} 修改了账号 {account.username} 的密码,已重置登录状态")
account._password = new_password
account.remember = new_remember
log_to_client(f"账号 {account.username} 信息已更新,登录状态已重置", user_id)
return jsonify({"message": "账号更新成功", "account": account.to_dict()})
@api_accounts_bp.route("/api/accounts/<account_id>", methods=["DELETE"])
@login_required
def delete_account(account_id):
"""删除账号"""
user_id = current_user.id
account = safe_get_account(user_id, account_id)
if not account:
return jsonify({"error": "账号不存在"}), 404
if account.is_running:
account.should_stop = True
if account.automation:
account.automation.close()
username = account.username
database.delete_account(account_id)
safe_remove_account(user_id, account_id)
log_to_client(f"删除账号: {username}", user_id)
return jsonify({"success": True})
@api_accounts_bp.route("/api/accounts/clear", methods=["POST"])
@login_required
def clear_accounts():
"""清空当前用户的所有账号"""
user_id = current_user.id
accounts = safe_get_user_accounts_snapshot(user_id)
if any(acc.is_running for acc in accounts.values()):
return jsonify({"error": "有任务正在运行,请先停止后再清空"}), 400
account_ids = list(accounts.keys())
deleted = database.delete_user_accounts(user_id)
safe_set_user_accounts(user_id, {})
for account_id in account_ids:
safe_remove_task_status(account_id)
safe_remove_task(account_id)
log_to_client(f"清空账号: {deleted}", user_id)
return jsonify({"success": True, "deleted": deleted})
@api_accounts_bp.route("/api/accounts/<account_id>/remark", methods=["PUT"])
@login_required
def update_remark(account_id):
"""更新备注"""
user_id = current_user.id
account = safe_get_account(user_id, account_id)
if not account:
return jsonify({"error": "账号不存在"}), 404
data = request.json
remark = data.get("remark", "").strip()[:200]
database.update_account_remark(account_id, remark)
account.remark = remark
log_to_client(f"更新备注: {account.username} -> {remark}", user_id)
return jsonify({"success": True})
@api_accounts_bp.route("/api/accounts/<account_id>/start", methods=["POST"])
@login_required
def start_account(account_id):
"""启动账号任务"""
user_id = current_user.id
account = safe_get_account(user_id, account_id)
if not account:
return jsonify({"error": "账号不存在"}), 404
if account.is_running:
return jsonify({"error": "任务已在运行中"}), 400
data = request.json or {}
browse_type = validate_browse_type(data.get("browse_type"), default=BROWSE_TYPE_SHOULD_READ)
if not browse_type:
return jsonify({"error": "浏览类型无效"}), 400
enable_screenshot = data.get("enable_screenshot", True)
ok, message = submit_account_task(
user_id=user_id,
account_id=account_id,
browse_type=browse_type,
enable_screenshot=enable_screenshot,
source="manual",
)
if not ok:
return jsonify({"error": message}), 400
log_to_client(f"启动任务: {account.username} - {browse_type}", user_id)
return jsonify({"success": True})
@api_accounts_bp.route("/api/accounts/<account_id>/stop", methods=["POST"])
@login_required
def stop_account(account_id):
"""停止账号任务"""
user_id = current_user.id
account = safe_get_account(user_id, account_id)
if not account:
return jsonify({"error": "账号不存在"}), 404
if not account.is_running:
return jsonify({"error": "任务未在运行"}), 400
account.should_stop = True
account.status = "正在停止"
try:
scheduler = get_task_scheduler()
if scheduler.cancel_pending_task(user_id=user_id, account_id=account_id):
account.status = "已停止"
account.is_running = False
safe_remove_task_status(account_id)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
log_to_client(f"任务已取消: {account.username}", user_id)
return jsonify({"success": True, "canceled": True})
except Exception:
pass
log_to_client(f"停止任务: {account.username}", user_id)
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
return jsonify({"success": True})
@api_accounts_bp.route("/api/accounts/<account_id>/screenshot", methods=["POST"])
@login_required
def manual_screenshot(account_id):
"""手动为指定账号截图"""
user_id = current_user.id
account = safe_get_account(user_id, account_id)
if not account:
load_user_accounts(user_id)
account = safe_get_account(user_id, account_id)
if not account:
return jsonify({"error": "账号不存在"}), 404
if account.is_running:
return jsonify({"error": "任务运行中,无法截图"}), 400
data = request.json or {}
requested_browse_type = data.get("browse_type", None)
if requested_browse_type is None:
browse_type = normalize_browse_type(account.last_browse_type)
else:
browse_type = validate_browse_type(requested_browse_type, default=BROWSE_TYPE_SHOULD_READ)
if not browse_type:
return jsonify({"error": "浏览类型无效"}), 400
account.last_browse_type = browse_type
threading.Thread(
target=take_screenshot_for_account,
args=(user_id, account_id, browse_type, "manual_screenshot"),
daemon=True,
).start()
log_to_client(f"手动截图: {account.username} - {browse_type}", user_id)
return jsonify({"success": True})
@api_accounts_bp.route("/api/accounts/batch/start", methods=["POST"])
@login_required
def batch_start_accounts():
"""批量启动账号"""
user_id = current_user.id
data = request.json or {}
account_ids = data.get("account_ids", [])
browse_type = validate_browse_type(data.get("browse_type", BROWSE_TYPE_SHOULD_READ), default=BROWSE_TYPE_SHOULD_READ)
if not browse_type:
return jsonify({"error": "浏览类型无效"}), 400
enable_screenshot = data.get("enable_screenshot", True)
if not account_ids:
return jsonify({"error": "请选择要启动的账号"}), 400
started = []
failed = []
if not safe_get_user_accounts_snapshot(user_id):
load_user_accounts(user_id)
for account_id in account_ids:
account = safe_get_account(user_id, account_id)
if not account:
failed.append({"id": account_id, "reason": "账号不存在"})
continue
if account.is_running:
failed.append({"id": account_id, "reason": "已在运行中"})
continue
ok, msg = submit_account_task(
user_id=user_id,
account_id=account_id,
browse_type=browse_type,
enable_screenshot=enable_screenshot,
source="batch",
)
if ok:
started.append(account_id)
else:
failed.append({"id": account_id, "reason": msg})
return jsonify(
{"success": True, "started_count": len(started), "failed_count": len(failed), "started": started, "failed": failed}
)
@api_accounts_bp.route("/api/accounts/batch/stop", methods=["POST"])
@login_required
def batch_stop_accounts():
"""批量停止账号"""
user_id = current_user.id
data = request.json
account_ids = data.get("account_ids", [])
if not account_ids:
return jsonify({"error": "请选择要停止的账号"}), 400
stopped = []
if not safe_get_user_accounts_snapshot(user_id):
load_user_accounts(user_id)
for account_id in account_ids:
account = safe_get_account(user_id, account_id)
if not account:
continue
if not account.is_running:
continue
account.should_stop = True
account.status = "正在停止"
stopped.append(account_id)
try:
scheduler = get_task_scheduler()
if scheduler.cancel_pending_task(user_id=user_id, account_id=account_id):
account.status = "已停止"
account.is_running = False
safe_remove_task_status(account_id)
except Exception:
pass
_emit("account_update", account.to_dict(), room=f"user_{user_id}")
return jsonify({"success": True, "stopped_count": len(stopped), "stopped": stopped})