From 5d5ddf06793e09076f335216608ae244101ead17 Mon Sep 17 00:00:00 2001
From: 237899745 <237899745@git.workyai.cn>
Date: Fri, 27 Feb 2026 15:24:20 +0800
Subject: [PATCH] feat: add wechat webui login server and page
---
.gitignore | 5 +
wechat-webui/index.html | 878 ++++++++++++++++++++++++++++++++++++++++
wechat-webui/server.py | 743 ++++++++++++++++++++++++++++++++++
3 files changed, 1626 insertions(+)
create mode 100644 wechat-webui/index.html
create mode 100644 wechat-webui/server.py
diff --git a/.gitignore b/.gitignore
index 9dec4c9..dea335a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,3 +33,8 @@ __pycache__/
# OS files
.DS_Store
Thumbs.db
+
+# WeChat webui runtime/session artifacts
+/wechat-webui/.session.json
+/wechat-webui/webui.log
+/wechat-webui/__pycache__/
diff --git a/wechat-webui/index.html b/wechat-webui/index.html
new file mode 100644
index 0000000..2eeded6
--- /dev/null
+++ b/wechat-webui/index.html
@@ -0,0 +1,878 @@
+
+
+
+
+
+ WeChatPadPro 登录控制台
+
+
+
+
+
+
+
WeChatPadPro 登录控制台
+
只保留扫码登录,状态与消息自动轮询
+
+
+
+ 未连接
+
+
+
+
+
+ 1. 登录配置
+
+
+
+
+
+
+
+
+
+
+
+ 2. 扫码窗口
+
+
![二维码]()
+
点击“生成授权二维码”后在这里显示二维码
+
+
+
+
+
+
+
+
+
+
diff --git a/wechat-webui/server.py b/wechat-webui/server.py
new file mode 100644
index 0000000..b277d45
--- /dev/null
+++ b/wechat-webui/server.py
@@ -0,0 +1,743 @@
+#!/usr/bin/env python3
+"""WeChatPadPro login/message web UI server."""
+
+from __future__ import annotations
+
+import json
+import os
+import threading
+import time
+import urllib.error
+import urllib.parse
+import urllib.request
+from collections import deque
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+from pathlib import Path
+from typing import Any, Iterable
+
+ROOT_DIR = Path(__file__).resolve().parent
+INDEX_FILE = ROOT_DIR / "index.html"
+SESSION_FILE = ROOT_DIR / ".session.json"
+DEFAULT_BASE_URL = os.environ.get("WCPP_API_BASE", "http://127.0.0.1:18238")
+HOST = os.environ.get("WEBUI_HOST", "0.0.0.0")
+PORT = int(os.environ.get("WEBUI_PORT", "18239"))
+SESSION_LOCK = threading.Lock()
+MESSAGE_CACHE_LOCK = threading.Lock()
+SUPPORTED_QR_ENDPOINTS = {
+ "GetLoginQrCodeNewX",
+ "GetLoginQrCodePadX",
+ "GetLoginQrCodeWin",
+ "GetLoginQrCodeMac",
+ "GetLoginQrCodeAndroidPad",
+ "GetLoginQrCodeCar",
+ "GetLoginQrCodeA16",
+ "GetLoginQrCodeNew",
+}
+MESSAGE_CACHE_MAX_KEYS = 6000
+MESSAGE_CACHE_RECENT_KEYS: dict[str, deque[str]] = {}
+MESSAGE_CACHE_RECENT_SET: dict[str, set[str]] = {}
+
+
+def load_admin_key() -> str:
+ env_key = os.environ.get("WCPP_ADMIN_KEY", "").strip()
+ if env_key:
+ return env_key
+
+ candidates = [
+ ROOT_DIR.parent / "deploy" / ".env",
+ ROOT_DIR.parent / ".env",
+ ]
+ for env_file in candidates:
+ if not env_file.exists():
+ continue
+ text = env_file.read_text(encoding="utf-8", errors="ignore")
+ for raw_line in text.splitlines():
+ line = raw_line.strip()
+ if not line or line.startswith("#") or "=" not in line:
+ continue
+ key, value = line.split("=", 1)
+ if key.strip() == "ADMIN_KEY" and value.strip():
+ return value.strip()
+ return ""
+
+
+DEFAULT_ADMIN_KEY = load_admin_key()
+
+DEFAULT_SESSION_STATE: dict[str, Any] = {
+ "baseUrl": DEFAULT_BASE_URL,
+ "authKey": "",
+ "ticket": "",
+ "qrData": {},
+ "qrEndpoint": "GetLoginQrCodeNewX",
+ "updatedAt": 0,
+}
+
+
+def _normalize_session(data: Any) -> dict[str, Any]:
+ state = dict(DEFAULT_SESSION_STATE)
+ if isinstance(data, dict):
+ base_url = data.get("baseUrl")
+ auth_key = data.get("authKey")
+ ticket = data.get("ticket")
+ qr_data = data.get("qrData")
+ qr_endpoint = data.get("qrEndpoint")
+ updated_at = data.get("updatedAt")
+
+ if isinstance(base_url, str) and base_url.strip():
+ state["baseUrl"] = base_url.strip()
+ if isinstance(auth_key, str):
+ state["authKey"] = auth_key.strip()
+ if isinstance(ticket, str):
+ state["ticket"] = ticket.strip()
+ if isinstance(qr_data, dict):
+ state["qrData"] = qr_data
+ if isinstance(qr_endpoint, str) and qr_endpoint.strip() in SUPPORTED_QR_ENDPOINTS:
+ state["qrEndpoint"] = qr_endpoint.strip()
+ if isinstance(updated_at, (int, float)):
+ state["updatedAt"] = int(updated_at)
+ return state
+
+
+def _write_session_file(state: dict[str, Any]) -> None:
+ tmp = SESSION_FILE.with_suffix(".json.tmp")
+ tmp.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
+ tmp.replace(SESSION_FILE)
+
+
+def load_session_state() -> dict[str, Any]:
+ if not SESSION_FILE.exists():
+ return dict(DEFAULT_SESSION_STATE)
+ try:
+ raw = json.loads(SESSION_FILE.read_text(encoding="utf-8"))
+ except Exception:
+ return dict(DEFAULT_SESSION_STATE)
+ return _normalize_session(raw)
+
+
+SESSION_STATE: dict[str, Any] = load_session_state()
+
+
+def get_session_state() -> dict[str, Any]:
+ with SESSION_LOCK:
+ return dict(SESSION_STATE)
+
+
+def update_session_state(patch: dict[str, Any]) -> dict[str, Any]:
+ with SESSION_LOCK:
+ merged = dict(SESSION_STATE)
+ for key in ("baseUrl", "authKey", "ticket", "qrData", "qrEndpoint"):
+ if key in patch:
+ merged[key] = patch[key]
+ merged = _normalize_session(merged)
+ merged["updatedAt"] = int(time.time())
+ SESSION_STATE.clear()
+ SESSION_STATE.update(merged)
+ _write_session_file(SESSION_STATE)
+ return dict(SESSION_STATE)
+
+
+def clear_session_state() -> dict[str, Any]:
+ with SESSION_LOCK:
+ SESSION_STATE.clear()
+ SESSION_STATE.update(dict(DEFAULT_SESSION_STATE))
+ SESSION_STATE["updatedAt"] = int(time.time())
+ _write_session_file(SESSION_STATE)
+ return dict(SESSION_STATE)
+
+
+def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
+ raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
+ handler.send_response(status)
+ handler.send_header("Content-Type", "application/json; charset=utf-8")
+ handler.send_header("Content-Length", str(len(raw)))
+ handler.send_header("Cache-Control", "no-store")
+ handler.send_header("Access-Control-Allow-Origin", "*")
+ handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
+ handler.send_header("Access-Control-Allow-Headers", "Content-Type")
+ handler.end_headers()
+ handler.wfile.write(raw)
+
+
+def text_response(handler: BaseHTTPRequestHandler, status: int, content_type: str, payload: bytes) -> None:
+ handler.send_response(status)
+ handler.send_header("Content-Type", content_type)
+ handler.send_header("Content-Length", str(len(payload)))
+ handler.send_header("Cache-Control", "no-store")
+ handler.send_header("Access-Control-Allow-Origin", "*")
+ handler.end_headers()
+ handler.wfile.write(payload)
+
+
+def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
+ length = int(handler.headers.get("Content-Length", "0") or "0")
+ if length == 0:
+ return {}
+ raw = handler.rfile.read(length)
+ try:
+ data = json.loads(raw.decode("utf-8"))
+ except json.JSONDecodeError as exc:
+ raise ValueError(f"请求体 JSON 无效: {exc}") from exc
+ if not isinstance(data, dict):
+ raise ValueError("请求体必须是 JSON 对象")
+ return data
+
+
+def api_request(
+ base_url: str,
+ method: str,
+ path: str,
+ key: str | None = None,
+ body: dict[str, Any] | None = None,
+ timeout: int = 25,
+) -> dict[str, Any]:
+ base = base_url.rstrip("/")
+ url = f"{base}{path}"
+ if key:
+ joiner = "&" if "?" in url else "?"
+ url = f"{url}{joiner}key={urllib.parse.quote(key)}"
+
+ payload = None
+ headers: dict[str, str] = {}
+ if body is not None:
+ payload = json.dumps(body).encode("utf-8")
+ headers["Content-Type"] = "application/json"
+
+ req = urllib.request.Request(url, data=payload, method=method.upper(), headers=headers)
+ try:
+ with urllib.request.urlopen(req, timeout=timeout) as resp:
+ return json.loads(resp.read().decode("utf-8"))
+ except urllib.error.HTTPError as exc:
+ detail = exc.read().decode("utf-8", errors="replace")
+ raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
+ except urllib.error.URLError as exc:
+ raise RuntimeError(f"请求失败: {exc}") from exc
+
+
+def flatten_wrapped_value(value: Any) -> Any:
+ if isinstance(value, dict):
+ for name in ("str", "string", "String", "value", "Value"):
+ if name in value and value[name] not in (None, ""):
+ return value[name]
+ return value
+
+
+def find_first_value(data: Any, keys: Iterable[str]) -> Any:
+ targets = {k.lower() for k in keys}
+ queue: list[Any] = [data]
+ while queue:
+ current = queue.pop(0)
+ if isinstance(current, dict):
+ for k, v in current.items():
+ if k.lower() in targets:
+ found = flatten_wrapped_value(v)
+ if found not in (None, "", [], {}):
+ return found
+ queue.append(v)
+ elif isinstance(current, list):
+ queue.extend(current)
+ return None
+
+
+def to_display_value(value: Any) -> Any:
+ value = flatten_wrapped_value(value)
+ if isinstance(value, (dict, list)):
+ try:
+ return json.dumps(value, ensure_ascii=False)
+ except Exception:
+ return str(value)
+ return value
+
+
+def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]:
+ add_msgs = item.get("AddMsgs")
+ if isinstance(add_msgs, list):
+ for msg in add_msgs:
+ if isinstance(msg, dict):
+ return msg
+ return item
+
+
+def summarize_message(item: Any) -> dict[str, Any]:
+ if not isinstance(item, dict):
+ return {"content": str(item)}
+
+ primary = pick_primary_message(item)
+
+ outer_type = find_first_value(item, ["Type"])
+ inner_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"])
+
+ msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"])
+ from_user = find_first_value(
+ primary,
+ ["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"],
+ )
+ to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"])
+ create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"])
+ content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"])
+
+ if content in (None, "", {}, []):
+ # 部分系统消息正文在 msg_source 里
+ content = find_first_value(primary, ["msg_source", "MsgSource"])
+
+ return {
+ "id": to_display_value(msg_id),
+ "type": to_display_value(inner_type if inner_type not in (None, "") else outer_type),
+ "from": to_display_value(from_user),
+ "to": to_display_value(to_user),
+ "time": to_display_value(create_time),
+ "content": to_display_value(content),
+ }
+
+
+def build_message_dedupe_key(summary: dict[str, Any], raw_item: Any) -> str:
+ msg_id = summary.get("id")
+ msg_type = summary.get("type")
+ if msg_id not in (None, "", "-"):
+ return f"id:{msg_id}|type:{msg_type}"
+ if isinstance(raw_item, dict):
+ fallback = find_first_value(raw_item, ["new_msg_id", "msg_id", "NewMsgId", "MsgId"])
+ if fallback not in (None, "", "-"):
+ return f"id:{fallback}|type:{msg_type}"
+ return "|".join(
+ [
+ f"type:{msg_type}",
+ f"from:{summary.get('from')}",
+ f"to:{summary.get('to')}",
+ f"time:{summary.get('time')}",
+ f"content:{str(summary.get('content') or '')[:120]}",
+ ]
+ )
+
+
+def seen_message_before(auth_key: str, dedupe_key: str) -> bool:
+ with MESSAGE_CACHE_LOCK:
+ msg_set = MESSAGE_CACHE_RECENT_SET.setdefault(auth_key, set())
+ if dedupe_key in msg_set:
+ return True
+
+ msg_queue = MESSAGE_CACHE_RECENT_KEYS.setdefault(auth_key, deque())
+ msg_queue.append(dedupe_key)
+ msg_set.add(dedupe_key)
+
+ while len(msg_queue) > MESSAGE_CACHE_MAX_KEYS:
+ old = msg_queue.popleft()
+ msg_set.discard(old)
+
+ return False
+
+
+def clear_message_cache(auth_key: str | None = None) -> None:
+ with MESSAGE_CACHE_LOCK:
+ if auth_key:
+ MESSAGE_CACHE_RECENT_KEYS.pop(auth_key, None)
+ MESSAGE_CACHE_RECENT_SET.pop(auth_key, None)
+ return
+ MESSAGE_CACHE_RECENT_KEYS.clear()
+ MESSAGE_CACHE_RECENT_SET.clear()
+
+
+def create_auth_key(base_url: str, admin_key: str, days: int, remark: str) -> str:
+ resp = api_request(
+ base_url=base_url,
+ method="POST",
+ path="/admin/GenAuthKey1",
+ key=admin_key,
+ body={"Count": 1, "Days": days, "Remark": remark},
+ )
+ if resp.get("Code") != 200:
+ raise RuntimeError(f"生成授权码失败: {json.dumps(resp, ensure_ascii=False)}")
+ data = resp.get("Data")
+ if isinstance(data, dict):
+ keys = data.get("authKeys") or data.get("AuthKeys")
+ if isinstance(keys, list) and keys:
+ return str(keys[0])
+ raise RuntimeError("授权码响应结构异常")
+
+
+def get_login_qr(base_url: str, auth_key: str, qr_endpoint: str = "GetLoginQrCodeNewX") -> dict[str, Any]:
+ endpoint = qr_endpoint.strip() if qr_endpoint else "GetLoginQrCodeNewX"
+ if endpoint not in SUPPORTED_QR_ENDPOINTS:
+ raise RuntimeError(f"不支持的二维码接口: {endpoint}")
+ resp = api_request(
+ base_url=base_url,
+ method="POST",
+ path=f"/login/{endpoint}",
+ key=auth_key,
+ body={"Check": False, "Proxy": ""},
+ )
+ if resp.get("Code") != 200:
+ raise RuntimeError(f"获取二维码失败: {json.dumps(resp, ensure_ascii=False)}")
+ data = resp.get("Data")
+ if not isinstance(data, dict):
+ raise RuntimeError("二维码响应结构异常")
+ return data
+
+
+def get_status_bundle(base_url: str, auth_key: str) -> dict[str, Any]:
+ check = api_request(base_url=base_url, method="GET", path="/login/CheckLoginStatus", key=auth_key)
+ online = api_request(base_url=base_url, method="GET", path="/login/GetLoginStatus", key=auth_key)
+ return {
+ "check": check,
+ "online": online,
+ "isOnline": online.get("Code") == 200,
+ }
+
+
+def submit_verify(base_url: str, auth_key: str, code: str, ticket: str, data62: str, uuid: str) -> dict[str, Any]:
+ payload: dict[str, Any] = {"Code": code}
+ if ticket:
+ payload["Ticket"] = ticket
+ if data62:
+ payload["Data62"] = data62
+ if uuid:
+ payload["Uuid"] = uuid
+ return api_request(
+ base_url=base_url,
+ method="POST",
+ path="/login/YPayVerificationcode",
+ key=auth_key,
+ body=payload,
+ )
+
+
+def poll_messages(base_url: str, auth_key: str, count: int) -> dict[str, Any]:
+ raw = api_request(
+ base_url=base_url,
+ method="POST",
+ path="/message/HttpSyncMsg",
+ key=auth_key,
+ body={"Count": count},
+ )
+ data = raw.get("Data")
+ items: list[dict[str, Any]] = []
+ batch_seen: set[str] = set()
+ if isinstance(data, list):
+ for item in data:
+ summary = summarize_message(item)
+ dedupe_key = build_message_dedupe_key(summary, item)
+ if dedupe_key in batch_seen:
+ continue
+ batch_seen.add(dedupe_key)
+ if seen_message_before(auth_key, dedupe_key):
+ continue
+ summary["raw"] = item
+ items.append(summary)
+ return {
+ "raw": raw,
+ "items": items,
+ "received": len(items),
+ }
+
+
+def ext_device_confirm_get(base_url: str, auth_key: str, url: str) -> dict[str, Any]:
+ payload: dict[str, Any] = {}
+ if url:
+ payload["Url"] = url
+ return api_request(
+ base_url=base_url,
+ method="POST",
+ path="/login/ExtDeviceLoginConfirmGet",
+ key=auth_key,
+ body=payload,
+ )
+
+
+def ext_device_confirm_ok(base_url: str, auth_key: str, url: str) -> dict[str, Any]:
+ payload: dict[str, Any] = {}
+ if url:
+ payload["Url"] = url
+ return api_request(
+ base_url=base_url,
+ method="POST",
+ path="/login/ExtDeviceLoginConfirmOk",
+ key=auth_key,
+ body=payload,
+ )
+
+
+def try_logout(base_url: str, auth_key: str) -> dict[str, Any]:
+ attempts: list[dict[str, Any]] = []
+ candidates: list[tuple[str, str, dict[str, Any] | None]] = [
+ ("POST", "/login/LogOut", {}),
+ ("GET", "/login/LogOut", None),
+ ("POST", "/login/Logout", {}),
+ ("GET", "/login/Logout", None),
+ ]
+ for method, path, body in candidates:
+ try:
+ resp = api_request(base_url=base_url, method=method, path=path, key=auth_key, body=body)
+ attempts.append({"method": method, "path": path, "ok": True, "raw": resp})
+ code = resp.get("Code") if isinstance(resp, dict) else None
+ if code in (None, 200, 300, -2, -3):
+ return {
+ "attempted": True,
+ "ok": True,
+ "method": method,
+ "path": path,
+ "raw": resp,
+ "attempts": attempts,
+ }
+ except Exception as exc:
+ attempts.append({"method": method, "path": path, "ok": False, "error": str(exc)})
+
+ last_error = next((item["error"] for item in reversed(attempts) if not item.get("ok")), "logout not supported")
+ return {"attempted": True, "ok": False, "error": last_error, "attempts": attempts}
+
+
+class Handler(BaseHTTPRequestHandler):
+ server_version = "WCPPWebUI/1.0"
+
+ def log_message(self, fmt: str, *args: Any) -> None:
+ print(f"[webui] {self.address_string()} - {fmt % args}")
+
+ def do_OPTIONS(self) -> None:
+ self.send_response(204)
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
+ self.send_header("Access-Control-Allow-Headers", "Content-Type")
+ self.end_headers()
+
+ def do_GET(self) -> None:
+ parsed = urllib.parse.urlparse(self.path)
+ if parsed.path in {"/", "/index.html"}:
+ if not INDEX_FILE.exists():
+ json_response(self, 500, {"error": "index.html 不存在"})
+ return
+ payload = INDEX_FILE.read_bytes()
+ text_response(self, 200, "text/html; charset=utf-8", payload)
+ return
+
+ if parsed.path == "/health":
+ json_response(self, 200, {"ok": True})
+ return
+
+ if parsed.path == "/api/config":
+ json_response(
+ self,
+ 200,
+ {
+ "baseUrl": DEFAULT_BASE_URL,
+ "hasDefaultAdminKey": bool(DEFAULT_ADMIN_KEY),
+ "hasSavedSession": bool(get_session_state().get("authKey")),
+ "qrEndpoints": sorted(SUPPORTED_QR_ENDPOINTS),
+ },
+ )
+ return
+
+ if parsed.path == "/api/session":
+ json_response(self, 200, {"session": get_session_state()})
+ return
+
+ if parsed.path == "/api/login/status":
+ query = urllib.parse.parse_qs(parsed.query)
+ auth_key = (query.get("authKey") or [""])[0].strip()
+ base_url = (query.get("baseUrl") or [DEFAULT_BASE_URL])[0].strip() or DEFAULT_BASE_URL
+ if not auth_key:
+ json_response(self, 400, {"error": "authKey 不能为空"})
+ return
+ try:
+ result = get_status_bundle(base_url=base_url, auth_key=auth_key)
+ update_session_state({"baseUrl": base_url, "authKey": auth_key})
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, result)
+ return
+
+ json_response(self, 404, {"error": "Not Found"})
+
+ def do_POST(self) -> None:
+ parsed = urllib.parse.urlparse(self.path)
+ try:
+ data = parse_json_body(self)
+ except ValueError as exc:
+ json_response(self, 400, {"error": str(exc)})
+ return
+
+ base_url = str(data.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL
+
+ if parsed.path == "/api/login/start":
+ admin_key = str(data.get("adminKey") or DEFAULT_ADMIN_KEY).strip()
+ if not admin_key:
+ json_response(self, 400, {"error": "adminKey 不能为空(也未找到默认 ADMIN_KEY)"})
+ return
+ days = int(data.get("days") or 30)
+ remark = str(data.get("remark") or "webui-login").strip()
+ qr_endpoint = str(data.get("qrEndpoint") or "GetLoginQrCodeNewX").strip()
+ try:
+ auth_key = create_auth_key(base_url=base_url, admin_key=admin_key, days=days, remark=remark)
+ qr_data = get_login_qr(base_url=base_url, auth_key=auth_key, qr_endpoint=qr_endpoint)
+ clear_message_cache(auth_key)
+ update_session_state(
+ {
+ "baseUrl": base_url,
+ "authKey": auth_key,
+ "qrData": qr_data,
+ "ticket": "",
+ "qrEndpoint": qr_endpoint,
+ }
+ )
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(
+ self,
+ 200,
+ {
+ "authKey": auth_key,
+ "qrEndpoint": qr_endpoint,
+ "qrData": qr_data,
+ },
+ )
+ return
+
+ if parsed.path == "/api/login/wakeup":
+ auth_key = str(data.get("authKey") or "").strip()
+ if not auth_key:
+ json_response(self, 400, {"error": "authKey 不能为空"})
+ return
+ try:
+ resp = api_request(
+ base_url=base_url,
+ method="POST",
+ path="/login/WakeUpLogin",
+ key=auth_key,
+ body={"Check": False, "Proxy": ""},
+ )
+ update_session_state({"baseUrl": base_url, "authKey": auth_key})
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, {"raw": resp})
+ return
+
+ if parsed.path == "/api/login/verify":
+ auth_key = str(data.get("authKey") or "").strip()
+ code = str(data.get("code") or "").strip()
+ ticket = str(data.get("ticket") or "").strip()
+ data62 = str(data.get("data62") or "").strip()
+ uuid = str(data.get("uuid") or "").strip()
+ if not auth_key:
+ json_response(self, 400, {"error": "authKey 不能为空"})
+ return
+ if not code:
+ json_response(self, 400, {"error": "code 不能为空"})
+ return
+ try:
+ resp = submit_verify(
+ base_url=base_url,
+ auth_key=auth_key,
+ code=code,
+ ticket=ticket,
+ data62=data62,
+ uuid=uuid,
+ )
+ update_session_state({"baseUrl": base_url, "authKey": auth_key, "ticket": ticket})
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, {"raw": resp})
+ return
+
+ if parsed.path == "/api/login/ext-confirm/get":
+ auth_key = str(data.get("authKey") or "").strip()
+ url = str(data.get("url") or "").strip()
+ if not auth_key:
+ json_response(self, 400, {"error": "authKey 不能为空"})
+ return
+ try:
+ resp = ext_device_confirm_get(base_url=base_url, auth_key=auth_key, url=url)
+ update_session_state({"baseUrl": base_url, "authKey": auth_key})
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, {"raw": resp})
+ return
+
+ if parsed.path == "/api/login/ext-confirm/ok":
+ auth_key = str(data.get("authKey") or "").strip()
+ url = str(data.get("url") or "").strip()
+ if not auth_key:
+ json_response(self, 400, {"error": "authKey 不能为空"})
+ return
+ try:
+ resp = ext_device_confirm_ok(base_url=base_url, auth_key=auth_key, url=url)
+ update_session_state({"baseUrl": base_url, "authKey": auth_key})
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, {"raw": resp})
+ return
+
+ if parsed.path == "/api/login/clear":
+ session = get_session_state()
+ auth_key = str(data.get("authKey") or session.get("authKey") or "").strip()
+ current_base_url = str(data.get("baseUrl") or session.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL
+ logout_result: dict[str, Any] = {"attempted": False, "ok": False}
+
+ if auth_key:
+ logout_result = try_logout(base_url=current_base_url, auth_key=auth_key)
+
+ try:
+ next_state = clear_session_state()
+ clear_message_cache(auth_key or None)
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+
+ json_response(self, 200, {"session": next_state, "logout": logout_result})
+ return
+
+ if parsed.path == "/api/session":
+ try:
+ next_state = update_session_state(data)
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, {"session": next_state})
+ return
+
+ if parsed.path == "/api/session/clear":
+ old_auth_key = str(get_session_state().get("authKey") or "").strip()
+ try:
+ next_state = clear_session_state()
+ clear_message_cache(old_auth_key or None)
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, {"session": next_state})
+ return
+
+ if parsed.path == "/api/message/poll":
+ auth_key = str(data.get("authKey") or "").strip()
+ count = int(data.get("count") or 30)
+ if not auth_key:
+ json_response(self, 400, {"error": "authKey 不能为空"})
+ return
+ try:
+ result = poll_messages(base_url=base_url, auth_key=auth_key, count=count)
+ update_session_state({"baseUrl": base_url, "authKey": auth_key})
+ except Exception as exc: # pragma: no cover
+ json_response(self, 500, {"error": str(exc)})
+ return
+ json_response(self, 200, result)
+ return
+
+ json_response(self, 404, {"error": "Not Found"})
+
+
+def main() -> None:
+ server = ThreadingHTTPServer((HOST, PORT), Handler)
+ print(f"[webui] listening on http://{HOST}:{PORT}")
+ print(f"[webui] default api base: {DEFAULT_BASE_URL}")
+ print(f"[webui] default admin key loaded: {bool(DEFAULT_ADMIN_KEY)}")
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ server.server_close()
+
+
+if __name__ == "__main__":
+ main()