78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
import threading
|
|
import time
|
|
|
|
from services import state
|
|
|
|
|
|
def test_task_status_returns_copy():
|
|
account_id = "acc_test_copy"
|
|
state.safe_set_task_status(account_id, {"status": "运行中", "progress": {"items": 1}})
|
|
|
|
snapshot = state.safe_get_task_status(account_id)
|
|
snapshot["status"] = "已修改"
|
|
|
|
snapshot2 = state.safe_get_task_status(account_id)
|
|
assert snapshot2["status"] == "运行中"
|
|
|
|
|
|
def test_captcha_roundtrip():
|
|
session_id = "captcha_test"
|
|
state.safe_set_captcha(session_id, {"code": "1234", "expire_time": time.time() + 60, "failed_attempts": 0})
|
|
|
|
ok, msg = state.safe_verify_and_consume_captcha(session_id, "1234", max_attempts=5)
|
|
assert ok, msg
|
|
|
|
ok2, _ = state.safe_verify_and_consume_captcha(session_id, "1234", max_attempts=5)
|
|
assert not ok2
|
|
|
|
|
|
def test_ip_rate_limit_locking():
|
|
ip = "203.0.113.9"
|
|
ok, msg = state.check_ip_rate_limit(ip, max_attempts_per_hour=2, lock_duration_seconds=10)
|
|
assert ok and msg is None
|
|
|
|
locked = state.record_failed_captcha(ip, max_attempts_per_hour=2, lock_duration_seconds=10)
|
|
assert locked is False
|
|
locked2 = state.record_failed_captcha(ip, max_attempts_per_hour=2, lock_duration_seconds=10)
|
|
assert locked2 is True
|
|
|
|
ok3, msg3 = state.check_ip_rate_limit(ip, max_attempts_per_hour=2, lock_duration_seconds=10)
|
|
assert ok3 is False
|
|
assert "锁定" in (msg3 or "")
|
|
|
|
|
|
def test_batch_finalize_after_dispatch():
|
|
batch_id = "batch_test"
|
|
now_ts = time.time()
|
|
state.safe_create_batch(
|
|
batch_id,
|
|
{"screenshots": [], "total_accounts": 0, "completed": 0, "created_at": now_ts, "updated_at": now_ts},
|
|
)
|
|
state.safe_batch_append_result(batch_id, {"path": "a.png"})
|
|
state.safe_batch_append_result(batch_id, {"path": "b.png"})
|
|
|
|
batch_info = state.safe_finalize_batch_after_dispatch(batch_id, total_accounts=2, now_ts=time.time())
|
|
assert batch_info is not None
|
|
assert batch_info["completed"] == 2
|
|
|
|
|
|
def test_state_thread_safety_smoke():
|
|
errors = []
|
|
|
|
def worker(i: int):
|
|
try:
|
|
aid = f"acc_{i % 10}"
|
|
state.safe_set_task_status(aid, {"status": "运行中", "i": i})
|
|
_ = state.safe_get_task_status(aid)
|
|
except Exception as exc: # pragma: no cover
|
|
errors.append(exc)
|
|
|
|
threads = [threading.Thread(target=worker, args=(i,)) for i in range(200)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert not errors
|
|
|