147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
from __future__ import annotations
|
||
|
||
import threading
|
||
import time
|
||
|
||
from services.tasks import TaskScheduler
|
||
|
||
|
||
def test_task_scheduler_vip_priority(monkeypatch):
|
||
calls: list[str] = []
|
||
blocker_started = threading.Event()
|
||
blocker_release = threading.Event()
|
||
|
||
def fake_run_task(*, user_id, account_id, **kwargs):
|
||
calls.append(account_id)
|
||
if account_id == "block":
|
||
blocker_started.set()
|
||
blocker_release.wait(timeout=5)
|
||
|
||
import services.tasks as tasks_mod
|
||
|
||
monkeypatch.setattr(tasks_mod, "run_task", fake_run_task)
|
||
|
||
scheduler = TaskScheduler(max_global=1, max_per_user=1, max_queue_size=10)
|
||
try:
|
||
ok, _ = scheduler.submit_task(user_id=1, account_id="block", browse_type="应读", is_vip=False)
|
||
assert ok
|
||
assert blocker_started.wait(timeout=2)
|
||
|
||
ok2, _ = scheduler.submit_task(user_id=1, account_id="normal", browse_type="应读", is_vip=False)
|
||
ok3, _ = scheduler.submit_task(user_id=2, account_id="vip", browse_type="应读", is_vip=True)
|
||
assert ok2 and ok3
|
||
|
||
blocker_release.set()
|
||
|
||
deadline = time.time() + 3
|
||
while time.time() < deadline:
|
||
if calls[:3] == ["block", "vip", "normal"]:
|
||
break
|
||
time.sleep(0.05)
|
||
|
||
assert calls[:3] == ["block", "vip", "normal"]
|
||
finally:
|
||
scheduler.shutdown(timeout=2)
|
||
|
||
|
||
def test_task_scheduler_per_user_concurrency(monkeypatch):
|
||
started: list[str] = []
|
||
a1_started = threading.Event()
|
||
a1_release = threading.Event()
|
||
a2_started = threading.Event()
|
||
|
||
def fake_run_task(*, user_id, account_id, **kwargs):
|
||
started.append(account_id)
|
||
if account_id == "a1":
|
||
a1_started.set()
|
||
a1_release.wait(timeout=5)
|
||
if account_id == "a2":
|
||
a2_started.set()
|
||
|
||
import services.tasks as tasks_mod
|
||
|
||
monkeypatch.setattr(tasks_mod, "run_task", fake_run_task)
|
||
|
||
scheduler = TaskScheduler(max_global=2, max_per_user=1, max_queue_size=10)
|
||
try:
|
||
ok, _ = scheduler.submit_task(user_id=1, account_id="a1", browse_type="应读", is_vip=False)
|
||
assert ok
|
||
assert a1_started.wait(timeout=2)
|
||
|
||
ok2, _ = scheduler.submit_task(user_id=1, account_id="a2", browse_type="应读", is_vip=False)
|
||
assert ok2
|
||
|
||
# 同一用户并发=1:a2 不应在 a1 未结束时启动
|
||
assert not a2_started.wait(timeout=0.3)
|
||
|
||
a1_release.set()
|
||
assert a2_started.wait(timeout=2)
|
||
assert started[0] == "a1"
|
||
assert "a2" in started
|
||
finally:
|
||
scheduler.shutdown(timeout=2)
|
||
|
||
|
||
def test_task_scheduler_cancel_pending(monkeypatch):
|
||
calls: list[str] = []
|
||
blocker_started = threading.Event()
|
||
blocker_release = threading.Event()
|
||
|
||
def fake_run_task(*, user_id, account_id, **kwargs):
|
||
calls.append(account_id)
|
||
if account_id == "block":
|
||
blocker_started.set()
|
||
blocker_release.wait(timeout=5)
|
||
|
||
import services.tasks as tasks_mod
|
||
|
||
monkeypatch.setattr(tasks_mod, "run_task", fake_run_task)
|
||
|
||
scheduler = TaskScheduler(max_global=1, max_per_user=1, max_queue_size=10)
|
||
try:
|
||
ok, _ = scheduler.submit_task(user_id=1, account_id="block", browse_type="应读", is_vip=False)
|
||
assert ok
|
||
assert blocker_started.wait(timeout=2)
|
||
|
||
ok2, _ = scheduler.submit_task(user_id=1, account_id="to_cancel", browse_type="应读", is_vip=False)
|
||
assert ok2
|
||
|
||
assert scheduler.cancel_pending_task(user_id=1, account_id="to_cancel") is True
|
||
|
||
blocker_release.set()
|
||
time.sleep(0.3)
|
||
assert "to_cancel" not in calls
|
||
finally:
|
||
scheduler.shutdown(timeout=2)
|
||
|
||
|
||
def test_task_scheduler_queue_full(monkeypatch):
|
||
blocker_started = threading.Event()
|
||
blocker_release = threading.Event()
|
||
|
||
def fake_run_task(*, user_id, account_id, **kwargs):
|
||
if account_id == "block":
|
||
blocker_started.set()
|
||
blocker_release.wait(timeout=5)
|
||
|
||
import services.tasks as tasks_mod
|
||
|
||
monkeypatch.setattr(tasks_mod, "run_task", fake_run_task)
|
||
|
||
scheduler = TaskScheduler(max_global=1, max_per_user=1, max_queue_size=1)
|
||
try:
|
||
ok, _ = scheduler.submit_task(user_id=1, account_id="block", browse_type="应读", is_vip=False)
|
||
assert ok
|
||
assert blocker_started.wait(timeout=2)
|
||
|
||
ok2, _ = scheduler.submit_task(user_id=1, account_id="p1", browse_type="应读", is_vip=False)
|
||
assert ok2
|
||
|
||
ok3, msg3 = scheduler.submit_task(user_id=1, account_id="p2", browse_type="应读", is_vip=False)
|
||
assert ok3 is False
|
||
assert "队列已满" in (msg3 or "")
|
||
finally:
|
||
blocker_release.set()
|
||
scheduler.shutdown(timeout=2)
|
||
|