from __future__ import annotations import threading import time from services.tasks import TaskScheduler def test_task_scheduler_vip_priority(monkeypatch): calls: list[str] = [] blocker_started = threading.Event() blocker_release = threading.Event() def fake_run_task(*, user_id, account_id, **kwargs): calls.append(account_id) if account_id == "block": blocker_started.set() blocker_release.wait(timeout=5) import services.tasks as tasks_mod monkeypatch.setattr(tasks_mod, "run_task", fake_run_task) scheduler = TaskScheduler(max_global=1, max_per_user=1, max_queue_size=10) try: ok, _ = scheduler.submit_task(user_id=1, account_id="block", browse_type="应读", is_vip=False) assert ok assert blocker_started.wait(timeout=2) ok2, _ = scheduler.submit_task(user_id=1, account_id="normal", browse_type="应读", is_vip=False) ok3, _ = scheduler.submit_task(user_id=2, account_id="vip", browse_type="应读", is_vip=True) assert ok2 and ok3 blocker_release.set() deadline = time.time() + 3 while time.time() < deadline: if calls[:3] == ["block", "vip", "normal"]: break time.sleep(0.05) assert calls[:3] == ["block", "vip", "normal"] finally: scheduler.shutdown(timeout=2) def test_task_scheduler_per_user_concurrency(monkeypatch): started: list[str] = [] a1_started = threading.Event() a1_release = threading.Event() a2_started = threading.Event() def fake_run_task(*, user_id, account_id, **kwargs): started.append(account_id) if account_id == "a1": a1_started.set() a1_release.wait(timeout=5) if account_id == "a2": a2_started.set() import services.tasks as tasks_mod monkeypatch.setattr(tasks_mod, "run_task", fake_run_task) scheduler = TaskScheduler(max_global=2, max_per_user=1, max_queue_size=10) try: ok, _ = scheduler.submit_task(user_id=1, account_id="a1", browse_type="应读", is_vip=False) assert ok assert a1_started.wait(timeout=2) ok2, _ = scheduler.submit_task(user_id=1, account_id="a2", browse_type="应读", is_vip=False) assert ok2 # 同一用户并发=1:a2 不应在 a1 未结束时启动 assert not a2_started.wait(timeout=0.3) a1_release.set() assert a2_started.wait(timeout=2) assert started[0] == "a1" assert "a2" in started finally: scheduler.shutdown(timeout=2) def test_task_scheduler_cancel_pending(monkeypatch): calls: list[str] = [] blocker_started = threading.Event() blocker_release = threading.Event() def fake_run_task(*, user_id, account_id, **kwargs): calls.append(account_id) if account_id == "block": blocker_started.set() blocker_release.wait(timeout=5) import services.tasks as tasks_mod monkeypatch.setattr(tasks_mod, "run_task", fake_run_task) scheduler = TaskScheduler(max_global=1, max_per_user=1, max_queue_size=10) try: ok, _ = scheduler.submit_task(user_id=1, account_id="block", browse_type="应读", is_vip=False) assert ok assert blocker_started.wait(timeout=2) ok2, _ = scheduler.submit_task(user_id=1, account_id="to_cancel", browse_type="应读", is_vip=False) assert ok2 assert scheduler.cancel_pending_task(user_id=1, account_id="to_cancel") is True blocker_release.set() time.sleep(0.3) assert "to_cancel" not in calls finally: scheduler.shutdown(timeout=2) def test_task_scheduler_queue_full(monkeypatch): blocker_started = threading.Event() blocker_release = threading.Event() def fake_run_task(*, user_id, account_id, **kwargs): if account_id == "block": blocker_started.set() blocker_release.wait(timeout=5) import services.tasks as tasks_mod monkeypatch.setattr(tasks_mod, "run_task", fake_run_task) scheduler = TaskScheduler(max_global=1, max_per_user=1, max_queue_size=1) try: ok, _ = scheduler.submit_task(user_id=1, account_id="block", browse_type="应读", is_vip=False) assert ok assert blocker_started.wait(timeout=2) ok2, _ = scheduler.submit_task(user_id=1, account_id="p1", browse_type="应读", is_vip=False) assert ok2 ok3, msg3 = scheduler.submit_task(user_id=1, account_id="p2", browse_type="应读", is_vip=False) assert ok3 is False assert "队列已满" in (msg3 or "") finally: blocker_release.set() scheduler.shutdown(timeout=2)