diff --git a/.gitignore b/.gitignore index 9dec4c9..dea335a 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,8 @@ __pycache__/ # OS files .DS_Store Thumbs.db + +# WeChat webui runtime/session artifacts +/wechat-webui/.session.json +/wechat-webui/webui.log +/wechat-webui/__pycache__/ diff --git a/wechat-webui/index.html b/wechat-webui/index.html new file mode 100644 index 0000000..2eeded6 --- /dev/null +++ b/wechat-webui/index.html @@ -0,0 +1,878 @@ + + + + + + WeChatPadPro 登录控制台 + + + +
+
+
+

WeChatPadPro 登录控制台

+

只保留扫码登录,状态与消息自动轮询

+
+
+ + 未连接 +
+
+ +
+
+

1. 登录配置

+
+ + +
+
+ + +
+
+ +
+

2. 扫码窗口

+
+ 二维码 +
点击“生成授权二维码”后在这里显示二维码
+
+
+
+ +
+

状态日志

+ +
+
+ + + + diff --git a/wechat-webui/server.py b/wechat-webui/server.py new file mode 100644 index 0000000..b277d45 --- /dev/null +++ b/wechat-webui/server.py @@ -0,0 +1,743 @@ +#!/usr/bin/env python3 +"""WeChatPadPro login/message web UI server.""" + +from __future__ import annotations + +import json +import os +import threading +import time +import urllib.error +import urllib.parse +import urllib.request +from collections import deque +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any, Iterable + +ROOT_DIR = Path(__file__).resolve().parent +INDEX_FILE = ROOT_DIR / "index.html" +SESSION_FILE = ROOT_DIR / ".session.json" +DEFAULT_BASE_URL = os.environ.get("WCPP_API_BASE", "http://127.0.0.1:18238") +HOST = os.environ.get("WEBUI_HOST", "0.0.0.0") +PORT = int(os.environ.get("WEBUI_PORT", "18239")) +SESSION_LOCK = threading.Lock() +MESSAGE_CACHE_LOCK = threading.Lock() +SUPPORTED_QR_ENDPOINTS = { + "GetLoginQrCodeNewX", + "GetLoginQrCodePadX", + "GetLoginQrCodeWin", + "GetLoginQrCodeMac", + "GetLoginQrCodeAndroidPad", + "GetLoginQrCodeCar", + "GetLoginQrCodeA16", + "GetLoginQrCodeNew", +} +MESSAGE_CACHE_MAX_KEYS = 6000 +MESSAGE_CACHE_RECENT_KEYS: dict[str, deque[str]] = {} +MESSAGE_CACHE_RECENT_SET: dict[str, set[str]] = {} + + +def load_admin_key() -> str: + env_key = os.environ.get("WCPP_ADMIN_KEY", "").strip() + if env_key: + return env_key + + candidates = [ + ROOT_DIR.parent / "deploy" / ".env", + ROOT_DIR.parent / ".env", + ] + for env_file in candidates: + if not env_file.exists(): + continue + text = env_file.read_text(encoding="utf-8", errors="ignore") + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + if key.strip() == "ADMIN_KEY" and value.strip(): + return value.strip() + return "" + + +DEFAULT_ADMIN_KEY = load_admin_key() + +DEFAULT_SESSION_STATE: dict[str, Any] = { + "baseUrl": DEFAULT_BASE_URL, + "authKey": "", + "ticket": "", + "qrData": {}, + "qrEndpoint": "GetLoginQrCodeNewX", + "updatedAt": 0, +} + + +def _normalize_session(data: Any) -> dict[str, Any]: + state = dict(DEFAULT_SESSION_STATE) + if isinstance(data, dict): + base_url = data.get("baseUrl") + auth_key = data.get("authKey") + ticket = data.get("ticket") + qr_data = data.get("qrData") + qr_endpoint = data.get("qrEndpoint") + updated_at = data.get("updatedAt") + + if isinstance(base_url, str) and base_url.strip(): + state["baseUrl"] = base_url.strip() + if isinstance(auth_key, str): + state["authKey"] = auth_key.strip() + if isinstance(ticket, str): + state["ticket"] = ticket.strip() + if isinstance(qr_data, dict): + state["qrData"] = qr_data + if isinstance(qr_endpoint, str) and qr_endpoint.strip() in SUPPORTED_QR_ENDPOINTS: + state["qrEndpoint"] = qr_endpoint.strip() + if isinstance(updated_at, (int, float)): + state["updatedAt"] = int(updated_at) + return state + + +def _write_session_file(state: dict[str, Any]) -> None: + tmp = SESSION_FILE.with_suffix(".json.tmp") + tmp.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8") + tmp.replace(SESSION_FILE) + + +def load_session_state() -> dict[str, Any]: + if not SESSION_FILE.exists(): + return dict(DEFAULT_SESSION_STATE) + try: + raw = json.loads(SESSION_FILE.read_text(encoding="utf-8")) + except Exception: + return dict(DEFAULT_SESSION_STATE) + return _normalize_session(raw) + + +SESSION_STATE: dict[str, Any] = load_session_state() + + +def get_session_state() -> dict[str, Any]: + with SESSION_LOCK: + return dict(SESSION_STATE) + + +def update_session_state(patch: dict[str, Any]) -> dict[str, Any]: + with SESSION_LOCK: + merged = dict(SESSION_STATE) + for key in ("baseUrl", "authKey", "ticket", "qrData", "qrEndpoint"): + if key in patch: + merged[key] = patch[key] + merged = _normalize_session(merged) + merged["updatedAt"] = int(time.time()) + SESSION_STATE.clear() + SESSION_STATE.update(merged) + _write_session_file(SESSION_STATE) + return dict(SESSION_STATE) + + +def clear_session_state() -> dict[str, Any]: + with SESSION_LOCK: + SESSION_STATE.clear() + SESSION_STATE.update(dict(DEFAULT_SESSION_STATE)) + SESSION_STATE["updatedAt"] = int(time.time()) + _write_session_file(SESSION_STATE) + return dict(SESSION_STATE) + + +def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None: + raw = json.dumps(payload, ensure_ascii=False).encode("utf-8") + handler.send_response(status) + handler.send_header("Content-Type", "application/json; charset=utf-8") + handler.send_header("Content-Length", str(len(raw))) + handler.send_header("Cache-Control", "no-store") + handler.send_header("Access-Control-Allow-Origin", "*") + handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + handler.send_header("Access-Control-Allow-Headers", "Content-Type") + handler.end_headers() + handler.wfile.write(raw) + + +def text_response(handler: BaseHTTPRequestHandler, status: int, content_type: str, payload: bytes) -> None: + handler.send_response(status) + handler.send_header("Content-Type", content_type) + handler.send_header("Content-Length", str(len(payload))) + handler.send_header("Cache-Control", "no-store") + handler.send_header("Access-Control-Allow-Origin", "*") + handler.end_headers() + handler.wfile.write(payload) + + +def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]: + length = int(handler.headers.get("Content-Length", "0") or "0") + if length == 0: + return {} + raw = handler.rfile.read(length) + try: + data = json.loads(raw.decode("utf-8")) + except json.JSONDecodeError as exc: + raise ValueError(f"请求体 JSON 无效: {exc}") from exc + if not isinstance(data, dict): + raise ValueError("请求体必须是 JSON 对象") + return data + + +def api_request( + base_url: str, + method: str, + path: str, + key: str | None = None, + body: dict[str, Any] | None = None, + timeout: int = 25, +) -> dict[str, Any]: + base = base_url.rstrip("/") + url = f"{base}{path}" + if key: + joiner = "&" if "?" in url else "?" + url = f"{url}{joiner}key={urllib.parse.quote(key)}" + + payload = None + headers: dict[str, str] = {} + if body is not None: + payload = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + + req = urllib.request.Request(url, data=payload, method=method.upper(), headers=headers) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc + except urllib.error.URLError as exc: + raise RuntimeError(f"请求失败: {exc}") from exc + + +def flatten_wrapped_value(value: Any) -> Any: + if isinstance(value, dict): + for name in ("str", "string", "String", "value", "Value"): + if name in value and value[name] not in (None, ""): + return value[name] + return value + + +def find_first_value(data: Any, keys: Iterable[str]) -> Any: + targets = {k.lower() for k in keys} + queue: list[Any] = [data] + while queue: + current = queue.pop(0) + if isinstance(current, dict): + for k, v in current.items(): + if k.lower() in targets: + found = flatten_wrapped_value(v) + if found not in (None, "", [], {}): + return found + queue.append(v) + elif isinstance(current, list): + queue.extend(current) + return None + + +def to_display_value(value: Any) -> Any: + value = flatten_wrapped_value(value) + if isinstance(value, (dict, list)): + try: + return json.dumps(value, ensure_ascii=False) + except Exception: + return str(value) + return value + + +def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]: + add_msgs = item.get("AddMsgs") + if isinstance(add_msgs, list): + for msg in add_msgs: + if isinstance(msg, dict): + return msg + return item + + +def summarize_message(item: Any) -> dict[str, Any]: + if not isinstance(item, dict): + return {"content": str(item)} + + primary = pick_primary_message(item) + + outer_type = find_first_value(item, ["Type"]) + inner_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"]) + + msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"]) + from_user = find_first_value( + primary, + ["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"], + ) + to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"]) + create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"]) + content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"]) + + if content in (None, "", {}, []): + # 部分系统消息正文在 msg_source 里 + content = find_first_value(primary, ["msg_source", "MsgSource"]) + + return { + "id": to_display_value(msg_id), + "type": to_display_value(inner_type if inner_type not in (None, "") else outer_type), + "from": to_display_value(from_user), + "to": to_display_value(to_user), + "time": to_display_value(create_time), + "content": to_display_value(content), + } + + +def build_message_dedupe_key(summary: dict[str, Any], raw_item: Any) -> str: + msg_id = summary.get("id") + msg_type = summary.get("type") + if msg_id not in (None, "", "-"): + return f"id:{msg_id}|type:{msg_type}" + if isinstance(raw_item, dict): + fallback = find_first_value(raw_item, ["new_msg_id", "msg_id", "NewMsgId", "MsgId"]) + if fallback not in (None, "", "-"): + return f"id:{fallback}|type:{msg_type}" + return "|".join( + [ + f"type:{msg_type}", + f"from:{summary.get('from')}", + f"to:{summary.get('to')}", + f"time:{summary.get('time')}", + f"content:{str(summary.get('content') or '')[:120]}", + ] + ) + + +def seen_message_before(auth_key: str, dedupe_key: str) -> bool: + with MESSAGE_CACHE_LOCK: + msg_set = MESSAGE_CACHE_RECENT_SET.setdefault(auth_key, set()) + if dedupe_key in msg_set: + return True + + msg_queue = MESSAGE_CACHE_RECENT_KEYS.setdefault(auth_key, deque()) + msg_queue.append(dedupe_key) + msg_set.add(dedupe_key) + + while len(msg_queue) > MESSAGE_CACHE_MAX_KEYS: + old = msg_queue.popleft() + msg_set.discard(old) + + return False + + +def clear_message_cache(auth_key: str | None = None) -> None: + with MESSAGE_CACHE_LOCK: + if auth_key: + MESSAGE_CACHE_RECENT_KEYS.pop(auth_key, None) + MESSAGE_CACHE_RECENT_SET.pop(auth_key, None) + return + MESSAGE_CACHE_RECENT_KEYS.clear() + MESSAGE_CACHE_RECENT_SET.clear() + + +def create_auth_key(base_url: str, admin_key: str, days: int, remark: str) -> str: + resp = api_request( + base_url=base_url, + method="POST", + path="/admin/GenAuthKey1", + key=admin_key, + body={"Count": 1, "Days": days, "Remark": remark}, + ) + if resp.get("Code") != 200: + raise RuntimeError(f"生成授权码失败: {json.dumps(resp, ensure_ascii=False)}") + data = resp.get("Data") + if isinstance(data, dict): + keys = data.get("authKeys") or data.get("AuthKeys") + if isinstance(keys, list) and keys: + return str(keys[0]) + raise RuntimeError("授权码响应结构异常") + + +def get_login_qr(base_url: str, auth_key: str, qr_endpoint: str = "GetLoginQrCodeNewX") -> dict[str, Any]: + endpoint = qr_endpoint.strip() if qr_endpoint else "GetLoginQrCodeNewX" + if endpoint not in SUPPORTED_QR_ENDPOINTS: + raise RuntimeError(f"不支持的二维码接口: {endpoint}") + resp = api_request( + base_url=base_url, + method="POST", + path=f"/login/{endpoint}", + key=auth_key, + body={"Check": False, "Proxy": ""}, + ) + if resp.get("Code") != 200: + raise RuntimeError(f"获取二维码失败: {json.dumps(resp, ensure_ascii=False)}") + data = resp.get("Data") + if not isinstance(data, dict): + raise RuntimeError("二维码响应结构异常") + return data + + +def get_status_bundle(base_url: str, auth_key: str) -> dict[str, Any]: + check = api_request(base_url=base_url, method="GET", path="/login/CheckLoginStatus", key=auth_key) + online = api_request(base_url=base_url, method="GET", path="/login/GetLoginStatus", key=auth_key) + return { + "check": check, + "online": online, + "isOnline": online.get("Code") == 200, + } + + +def submit_verify(base_url: str, auth_key: str, code: str, ticket: str, data62: str, uuid: str) -> dict[str, Any]: + payload: dict[str, Any] = {"Code": code} + if ticket: + payload["Ticket"] = ticket + if data62: + payload["Data62"] = data62 + if uuid: + payload["Uuid"] = uuid + return api_request( + base_url=base_url, + method="POST", + path="/login/YPayVerificationcode", + key=auth_key, + body=payload, + ) + + +def poll_messages(base_url: str, auth_key: str, count: int) -> dict[str, Any]: + raw = api_request( + base_url=base_url, + method="POST", + path="/message/HttpSyncMsg", + key=auth_key, + body={"Count": count}, + ) + data = raw.get("Data") + items: list[dict[str, Any]] = [] + batch_seen: set[str] = set() + if isinstance(data, list): + for item in data: + summary = summarize_message(item) + dedupe_key = build_message_dedupe_key(summary, item) + if dedupe_key in batch_seen: + continue + batch_seen.add(dedupe_key) + if seen_message_before(auth_key, dedupe_key): + continue + summary["raw"] = item + items.append(summary) + return { + "raw": raw, + "items": items, + "received": len(items), + } + + +def ext_device_confirm_get(base_url: str, auth_key: str, url: str) -> dict[str, Any]: + payload: dict[str, Any] = {} + if url: + payload["Url"] = url + return api_request( + base_url=base_url, + method="POST", + path="/login/ExtDeviceLoginConfirmGet", + key=auth_key, + body=payload, + ) + + +def ext_device_confirm_ok(base_url: str, auth_key: str, url: str) -> dict[str, Any]: + payload: dict[str, Any] = {} + if url: + payload["Url"] = url + return api_request( + base_url=base_url, + method="POST", + path="/login/ExtDeviceLoginConfirmOk", + key=auth_key, + body=payload, + ) + + +def try_logout(base_url: str, auth_key: str) -> dict[str, Any]: + attempts: list[dict[str, Any]] = [] + candidates: list[tuple[str, str, dict[str, Any] | None]] = [ + ("POST", "/login/LogOut", {}), + ("GET", "/login/LogOut", None), + ("POST", "/login/Logout", {}), + ("GET", "/login/Logout", None), + ] + for method, path, body in candidates: + try: + resp = api_request(base_url=base_url, method=method, path=path, key=auth_key, body=body) + attempts.append({"method": method, "path": path, "ok": True, "raw": resp}) + code = resp.get("Code") if isinstance(resp, dict) else None + if code in (None, 200, 300, -2, -3): + return { + "attempted": True, + "ok": True, + "method": method, + "path": path, + "raw": resp, + "attempts": attempts, + } + except Exception as exc: + attempts.append({"method": method, "path": path, "ok": False, "error": str(exc)}) + + last_error = next((item["error"] for item in reversed(attempts) if not item.get("ok")), "logout not supported") + return {"attempted": True, "ok": False, "error": last_error, "attempts": attempts} + + +class Handler(BaseHTTPRequestHandler): + server_version = "WCPPWebUI/1.0" + + def log_message(self, fmt: str, *args: Any) -> None: + print(f"[webui] {self.address_string()} - {fmt % args}") + + def do_OPTIONS(self) -> None: + self.send_response(204) + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + self.send_header("Access-Control-Allow-Headers", "Content-Type") + self.end_headers() + + def do_GET(self) -> None: + parsed = urllib.parse.urlparse(self.path) + if parsed.path in {"/", "/index.html"}: + if not INDEX_FILE.exists(): + json_response(self, 500, {"error": "index.html 不存在"}) + return + payload = INDEX_FILE.read_bytes() + text_response(self, 200, "text/html; charset=utf-8", payload) + return + + if parsed.path == "/health": + json_response(self, 200, {"ok": True}) + return + + if parsed.path == "/api/config": + json_response( + self, + 200, + { + "baseUrl": DEFAULT_BASE_URL, + "hasDefaultAdminKey": bool(DEFAULT_ADMIN_KEY), + "hasSavedSession": bool(get_session_state().get("authKey")), + "qrEndpoints": sorted(SUPPORTED_QR_ENDPOINTS), + }, + ) + return + + if parsed.path == "/api/session": + json_response(self, 200, {"session": get_session_state()}) + return + + if parsed.path == "/api/login/status": + query = urllib.parse.parse_qs(parsed.query) + auth_key = (query.get("authKey") or [""])[0].strip() + base_url = (query.get("baseUrl") or [DEFAULT_BASE_URL])[0].strip() or DEFAULT_BASE_URL + if not auth_key: + json_response(self, 400, {"error": "authKey 不能为空"}) + return + try: + result = get_status_bundle(base_url=base_url, auth_key=auth_key) + update_session_state({"baseUrl": base_url, "authKey": auth_key}) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, result) + return + + json_response(self, 404, {"error": "Not Found"}) + + def do_POST(self) -> None: + parsed = urllib.parse.urlparse(self.path) + try: + data = parse_json_body(self) + except ValueError as exc: + json_response(self, 400, {"error": str(exc)}) + return + + base_url = str(data.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL + + if parsed.path == "/api/login/start": + admin_key = str(data.get("adminKey") or DEFAULT_ADMIN_KEY).strip() + if not admin_key: + json_response(self, 400, {"error": "adminKey 不能为空(也未找到默认 ADMIN_KEY)"}) + return + days = int(data.get("days") or 30) + remark = str(data.get("remark") or "webui-login").strip() + qr_endpoint = str(data.get("qrEndpoint") or "GetLoginQrCodeNewX").strip() + try: + auth_key = create_auth_key(base_url=base_url, admin_key=admin_key, days=days, remark=remark) + qr_data = get_login_qr(base_url=base_url, auth_key=auth_key, qr_endpoint=qr_endpoint) + clear_message_cache(auth_key) + update_session_state( + { + "baseUrl": base_url, + "authKey": auth_key, + "qrData": qr_data, + "ticket": "", + "qrEndpoint": qr_endpoint, + } + ) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response( + self, + 200, + { + "authKey": auth_key, + "qrEndpoint": qr_endpoint, + "qrData": qr_data, + }, + ) + return + + if parsed.path == "/api/login/wakeup": + auth_key = str(data.get("authKey") or "").strip() + if not auth_key: + json_response(self, 400, {"error": "authKey 不能为空"}) + return + try: + resp = api_request( + base_url=base_url, + method="POST", + path="/login/WakeUpLogin", + key=auth_key, + body={"Check": False, "Proxy": ""}, + ) + update_session_state({"baseUrl": base_url, "authKey": auth_key}) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, {"raw": resp}) + return + + if parsed.path == "/api/login/verify": + auth_key = str(data.get("authKey") or "").strip() + code = str(data.get("code") or "").strip() + ticket = str(data.get("ticket") or "").strip() + data62 = str(data.get("data62") or "").strip() + uuid = str(data.get("uuid") or "").strip() + if not auth_key: + json_response(self, 400, {"error": "authKey 不能为空"}) + return + if not code: + json_response(self, 400, {"error": "code 不能为空"}) + return + try: + resp = submit_verify( + base_url=base_url, + auth_key=auth_key, + code=code, + ticket=ticket, + data62=data62, + uuid=uuid, + ) + update_session_state({"baseUrl": base_url, "authKey": auth_key, "ticket": ticket}) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, {"raw": resp}) + return + + if parsed.path == "/api/login/ext-confirm/get": + auth_key = str(data.get("authKey") or "").strip() + url = str(data.get("url") or "").strip() + if not auth_key: + json_response(self, 400, {"error": "authKey 不能为空"}) + return + try: + resp = ext_device_confirm_get(base_url=base_url, auth_key=auth_key, url=url) + update_session_state({"baseUrl": base_url, "authKey": auth_key}) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, {"raw": resp}) + return + + if parsed.path == "/api/login/ext-confirm/ok": + auth_key = str(data.get("authKey") or "").strip() + url = str(data.get("url") or "").strip() + if not auth_key: + json_response(self, 400, {"error": "authKey 不能为空"}) + return + try: + resp = ext_device_confirm_ok(base_url=base_url, auth_key=auth_key, url=url) + update_session_state({"baseUrl": base_url, "authKey": auth_key}) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, {"raw": resp}) + return + + if parsed.path == "/api/login/clear": + session = get_session_state() + auth_key = str(data.get("authKey") or session.get("authKey") or "").strip() + current_base_url = str(data.get("baseUrl") or session.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL + logout_result: dict[str, Any] = {"attempted": False, "ok": False} + + if auth_key: + logout_result = try_logout(base_url=current_base_url, auth_key=auth_key) + + try: + next_state = clear_session_state() + clear_message_cache(auth_key or None) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + + json_response(self, 200, {"session": next_state, "logout": logout_result}) + return + + if parsed.path == "/api/session": + try: + next_state = update_session_state(data) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, {"session": next_state}) + return + + if parsed.path == "/api/session/clear": + old_auth_key = str(get_session_state().get("authKey") or "").strip() + try: + next_state = clear_session_state() + clear_message_cache(old_auth_key or None) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, {"session": next_state}) + return + + if parsed.path == "/api/message/poll": + auth_key = str(data.get("authKey") or "").strip() + count = int(data.get("count") or 30) + if not auth_key: + json_response(self, 400, {"error": "authKey 不能为空"}) + return + try: + result = poll_messages(base_url=base_url, auth_key=auth_key, count=count) + update_session_state({"baseUrl": base_url, "authKey": auth_key}) + except Exception as exc: # pragma: no cover + json_response(self, 500, {"error": str(exc)}) + return + json_response(self, 200, result) + return + + json_response(self, 404, {"error": "Not Found"}) + + +def main() -> None: + server = ThreadingHTTPServer((HOST, PORT), Handler) + print(f"[webui] listening on http://{HOST}:{PORT}") + print(f"[webui] default api base: {DEFAULT_BASE_URL}") + print(f"[webui] default admin key loaded: {bool(DEFAULT_ADMIN_KEY)}") + try: + server.serve_forever() + except KeyboardInterrupt: + pass + finally: + server.server_close() + + +if __name__ == "__main__": + main()