#!/usr/bin/env python3 """WeChatPadPro login/message web UI server.""" from __future__ import annotations import json import os import threading import time import urllib.error import urllib.parse import urllib.request from collections import deque from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any, Iterable ROOT_DIR = Path(__file__).resolve().parent INDEX_FILE = ROOT_DIR / "index.html" SESSION_FILE = ROOT_DIR / ".session.json" DEFAULT_BASE_URL = os.environ.get("WCPP_API_BASE", "http://127.0.0.1:18238") HOST = os.environ.get("WEBUI_HOST", "0.0.0.0") PORT = int(os.environ.get("WEBUI_PORT", "18239")) SESSION_LOCK = threading.Lock() MESSAGE_CACHE_LOCK = threading.Lock() SUPPORTED_QR_ENDPOINTS = { "GetLoginQrCodeNewX", "GetLoginQrCodePadX", "GetLoginQrCodeWin", "GetLoginQrCodeMac", "GetLoginQrCodeAndroidPad", "GetLoginQrCodeCar", "GetLoginQrCodeA16", "GetLoginQrCodeNew", } MESSAGE_CACHE_MAX_KEYS = 6000 MESSAGE_CACHE_RECENT_KEYS: dict[str, deque[str]] = {} MESSAGE_CACHE_RECENT_SET: dict[str, set[str]] = {} def load_admin_key() -> str: env_key = os.environ.get("WCPP_ADMIN_KEY", "").strip() if env_key: return env_key candidates = [ ROOT_DIR.parent / "deploy" / ".env", ROOT_DIR.parent / ".env", ] for env_file in candidates: if not env_file.exists(): continue text = env_file.read_text(encoding="utf-8", errors="ignore") for raw_line in text.splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) if key.strip() == "ADMIN_KEY" and value.strip(): return value.strip() return "" DEFAULT_ADMIN_KEY = load_admin_key() DEFAULT_SESSION_STATE: dict[str, Any] = { "baseUrl": DEFAULT_BASE_URL, "authKey": "", "ticket": "", "qrData": {}, "qrEndpoint": "GetLoginQrCodeNewX", "updatedAt": 0, } def _normalize_session(data: Any) -> dict[str, Any]: state = dict(DEFAULT_SESSION_STATE) if isinstance(data, dict): base_url = data.get("baseUrl") auth_key = data.get("authKey") ticket = data.get("ticket") qr_data = data.get("qrData") qr_endpoint = data.get("qrEndpoint") updated_at = data.get("updatedAt") if isinstance(base_url, str) and base_url.strip(): state["baseUrl"] = base_url.strip() if isinstance(auth_key, str): state["authKey"] = auth_key.strip() if isinstance(ticket, str): state["ticket"] = ticket.strip() if isinstance(qr_data, dict): state["qrData"] = qr_data if isinstance(qr_endpoint, str) and qr_endpoint.strip() in SUPPORTED_QR_ENDPOINTS: state["qrEndpoint"] = qr_endpoint.strip() if isinstance(updated_at, (int, float)): state["updatedAt"] = int(updated_at) return state def _write_session_file(state: dict[str, Any]) -> None: tmp = SESSION_FILE.with_suffix(".json.tmp") tmp.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8") tmp.replace(SESSION_FILE) def load_session_state() -> dict[str, Any]: if not SESSION_FILE.exists(): return dict(DEFAULT_SESSION_STATE) try: raw = json.loads(SESSION_FILE.read_text(encoding="utf-8")) except Exception: return dict(DEFAULT_SESSION_STATE) return _normalize_session(raw) SESSION_STATE: dict[str, Any] = load_session_state() def get_session_state() -> dict[str, Any]: with SESSION_LOCK: return dict(SESSION_STATE) def update_session_state(patch: dict[str, Any]) -> dict[str, Any]: with SESSION_LOCK: merged = dict(SESSION_STATE) for key in ("baseUrl", "authKey", "ticket", "qrData", "qrEndpoint"): if key in patch: merged[key] = patch[key] merged = _normalize_session(merged) merged["updatedAt"] = int(time.time()) SESSION_STATE.clear() SESSION_STATE.update(merged) _write_session_file(SESSION_STATE) return dict(SESSION_STATE) def clear_session_state() -> dict[str, Any]: with SESSION_LOCK: SESSION_STATE.clear() SESSION_STATE.update(dict(DEFAULT_SESSION_STATE)) SESSION_STATE["updatedAt"] = int(time.time()) _write_session_file(SESSION_STATE) return dict(SESSION_STATE) def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None: raw = json.dumps(payload, ensure_ascii=False).encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json; charset=utf-8") handler.send_header("Content-Length", str(len(raw))) handler.send_header("Cache-Control", "no-store") handler.send_header("Access-Control-Allow-Origin", "*") handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") handler.send_header("Access-Control-Allow-Headers", "Content-Type") handler.end_headers() handler.wfile.write(raw) def text_response(handler: BaseHTTPRequestHandler, status: int, content_type: str, payload: bytes) -> None: handler.send_response(status) handler.send_header("Content-Type", content_type) handler.send_header("Content-Length", str(len(payload))) handler.send_header("Cache-Control", "no-store") handler.send_header("Access-Control-Allow-Origin", "*") handler.end_headers() handler.wfile.write(payload) def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]: length = int(handler.headers.get("Content-Length", "0") or "0") if length == 0: return {} raw = handler.rfile.read(length) try: data = json.loads(raw.decode("utf-8")) except json.JSONDecodeError as exc: raise ValueError(f"请求体 JSON 无效: {exc}") from exc if not isinstance(data, dict): raise ValueError("请求体必须是 JSON 对象") return data def api_request( base_url: str, method: str, path: str, key: str | None = None, body: dict[str, Any] | None = None, timeout: int = 25, ) -> dict[str, Any]: base = base_url.rstrip("/") url = f"{base}{path}" if key: joiner = "&" if "?" in url else "?" url = f"{url}{joiner}key={urllib.parse.quote(key)}" payload = None headers: dict[str, str] = {} if body is not None: payload = json.dumps(body).encode("utf-8") headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=payload, method=method.upper(), headers=headers) try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"请求失败: {exc}") from exc def flatten_wrapped_value(value: Any) -> Any: if isinstance(value, dict): for name in ("str", "string", "String", "value", "Value"): if name in value and value[name] not in (None, ""): return value[name] return value def find_first_value(data: Any, keys: Iterable[str]) -> Any: targets = {k.lower() for k in keys} queue: list[Any] = [data] while queue: current = queue.pop(0) if isinstance(current, dict): for k, v in current.items(): if k.lower() in targets: found = flatten_wrapped_value(v) if found not in (None, "", [], {}): return found queue.append(v) elif isinstance(current, list): queue.extend(current) return None def to_display_value(value: Any) -> Any: value = flatten_wrapped_value(value) if isinstance(value, (dict, list)): try: return json.dumps(value, ensure_ascii=False) except Exception: return str(value) return value def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]: add_msgs = item.get("AddMsgs") if isinstance(add_msgs, list): for msg in add_msgs: if isinstance(msg, dict): return msg return item def summarize_message(item: Any) -> dict[str, Any]: if not isinstance(item, dict): return {"content": str(item)} primary = pick_primary_message(item) outer_type = find_first_value(item, ["Type"]) inner_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"]) msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"]) from_user = find_first_value( primary, ["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"], ) to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"]) create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"]) content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"]) if content in (None, "", {}, []): # 部分系统消息正文在 msg_source 里 content = find_first_value(primary, ["msg_source", "MsgSource"]) return { "id": to_display_value(msg_id), "type": to_display_value(inner_type if inner_type not in (None, "") else outer_type), "from": to_display_value(from_user), "to": to_display_value(to_user), "time": to_display_value(create_time), "content": to_display_value(content), } def build_message_dedupe_key(summary: dict[str, Any], raw_item: Any) -> str: msg_id = summary.get("id") msg_type = summary.get("type") if msg_id not in (None, "", "-"): return f"id:{msg_id}|type:{msg_type}" if isinstance(raw_item, dict): fallback = find_first_value(raw_item, ["new_msg_id", "msg_id", "NewMsgId", "MsgId"]) if fallback not in (None, "", "-"): return f"id:{fallback}|type:{msg_type}" return "|".join( [ f"type:{msg_type}", f"from:{summary.get('from')}", f"to:{summary.get('to')}", f"time:{summary.get('time')}", f"content:{str(summary.get('content') or '')[:120]}", ] ) def seen_message_before(auth_key: str, dedupe_key: str) -> bool: with MESSAGE_CACHE_LOCK: msg_set = MESSAGE_CACHE_RECENT_SET.setdefault(auth_key, set()) if dedupe_key in msg_set: return True msg_queue = MESSAGE_CACHE_RECENT_KEYS.setdefault(auth_key, deque()) msg_queue.append(dedupe_key) msg_set.add(dedupe_key) while len(msg_queue) > MESSAGE_CACHE_MAX_KEYS: old = msg_queue.popleft() msg_set.discard(old) return False def clear_message_cache(auth_key: str | None = None) -> None: with MESSAGE_CACHE_LOCK: if auth_key: MESSAGE_CACHE_RECENT_KEYS.pop(auth_key, None) MESSAGE_CACHE_RECENT_SET.pop(auth_key, None) return MESSAGE_CACHE_RECENT_KEYS.clear() MESSAGE_CACHE_RECENT_SET.clear() def create_auth_key(base_url: str, admin_key: str, days: int, remark: str) -> str: resp = api_request( base_url=base_url, method="POST", path="/admin/GenAuthKey1", key=admin_key, body={"Count": 1, "Days": days, "Remark": remark}, ) if resp.get("Code") != 200: raise RuntimeError(f"生成授权码失败: {json.dumps(resp, ensure_ascii=False)}") data = resp.get("Data") if isinstance(data, dict): keys = data.get("authKeys") or data.get("AuthKeys") if isinstance(keys, list) and keys: return str(keys[0]) raise RuntimeError("授权码响应结构异常") def get_login_qr(base_url: str, auth_key: str, qr_endpoint: str = "GetLoginQrCodeNewX") -> dict[str, Any]: endpoint = qr_endpoint.strip() if qr_endpoint else "GetLoginQrCodeNewX" if endpoint not in SUPPORTED_QR_ENDPOINTS: raise RuntimeError(f"不支持的二维码接口: {endpoint}") resp = api_request( base_url=base_url, method="POST", path=f"/login/{endpoint}", key=auth_key, body={"Check": False, "Proxy": ""}, ) if resp.get("Code") != 200: raise RuntimeError(f"获取二维码失败: {json.dumps(resp, ensure_ascii=False)}") data = resp.get("Data") if not isinstance(data, dict): raise RuntimeError("二维码响应结构异常") return data def get_status_bundle(base_url: str, auth_key: str) -> dict[str, Any]: check = api_request(base_url=base_url, method="GET", path="/login/CheckLoginStatus", key=auth_key) online = api_request(base_url=base_url, method="GET", path="/login/GetLoginStatus", key=auth_key) return { "check": check, "online": online, "isOnline": online.get("Code") == 200, } def submit_verify(base_url: str, auth_key: str, code: str, ticket: str, data62: str, uuid: str) -> dict[str, Any]: payload: dict[str, Any] = {"Code": code} if ticket: payload["Ticket"] = ticket if data62: payload["Data62"] = data62 if uuid: payload["Uuid"] = uuid return api_request( base_url=base_url, method="POST", path="/login/YPayVerificationcode", key=auth_key, body=payload, ) def poll_messages(base_url: str, auth_key: str, count: int) -> dict[str, Any]: raw = api_request( base_url=base_url, method="POST", path="/message/HttpSyncMsg", key=auth_key, body={"Count": count}, ) data = raw.get("Data") items: list[dict[str, Any]] = [] batch_seen: set[str] = set() if isinstance(data, list): for item in data: summary = summarize_message(item) dedupe_key = build_message_dedupe_key(summary, item) if dedupe_key in batch_seen: continue batch_seen.add(dedupe_key) if seen_message_before(auth_key, dedupe_key): continue summary["raw"] = item items.append(summary) return { "raw": raw, "items": items, "received": len(items), } def ext_device_confirm_get(base_url: str, auth_key: str, url: str) -> dict[str, Any]: payload: dict[str, Any] = {} if url: payload["Url"] = url return api_request( base_url=base_url, method="POST", path="/login/ExtDeviceLoginConfirmGet", key=auth_key, body=payload, ) def ext_device_confirm_ok(base_url: str, auth_key: str, url: str) -> dict[str, Any]: payload: dict[str, Any] = {} if url: payload["Url"] = url return api_request( base_url=base_url, method="POST", path="/login/ExtDeviceLoginConfirmOk", key=auth_key, body=payload, ) def try_logout(base_url: str, auth_key: str) -> dict[str, Any]: attempts: list[dict[str, Any]] = [] candidates: list[tuple[str, str, dict[str, Any] | None]] = [ ("POST", "/login/LogOut", {}), ("GET", "/login/LogOut", None), ("POST", "/login/Logout", {}), ("GET", "/login/Logout", None), ] for method, path, body in candidates: try: resp = api_request(base_url=base_url, method=method, path=path, key=auth_key, body=body) attempts.append({"method": method, "path": path, "ok": True, "raw": resp}) code = resp.get("Code") if isinstance(resp, dict) else None if code in (None, 200, 300, -2, -3): return { "attempted": True, "ok": True, "method": method, "path": path, "raw": resp, "attempts": attempts, } except Exception as exc: attempts.append({"method": method, "path": path, "ok": False, "error": str(exc)}) last_error = next((item["error"] for item in reversed(attempts) if not item.get("ok")), "logout not supported") return {"attempted": True, "ok": False, "error": last_error, "attempts": attempts} class Handler(BaseHTTPRequestHandler): server_version = "WCPPWebUI/1.0" def log_message(self, fmt: str, *args: Any) -> None: print(f"[webui] {self.address_string()} - {fmt % args}") def do_OPTIONS(self) -> None: self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path in {"/", "/index.html"}: if not INDEX_FILE.exists(): json_response(self, 500, {"error": "index.html 不存在"}) return payload = INDEX_FILE.read_bytes() text_response(self, 200, "text/html; charset=utf-8", payload) return if parsed.path == "/health": json_response(self, 200, {"ok": True}) return if parsed.path == "/api/config": json_response( self, 200, { "baseUrl": DEFAULT_BASE_URL, "hasDefaultAdminKey": bool(DEFAULT_ADMIN_KEY), "hasSavedSession": bool(get_session_state().get("authKey")), "qrEndpoints": sorted(SUPPORTED_QR_ENDPOINTS), }, ) return if parsed.path == "/api/session": json_response(self, 200, {"session": get_session_state()}) return if parsed.path == "/api/login/status": query = urllib.parse.parse_qs(parsed.query) auth_key = (query.get("authKey") or [""])[0].strip() base_url = (query.get("baseUrl") or [DEFAULT_BASE_URL])[0].strip() or DEFAULT_BASE_URL if not auth_key: json_response(self, 400, {"error": "authKey 不能为空"}) return try: result = get_status_bundle(base_url=base_url, auth_key=auth_key) update_session_state({"baseUrl": base_url, "authKey": auth_key}) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, result) return json_response(self, 404, {"error": "Not Found"}) def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) try: data = parse_json_body(self) except ValueError as exc: json_response(self, 400, {"error": str(exc)}) return base_url = str(data.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL if parsed.path == "/api/login/start": admin_key = str(data.get("adminKey") or DEFAULT_ADMIN_KEY).strip() if not admin_key: json_response(self, 400, {"error": "adminKey 不能为空(也未找到默认 ADMIN_KEY)"}) return days = int(data.get("days") or 30) remark = str(data.get("remark") or "webui-login").strip() qr_endpoint = str(data.get("qrEndpoint") or "GetLoginQrCodeNewX").strip() try: auth_key = create_auth_key(base_url=base_url, admin_key=admin_key, days=days, remark=remark) qr_data = get_login_qr(base_url=base_url, auth_key=auth_key, qr_endpoint=qr_endpoint) clear_message_cache(auth_key) update_session_state( { "baseUrl": base_url, "authKey": auth_key, "qrData": qr_data, "ticket": "", "qrEndpoint": qr_endpoint, } ) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response( self, 200, { "authKey": auth_key, "qrEndpoint": qr_endpoint, "qrData": qr_data, }, ) return if parsed.path == "/api/login/wakeup": auth_key = str(data.get("authKey") or "").strip() if not auth_key: json_response(self, 400, {"error": "authKey 不能为空"}) return try: resp = api_request( base_url=base_url, method="POST", path="/login/WakeUpLogin", key=auth_key, body={"Check": False, "Proxy": ""}, ) update_session_state({"baseUrl": base_url, "authKey": auth_key}) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"raw": resp}) return if parsed.path == "/api/login/verify": auth_key = str(data.get("authKey") or "").strip() code = str(data.get("code") or "").strip() ticket = str(data.get("ticket") or "").strip() data62 = str(data.get("data62") or "").strip() uuid = str(data.get("uuid") or "").strip() if not auth_key: json_response(self, 400, {"error": "authKey 不能为空"}) return if not code: json_response(self, 400, {"error": "code 不能为空"}) return try: resp = submit_verify( base_url=base_url, auth_key=auth_key, code=code, ticket=ticket, data62=data62, uuid=uuid, ) update_session_state({"baseUrl": base_url, "authKey": auth_key, "ticket": ticket}) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"raw": resp}) return if parsed.path == "/api/login/ext-confirm/get": auth_key = str(data.get("authKey") or "").strip() url = str(data.get("url") or "").strip() if not auth_key: json_response(self, 400, {"error": "authKey 不能为空"}) return try: resp = ext_device_confirm_get(base_url=base_url, auth_key=auth_key, url=url) update_session_state({"baseUrl": base_url, "authKey": auth_key}) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"raw": resp}) return if parsed.path == "/api/login/ext-confirm/ok": auth_key = str(data.get("authKey") or "").strip() url = str(data.get("url") or "").strip() if not auth_key: json_response(self, 400, {"error": "authKey 不能为空"}) return try: resp = ext_device_confirm_ok(base_url=base_url, auth_key=auth_key, url=url) update_session_state({"baseUrl": base_url, "authKey": auth_key}) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"raw": resp}) return if parsed.path == "/api/login/clear": session = get_session_state() auth_key = str(data.get("authKey") or session.get("authKey") or "").strip() current_base_url = str(data.get("baseUrl") or session.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL logout_result: dict[str, Any] = {"attempted": False, "ok": False} if auth_key: logout_result = try_logout(base_url=current_base_url, auth_key=auth_key) try: next_state = clear_session_state() clear_message_cache(auth_key or None) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"session": next_state, "logout": logout_result}) return if parsed.path == "/api/session": try: next_state = update_session_state(data) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"session": next_state}) return if parsed.path == "/api/session/clear": old_auth_key = str(get_session_state().get("authKey") or "").strip() try: next_state = clear_session_state() clear_message_cache(old_auth_key or None) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, {"session": next_state}) return if parsed.path == "/api/message/poll": auth_key = str(data.get("authKey") or "").strip() count = int(data.get("count") or 30) if not auth_key: json_response(self, 400, {"error": "authKey 不能为空"}) return try: result = poll_messages(base_url=base_url, auth_key=auth_key, count=count) update_session_state({"baseUrl": base_url, "authKey": auth_key}) except Exception as exc: # pragma: no cover json_response(self, 500, {"error": str(exc)}) return json_response(self, 200, result) return json_response(self, 404, {"error": "Not Found"}) def main() -> None: server = ThreadingHTTPServer((HOST, PORT), Handler) print(f"[webui] listening on http://{HOST}:{PORT}") print(f"[webui] default api base: {DEFAULT_BASE_URL}") print(f"[webui] default admin key loaded: {bool(DEFAULT_ADMIN_KEY)}") try: server.serve_forever() except KeyboardInterrupt: pass finally: server.server_close() if __name__ == "__main__": main()