#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations from datetime import datetime import db_pool from services.schedule_utils import compute_next_run_at, format_cst from services.time_utils import get_beijing_now def get_user_schedules(user_id): """获取用户的所有定时任务""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM user_schedules WHERE user_id = ? ORDER BY created_at DESC """, (user_id,), ) return [dict(row) for row in cursor.fetchall()] def get_schedule_by_id(schedule_id): """根据ID获取定时任务""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM user_schedules WHERE id = ?", (schedule_id,)) row = cursor.fetchone() return dict(row) if row else None def create_user_schedule( user_id, name="我的定时任务", schedule_time="08:00", weekdays="1,2,3,4,5", browse_type="应读", enable_screenshot=1, random_delay=0, account_ids=None, ): """创建用户定时任务""" import json with db_pool.get_db() as conn: cursor = conn.cursor() cst_time = format_cst(get_beijing_now()) account_ids_str = json.dumps(account_ids) if account_ids else "[]" cursor.execute( """ INSERT INTO user_schedules ( user_id, name, enabled, schedule_time, weekdays, browse_type, enable_screenshot, random_delay, account_ids, created_at, updated_at ) VALUES (?, ?, 0, ?, ?, ?, ?, ?, ?, ?, ?) """, ( user_id, name, schedule_time, weekdays, browse_type, enable_screenshot, int(random_delay or 0), account_ids_str, cst_time, cst_time, ), ) conn.commit() return cursor.lastrowid def update_user_schedule(schedule_id, **kwargs): """更新用户定时任务""" import json with db_pool.get_db() as conn: cursor = conn.cursor() now_dt = get_beijing_now() now_str = format_cst(now_dt) updates = [] params = [] allowed_fields = [ "name", "enabled", "schedule_time", "weekdays", "browse_type", "enable_screenshot", "random_delay", "account_ids", ] # 读取旧值,用于决定是否需要重算 next_run_at cursor.execute( """ SELECT enabled, schedule_time, weekdays, random_delay, last_run_at FROM user_schedules WHERE id = ? """, (schedule_id,), ) current = cursor.fetchone() if not current: return False current_enabled = int(current[0] or 0) current_time = current[1] current_weekdays = current[2] current_random_delay = int(current[3] or 0) current_last_run_at = current[4] will_enabled = current_enabled next_time = current_time next_weekdays = current_weekdays next_random_delay = current_random_delay for field in allowed_fields: if field in kwargs: value = kwargs[field] if field == "account_ids" and isinstance(value, list): value = json.dumps(value) if field == "enabled": will_enabled = 1 if value else 0 if field == "schedule_time": next_time = value if field == "weekdays": next_weekdays = value if field == "random_delay": next_random_delay = int(value or 0) updates.append(f"{field} = ?") params.append(value) if not updates: return False updates.append("updated_at = ?") params.append(now_str) # 关键字段变更后重算 next_run_at,确保索引驱动不会跑偏 # # 需求:当用户修改“执行时间/执行日期/随机±15分钟”后,即使今天已经执行过,也允许按新配置在今天再次触发。 # 做法:这些关键字段发生变更时,重算 next_run_at 时忽略 last_run_at 的“同日仅一次”限制。 config_changed = any(key in kwargs for key in ["schedule_time", "weekdays", "random_delay"]) enabled_toggled = "enabled" in kwargs should_recompute_next = config_changed or (enabled_toggled and will_enabled == 1) if should_recompute_next: next_dt = compute_next_run_at( now=now_dt, schedule_time=str(next_time or "08:00"), weekdays=str(next_weekdays or "1,2,3,4,5"), random_delay=int(next_random_delay or 0), last_run_at=None if config_changed else (str(current_last_run_at or "") if current_last_run_at else None), ) updates.append("next_run_at = ?") params.append(format_cst(next_dt)) # 若本次显式禁用任务,则 next_run_at 清空(与 toggle 行为保持一致) if enabled_toggled and will_enabled == 0: updates.append("next_run_at = ?") params.append(None) params.append(schedule_id) sql = f"UPDATE user_schedules SET {', '.join(updates)} WHERE id = ?" cursor.execute(sql, params) conn.commit() return cursor.rowcount > 0 def delete_user_schedule(schedule_id): """删除用户定时任务""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM user_schedules WHERE id = ?", (schedule_id,)) conn.commit() return cursor.rowcount > 0 def toggle_user_schedule(schedule_id, enabled): """启用/禁用用户定时任务""" with db_pool.get_db() as conn: cursor = conn.cursor() now_dt = get_beijing_now() now_str = format_cst(now_dt) next_run_at = None if enabled: cursor.execute( """ SELECT schedule_time, weekdays, random_delay, last_run_at, next_run_at FROM user_schedules WHERE id = ? """, (schedule_id,), ) row = cursor.fetchone() if row: schedule_time, weekdays, random_delay, last_run_at, existing_next_run_at = ( row[0], row[1], row[2], row[3], row[4], ) existing_next_run_at = str(existing_next_run_at or "").strip() or None # 若 next_run_at 已经被“修改配置”逻辑预先计算好且仍在未来,则优先沿用, # 避免 last_run_at 的“同日仅一次”限制阻塞用户把任务调整到今天再次触发。 if existing_next_run_at and existing_next_run_at > now_str: next_run_at = existing_next_run_at else: next_dt = compute_next_run_at( now=now_dt, schedule_time=str(schedule_time or "08:00"), weekdays=str(weekdays or "1,2,3,4,5"), random_delay=int(random_delay or 0), last_run_at=str(last_run_at or "") if last_run_at else None, ) next_run_at = format_cst(next_dt) cursor.execute( """ UPDATE user_schedules SET enabled = ?, next_run_at = ?, updated_at = ? WHERE id = ? """, (1 if enabled else 0, next_run_at, now_str, schedule_id), ) conn.commit() return cursor.rowcount > 0 def get_enabled_user_schedules(): """获取所有启用的用户定时任务""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT us.*, u.username as user_username FROM user_schedules us JOIN users u ON us.user_id = u.id WHERE us.enabled = 1 ORDER BY us.schedule_time """ ) return [dict(row) for row in cursor.fetchall()] def update_schedule_last_run(schedule_id): """更新定时任务最后运行时间,并推进 next_run_at(O-08)""" with db_pool.get_db() as conn: cursor = conn.cursor() now_dt = get_beijing_now() now_str = format_cst(now_dt) cursor.execute( """ SELECT schedule_time, weekdays, random_delay FROM user_schedules WHERE id = ? """, (schedule_id,), ) row = cursor.fetchone() if not row: return False schedule_time, weekdays, random_delay = row[0], row[1], row[2] next_dt = compute_next_run_at( now=now_dt, schedule_time=str(schedule_time or "08:00"), weekdays=str(weekdays or "1,2,3,4,5"), random_delay=int(random_delay or 0), last_run_at=now_str, ) next_run_at = format_cst(next_dt) cursor.execute( """ UPDATE user_schedules SET last_run_at = ?, next_run_at = ?, updated_at = ? WHERE id = ? """, (now_str, next_run_at, now_str, schedule_id), ) conn.commit() return cursor.rowcount > 0 def update_schedule_next_run(schedule_id: int, next_run_at: str) -> bool: """仅更新 next_run_at(不改变 last_run_at),用于跳过执行时推进。""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE user_schedules SET next_run_at = ?, updated_at = ? WHERE id = ? """, (str(next_run_at or "").strip() or None, format_cst(get_beijing_now()), int(schedule_id)), ) conn.commit() return cursor.rowcount > 0 def recompute_schedule_next_run(schedule_id: int, *, now_dt=None) -> bool: """按当前配置重算 next_run_at(不改变 last_run_at)。""" now_dt = now_dt or get_beijing_now() with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT schedule_time, weekdays, random_delay, last_run_at FROM user_schedules WHERE id = ? """, (int(schedule_id),), ) row = cursor.fetchone() if not row: return False schedule_time, weekdays, random_delay, last_run_at = row[0], row[1], row[2], row[3] next_dt = compute_next_run_at( now=now_dt, schedule_time=str(schedule_time or "08:00"), weekdays=str(weekdays or "1,2,3,4,5"), random_delay=int(random_delay or 0), last_run_at=str(last_run_at or "") if last_run_at else None, ) return update_schedule_next_run(int(schedule_id), format_cst(next_dt)) def get_due_user_schedules(now_cst: str, limit: int = 50): """获取到期需要执行的用户定时任务(索引驱动)。""" now_cst = str(now_cst or "").strip() if not now_cst: now_cst = format_cst(get_beijing_now()) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT us.*, u.username as user_username FROM user_schedules us JOIN users u ON us.user_id = u.id WHERE us.enabled = 1 AND us.next_run_at IS NOT NULL AND us.next_run_at <= ? ORDER BY us.next_run_at ASC LIMIT ? """, (now_cst, int(limit)), ) return [dict(row) for row in cursor.fetchall()] # ==================== 定时任务执行日志 ==================== def create_schedule_execution_log(schedule_id, user_id, schedule_name): """创建定时任务执行日志""" with db_pool.get_db() as conn: cursor = conn.cursor() execute_time = format_cst(get_beijing_now()) cursor.execute( """ INSERT INTO schedule_execution_logs ( schedule_id, user_id, schedule_name, execute_time, status ) VALUES (?, ?, ?, ?, 'running') """, (schedule_id, user_id, schedule_name, execute_time), ) conn.commit() return cursor.lastrowid def update_schedule_execution_log(log_id, **kwargs): """更新定时任务执行日志""" with db_pool.get_db() as conn: cursor = conn.cursor() updates = [] params = [] allowed_fields = [ "total_accounts", "success_accounts", "failed_accounts", "total_items", "total_attachments", "total_screenshots", "duration_seconds", "status", "error_message", ] for field in allowed_fields: if field in kwargs: updates.append(f"{field} = ?") params.append(kwargs[field]) if not updates: return False params.append(log_id) sql = f"UPDATE schedule_execution_logs SET {', '.join(updates)} WHERE id = ?" cursor.execute(sql, params) conn.commit() return cursor.rowcount > 0 def get_schedule_execution_logs(schedule_id, limit=10): """获取定时任务执行日志""" try: with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM schedule_execution_logs WHERE schedule_id = ? ORDER BY execute_time DESC LIMIT ? """, (schedule_id, limit), ) logs = [] rows = cursor.fetchall() for row in rows: try: log = dict(row) log["created_at"] = log.get("execute_time") log["success_count"] = log.get("success_accounts", 0) log["failed_count"] = log.get("failed_accounts", 0) log["duration"] = log.get("duration_seconds", 0) logs.append(log) except Exception as e: print(f"[数据库] 处理日志行时出错: {e}") continue return logs except Exception as e: print(f"[数据库] 查询定时任务日志时出错: {e}") import traceback traceback.print_exc() return [] def get_user_all_schedule_logs(user_id, limit=50): """获取用户所有定时任务的执行日志""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM schedule_execution_logs WHERE user_id = ? ORDER BY execute_time DESC LIMIT ? """, (user_id, limit), ) return [dict(row) for row in cursor.fetchall()] def delete_schedule_logs(schedule_id, user_id): """删除指定定时任务的所有执行日志(需验证用户权限)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ DELETE FROM schedule_execution_logs WHERE schedule_id = ? AND user_id = ? """, (schedule_id, user_id), ) conn.commit() return cursor.rowcount def clean_old_schedule_logs(days=30): """清理指定天数前的定时任务执行日志""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ DELETE FROM schedule_execution_logs WHERE execute_time < datetime('now', 'localtime', '-' || ? || ' days') """, (days,), ) conn.commit() return cursor.rowcount