Files
zsglpt/services/schedule_utils.py
2025-12-15 10:48:58 +08:00

123 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
from datetime import datetime, timedelta
from typing import Iterable, Optional, Set, Tuple
from services.time_utils import BEIJING_TZ
def _parse_hhmm(value: str, default: Tuple[int, int] = (8, 0)) -> Tuple[int, int]:
text = str(value or "").strip()
if ":" not in text:
return default
try:
h, m = text.split(":", 1)
hour = int(h)
minute = int(m)
if 0 <= hour <= 23 and 0 <= minute <= 59:
return hour, minute
except Exception:
pass
return default
def _parse_weekdays(value: str, default: Iterable[int] = (1, 2, 3, 4, 5)) -> Set[int]:
text = str(value or "").strip()
days = []
for part in text.split(","):
part = part.strip()
if not part:
continue
try:
day = int(part)
except Exception:
continue
if 1 <= day <= 7:
days.append(day)
return set(days) if days else set(default)
def _parse_cst_datetime(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
text = str(value).strip()
if not text:
return None
try:
naive = datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
return BEIJING_TZ.localize(naive)
except Exception:
return None
def compute_next_run_at(
*,
now: datetime,
schedule_time: str,
weekdays: str,
random_delay: int = 0,
last_run_at: Optional[str] = None,
) -> datetime:
"""
计算下一次实际执行时间北京时间aware datetime
规则:
- weekday 过滤1=周一..7=周日)
- random_delay=1 时:在 [schedule_time-15min, schedule_time+15min] 内随机
- 同一天只执行一次:若 last_run_at 是今天,则 next_run_at 至少是下一可用日
"""
if now.tzinfo is None:
now = BEIJING_TZ.localize(now)
hour, minute = _parse_hhmm(schedule_time, default=(8, 0))
allowed_weekdays = _parse_weekdays(weekdays, default=(1, 2, 3, 4, 5))
random_delay = 1 if int(random_delay or 0) == 1 else 0
last_run_dt = _parse_cst_datetime(last_run_at)
last_run_date = last_run_dt.date() if last_run_dt else None
base_today = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
for day_offset in range(0, 14):
day = (base_today + timedelta(days=day_offset)).date()
if last_run_date is not None and day == last_run_date:
continue
candidate_base = base_today.replace(year=day.year, month=day.month, day=day.day)
if candidate_base.isoweekday() not in allowed_weekdays:
continue
if random_delay:
window_start = candidate_base - timedelta(minutes=15)
window_end = candidate_base + timedelta(minutes=15)
# 只从“未来窗口”中抽样,避免抽到过去时间导致整天被跳过
delta_seconds = (now - window_start).total_seconds()
if delta_seconds < 0:
min_offset = 0
else:
min_offset = int(delta_seconds // 60) + 1
max_offset = int((window_end - window_start).total_seconds() // 60)
if min_offset > max_offset:
continue
candidate = window_start + timedelta(minutes=random.randint(min_offset, max_offset))
else:
candidate = candidate_base
if candidate <= now:
continue
return candidate
# 兜底:找不到则推迟一天
return now + timedelta(days=1)
def format_cst(dt: datetime) -> str:
"""格式化为 DB 存储用的 CST 字符串。"""
if dt.tzinfo is None:
dt = BEIJING_TZ.localize(dt)
dt = dt.astimezone(BEIJING_TZ)
return dt.strftime("%Y-%m-%d %H:%M:%S")