57 lines
1.6 KiB
Python
57 lines
1.6 KiB
Python
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
|
||
from services.schedule_utils import compute_next_run_at, format_cst
|
||
from services.time_utils import BEIJING_TZ
|
||
|
||
|
||
def _dt(text: str) -> datetime:
|
||
naive = datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
||
return BEIJING_TZ.localize(naive)
|
||
|
||
|
||
def test_compute_next_run_at_weekday_filter():
|
||
now = _dt("2025-01-06 07:00:00") # 周一
|
||
next_dt = compute_next_run_at(
|
||
now=now,
|
||
schedule_time="08:00",
|
||
weekdays="2", # 仅周二
|
||
random_delay=0,
|
||
last_run_at=None,
|
||
)
|
||
assert format_cst(next_dt) == "2025-01-07 08:00:00"
|
||
|
||
|
||
def test_compute_next_run_at_random_delay_within_window(monkeypatch):
|
||
now = _dt("2025-01-06 06:00:00")
|
||
|
||
# 固定随机值:0 => window_start(schedule_time-15min)
|
||
monkeypatch.setattr("services.schedule_utils.random.randint", lambda a, b: 0)
|
||
|
||
next_dt = compute_next_run_at(
|
||
now=now,
|
||
schedule_time="08:00",
|
||
weekdays="1,2,3,4,5,6,7",
|
||
random_delay=1,
|
||
last_run_at=None,
|
||
)
|
||
assert format_cst(next_dt) == "2025-01-06 07:45:00"
|
||
|
||
|
||
def test_compute_next_run_at_skips_same_day_if_last_run_today(monkeypatch):
|
||
now = _dt("2025-01-06 06:00:00")
|
||
|
||
# 让次日的随机值固定,便于断言
|
||
monkeypatch.setattr("services.schedule_utils.random.randint", lambda a, b: 30)
|
||
|
||
next_dt = compute_next_run_at(
|
||
now=now,
|
||
schedule_time="08:00",
|
||
weekdays="1,2,3,4,5,6,7",
|
||
random_delay=1,
|
||
last_run_at="2025-01-06 01:00:00",
|
||
)
|
||
# 次日 window_start=07:45 + 30min => 08:15
|
||
assert format_cst(next_dt) == "2025-01-07 08:15:00"
|