Files
zsglpt/db/tasks.py

236 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from datetime import datetime
import pytz
import db_pool
from db.utils import sanitize_sql_like_pattern
def create_task_log(
user_id,
account_id,
username,
browse_type,
status,
total_items=0,
total_attachments=0,
error_message="",
duration=None,
source="manual",
):
"""创建任务日志记录"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute(
"""
INSERT INTO task_logs (
user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, created_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
account_id,
username,
browse_type,
status,
total_items,
total_attachments,
error_message,
duration,
cst_time,
source,
),
)
conn.commit()
return cursor.lastrowid
def get_task_logs(
limit=100,
offset=0,
date_filter=None,
status_filter=None,
source_filter=None,
user_id_filter=None,
account_filter=None,
):
"""获取任务日志列表(支持分页和多种筛选)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
where_clauses = ["1=1"]
params = []
if date_filter:
where_clauses.append("date(tl.created_at) = ?")
params.append(date_filter)
if status_filter:
where_clauses.append("tl.status = ?")
params.append(status_filter)
if source_filter:
where_clauses.append("tl.source = ?")
params.append(source_filter)
if user_id_filter:
where_clauses.append("tl.user_id = ?")
params.append(user_id_filter)
if account_filter:
safe_filter = sanitize_sql_like_pattern(account_filter)
where_clauses.append("tl.username LIKE ? ESCAPE '\\\\'")
params.append(f"%{safe_filter}%")
where_sql = " AND ".join(where_clauses)
count_sql = f"""
SELECT COUNT(*) as total
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
"""
cursor.execute(count_sql, params)
total = cursor.fetchone()["total"]
data_sql = f"""
SELECT
tl.*,
u.username as user_username
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
ORDER BY tl.created_at DESC
LIMIT ? OFFSET ?
"""
params.extend([limit, offset])
cursor.execute(data_sql, params)
logs = [dict(row) for row in cursor.fetchall()]
return {"logs": logs, "total": total}
def get_task_stats(date_filter=None):
"""获取任务统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime("%Y-%m-%d")
cursor.execute(
"""
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE date(created_at) = ?
""",
(date_filter,),
)
today_stats = cursor.fetchone()
cursor.execute(
"""
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
"""
)
total_stats = cursor.fetchone()
return {
"today": {
"total_tasks": today_stats["total_tasks"] or 0,
"success_tasks": today_stats["success_tasks"] or 0,
"failed_tasks": today_stats["failed_tasks"] or 0,
"total_items": today_stats["total_items"] or 0,
"total_attachments": today_stats["total_attachments"] or 0,
},
"total": {
"total_tasks": total_stats["total_tasks"] or 0,
"success_tasks": total_stats["success_tasks"] or 0,
"failed_tasks": total_stats["failed_tasks"] or 0,
"total_items": total_stats["total_items"] or 0,
"total_attachments": total_stats["total_attachments"] or 0,
},
}
def delete_old_task_logs(days=30, batch_size=1000):
"""删除N天前的任务日志分批删除避免长时间锁表"""
total_deleted = 0
while True:
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM task_logs
WHERE rowid IN (
SELECT rowid FROM task_logs
WHERE created_at < datetime('now', 'localtime', '-' || ? || ' days')
LIMIT ?
)
""",
(days, batch_size),
)
deleted = cursor.rowcount
conn.commit()
if deleted == 0:
break
total_deleted += deleted
return total_deleted
def get_user_run_stats(user_id, date_filter=None):
"""获取用户的运行统计信息"""
with db_pool.get_db() as conn:
cst_tz = pytz.timezone("Asia/Shanghai")
cursor = conn.cursor()
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime("%Y-%m-%d")
cursor.execute(
"""
SELECT
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE user_id = ? AND date(created_at) = ?
""",
(user_id, date_filter),
)
stats = cursor.fetchone()
return {
"completed": stats["completed"] or 0,
"failed": stats["failed"] or 0,
"total_items": stats["total_items"] or 0,
"total_attachments": stats["total_attachments"] or 0,
}