Files
zsglpt/db/tasks.py
yuyx 49897081b6 更新日志页面和统计页面
- 更新 LogsPage 和 StatsPage 组件
- 添加 taskSource 工具模块
- 更新 db/tasks.py
- 重新构建前端静态资源
2025-12-15 16:25:37 +08:00

246 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from datetime import datetime
import pytz
import db_pool
from db.utils import sanitize_sql_like_pattern
def create_task_log(
user_id,
account_id,
username,
browse_type,
status,
total_items=0,
total_attachments=0,
error_message="",
duration=None,
source="manual",
):
"""创建任务日志记录"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute(
"""
INSERT INTO task_logs (
user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, created_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
account_id,
username,
browse_type,
status,
total_items,
total_attachments,
error_message,
duration,
cst_time,
source,
),
)
conn.commit()
return cursor.lastrowid
def get_task_logs(
limit=100,
offset=0,
date_filter=None,
status_filter=None,
source_filter=None,
user_id_filter=None,
account_filter=None,
):
"""获取任务日志列表(支持分页和多种筛选)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
where_clauses = ["1=1"]
params = []
if date_filter:
where_clauses.append("date(tl.created_at) = ?")
params.append(date_filter)
if status_filter:
where_clauses.append("tl.status = ?")
params.append(status_filter)
if source_filter:
source_filter = str(source_filter or "").strip()
# 兼容“虚拟来源”:用于筛选 user_scheduled:batch_xxx 这类动态值
if source_filter == "user_scheduled":
where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'")
params.append("user_scheduled:%")
elif source_filter.endswith("*"):
prefix = source_filter[:-1]
safe_prefix = sanitize_sql_like_pattern(prefix)
where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'")
params.append(f"{safe_prefix}%")
else:
where_clauses.append("tl.source = ?")
params.append(source_filter)
if user_id_filter:
where_clauses.append("tl.user_id = ?")
params.append(user_id_filter)
if account_filter:
safe_filter = sanitize_sql_like_pattern(account_filter)
where_clauses.append("tl.username LIKE ? ESCAPE '\\\\'")
params.append(f"%{safe_filter}%")
where_sql = " AND ".join(where_clauses)
count_sql = f"""
SELECT COUNT(*) as total
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
"""
cursor.execute(count_sql, params)
total = cursor.fetchone()["total"]
data_sql = f"""
SELECT
tl.*,
u.username as user_username
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
ORDER BY tl.created_at DESC
LIMIT ? OFFSET ?
"""
params.extend([limit, offset])
cursor.execute(data_sql, params)
logs = [dict(row) for row in cursor.fetchall()]
return {"logs": logs, "total": total}
def get_task_stats(date_filter=None):
"""获取任务统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime("%Y-%m-%d")
cursor.execute(
"""
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE date(created_at) = ?
""",
(date_filter,),
)
today_stats = cursor.fetchone()
cursor.execute(
"""
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
"""
)
total_stats = cursor.fetchone()
return {
"today": {
"total_tasks": today_stats["total_tasks"] or 0,
"success_tasks": today_stats["success_tasks"] or 0,
"failed_tasks": today_stats["failed_tasks"] or 0,
"total_items": today_stats["total_items"] or 0,
"total_attachments": today_stats["total_attachments"] or 0,
},
"total": {
"total_tasks": total_stats["total_tasks"] or 0,
"success_tasks": total_stats["success_tasks"] or 0,
"failed_tasks": total_stats["failed_tasks"] or 0,
"total_items": total_stats["total_items"] or 0,
"total_attachments": total_stats["total_attachments"] or 0,
},
}
def delete_old_task_logs(days=30, batch_size=1000):
"""删除N天前的任务日志分批删除避免长时间锁表"""
total_deleted = 0
while True:
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM task_logs
WHERE rowid IN (
SELECT rowid FROM task_logs
WHERE created_at < datetime('now', 'localtime', '-' || ? || ' days')
LIMIT ?
)
""",
(days, batch_size),
)
deleted = cursor.rowcount
conn.commit()
if deleted == 0:
break
total_deleted += deleted
return total_deleted
def get_user_run_stats(user_id, date_filter=None):
"""获取用户的运行统计信息"""
with db_pool.get_db() as conn:
cst_tz = pytz.timezone("Asia/Shanghai")
cursor = conn.cursor()
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime("%Y-%m-%d")
cursor.execute(
"""
SELECT
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE user_id = ? AND date(created_at) = ?
""",
(user_id, date_filter),
)
stats = cursor.fetchone()
return {
"completed": stats["completed"] or 0,
"failed": stats["failed"] or 0,
"total_items": stats["total_items"] or 0,
"total_attachments": stats["total_attachments"] or 0,
}