Files
zsglpt/routes/admin_api/tasks_api.py

139 lines
4.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import database
from app_logger import get_logger
from flask import jsonify, request
from routes.admin_api import admin_api_bp
from routes.decorators import admin_required
from services.state import safe_iter_task_status_items
from services.tasks import get_task_scheduler
logger = get_logger("app")
def _parse_page_int(name: str, default: int, *, minimum: int, maximum: int) -> int:
try:
value = int(request.args.get(name, default))
return max(minimum, min(value, maximum))
except (ValueError, TypeError):
return default
@admin_api_bp.route("/task/stats", methods=["GET"])
@admin_required
def get_task_stats_api():
"""获取任务统计数据"""
date_filter = request.args.get("date")
stats = database.get_task_stats(date_filter)
return jsonify(stats)
@admin_api_bp.route("/task/running", methods=["GET"])
@admin_required
def get_running_tasks_api():
"""获取当前运行中和排队中的任务"""
import time as time_mod
current_time = time_mod.time()
running = []
queuing = []
user_cache = {}
for account_id, info in safe_iter_task_status_items():
elapsed = int(current_time - info.get("start_time", current_time))
info_user_id = info.get("user_id")
if info_user_id not in user_cache:
user_cache[info_user_id] = database.get_user_by_id(info_user_id)
user = user_cache.get(info_user_id)
user_username = user["username"] if user else "N/A"
progress = info.get("progress", {"items": 0, "attachments": 0})
task_info = {
"account_id": account_id,
"user_id": info.get("user_id"),
"user_username": user_username,
"username": info.get("username"),
"browse_type": info.get("browse_type"),
"source": info.get("source", "manual"),
"detail_status": info.get("detail_status", "未知"),
"progress_items": progress.get("items", 0),
"progress_attachments": progress.get("attachments", 0),
"elapsed_seconds": elapsed,
"elapsed_display": f"{elapsed // 60}{elapsed % 60}" if elapsed >= 60 else f"{elapsed}",
}
if info.get("status") == "运行中":
running.append(task_info)
else:
queuing.append(task_info)
running.sort(key=lambda x: x["elapsed_seconds"], reverse=True)
queuing.sort(key=lambda x: x["elapsed_seconds"], reverse=True)
try:
max_concurrent = int(get_task_scheduler().max_global)
except Exception:
max_concurrent = int((database.get_system_config() or {}).get("max_concurrent_global", 2))
return jsonify(
{
"running": running,
"queuing": queuing,
"running_count": len(running),
"queuing_count": len(queuing),
"max_concurrent": max_concurrent,
}
)
@admin_api_bp.route("/task/logs", methods=["GET"])
@admin_required
def get_task_logs_api():
"""获取任务日志列表(支持分页和多种筛选)"""
limit = _parse_page_int("limit", 20, minimum=1, maximum=200)
offset = _parse_page_int("offset", 0, minimum=0, maximum=10**9)
date_filter = request.args.get("date")
status_filter = request.args.get("status")
source_filter = request.args.get("source")
user_id_filter = request.args.get("user_id")
account_filter = (request.args.get("account") or "").strip()
if user_id_filter:
try:
user_id_filter = int(user_id_filter)
except (ValueError, TypeError):
user_id_filter = None
try:
result = database.get_task_logs(
limit=limit,
offset=offset,
date_filter=date_filter,
status_filter=status_filter,
source_filter=source_filter,
user_id_filter=user_id_filter,
account_filter=account_filter if account_filter else None,
)
return jsonify(result)
except Exception as e:
logger.error(f"获取任务日志失败: {e}")
return jsonify({"logs": [], "total": 0, "error": "查询失败"})
@admin_api_bp.route("/task/logs/clear", methods=["POST"])
@admin_required
def clear_old_task_logs_api():
"""清理旧的任务日志"""
data = request.json or {}
days = data.get("days", 30)
if not isinstance(days, int) or days < 1:
return jsonify({"error": "天数必须是大于0的整数"}), 400
deleted_count = database.delete_old_task_logs(days)
return jsonify({"message": f"已删除{days}天前的{deleted_count}条日志"})