import threading import time from services import state def test_task_status_returns_copy(): account_id = "acc_test_copy" state.safe_set_task_status(account_id, {"status": "运行中", "progress": {"items": 1}}) snapshot = state.safe_get_task_status(account_id) snapshot["status"] = "已修改" snapshot2 = state.safe_get_task_status(account_id) assert snapshot2["status"] == "运行中" def test_captcha_roundtrip(): session_id = "captcha_test" state.safe_set_captcha(session_id, {"code": "1234", "expire_time": time.time() + 60, "failed_attempts": 0}) ok, msg = state.safe_verify_and_consume_captcha(session_id, "1234", max_attempts=5) assert ok, msg ok2, _ = state.safe_verify_and_consume_captcha(session_id, "1234", max_attempts=5) assert not ok2 def test_ip_rate_limit_locking(): ip = "203.0.113.9" ok, msg = state.check_ip_rate_limit(ip, max_attempts_per_hour=2, lock_duration_seconds=10) assert ok and msg is None locked = state.record_failed_captcha(ip, max_attempts_per_hour=2, lock_duration_seconds=10) assert locked is False locked2 = state.record_failed_captcha(ip, max_attempts_per_hour=2, lock_duration_seconds=10) assert locked2 is True ok3, msg3 = state.check_ip_rate_limit(ip, max_attempts_per_hour=2, lock_duration_seconds=10) assert ok3 is False assert "锁定" in (msg3 or "") def test_batch_finalize_after_dispatch(): batch_id = "batch_test" now_ts = time.time() state.safe_create_batch( batch_id, {"screenshots": [], "total_accounts": 0, "completed": 0, "created_at": now_ts, "updated_at": now_ts}, ) state.safe_batch_append_result(batch_id, {"path": "a.png"}) state.safe_batch_append_result(batch_id, {"path": "b.png"}) batch_info = state.safe_finalize_batch_after_dispatch(batch_id, total_accounts=2, now_ts=time.time()) assert batch_info is not None assert batch_info["completed"] == 2 def test_state_thread_safety_smoke(): errors = [] def worker(i: int): try: aid = f"acc_{i % 10}" state.safe_set_task_status(aid, {"status": "运行中", "i": i}) _ = state.safe_get_task_status(aid) except Exception as exc: # pragma: no cover errors.append(exc) threads = [threading.Thread(target=worker, args=(i,)) for i in range(200)] for t in threads: t.start() for t in threads: t.join() assert not errors