Files
zsglpt/db/tasks.py

311 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from datetime import datetime, timedelta
import db_pool
from db.utils import get_cst_now, get_cst_now_str, sanitize_sql_like_pattern
_TASK_STATS_SELECT_SQL = """
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
"""
_USER_RUN_STATS_SELECT_SQL = """
SELECT
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
"""
def _build_day_bounds(date_filter: str) -> tuple[str | None, str | None]:
"""将 YYYY-MM-DD 转换为 [day_start, day_end) 区间。"""
try:
day_start = datetime.strptime(str(date_filter), "%Y-%m-%d")
except Exception:
return None, None
day_end = day_start + timedelta(days=1)
return day_start.strftime("%Y-%m-%d %H:%M:%S"), day_end.strftime("%Y-%m-%d %H:%M:%S")
def _normalize_int(value, default: int, *, minimum: int | None = None) -> int:
try:
parsed = int(value)
except Exception:
parsed = default
if minimum is not None and parsed < minimum:
return minimum
return parsed
def _stat_value(row, key: str) -> int:
try:
value = row[key] if row else 0
except Exception:
value = 0
return int(value or 0)
def _build_task_logs_where_sql(
*,
date_filter=None,
status_filter=None,
source_filter=None,
user_id_filter=None,
account_filter=None,
) -> tuple[str, list]:
where_clauses = ["1=1"]
params = []
if date_filter:
day_start, day_end = _build_day_bounds(date_filter)
if day_start and day_end:
where_clauses.append("tl.created_at >= ? AND tl.created_at < ?")
params.extend([day_start, day_end])
else:
where_clauses.append("date(tl.created_at) = ?")
params.append(date_filter)
if status_filter:
where_clauses.append("tl.status = ?")
params.append(status_filter)
if source_filter:
source_filter = str(source_filter or "").strip()
if source_filter == "user_scheduled":
where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'")
params.append("user_scheduled:%")
elif source_filter.endswith("*"):
prefix = source_filter[:-1]
safe_prefix = sanitize_sql_like_pattern(prefix)
where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'")
params.append(f"{safe_prefix}%")
else:
where_clauses.append("tl.source = ?")
params.append(source_filter)
if user_id_filter:
where_clauses.append("tl.user_id = ?")
params.append(user_id_filter)
if account_filter:
safe_filter = sanitize_sql_like_pattern(account_filter)
where_clauses.append("tl.username LIKE ? ESCAPE '\\\\'")
params.append(f"%{safe_filter}%")
return " AND ".join(where_clauses), params
def _fetch_task_stats_row(cursor, *, where_clause: str = "", params: tuple | list = ()) -> dict:
sql = _TASK_STATS_SELECT_SQL
if where_clause:
sql = f"{sql}\nWHERE {where_clause}"
cursor.execute(sql, params)
row = cursor.fetchone()
return {
"total_tasks": _stat_value(row, "total_tasks"),
"success_tasks": _stat_value(row, "success_tasks"),
"failed_tasks": _stat_value(row, "failed_tasks"),
"total_items": _stat_value(row, "total_items"),
"total_attachments": _stat_value(row, "total_attachments"),
}
def _fetch_user_run_stats_row(cursor, *, where_clause: str, params: tuple | list) -> dict:
sql = f"{_USER_RUN_STATS_SELECT_SQL}\nWHERE {where_clause}"
cursor.execute(sql, params)
row = cursor.fetchone()
return {
"completed": _stat_value(row, "completed"),
"failed": _stat_value(row, "failed"),
"total_items": _stat_value(row, "total_items"),
"total_attachments": _stat_value(row, "total_attachments"),
}
def create_task_log(
user_id,
account_id,
username,
browse_type,
status,
total_items=0,
total_attachments=0,
error_message="",
duration=None,
source="manual",
):
"""创建任务日志记录"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO task_logs (
user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, created_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
account_id,
username,
browse_type,
status,
total_items,
total_attachments,
error_message,
duration,
get_cst_now_str(),
source,
),
)
conn.commit()
return cursor.lastrowid
def get_task_logs(
limit=100,
offset=0,
date_filter=None,
status_filter=None,
source_filter=None,
user_id_filter=None,
account_filter=None,
):
"""获取任务日志列表(支持分页和多种筛选)"""
limit = _normalize_int(limit, 100, minimum=1)
offset = _normalize_int(offset, 0, minimum=0)
with db_pool.get_db() as conn:
cursor = conn.cursor()
where_sql, params = _build_task_logs_where_sql(
date_filter=date_filter,
status_filter=status_filter,
source_filter=source_filter,
user_id_filter=user_id_filter,
account_filter=account_filter,
)
count_sql = f"""
SELECT COUNT(*) as total
FROM task_logs tl
WHERE {where_sql}
"""
cursor.execute(count_sql, params)
total = _stat_value(cursor.fetchone(), "total")
data_sql = f"""
SELECT
tl.*,
u.username as user_username
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
ORDER BY tl.created_at DESC
LIMIT ? OFFSET ?
"""
data_params = list(params)
data_params.extend([limit, offset])
cursor.execute(data_sql, data_params)
logs = [dict(row) for row in cursor.fetchall()]
return {"logs": logs, "total": total}
def get_task_stats(date_filter=None):
"""获取任务统计信息"""
if date_filter is None:
date_filter = get_cst_now().strftime("%Y-%m-%d")
day_start, day_end = _build_day_bounds(date_filter)
with db_pool.get_db() as conn:
cursor = conn.cursor()
if day_start and day_end:
today_stats = _fetch_task_stats_row(
cursor,
where_clause="created_at >= ? AND created_at < ?",
params=(day_start, day_end),
)
else:
today_stats = _fetch_task_stats_row(
cursor,
where_clause="date(created_at) = ?",
params=(date_filter,),
)
total_stats = _fetch_task_stats_row(cursor)
return {"today": today_stats, "total": total_stats}
def delete_old_task_logs(days=30, batch_size=1000):
"""删除N天前的任务日志分批删除避免长时间锁表"""
days = _normalize_int(days, 30, minimum=0)
batch_size = _normalize_int(batch_size, 1000, minimum=1)
cutoff = (get_cst_now() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S")
total_deleted = 0
while True:
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM task_logs
WHERE rowid IN (
SELECT rowid FROM task_logs
WHERE created_at < ?
LIMIT ?
)
""",
(cutoff, batch_size),
)
deleted = cursor.rowcount
conn.commit()
if deleted <= 0:
break
total_deleted += deleted
return total_deleted
def get_user_run_stats(user_id, date_filter=None):
"""获取用户的运行统计信息"""
if date_filter is None:
date_filter = get_cst_now().strftime("%Y-%m-%d")
day_start, day_end = _build_day_bounds(date_filter)
with db_pool.get_db() as conn:
cursor = conn.cursor()
if day_start and day_end:
return _fetch_user_run_stats_row(
cursor,
where_clause="user_id = ? AND created_at >= ? AND created_at < ?",
params=(user_id, day_start, day_end),
)
return _fetch_user_run_stats_row(
cursor,
where_clause="user_id = ? AND date(created_at) = ?",
params=(user_id, date_filter),
)