134 lines
4.7 KiB
Python
134 lines
4.7 KiB
Python
import sys
|
|
from pathlib import Path
|
|
|
|
from flask import Flask
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
import routes.api_social as api_social
|
|
from services import social_login
|
|
from services.social_login import SpaceProfile, normalize_social_endpoint, parse_space_scan_page, poll_social_scan
|
|
|
|
|
|
def test_normalize_social_endpoint_accepts_space_root():
|
|
assert normalize_social_endpoint("") == "https://www.spacezs.cn/connect.php"
|
|
assert normalize_social_endpoint("https://www.spacezs.cn/") == "https://www.spacezs.cn/connect.php"
|
|
assert normalize_social_endpoint("https://www.spacezs.cn") == "https://www.spacezs.cn/connect.php"
|
|
|
|
|
|
def test_parse_space_scan_page_extracts_qr_and_state():
|
|
html = 'var qrcode_url = "http://weixin.qq.com/q/test"; var state = "state-123";'
|
|
assert parse_space_scan_page(html) == ("http://weixin.qq.com/q/test", "state-123")
|
|
|
|
|
|
def test_poll_social_scan_treats_code_zero_as_authorized(monkeypatch):
|
|
class FakeResponse:
|
|
def json(self):
|
|
return {"code": 0, "url": "https://zsglpt.workyai.cn/login?type=wx&code=ok"}
|
|
|
|
def fake_get(*_args, **_kwargs):
|
|
return FakeResponse()
|
|
|
|
monkeypatch.setattr(social_login.requests, "get", fake_get)
|
|
monkeypatch.setattr(social_login, "social_appkey", lambda _cfg: "key")
|
|
|
|
result = poll_social_scan(
|
|
{
|
|
"social_login_enabled": 1,
|
|
"social_login_endpoint": "https://www.spacezs.cn/connect.php",
|
|
"social_login_appid": "appid",
|
|
"social_login_appkey": "encrypted",
|
|
"social_login_providers": "wx",
|
|
},
|
|
provider="wx",
|
|
state="state-123",
|
|
)
|
|
|
|
assert result == {"status": "authorized", "url": "https://zsglpt.workyai.cn/login?type=wx&code=ok"}
|
|
|
|
|
|
def test_user_social_callback_ignores_admin_binding_namespace(monkeypatch):
|
|
app = Flask(__name__)
|
|
app.secret_key = "test-secret"
|
|
app.register_blueprint(api_social.api_social_bp)
|
|
|
|
logged_user_ids = []
|
|
|
|
class FakeDatabase:
|
|
def get_system_config(self):
|
|
return {}
|
|
|
|
def find_social_login_binding(self, provider, social_uid):
|
|
assert (provider, social_uid) == ("wx", "same-openid")
|
|
return {"id": 3, "user_id": 11}
|
|
|
|
def find_admin_social_login_binding_by_identity(self, *_args, **_kwargs):
|
|
raise AssertionError("user callback must not check admin social bindings")
|
|
|
|
def get_user_by_id(self, user_id):
|
|
return {"id": user_id, "username": "normal-user", "status": "approved"}
|
|
|
|
def update_social_login_binding_profile(self, *_args, **_kwargs):
|
|
return True
|
|
|
|
monkeypatch.setattr(api_social, "database", FakeDatabase())
|
|
monkeypatch.setattr(
|
|
api_social,
|
|
"fetch_space_profile",
|
|
lambda *_args, **_kwargs: SpaceProfile(provider="wx", social_uid="same-openid", nickname="nick"),
|
|
)
|
|
monkeypatch.setattr(api_social, "_login_user_id", lambda user_id: logged_user_ids.append(user_id))
|
|
|
|
response = app.test_client().post(
|
|
"/api/auth/social/callback",
|
|
json={"provider": "wx", "code": "ok", "mode": "login"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.get_json()["username"] == "normal-user"
|
|
assert logged_user_ids == [11]
|
|
|
|
|
|
def test_admin_social_callback_uses_admin_binding_namespace(monkeypatch):
|
|
app = Flask(__name__)
|
|
app.secret_key = "test-secret"
|
|
app.register_blueprint(api_social.api_social_bp)
|
|
|
|
class FakeDatabase:
|
|
def get_system_config(self):
|
|
return {}
|
|
|
|
def find_social_login_binding(self, *_args, **_kwargs):
|
|
raise AssertionError("admin callback must not check user social bindings")
|
|
|
|
def find_admin_social_login_binding_by_identity(self, provider, social_uid):
|
|
assert (provider, social_uid) == ("wx", "same-openid")
|
|
return {"id": 9, "admin_id": 7}
|
|
|
|
def get_admin_by_id(self, admin_id):
|
|
return {"id": admin_id, "username": "admin-user"}
|
|
|
|
def update_admin_social_login_binding_profile(self, *_args, **_kwargs):
|
|
return True
|
|
|
|
monkeypatch.setattr(api_social, "database", FakeDatabase())
|
|
monkeypatch.setattr(
|
|
api_social,
|
|
"fetch_space_profile",
|
|
lambda *_args, **_kwargs: SpaceProfile(provider="wx", social_uid="same-openid", nickname="nick"),
|
|
)
|
|
|
|
client = app.test_client()
|
|
response = client.post(
|
|
"/yuyx/api/admin-auth/social/callback",
|
|
json={"provider": "wx", "code": "ok"},
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
assert response.get_json()["redirect"] == "/yuyx/admin"
|
|
with client.session_transaction() as sess:
|
|
assert sess["admin_id"] == 7
|
|
assert sess["admin_username"] == "admin-user"
|