#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations from datetime import datetime import pytz import db_pool from db.utils import sanitize_sql_like_pattern def create_task_log( user_id, account_id, username, browse_type, status, total_items=0, total_attachments=0, error_message="", duration=None, source="manual", ): """创建任务日志记录""" with db_pool.get_db() as conn: cursor = conn.cursor() cst_tz = pytz.timezone("Asia/Shanghai") cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S") cursor.execute( """ INSERT INTO task_logs ( user_id, account_id, username, browse_type, status, total_items, total_attachments, error_message, duration, created_at, source ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( user_id, account_id, username, browse_type, status, total_items, total_attachments, error_message, duration, cst_time, source, ), ) conn.commit() return cursor.lastrowid def get_task_logs( limit=100, offset=0, date_filter=None, status_filter=None, source_filter=None, user_id_filter=None, account_filter=None, ): """获取任务日志列表(支持分页和多种筛选)""" with db_pool.get_db() as conn: cursor = conn.cursor() where_clauses = ["1=1"] params = [] if date_filter: where_clauses.append("date(tl.created_at) = ?") params.append(date_filter) if status_filter: where_clauses.append("tl.status = ?") params.append(status_filter) if source_filter: source_filter = str(source_filter or "").strip() # 兼容“虚拟来源”:用于筛选 user_scheduled:batch_xxx 这类动态值 if source_filter == "user_scheduled": where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'") params.append("user_scheduled:%") elif source_filter.endswith("*"): prefix = source_filter[:-1] safe_prefix = sanitize_sql_like_pattern(prefix) where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'") params.append(f"{safe_prefix}%") else: where_clauses.append("tl.source = ?") params.append(source_filter) if user_id_filter: where_clauses.append("tl.user_id = ?") params.append(user_id_filter) if account_filter: safe_filter = sanitize_sql_like_pattern(account_filter) where_clauses.append("tl.username LIKE ? ESCAPE '\\\\'") params.append(f"%{safe_filter}%") where_sql = " AND ".join(where_clauses) count_sql = f""" SELECT COUNT(*) as total FROM task_logs tl LEFT JOIN users u ON tl.user_id = u.id WHERE {where_sql} """ cursor.execute(count_sql, params) total = cursor.fetchone()["total"] data_sql = f""" SELECT tl.*, u.username as user_username FROM task_logs tl LEFT JOIN users u ON tl.user_id = u.id WHERE {where_sql} ORDER BY tl.created_at DESC LIMIT ? OFFSET ? """ params.extend([limit, offset]) cursor.execute(data_sql, params) logs = [dict(row) for row in cursor.fetchall()] return {"logs": logs, "total": total} def get_task_stats(date_filter=None): """获取任务统计信息""" with db_pool.get_db() as conn: cursor = conn.cursor() cst_tz = pytz.timezone("Asia/Shanghai") if date_filter is None: date_filter = datetime.now(cst_tz).strftime("%Y-%m-%d") cursor.execute( """ SELECT COUNT(*) as total_tasks, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks, SUM(total_items) as total_items, SUM(total_attachments) as total_attachments FROM task_logs WHERE date(created_at) = ? """, (date_filter,), ) today_stats = cursor.fetchone() cursor.execute( """ SELECT COUNT(*) as total_tasks, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks, SUM(total_items) as total_items, SUM(total_attachments) as total_attachments FROM task_logs """ ) total_stats = cursor.fetchone() return { "today": { "total_tasks": today_stats["total_tasks"] or 0, "success_tasks": today_stats["success_tasks"] or 0, "failed_tasks": today_stats["failed_tasks"] or 0, "total_items": today_stats["total_items"] or 0, "total_attachments": today_stats["total_attachments"] or 0, }, "total": { "total_tasks": total_stats["total_tasks"] or 0, "success_tasks": total_stats["success_tasks"] or 0, "failed_tasks": total_stats["failed_tasks"] or 0, "total_items": total_stats["total_items"] or 0, "total_attachments": total_stats["total_attachments"] or 0, }, } def delete_old_task_logs(days=30, batch_size=1000): """删除N天前的任务日志(分批删除,避免长时间锁表)""" total_deleted = 0 while True: with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ DELETE FROM task_logs WHERE rowid IN ( SELECT rowid FROM task_logs WHERE created_at < datetime('now', 'localtime', '-' || ? || ' days') LIMIT ? ) """, (days, batch_size), ) deleted = cursor.rowcount conn.commit() if deleted == 0: break total_deleted += deleted return total_deleted def get_user_run_stats(user_id, date_filter=None): """获取用户的运行统计信息""" with db_pool.get_db() as conn: cst_tz = pytz.timezone("Asia/Shanghai") cursor = conn.cursor() if date_filter is None: date_filter = datetime.now(cst_tz).strftime("%Y-%m-%d") cursor.execute( """ SELECT SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as completed, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, SUM(total_items) as total_items, SUM(total_attachments) as total_attachments FROM task_logs WHERE user_id = ? AND date(created_at) = ? """, (user_id, date_filter), ) stats = cursor.fetchone() return { "completed": stats["completed"] or 0, "failed": stats["failed"] or 0, "total_items": stats["total_items"] or 0, "total_attachments": stats["total_attachments"] or 0, }