114 lines
3.3 KiB
Python
114 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
import random
|
||
from datetime import datetime, timedelta
|
||
from typing import Iterable, Optional, Set, Tuple
|
||
|
||
from services.time_utils import BEIJING_TZ
|
||
|
||
|
||
def _parse_hhmm(value: str, default: Tuple[int, int] = (8, 0)) -> Tuple[int, int]:
|
||
text = str(value or "").strip()
|
||
if ":" not in text:
|
||
return default
|
||
try:
|
||
h, m = text.split(":", 1)
|
||
hour = int(h)
|
||
minute = int(m)
|
||
if 0 <= hour <= 23 and 0 <= minute <= 59:
|
||
return hour, minute
|
||
except Exception:
|
||
pass
|
||
return default
|
||
|
||
|
||
def _parse_weekdays(value: str, default: Iterable[int] = (1, 2, 3, 4, 5)) -> Set[int]:
|
||
text = str(value or "").strip()
|
||
days = []
|
||
for part in text.split(","):
|
||
part = part.strip()
|
||
if not part:
|
||
continue
|
||
try:
|
||
day = int(part)
|
||
except Exception:
|
||
continue
|
||
if 1 <= day <= 7:
|
||
days.append(day)
|
||
return set(days) if days else set(default)
|
||
|
||
|
||
def _parse_cst_datetime(value: Optional[str]) -> Optional[datetime]:
|
||
if not value:
|
||
return None
|
||
text = str(value).strip()
|
||
if not text:
|
||
return None
|
||
try:
|
||
naive = datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
||
return BEIJING_TZ.localize(naive)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def compute_next_run_at(
|
||
*,
|
||
now: datetime,
|
||
schedule_time: str,
|
||
weekdays: str,
|
||
random_delay: int = 0,
|
||
last_run_at: Optional[str] = None,
|
||
) -> datetime:
|
||
"""
|
||
计算下一次实际执行时间(北京时间,aware datetime)。
|
||
|
||
规则:
|
||
- weekday 过滤(1=周一..7=周日)
|
||
- random_delay=1 时:在 [schedule_time-15min, schedule_time+15min] 内随机
|
||
- 同一天只执行一次:若 last_run_at 是今天,则 next_run_at 至少是下一可用日
|
||
"""
|
||
if now.tzinfo is None:
|
||
now = BEIJING_TZ.localize(now)
|
||
|
||
hour, minute = _parse_hhmm(schedule_time, default=(8, 0))
|
||
allowed_weekdays = _parse_weekdays(weekdays, default=(1, 2, 3, 4, 5))
|
||
random_delay = 1 if int(random_delay or 0) == 1 else 0
|
||
|
||
last_run_dt = _parse_cst_datetime(last_run_at)
|
||
last_run_date = last_run_dt.date() if last_run_dt else None
|
||
|
||
base_today = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
||
for day_offset in range(0, 14):
|
||
day = (base_today + timedelta(days=day_offset)).date()
|
||
if last_run_date is not None and day == last_run_date:
|
||
continue
|
||
|
||
candidate_base = base_today.replace(year=day.year, month=day.month, day=day.day)
|
||
if candidate_base.isoweekday() not in allowed_weekdays:
|
||
continue
|
||
|
||
if random_delay:
|
||
window_start = candidate_base - timedelta(minutes=15)
|
||
random_minutes = random.randint(0, 30)
|
||
candidate = window_start + timedelta(minutes=random_minutes)
|
||
else:
|
||
candidate = candidate_base
|
||
|
||
if candidate <= now:
|
||
continue
|
||
return candidate
|
||
|
||
# 兜底:找不到则推迟一天
|
||
return now + timedelta(days=1)
|
||
|
||
|
||
def format_cst(dt: datetime) -> str:
|
||
"""格式化为 DB 存储用的 CST 字符串。"""
|
||
if dt.tzinfo is None:
|
||
dt = BEIJING_TZ.localize(dt)
|
||
dt = dt.astimezone(BEIJING_TZ)
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
|