507 lines
16 KiB
Python
507 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
|
||
import db_pool
|
||
from services.schedule_utils import compute_next_run_at, format_cst
|
||
from services.time_utils import get_beijing_now
|
||
|
||
|
||
def get_user_schedules(user_id):
|
||
"""获取用户的所有定时任务"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT * FROM user_schedules
|
||
WHERE user_id = ?
|
||
ORDER BY created_at DESC
|
||
""",
|
||
(user_id,),
|
||
)
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
|
||
def get_schedule_by_id(schedule_id):
|
||
"""根据ID获取定时任务"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM user_schedules WHERE id = ?", (schedule_id,))
|
||
row = cursor.fetchone()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def create_user_schedule(
|
||
user_id,
|
||
name="我的定时任务",
|
||
schedule_time="08:00",
|
||
weekdays="1,2,3,4,5",
|
||
browse_type="应读",
|
||
enable_screenshot=1,
|
||
random_delay=0,
|
||
account_ids=None,
|
||
):
|
||
"""创建用户定时任务"""
|
||
import json
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cst_time = format_cst(get_beijing_now())
|
||
|
||
account_ids_str = json.dumps(account_ids) if account_ids else "[]"
|
||
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO user_schedules (
|
||
user_id, name, enabled, schedule_time, weekdays,
|
||
browse_type, enable_screenshot, random_delay, account_ids, created_at, updated_at
|
||
) VALUES (?, ?, 0, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
user_id,
|
||
name,
|
||
schedule_time,
|
||
weekdays,
|
||
browse_type,
|
||
enable_screenshot,
|
||
int(random_delay or 0),
|
||
account_ids_str,
|
||
cst_time,
|
||
cst_time,
|
||
),
|
||
)
|
||
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
|
||
|
||
def update_user_schedule(schedule_id, **kwargs):
|
||
"""更新用户定时任务"""
|
||
import json
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
now_dt = get_beijing_now()
|
||
now_str = format_cst(now_dt)
|
||
|
||
updates = []
|
||
params = []
|
||
|
||
allowed_fields = [
|
||
"name",
|
||
"enabled",
|
||
"schedule_time",
|
||
"weekdays",
|
||
"browse_type",
|
||
"enable_screenshot",
|
||
"random_delay",
|
||
"account_ids",
|
||
]
|
||
|
||
# 读取旧值,用于决定是否需要重算 next_run_at
|
||
cursor.execute(
|
||
"""
|
||
SELECT enabled, schedule_time, weekdays, random_delay, last_run_at
|
||
FROM user_schedules
|
||
WHERE id = ?
|
||
""",
|
||
(schedule_id,),
|
||
)
|
||
current = cursor.fetchone()
|
||
if not current:
|
||
return False
|
||
current_enabled = int(current[0] or 0)
|
||
current_time = current[1]
|
||
current_weekdays = current[2]
|
||
current_random_delay = int(current[3] or 0)
|
||
current_last_run_at = current[4]
|
||
|
||
will_enabled = current_enabled
|
||
next_time = current_time
|
||
next_weekdays = current_weekdays
|
||
next_random_delay = current_random_delay
|
||
|
||
for field in allowed_fields:
|
||
if field in kwargs:
|
||
value = kwargs[field]
|
||
if field == "account_ids" and isinstance(value, list):
|
||
value = json.dumps(value)
|
||
if field == "enabled":
|
||
will_enabled = 1 if value else 0
|
||
if field == "schedule_time":
|
||
next_time = value
|
||
if field == "weekdays":
|
||
next_weekdays = value
|
||
if field == "random_delay":
|
||
next_random_delay = int(value or 0)
|
||
updates.append(f"{field} = ?")
|
||
params.append(value)
|
||
|
||
if not updates:
|
||
return False
|
||
|
||
updates.append("updated_at = ?")
|
||
params.append(now_str)
|
||
|
||
# 关键字段变更后重算 next_run_at,确保索引驱动不会跑偏
|
||
#
|
||
# 需求:当用户修改“执行时间/执行日期/随机±15分钟”后,即使今天已经执行过,也允许按新配置在今天再次触发。
|
||
# 做法:这些关键字段发生变更时,重算 next_run_at 时忽略 last_run_at 的“同日仅一次”限制。
|
||
config_changed = any(key in kwargs for key in ["schedule_time", "weekdays", "random_delay"])
|
||
enabled_toggled = "enabled" in kwargs
|
||
should_recompute_next = config_changed or (enabled_toggled and will_enabled == 1)
|
||
if should_recompute_next:
|
||
next_dt = compute_next_run_at(
|
||
now=now_dt,
|
||
schedule_time=str(next_time or "08:00"),
|
||
weekdays=str(next_weekdays or "1,2,3,4,5"),
|
||
random_delay=int(next_random_delay or 0),
|
||
last_run_at=None if config_changed else (str(current_last_run_at or "") if current_last_run_at else None),
|
||
)
|
||
updates.append("next_run_at = ?")
|
||
params.append(format_cst(next_dt))
|
||
|
||
# 若本次显式禁用任务,则 next_run_at 清空(与 toggle 行为保持一致)
|
||
if enabled_toggled and will_enabled == 0:
|
||
updates.append("next_run_at = ?")
|
||
params.append(None)
|
||
params.append(schedule_id)
|
||
|
||
sql = f"UPDATE user_schedules SET {', '.join(updates)} WHERE id = ?"
|
||
cursor.execute(sql, params)
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def delete_user_schedule(schedule_id):
|
||
"""删除用户定时任务"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM user_schedules WHERE id = ?", (schedule_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def toggle_user_schedule(schedule_id, enabled):
|
||
"""启用/禁用用户定时任务"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
now_dt = get_beijing_now()
|
||
now_str = format_cst(now_dt)
|
||
|
||
next_run_at = None
|
||
if enabled:
|
||
cursor.execute(
|
||
"""
|
||
SELECT schedule_time, weekdays, random_delay, last_run_at, next_run_at
|
||
FROM user_schedules
|
||
WHERE id = ?
|
||
""",
|
||
(schedule_id,),
|
||
)
|
||
row = cursor.fetchone()
|
||
if row:
|
||
schedule_time, weekdays, random_delay, last_run_at, existing_next_run_at = (
|
||
row[0],
|
||
row[1],
|
||
row[2],
|
||
row[3],
|
||
row[4],
|
||
)
|
||
|
||
existing_next_run_at = str(existing_next_run_at or "").strip() or None
|
||
# 若 next_run_at 已经被“修改配置”逻辑预先计算好且仍在未来,则优先沿用,
|
||
# 避免 last_run_at 的“同日仅一次”限制阻塞用户把任务调整到今天再次触发。
|
||
if existing_next_run_at and existing_next_run_at > now_str:
|
||
next_run_at = existing_next_run_at
|
||
else:
|
||
next_dt = compute_next_run_at(
|
||
now=now_dt,
|
||
schedule_time=str(schedule_time or "08:00"),
|
||
weekdays=str(weekdays or "1,2,3,4,5"),
|
||
random_delay=int(random_delay or 0),
|
||
last_run_at=str(last_run_at or "") if last_run_at else None,
|
||
)
|
||
next_run_at = format_cst(next_dt)
|
||
|
||
cursor.execute(
|
||
"""
|
||
UPDATE user_schedules
|
||
SET enabled = ?, next_run_at = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(1 if enabled else 0, next_run_at, now_str, schedule_id),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def get_enabled_user_schedules():
|
||
"""获取所有启用的用户定时任务"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT us.*, u.username as user_username
|
||
FROM user_schedules us
|
||
JOIN users u ON us.user_id = u.id
|
||
WHERE us.enabled = 1
|
||
ORDER BY us.schedule_time
|
||
"""
|
||
)
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
|
||
def update_schedule_last_run(schedule_id):
|
||
"""更新定时任务最后运行时间,并推进 next_run_at(O-08)"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
now_dt = get_beijing_now()
|
||
now_str = format_cst(now_dt)
|
||
|
||
cursor.execute(
|
||
"""
|
||
SELECT schedule_time, weekdays, random_delay
|
||
FROM user_schedules
|
||
WHERE id = ?
|
||
""",
|
||
(schedule_id,),
|
||
)
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return False
|
||
schedule_time, weekdays, random_delay = row[0], row[1], row[2]
|
||
|
||
next_dt = compute_next_run_at(
|
||
now=now_dt,
|
||
schedule_time=str(schedule_time or "08:00"),
|
||
weekdays=str(weekdays or "1,2,3,4,5"),
|
||
random_delay=int(random_delay or 0),
|
||
last_run_at=now_str,
|
||
)
|
||
next_run_at = format_cst(next_dt)
|
||
|
||
cursor.execute(
|
||
"""
|
||
UPDATE user_schedules
|
||
SET last_run_at = ?, next_run_at = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(now_str, next_run_at, now_str, schedule_id),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def update_schedule_next_run(schedule_id: int, next_run_at: str) -> bool:
|
||
"""仅更新 next_run_at(不改变 last_run_at),用于跳过执行时推进。"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
UPDATE user_schedules
|
||
SET next_run_at = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(str(next_run_at or "").strip() or None, format_cst(get_beijing_now()), int(schedule_id)),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def recompute_schedule_next_run(schedule_id: int, *, now_dt=None) -> bool:
|
||
"""按当前配置重算 next_run_at(不改变 last_run_at)。"""
|
||
now_dt = now_dt or get_beijing_now()
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT schedule_time, weekdays, random_delay, last_run_at
|
||
FROM user_schedules
|
||
WHERE id = ?
|
||
""",
|
||
(int(schedule_id),),
|
||
)
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return False
|
||
|
||
schedule_time, weekdays, random_delay, last_run_at = row[0], row[1], row[2], row[3]
|
||
next_dt = compute_next_run_at(
|
||
now=now_dt,
|
||
schedule_time=str(schedule_time or "08:00"),
|
||
weekdays=str(weekdays or "1,2,3,4,5"),
|
||
random_delay=int(random_delay or 0),
|
||
last_run_at=str(last_run_at or "") if last_run_at else None,
|
||
)
|
||
return update_schedule_next_run(int(schedule_id), format_cst(next_dt))
|
||
|
||
|
||
def get_due_user_schedules(now_cst: str, limit: int = 50):
|
||
"""获取到期需要执行的用户定时任务(索引驱动)。"""
|
||
now_cst = str(now_cst or "").strip()
|
||
if not now_cst:
|
||
now_cst = format_cst(get_beijing_now())
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT us.*, u.username as user_username
|
||
FROM user_schedules us
|
||
JOIN users u ON us.user_id = u.id
|
||
WHERE us.enabled = 1
|
||
AND us.next_run_at IS NOT NULL
|
||
AND us.next_run_at <= ?
|
||
ORDER BY us.next_run_at ASC
|
||
LIMIT ?
|
||
""",
|
||
(now_cst, int(limit)),
|
||
)
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
|
||
# ==================== 定时任务执行日志 ====================
|
||
|
||
|
||
def create_schedule_execution_log(schedule_id, user_id, schedule_name):
|
||
"""创建定时任务执行日志"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
execute_time = format_cst(get_beijing_now())
|
||
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO schedule_execution_logs (
|
||
schedule_id, user_id, schedule_name, execute_time, status
|
||
) VALUES (?, ?, ?, ?, 'running')
|
||
""",
|
||
(schedule_id, user_id, schedule_name, execute_time),
|
||
)
|
||
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
|
||
|
||
def update_schedule_execution_log(log_id, **kwargs):
|
||
"""更新定时任务执行日志"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
updates = []
|
||
params = []
|
||
|
||
allowed_fields = [
|
||
"total_accounts",
|
||
"success_accounts",
|
||
"failed_accounts",
|
||
"total_items",
|
||
"total_attachments",
|
||
"total_screenshots",
|
||
"duration_seconds",
|
||
"status",
|
||
"error_message",
|
||
]
|
||
|
||
for field in allowed_fields:
|
||
if field in kwargs:
|
||
updates.append(f"{field} = ?")
|
||
params.append(kwargs[field])
|
||
|
||
if not updates:
|
||
return False
|
||
|
||
params.append(log_id)
|
||
sql = f"UPDATE schedule_execution_logs SET {', '.join(updates)} WHERE id = ?"
|
||
|
||
cursor.execute(sql, params)
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def get_schedule_execution_logs(schedule_id, limit=10):
|
||
"""获取定时任务执行日志"""
|
||
try:
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT * FROM schedule_execution_logs
|
||
WHERE schedule_id = ?
|
||
ORDER BY execute_time DESC
|
||
LIMIT ?
|
||
""",
|
||
(schedule_id, limit),
|
||
)
|
||
|
||
logs = []
|
||
rows = cursor.fetchall()
|
||
|
||
for row in rows:
|
||
try:
|
||
log = dict(row)
|
||
log["created_at"] = log.get("execute_time")
|
||
log["success_count"] = log.get("success_accounts", 0)
|
||
log["failed_count"] = log.get("failed_accounts", 0)
|
||
log["duration"] = log.get("duration_seconds", 0)
|
||
logs.append(log)
|
||
except Exception as e:
|
||
print(f"[数据库] 处理日志行时出错: {e}")
|
||
continue
|
||
|
||
return logs
|
||
except Exception as e:
|
||
print(f"[数据库] 查询定时任务日志时出错: {e}")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
return []
|
||
|
||
|
||
def get_user_all_schedule_logs(user_id, limit=50):
|
||
"""获取用户所有定时任务的执行日志"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT * FROM schedule_execution_logs
|
||
WHERE user_id = ?
|
||
ORDER BY execute_time DESC
|
||
LIMIT ?
|
||
""",
|
||
(user_id, limit),
|
||
)
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
|
||
def delete_schedule_logs(schedule_id, user_id):
|
||
"""删除指定定时任务的所有执行日志(需验证用户权限)"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
DELETE FROM schedule_execution_logs
|
||
WHERE schedule_id = ? AND user_id = ?
|
||
""",
|
||
(schedule_id, user_id),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount
|
||
|
||
|
||
def clean_old_schedule_logs(days=30):
|
||
"""清理指定天数前的定时任务执行日志"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
DELETE FROM schedule_execution_logs
|
||
WHERE execute_time < datetime('now', 'localtime', '-' || ? || ' days')
|
||
""",
|
||
(days,),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount
|