Files
zsglpt/db/schedules.py
2025-12-15 13:30:40 +08:00

507 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from datetime import datetime
import db_pool
from services.schedule_utils import compute_next_run_at, format_cst
from services.time_utils import get_beijing_now
def get_user_schedules(user_id):
"""获取用户的所有定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM user_schedules
WHERE user_id = ?
ORDER BY created_at DESC
""",
(user_id,),
)
return [dict(row) for row in cursor.fetchall()]
def get_schedule_by_id(schedule_id):
"""根据ID获取定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM user_schedules WHERE id = ?", (schedule_id,))
row = cursor.fetchone()
return dict(row) if row else None
def create_user_schedule(
user_id,
name="我的定时任务",
schedule_time="08:00",
weekdays="1,2,3,4,5",
browse_type="应读",
enable_screenshot=1,
random_delay=0,
account_ids=None,
):
"""创建用户定时任务"""
import json
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = format_cst(get_beijing_now())
account_ids_str = json.dumps(account_ids) if account_ids else "[]"
cursor.execute(
"""
INSERT INTO user_schedules (
user_id, name, enabled, schedule_time, weekdays,
browse_type, enable_screenshot, random_delay, account_ids, created_at, updated_at
) VALUES (?, ?, 0, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
name,
schedule_time,
weekdays,
browse_type,
enable_screenshot,
int(random_delay or 0),
account_ids_str,
cst_time,
cst_time,
),
)
conn.commit()
return cursor.lastrowid
def update_user_schedule(schedule_id, **kwargs):
"""更新用户定时任务"""
import json
with db_pool.get_db() as conn:
cursor = conn.cursor()
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
updates = []
params = []
allowed_fields = [
"name",
"enabled",
"schedule_time",
"weekdays",
"browse_type",
"enable_screenshot",
"random_delay",
"account_ids",
]
# 读取旧值,用于决定是否需要重算 next_run_at
cursor.execute(
"""
SELECT enabled, schedule_time, weekdays, random_delay, last_run_at
FROM user_schedules
WHERE id = ?
""",
(schedule_id,),
)
current = cursor.fetchone()
if not current:
return False
current_enabled = int(current[0] or 0)
current_time = current[1]
current_weekdays = current[2]
current_random_delay = int(current[3] or 0)
current_last_run_at = current[4]
will_enabled = current_enabled
next_time = current_time
next_weekdays = current_weekdays
next_random_delay = current_random_delay
for field in allowed_fields:
if field in kwargs:
value = kwargs[field]
if field == "account_ids" and isinstance(value, list):
value = json.dumps(value)
if field == "enabled":
will_enabled = 1 if value else 0
if field == "schedule_time":
next_time = value
if field == "weekdays":
next_weekdays = value
if field == "random_delay":
next_random_delay = int(value or 0)
updates.append(f"{field} = ?")
params.append(value)
if not updates:
return False
updates.append("updated_at = ?")
params.append(now_str)
# 关键字段变更后重算 next_run_at确保索引驱动不会跑偏
#
# 需求:当用户修改“执行时间/执行日期/随机±15分钟”后即使今天已经执行过也允许按新配置在今天再次触发。
# 做法:这些关键字段发生变更时,重算 next_run_at 时忽略 last_run_at 的“同日仅一次”限制。
config_changed = any(key in kwargs for key in ["schedule_time", "weekdays", "random_delay"])
enabled_toggled = "enabled" in kwargs
should_recompute_next = config_changed or (enabled_toggled and will_enabled == 1)
if should_recompute_next:
next_dt = compute_next_run_at(
now=now_dt,
schedule_time=str(next_time or "08:00"),
weekdays=str(next_weekdays or "1,2,3,4,5"),
random_delay=int(next_random_delay or 0),
last_run_at=None if config_changed else (str(current_last_run_at or "") if current_last_run_at else None),
)
updates.append("next_run_at = ?")
params.append(format_cst(next_dt))
# 若本次显式禁用任务,则 next_run_at 清空(与 toggle 行为保持一致)
if enabled_toggled and will_enabled == 0:
updates.append("next_run_at = ?")
params.append(None)
params.append(schedule_id)
sql = f"UPDATE user_schedules SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def delete_user_schedule(schedule_id):
"""删除用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM user_schedules WHERE id = ?", (schedule_id,))
conn.commit()
return cursor.rowcount > 0
def toggle_user_schedule(schedule_id, enabled):
"""启用/禁用用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
next_run_at = None
if enabled:
cursor.execute(
"""
SELECT schedule_time, weekdays, random_delay, last_run_at, next_run_at
FROM user_schedules
WHERE id = ?
""",
(schedule_id,),
)
row = cursor.fetchone()
if row:
schedule_time, weekdays, random_delay, last_run_at, existing_next_run_at = (
row[0],
row[1],
row[2],
row[3],
row[4],
)
existing_next_run_at = str(existing_next_run_at or "").strip() or None
# 若 next_run_at 已经被“修改配置”逻辑预先计算好且仍在未来,则优先沿用,
# 避免 last_run_at 的“同日仅一次”限制阻塞用户把任务调整到今天再次触发。
if existing_next_run_at and existing_next_run_at > now_str:
next_run_at = existing_next_run_at
else:
next_dt = compute_next_run_at(
now=now_dt,
schedule_time=str(schedule_time or "08:00"),
weekdays=str(weekdays or "1,2,3,4,5"),
random_delay=int(random_delay or 0),
last_run_at=str(last_run_at or "") if last_run_at else None,
)
next_run_at = format_cst(next_dt)
cursor.execute(
"""
UPDATE user_schedules
SET enabled = ?, next_run_at = ?, updated_at = ?
WHERE id = ?
""",
(1 if enabled else 0, next_run_at, now_str, schedule_id),
)
conn.commit()
return cursor.rowcount > 0
def get_enabled_user_schedules():
"""获取所有启用的用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT us.*, u.username as user_username
FROM user_schedules us
JOIN users u ON us.user_id = u.id
WHERE us.enabled = 1
ORDER BY us.schedule_time
"""
)
return [dict(row) for row in cursor.fetchall()]
def update_schedule_last_run(schedule_id):
"""更新定时任务最后运行时间,并推进 next_run_atO-08"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
cursor.execute(
"""
SELECT schedule_time, weekdays, random_delay
FROM user_schedules
WHERE id = ?
""",
(schedule_id,),
)
row = cursor.fetchone()
if not row:
return False
schedule_time, weekdays, random_delay = row[0], row[1], row[2]
next_dt = compute_next_run_at(
now=now_dt,
schedule_time=str(schedule_time or "08:00"),
weekdays=str(weekdays or "1,2,3,4,5"),
random_delay=int(random_delay or 0),
last_run_at=now_str,
)
next_run_at = format_cst(next_dt)
cursor.execute(
"""
UPDATE user_schedules
SET last_run_at = ?, next_run_at = ?, updated_at = ?
WHERE id = ?
""",
(now_str, next_run_at, now_str, schedule_id),
)
conn.commit()
return cursor.rowcount > 0
def update_schedule_next_run(schedule_id: int, next_run_at: str) -> bool:
"""仅更新 next_run_at不改变 last_run_at用于跳过执行时推进。"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE user_schedules
SET next_run_at = ?, updated_at = ?
WHERE id = ?
""",
(str(next_run_at or "").strip() or None, format_cst(get_beijing_now()), int(schedule_id)),
)
conn.commit()
return cursor.rowcount > 0
def recompute_schedule_next_run(schedule_id: int, *, now_dt=None) -> bool:
"""按当前配置重算 next_run_at不改变 last_run_at"""
now_dt = now_dt or get_beijing_now()
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT schedule_time, weekdays, random_delay, last_run_at
FROM user_schedules
WHERE id = ?
""",
(int(schedule_id),),
)
row = cursor.fetchone()
if not row:
return False
schedule_time, weekdays, random_delay, last_run_at = row[0], row[1], row[2], row[3]
next_dt = compute_next_run_at(
now=now_dt,
schedule_time=str(schedule_time or "08:00"),
weekdays=str(weekdays or "1,2,3,4,5"),
random_delay=int(random_delay or 0),
last_run_at=str(last_run_at or "") if last_run_at else None,
)
return update_schedule_next_run(int(schedule_id), format_cst(next_dt))
def get_due_user_schedules(now_cst: str, limit: int = 50):
"""获取到期需要执行的用户定时任务(索引驱动)。"""
now_cst = str(now_cst or "").strip()
if not now_cst:
now_cst = format_cst(get_beijing_now())
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT us.*, u.username as user_username
FROM user_schedules us
JOIN users u ON us.user_id = u.id
WHERE us.enabled = 1
AND us.next_run_at IS NOT NULL
AND us.next_run_at <= ?
ORDER BY us.next_run_at ASC
LIMIT ?
""",
(now_cst, int(limit)),
)
return [dict(row) for row in cursor.fetchall()]
# ==================== 定时任务执行日志 ====================
def create_schedule_execution_log(schedule_id, user_id, schedule_name):
"""创建定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
execute_time = format_cst(get_beijing_now())
cursor.execute(
"""
INSERT INTO schedule_execution_logs (
schedule_id, user_id, schedule_name, execute_time, status
) VALUES (?, ?, ?, ?, 'running')
""",
(schedule_id, user_id, schedule_name, execute_time),
)
conn.commit()
return cursor.lastrowid
def update_schedule_execution_log(log_id, **kwargs):
"""更新定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
allowed_fields = [
"total_accounts",
"success_accounts",
"failed_accounts",
"total_items",
"total_attachments",
"total_screenshots",
"duration_seconds",
"status",
"error_message",
]
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
params.append(kwargs[field])
if not updates:
return False
params.append(log_id)
sql = f"UPDATE schedule_execution_logs SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def get_schedule_execution_logs(schedule_id, limit=10):
"""获取定时任务执行日志"""
try:
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM schedule_execution_logs
WHERE schedule_id = ?
ORDER BY execute_time DESC
LIMIT ?
""",
(schedule_id, limit),
)
logs = []
rows = cursor.fetchall()
for row in rows:
try:
log = dict(row)
log["created_at"] = log.get("execute_time")
log["success_count"] = log.get("success_accounts", 0)
log["failed_count"] = log.get("failed_accounts", 0)
log["duration"] = log.get("duration_seconds", 0)
logs.append(log)
except Exception as e:
print(f"[数据库] 处理日志行时出错: {e}")
continue
return logs
except Exception as e:
print(f"[数据库] 查询定时任务日志时出错: {e}")
import traceback
traceback.print_exc()
return []
def get_user_all_schedule_logs(user_id, limit=50):
"""获取用户所有定时任务的执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM schedule_execution_logs
WHERE user_id = ?
ORDER BY execute_time DESC
LIMIT ?
""",
(user_id, limit),
)
return [dict(row) for row in cursor.fetchall()]
def delete_schedule_logs(schedule_id, user_id):
"""删除指定定时任务的所有执行日志(需验证用户权限)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM schedule_execution_logs
WHERE schedule_id = ? AND user_id = ?
""",
(schedule_id, user_id),
)
conn.commit()
return cursor.rowcount
def clean_old_schedule_logs(days=30):
"""清理指定天数前的定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM schedule_execution_logs
WHERE execute_time < datetime('now', 'localtime', '-' || ? || ' days')
""",
(days,),
)
conn.commit()
return cursor.rowcount