Files
zsglpt/db/schedules.py

555 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
from datetime import datetime, timedelta
import db_pool
from services.schedule_utils import compute_next_run_at, format_cst
from services.time_utils import get_beijing_now
_SCHEDULE_DEFAULT_TIME = "08:00"
_SCHEDULE_DEFAULT_WEEKDAYS = "1,2,3,4,5"
_ALLOWED_SCHEDULE_UPDATE_FIELDS = (
"name",
"enabled",
"schedule_time",
"weekdays",
"browse_type",
"enable_screenshot",
"random_delay",
"account_ids",
)
_ALLOWED_EXEC_LOG_UPDATE_FIELDS = (
"total_accounts",
"success_accounts",
"failed_accounts",
"total_items",
"total_attachments",
"total_screenshots",
"duration_seconds",
"status",
"error_message",
)
def _normalize_limit(limit, default: int, *, minimum: int = 1) -> int:
try:
parsed = int(limit)
except Exception:
parsed = default
if parsed < minimum:
return minimum
return parsed
def _to_int(value, default: int = 0) -> int:
try:
return int(value)
except Exception:
return default
def _format_optional_datetime(dt: datetime | None) -> str | None:
if dt is None:
return None
return format_cst(dt)
def _serialize_account_ids(account_ids) -> str:
return json.dumps(account_ids) if account_ids else "[]"
def _compute_schedule_next_run_str(
*,
now_dt,
schedule_time,
weekdays,
random_delay,
last_run_at,
) -> str:
next_dt = compute_next_run_at(
now=now_dt,
schedule_time=str(schedule_time or _SCHEDULE_DEFAULT_TIME),
weekdays=str(weekdays or _SCHEDULE_DEFAULT_WEEKDAYS),
random_delay=_to_int(random_delay, 0),
last_run_at=str(last_run_at or "") if last_run_at else None,
)
return format_cst(next_dt)
def _map_schedule_log_row(row) -> dict:
log = dict(row)
log["created_at"] = log.get("execute_time")
log["success_count"] = log.get("success_accounts", 0)
log["failed_count"] = log.get("failed_accounts", 0)
log["duration"] = log.get("duration_seconds", 0)
return log
def get_user_schedules(user_id):
"""获取用户的所有定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM user_schedules
WHERE user_id = ?
ORDER BY created_at DESC
""",
(user_id,),
)
return [dict(row) for row in cursor.fetchall()]
def get_schedule_by_id(schedule_id):
"""根据ID获取定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM user_schedules WHERE id = ?", (schedule_id,))
row = cursor.fetchone()
return dict(row) if row else None
def create_user_schedule(
user_id,
name="我的定时任务",
schedule_time="08:00",
weekdays="1,2,3,4,5",
browse_type="应读",
enable_screenshot=1,
random_delay=0,
account_ids=None,
):
"""创建用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = format_cst(get_beijing_now())
cursor.execute(
"""
INSERT INTO user_schedules (
user_id, name, enabled, schedule_time, weekdays,
browse_type, enable_screenshot, random_delay, account_ids, created_at, updated_at
) VALUES (?, ?, 0, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
name,
schedule_time,
weekdays,
browse_type,
enable_screenshot,
_to_int(random_delay, 0),
_serialize_account_ids(account_ids),
cst_time,
cst_time,
),
)
conn.commit()
return cursor.lastrowid
def update_user_schedule(schedule_id, **kwargs):
"""更新用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
cursor.execute(
"""
SELECT enabled, schedule_time, weekdays, random_delay, last_run_at
FROM user_schedules
WHERE id = ?
""",
(schedule_id,),
)
current = cursor.fetchone()
if not current:
return False
current_enabled = _to_int(current[0], 0)
current_time = current[1]
current_weekdays = current[2]
current_random_delay = _to_int(current[3], 0)
current_last_run_at = current[4]
will_enabled = current_enabled
next_time = current_time
next_weekdays = current_weekdays
next_random_delay = current_random_delay
updates = []
params = []
for field in _ALLOWED_SCHEDULE_UPDATE_FIELDS:
if field not in kwargs:
continue
value = kwargs[field]
if field == "account_ids" and isinstance(value, list):
value = json.dumps(value)
if field == "enabled":
will_enabled = 1 if value else 0
if field == "schedule_time":
next_time = value
if field == "weekdays":
next_weekdays = value
if field == "random_delay":
next_random_delay = int(value or 0)
updates.append(f"{field} = ?")
params.append(value)
if not updates:
return False
updates.append("updated_at = ?")
params.append(now_str)
config_changed = any(key in kwargs for key in ("schedule_time", "weekdays", "random_delay"))
enabled_toggled = "enabled" in kwargs
should_recompute_next = config_changed or (enabled_toggled and will_enabled == 1)
if should_recompute_next:
next_run_at = _compute_schedule_next_run_str(
now_dt=now_dt,
schedule_time=next_time,
weekdays=next_weekdays,
random_delay=next_random_delay,
last_run_at=None if config_changed else current_last_run_at,
)
updates.append("next_run_at = ?")
params.append(next_run_at)
if enabled_toggled and will_enabled == 0:
updates.append("next_run_at = ?")
params.append(None)
params.append(schedule_id)
sql = f"UPDATE user_schedules SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def delete_user_schedule(schedule_id):
"""删除用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM user_schedules WHERE id = ?", (schedule_id,))
conn.commit()
return cursor.rowcount > 0
def toggle_user_schedule(schedule_id, enabled):
"""启用/禁用用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
next_run_at = None
if enabled:
cursor.execute(
"""
SELECT schedule_time, weekdays, random_delay, last_run_at, next_run_at
FROM user_schedules
WHERE id = ?
""",
(schedule_id,),
)
row = cursor.fetchone()
if row:
schedule_time, weekdays, random_delay, last_run_at, existing_next_run_at = row
existing_next_run_at = str(existing_next_run_at or "").strip() or None
if existing_next_run_at and existing_next_run_at > now_str:
next_run_at = existing_next_run_at
else:
next_run_at = _compute_schedule_next_run_str(
now_dt=now_dt,
schedule_time=schedule_time,
weekdays=weekdays,
random_delay=random_delay,
last_run_at=last_run_at,
)
cursor.execute(
"""
UPDATE user_schedules
SET enabled = ?, next_run_at = ?, updated_at = ?
WHERE id = ?
""",
(1 if enabled else 0, next_run_at, now_str, schedule_id),
)
conn.commit()
return cursor.rowcount > 0
def get_enabled_user_schedules():
"""获取所有启用的用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT us.*, u.username as user_username
FROM user_schedules us
JOIN users u ON us.user_id = u.id
WHERE us.enabled = 1
ORDER BY us.schedule_time
"""
)
return [dict(row) for row in cursor.fetchall()]
def update_schedule_last_run(schedule_id):
"""更新定时任务最后运行时间,并推进 next_run_atO-08"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
cursor.execute(
"""
SELECT schedule_time, weekdays, random_delay
FROM user_schedules
WHERE id = ?
""",
(schedule_id,),
)
row = cursor.fetchone()
if not row:
return False
schedule_time, weekdays, random_delay = row
next_run_at = _compute_schedule_next_run_str(
now_dt=now_dt,
schedule_time=schedule_time,
weekdays=weekdays,
random_delay=random_delay,
last_run_at=now_str,
)
cursor.execute(
"""
UPDATE user_schedules
SET last_run_at = ?, next_run_at = ?, updated_at = ?
WHERE id = ?
""",
(now_str, next_run_at, now_str, schedule_id),
)
conn.commit()
return cursor.rowcount > 0
def update_schedule_next_run(schedule_id: int, next_run_at: str) -> bool:
"""仅更新 next_run_at不改变 last_run_at用于跳过执行时推进。"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE user_schedules
SET next_run_at = ?, updated_at = ?
WHERE id = ?
""",
(
str(next_run_at or "").strip() or None,
format_cst(get_beijing_now()),
int(schedule_id),
),
)
conn.commit()
return cursor.rowcount > 0
def recompute_schedule_next_run(schedule_id: int, *, now_dt=None) -> bool:
"""按当前配置重算 next_run_at不改变 last_run_at"""
now_dt = now_dt or get_beijing_now()
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT schedule_time, weekdays, random_delay, last_run_at
FROM user_schedules
WHERE id = ?
""",
(int(schedule_id),),
)
row = cursor.fetchone()
if not row:
return False
schedule_time, weekdays, random_delay, last_run_at = row
next_run_at = _compute_schedule_next_run_str(
now_dt=now_dt,
schedule_time=schedule_time,
weekdays=weekdays,
random_delay=random_delay,
last_run_at=last_run_at,
)
return update_schedule_next_run(int(schedule_id), next_run_at)
def get_due_user_schedules(now_cst: str, limit: int = 50):
"""获取到期需要执行的用户定时任务(索引驱动)。"""
now_cst = str(now_cst or "").strip()
if not now_cst:
now_cst = format_cst(get_beijing_now())
safe_limit = _normalize_limit(limit, 50, minimum=1)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT us.*, u.username as user_username
FROM user_schedules us
JOIN users u ON us.user_id = u.id
WHERE us.enabled = 1
AND us.next_run_at IS NOT NULL
AND us.next_run_at <= ?
ORDER BY us.next_run_at ASC
LIMIT ?
""",
(now_cst, safe_limit),
)
return [dict(row) for row in cursor.fetchall()]
# ==================== 定时任务执行日志 ====================
def create_schedule_execution_log(schedule_id, user_id, schedule_name):
"""创建定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO schedule_execution_logs (
schedule_id, user_id, schedule_name, execute_time, status
) VALUES (?, ?, ?, ?, 'running')
""",
(schedule_id, user_id, schedule_name, format_cst(get_beijing_now())),
)
conn.commit()
return cursor.lastrowid
def update_schedule_execution_log(log_id, **kwargs):
"""更新定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
for field in _ALLOWED_EXEC_LOG_UPDATE_FIELDS:
if field not in kwargs:
continue
updates.append(f"{field} = ?")
params.append(kwargs[field])
if not updates:
return False
params.append(log_id)
sql = f"UPDATE schedule_execution_logs SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def get_schedule_execution_logs(schedule_id, limit=10):
"""获取定时任务执行日志"""
try:
safe_limit = _normalize_limit(limit, 10, minimum=1)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM schedule_execution_logs
WHERE schedule_id = ?
ORDER BY execute_time DESC
LIMIT ?
""",
(schedule_id, safe_limit),
)
logs = []
for row in cursor.fetchall():
try:
logs.append(_map_schedule_log_row(row))
except Exception as e:
print(f"[数据库] 处理日志行时出错: {e}")
continue
return logs
except Exception as e:
print(f"[数据库] 查询定时任务日志时出错: {e}")
import traceback
traceback.print_exc()
return []
def get_user_all_schedule_logs(user_id, limit=50):
"""获取用户所有定时任务的执行日志"""
safe_limit = _normalize_limit(limit, 50, minimum=1)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM schedule_execution_logs
WHERE user_id = ?
ORDER BY execute_time DESC
LIMIT ?
""",
(user_id, safe_limit),
)
return [dict(row) for row in cursor.fetchall()]
def delete_schedule_logs(schedule_id, user_id):
"""删除指定定时任务的所有执行日志(需验证用户权限)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM schedule_execution_logs
WHERE schedule_id = ? AND user_id = ?
""",
(schedule_id, user_id),
)
conn.commit()
return cursor.rowcount
def clean_old_schedule_logs(days=30):
"""清理指定天数前的定时任务执行日志"""
safe_days = _to_int(days, 30)
if safe_days < 0:
safe_days = 0
cutoff_dt = get_beijing_now() - timedelta(days=safe_days)
cutoff_str = format_cst(cutoff_dt)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
DELETE FROM schedule_execution_logs
WHERE execute_time < ?
""",
(cutoff_str,),
)
conn.commit()
return cursor.rowcount