diff --git a/app.py b/app.py index 87e90ee..677ad06 100644 --- a/app.py +++ b/app.py @@ -221,6 +221,9 @@ def enforce_csrf_protection(): "/api/auth/social/login-url", "/api/auth/social/poll", "/api/auth/social/callback", + "/yuyx/api/admin-auth/social/login-url", + "/yuyx/api/admin-auth/social/poll", + "/yuyx/api/admin-auth/social/callback", } if request.path in csrf_exempt_paths: return diff --git a/requirements.txt b/requirements.txt index a71ca4e..e42832b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,5 +11,6 @@ python-dotenv==1.0.0 beautifulsoup4==4.12.2 cryptography>=41.0.0 Pillow>=10.0.0 +qrcode[pil]==7.4.2 playwright==1.42.0 eventlet==0.36.1 diff --git a/routes/api_auth.py b/routes/api_auth.py index 4631fe4..54d3865 100644 --- a/routes/api_auth.py +++ b/routes/api_auth.py @@ -259,8 +259,7 @@ def register(): social_uid = str(pending.get("social_uid") or "").strip() enabled_providers = parse_providers((database.get_system_config() or {}).get("social_login_providers")) existing_identity = database.find_social_login_binding(provider, social_uid) - existing_admin_identity = database.find_admin_social_login_binding_by_identity(provider, social_uid) - if provider in enabled_providers and social_uid and not existing_identity and not existing_admin_identity: + if provider in enabled_providers and social_uid and not existing_identity: binding = database.upsert_social_login_binding( user_id=user_id, provider=provider, diff --git a/routes/api_social.py b/routes/api_social.py index c96b688..8fa7720 100644 --- a/routes/api_social.py +++ b/routes/api_social.py @@ -3,10 +3,13 @@ from __future__ import annotations from datetime import timedelta +from io import BytesIO +import time import database +from app_config import get_config from app_logger import get_logger from db.utils import get_cst_now, get_cst_now_str -from flask import Blueprint, jsonify, request, session +from flask import Blueprint, jsonify, request, send_file, session from flask_login import current_user, login_required, login_user from services.accounts_service import load_user_accounts from services.models import User @@ -70,6 +73,20 @@ def _login_user_id(user_id: int) -> None: load_user_accounts(user_id) +def _login_admin_id(admin_id: int) -> dict | None: + admin = database.get_admin_by_id(int(admin_id)) + if not admin: + return None + session.pop("admin_id", None) + session.pop("admin_username", None) + session["admin_id"] = admin["id"] + session["admin_username"] = admin["username"] + session["admin_reauth_until"] = time.time() + int(get_config().ADMIN_REAUTH_WINDOW_SECONDS) + session.permanent = True + session.modified = True + return admin + + def _binding_row(provider: str, binding: dict | None) -> dict: return { "provider": provider, @@ -147,9 +164,6 @@ def social_callback(): return _social_error(error) binding = database.find_social_login_binding(profile.provider, profile.social_uid) - admin_binding = database.find_admin_social_login_binding_by_identity(profile.provider, profile.social_uid) - if admin_binding: - return jsonify({"error": "该第三方账号已绑定管理员账号"}), 409 if binding: if mode == "bind": @@ -220,9 +234,6 @@ def bind_social_account(): existing_identity = database.find_social_login_binding(provider, social_uid) if existing_identity and int(existing_identity.get("user_id") or 0) != int(current_user.id): return jsonify({"error": "该第三方账号已绑定其他用户"}), 409 - existing_admin_identity = database.find_admin_social_login_binding_by_identity(provider, social_uid) - if existing_admin_identity: - return jsonify({"error": "该第三方账号已绑定管理员账号"}), 409 existing_provider = database.find_user_social_login_binding(int(current_user.id), provider) if existing_provider and str(existing_provider.get("social_uid") or "") != social_uid: @@ -261,6 +272,98 @@ def admin_social_config(): return protected() +@api_social_bp.route("/yuyx/api/admin-auth/social/login-url", methods=["POST"]) +def admin_auth_social_login_url(): + data = _get_json_payload() + provider = str(data.get("provider") or "").strip().lower() + redirect_uri = str(data.get("redirect_uri") or "").strip() + try: + result = fetch_social_login_url( + database.get_system_config(), + provider=provider, + mode="login", + redirect_uri=redirect_uri, + allowed_hosts=_allowed_redirect_hosts(), + ) + except SocialLoginError as error: + logger.warning(f"[admin-auth/social/login-url] provider={provider or '-'} failed: {error.message}") + return _social_error(error) + return jsonify(result) + + +@api_social_bp.route("/yuyx/api/admin-auth/social/poll", methods=["POST"]) +def admin_auth_social_poll(): + data = _get_json_payload() + provider = str(data.get("provider") or "").strip().lower() + state = str(data.get("state") or "").strip() + try: + result = poll_social_scan(database.get_system_config(), provider=provider, state=state) + except SocialLoginError as error: + logger.warning(f"[admin-auth/social/poll] provider={provider or '-'} failed: {error.message}") + return _social_error(error) + return jsonify(result) + + +@api_social_bp.route("/yuyx/api/admin-auth/social/callback", methods=["POST"]) +def admin_auth_social_callback(): + data = _get_json_payload() + provider = str(data.get("provider") or data.get("type") or "").strip().lower() + code = str(data.get("code") or "").strip() + + try: + profile = fetch_space_profile(database.get_system_config(), provider=provider, code=code) + except SocialLoginError as error: + logger.warning(f"[admin-auth/social/callback] provider={provider or '-'} failed: {error.message}") + return _social_error(error) + + binding = database.find_admin_social_login_binding_by_identity(profile.provider, profile.social_uid) + if not binding: + return jsonify({"error": "该第三方账号未绑定管理员,请先使用账号密码登录后在设置中绑定"}), 404 + + admin = _login_admin_id(int(binding.get("admin_id") or 0)) + if not admin: + return jsonify({"error": "绑定管理员账号不存在"}), 401 + + database.update_admin_social_login_binding_profile( + int(binding["id"]), + nickname=profile.nickname, + avatar_url=profile.avatar_url, + ) + logger.info(f"[admin-auth/social/login] admin_id={admin['id']} provider={profile.provider}") + return jsonify( + { + "success": True, + "redirect": "/yuyx/admin", + "provider": profile.provider, + "provider_label": provider_label(profile.provider), + "username": admin.get("username") or "", + } + ) + + +@api_social_bp.route("/yuyx/api/admin-auth/social/qr", methods=["GET"]) +def admin_auth_social_qr(): + value = str(request.args.get("data") or "").strip() + if not value: + return jsonify({"error": "缺少二维码内容"}), 400 + if len(value) > 2048: + return jsonify({"error": "二维码内容过长"}), 400 + + try: + import qrcode + except ImportError: + logger.error("[admin-auth/social/qr] qrcode package is not installed") + return jsonify({"error": "二维码组件未安装"}), 500 + + image = qrcode.make(value) + buffer = BytesIO() + image.save(buffer, format="PNG") + buffer.seek(0) + response = send_file(buffer, mimetype="image/png", max_age=0) + response.headers["Cache-Control"] = "no-store" + return response + + @api_social_bp.route("/yuyx/api/admin/social-bindings", methods=["GET"]) def list_admin_social_bindings(): from routes.decorators import admin_required @@ -347,10 +450,6 @@ def bind_admin_social_callback(provider): logger.warning(f"[admin/social/callback] provider={provider_value or '-'} failed: {error.message}") return _social_error(error) - user_identity = database.find_social_login_binding(profile.provider, profile.social_uid) - if user_identity: - return jsonify({"error": "该第三方账号已绑定普通用户"}), 409 - existing_identity = database.find_admin_social_login_binding_by_identity(profile.provider, profile.social_uid) if existing_identity and int(existing_identity.get("admin_id") or 0) != admin_id: return jsonify({"error": "该第三方账号已绑定其他管理员"}), 409 diff --git a/templates/admin_login.html b/templates/admin_login.html index 8b0154e..c3ec77c 100644 --- a/templates/admin_login.html +++ b/templates/admin_login.html @@ -167,6 +167,144 @@ font-size: 13px; } + .divider { + display: flex; + align-items: center; + gap: 12px; + color: #6b7280; + font-size: 12px; + font-weight: 700; + margin: 20px 0 14px; + } + + .divider::before, + .divider::after { + content: ''; + height: 1px; + flex: 1; + background: rgba(17,24,39,0.12); + } + + .social-login-area { + display: none; + } + + .social-buttons { + display: flex; + flex-direction: column; + gap: 8px; + } + + .social-btn { + width: 100%; + height: 40px; + border-radius: 10px; + border: 1px solid rgba(17,24,39,0.14); + background: #fff; + color: #111827; + font-size: 13px; + font-weight: 800; + cursor: pointer; + display: inline-flex; + align-items: center; + justify-content: center; + gap: 8px; + transition: background 0.15s, border-color 0.15s; + } + + .social-btn:hover:not(:disabled) { + background: #f8fafc; + border-color: rgba(37,99,235,0.32); + } + + .social-btn:disabled { + cursor: not-allowed; + opacity: 0.7; + } + + .social-icon { + width: 22px; + height: 22px; + border-radius: 50%; + display: inline-flex; + align-items: center; + justify-content: center; + color: #fff; + font-size: 12px; + line-height: 1; + } + + .provider-wx .social-icon { background: #16a34a; } + .provider-qq .social-icon { background: #2563eb; } + .provider-alipay .social-icon { background: #1677ff; } + + .qr-mask { + position: fixed; + inset: 0; + z-index: 20; + display: none; + align-items: center; + justify-content: center; + padding: 18px; + background: rgba(17,24,39,0.45); + } + + .qr-dialog { + width: min(340px, 92vw); + border-radius: 14px; + background: #fff; + box-shadow: 0 24px 70px rgba(17,24,39,0.24); + padding: 20px; + } + + .qr-head { + display: flex; + align-items: center; + justify-content: space-between; + gap: 12px; + margin-bottom: 16px; + } + + .qr-title { + font-size: 16px; + font-weight: 800; + color: #111827; + } + + .qr-close { + width: 32px; + height: 32px; + border-radius: 50%; + border: 1px solid rgba(17,24,39,0.12); + background: #fff; + color: #4b5563; + cursor: pointer; + font-size: 18px; + line-height: 1; + } + + .qr-body { + display: flex; + flex-direction: column; + align-items: center; + gap: 12px; + } + + .qr-image { + width: 220px; + height: 220px; + border: 1px solid rgba(17,24,39,0.10); + border-radius: 10px; + padding: 8px; + background: #fff; + } + + .qr-prompt { + font-size: 13px; + color: #374151; + text-align: center; + } + @media (max-width: 480px) { body { padding: 12px; align-items: flex-start; padding-top: 20px; } .login-container { width: 100%; max-width: 100%; padding: 28px 20px; border-radius: 14px; } @@ -215,14 +353,48 @@ +
+
快捷登录
+
+
+ +
+ +
+ diff --git a/tests/test_social_login.py b/tests/test_social_login.py index dedf6cb..dbcb132 100644 --- a/tests/test_social_login.py +++ b/tests/test_social_login.py @@ -1,12 +1,15 @@ import sys from pathlib import Path +from flask import Flask + PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) +import routes.api_social as api_social from services import social_login -from services.social_login import normalize_social_endpoint, parse_space_scan_page, poll_social_scan +from services.social_login import SpaceProfile, normalize_social_endpoint, parse_space_scan_page, poll_social_scan def test_normalize_social_endpoint_accepts_space_root(): @@ -44,3 +47,87 @@ def test_poll_social_scan_treats_code_zero_as_authorized(monkeypatch): ) assert result == {"status": "authorized", "url": "https://zsglpt.workyai.cn/login?type=wx&code=ok"} + + +def test_user_social_callback_ignores_admin_binding_namespace(monkeypatch): + app = Flask(__name__) + app.secret_key = "test-secret" + app.register_blueprint(api_social.api_social_bp) + + logged_user_ids = [] + + class FakeDatabase: + def get_system_config(self): + return {} + + def find_social_login_binding(self, provider, social_uid): + assert (provider, social_uid) == ("wx", "same-openid") + return {"id": 3, "user_id": 11} + + def find_admin_social_login_binding_by_identity(self, *_args, **_kwargs): + raise AssertionError("user callback must not check admin social bindings") + + def get_user_by_id(self, user_id): + return {"id": user_id, "username": "normal-user", "status": "approved"} + + def update_social_login_binding_profile(self, *_args, **_kwargs): + return True + + monkeypatch.setattr(api_social, "database", FakeDatabase()) + monkeypatch.setattr( + api_social, + "fetch_space_profile", + lambda *_args, **_kwargs: SpaceProfile(provider="wx", social_uid="same-openid", nickname="nick"), + ) + monkeypatch.setattr(api_social, "_login_user_id", lambda user_id: logged_user_ids.append(user_id)) + + response = app.test_client().post( + "/api/auth/social/callback", + json={"provider": "wx", "code": "ok", "mode": "login"}, + ) + + assert response.status_code == 200 + assert response.get_json()["username"] == "normal-user" + assert logged_user_ids == [11] + + +def test_admin_social_callback_uses_admin_binding_namespace(monkeypatch): + app = Flask(__name__) + app.secret_key = "test-secret" + app.register_blueprint(api_social.api_social_bp) + + class FakeDatabase: + def get_system_config(self): + return {} + + def find_social_login_binding(self, *_args, **_kwargs): + raise AssertionError("admin callback must not check user social bindings") + + def find_admin_social_login_binding_by_identity(self, provider, social_uid): + assert (provider, social_uid) == ("wx", "same-openid") + return {"id": 9, "admin_id": 7} + + def get_admin_by_id(self, admin_id): + return {"id": admin_id, "username": "admin-user"} + + def update_admin_social_login_binding_profile(self, *_args, **_kwargs): + return True + + monkeypatch.setattr(api_social, "database", FakeDatabase()) + monkeypatch.setattr( + api_social, + "fetch_space_profile", + lambda *_args, **_kwargs: SpaceProfile(provider="wx", social_uid="same-openid", nickname="nick"), + ) + + client = app.test_client() + response = client.post( + "/yuyx/api/admin-auth/social/callback", + json={"provider": "wx", "code": "ok"}, + ) + + assert response.status_code == 200 + assert response.get_json()["redirect"] == "/yuyx/admin" + with client.session_transaction() as sess: + assert sess["admin_id"] == 7 + assert sess["admin_username"] == "admin-user"