#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import random from datetime import datetime, timedelta from typing import Iterable, Optional, Set, Tuple from services.time_utils import BEIJING_TZ def _parse_hhmm(value: str, default: Tuple[int, int] = (8, 0)) -> Tuple[int, int]: text = str(value or "").strip() if ":" not in text: return default try: h, m = text.split(":", 1) hour = int(h) minute = int(m) if 0 <= hour <= 23 and 0 <= minute <= 59: return hour, minute except Exception: pass return default def _parse_weekdays(value: str, default: Iterable[int] = (1, 2, 3, 4, 5)) -> Set[int]: text = str(value or "").strip() days = [] for part in text.split(","): part = part.strip() if not part: continue try: day = int(part) except Exception: continue if 1 <= day <= 7: days.append(day) return set(days) if days else set(default) def _parse_cst_datetime(value: Optional[str]) -> Optional[datetime]: if not value: return None text = str(value).strip() if not text: return None try: naive = datetime.strptime(text, "%Y-%m-%d %H:%M:%S") return BEIJING_TZ.localize(naive) except Exception: return None def compute_next_run_at( *, now: datetime, schedule_time: str, weekdays: str, random_delay: int = 0, last_run_at: Optional[str] = None, ) -> datetime: """ 计算下一次实际执行时间(北京时间,aware datetime)。 规则: - weekday 过滤(1=周一..7=周日) - random_delay=1 时:在 [schedule_time-15min, schedule_time+15min] 内随机 - 同一天只执行一次:若 last_run_at 是今天,则 next_run_at 至少是下一可用日 """ if now.tzinfo is None: now = BEIJING_TZ.localize(now) hour, minute = _parse_hhmm(schedule_time, default=(8, 0)) allowed_weekdays = _parse_weekdays(weekdays, default=(1, 2, 3, 4, 5)) random_delay = 1 if int(random_delay or 0) == 1 else 0 last_run_dt = _parse_cst_datetime(last_run_at) last_run_date = last_run_dt.date() if last_run_dt else None base_today = now.replace(hour=hour, minute=minute, second=0, microsecond=0) for day_offset in range(0, 14): day = (base_today + timedelta(days=day_offset)).date() if last_run_date is not None and day == last_run_date: continue candidate_base = base_today.replace(year=day.year, month=day.month, day=day.day) if candidate_base.isoweekday() not in allowed_weekdays: continue if random_delay: window_start = candidate_base - timedelta(minutes=15) window_end = candidate_base + timedelta(minutes=15) # 只从“未来窗口”中抽样,避免抽到过去时间导致整天被跳过 delta_seconds = (now - window_start).total_seconds() if delta_seconds < 0: min_offset = 0 else: min_offset = int(delta_seconds // 60) + 1 max_offset = int((window_end - window_start).total_seconds() // 60) if min_offset > max_offset: continue candidate = window_start + timedelta(minutes=random.randint(min_offset, max_offset)) else: candidate = candidate_base if candidate <= now: continue return candidate # 兜底:找不到则推迟一天 return now + timedelta(days=1) def format_cst(dt: datetime) -> str: """格式化为 DB 存储用的 CST 字符串。""" if dt.tzinfo is None: dt = BEIJING_TZ.localize(dt) dt = dt.astimezone(BEIJING_TZ) return dt.strftime("%Y-%m-%d %H:%M:%S")