Files
xb/wechat-webui/server.py
2026-02-27 15:24:20 +08:00

744 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""WeChatPadPro login/message web UI server."""
from __future__ import annotations
import json
import os
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from collections import deque
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, Iterable
ROOT_DIR = Path(__file__).resolve().parent
INDEX_FILE = ROOT_DIR / "index.html"
SESSION_FILE = ROOT_DIR / ".session.json"
DEFAULT_BASE_URL = os.environ.get("WCPP_API_BASE", "http://127.0.0.1:18238")
HOST = os.environ.get("WEBUI_HOST", "0.0.0.0")
PORT = int(os.environ.get("WEBUI_PORT", "18239"))
SESSION_LOCK = threading.Lock()
MESSAGE_CACHE_LOCK = threading.Lock()
SUPPORTED_QR_ENDPOINTS = {
"GetLoginQrCodeNewX",
"GetLoginQrCodePadX",
"GetLoginQrCodeWin",
"GetLoginQrCodeMac",
"GetLoginQrCodeAndroidPad",
"GetLoginQrCodeCar",
"GetLoginQrCodeA16",
"GetLoginQrCodeNew",
}
MESSAGE_CACHE_MAX_KEYS = 6000
MESSAGE_CACHE_RECENT_KEYS: dict[str, deque[str]] = {}
MESSAGE_CACHE_RECENT_SET: dict[str, set[str]] = {}
def load_admin_key() -> str:
env_key = os.environ.get("WCPP_ADMIN_KEY", "").strip()
if env_key:
return env_key
candidates = [
ROOT_DIR.parent / "deploy" / ".env",
ROOT_DIR.parent / ".env",
]
for env_file in candidates:
if not env_file.exists():
continue
text = env_file.read_text(encoding="utf-8", errors="ignore")
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
if key.strip() == "ADMIN_KEY" and value.strip():
return value.strip()
return ""
DEFAULT_ADMIN_KEY = load_admin_key()
DEFAULT_SESSION_STATE: dict[str, Any] = {
"baseUrl": DEFAULT_BASE_URL,
"authKey": "",
"ticket": "",
"qrData": {},
"qrEndpoint": "GetLoginQrCodeNewX",
"updatedAt": 0,
}
def _normalize_session(data: Any) -> dict[str, Any]:
state = dict(DEFAULT_SESSION_STATE)
if isinstance(data, dict):
base_url = data.get("baseUrl")
auth_key = data.get("authKey")
ticket = data.get("ticket")
qr_data = data.get("qrData")
qr_endpoint = data.get("qrEndpoint")
updated_at = data.get("updatedAt")
if isinstance(base_url, str) and base_url.strip():
state["baseUrl"] = base_url.strip()
if isinstance(auth_key, str):
state["authKey"] = auth_key.strip()
if isinstance(ticket, str):
state["ticket"] = ticket.strip()
if isinstance(qr_data, dict):
state["qrData"] = qr_data
if isinstance(qr_endpoint, str) and qr_endpoint.strip() in SUPPORTED_QR_ENDPOINTS:
state["qrEndpoint"] = qr_endpoint.strip()
if isinstance(updated_at, (int, float)):
state["updatedAt"] = int(updated_at)
return state
def _write_session_file(state: dict[str, Any]) -> None:
tmp = SESSION_FILE.with_suffix(".json.tmp")
tmp.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
tmp.replace(SESSION_FILE)
def load_session_state() -> dict[str, Any]:
if not SESSION_FILE.exists():
return dict(DEFAULT_SESSION_STATE)
try:
raw = json.loads(SESSION_FILE.read_text(encoding="utf-8"))
except Exception:
return dict(DEFAULT_SESSION_STATE)
return _normalize_session(raw)
SESSION_STATE: dict[str, Any] = load_session_state()
def get_session_state() -> dict[str, Any]:
with SESSION_LOCK:
return dict(SESSION_STATE)
def update_session_state(patch: dict[str, Any]) -> dict[str, Any]:
with SESSION_LOCK:
merged = dict(SESSION_STATE)
for key in ("baseUrl", "authKey", "ticket", "qrData", "qrEndpoint"):
if key in patch:
merged[key] = patch[key]
merged = _normalize_session(merged)
merged["updatedAt"] = int(time.time())
SESSION_STATE.clear()
SESSION_STATE.update(merged)
_write_session_file(SESSION_STATE)
return dict(SESSION_STATE)
def clear_session_state() -> dict[str, Any]:
with SESSION_LOCK:
SESSION_STATE.clear()
SESSION_STATE.update(dict(DEFAULT_SESSION_STATE))
SESSION_STATE["updatedAt"] = int(time.time())
_write_session_file(SESSION_STATE)
return dict(SESSION_STATE)
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(raw)))
handler.send_header("Cache-Control", "no-store")
handler.send_header("Access-Control-Allow-Origin", "*")
handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
handler.send_header("Access-Control-Allow-Headers", "Content-Type")
handler.end_headers()
handler.wfile.write(raw)
def text_response(handler: BaseHTTPRequestHandler, status: int, content_type: str, payload: bytes) -> None:
handler.send_response(status)
handler.send_header("Content-Type", content_type)
handler.send_header("Content-Length", str(len(payload)))
handler.send_header("Cache-Control", "no-store")
handler.send_header("Access-Control-Allow-Origin", "*")
handler.end_headers()
handler.wfile.write(payload)
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
length = int(handler.headers.get("Content-Length", "0") or "0")
if length == 0:
return {}
raw = handler.rfile.read(length)
try:
data = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as exc:
raise ValueError(f"请求体 JSON 无效: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("请求体必须是 JSON 对象")
return data
def api_request(
base_url: str,
method: str,
path: str,
key: str | None = None,
body: dict[str, Any] | None = None,
timeout: int = 25,
) -> dict[str, Any]:
base = base_url.rstrip("/")
url = f"{base}{path}"
if key:
joiner = "&" if "?" in url else "?"
url = f"{url}{joiner}key={urllib.parse.quote(key)}"
payload = None
headers: dict[str, str] = {}
if body is not None:
payload = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=payload, method=method.upper(), headers=headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"请求失败: {exc}") from exc
def flatten_wrapped_value(value: Any) -> Any:
if isinstance(value, dict):
for name in ("str", "string", "String", "value", "Value"):
if name in value and value[name] not in (None, ""):
return value[name]
return value
def find_first_value(data: Any, keys: Iterable[str]) -> Any:
targets = {k.lower() for k in keys}
queue: list[Any] = [data]
while queue:
current = queue.pop(0)
if isinstance(current, dict):
for k, v in current.items():
if k.lower() in targets:
found = flatten_wrapped_value(v)
if found not in (None, "", [], {}):
return found
queue.append(v)
elif isinstance(current, list):
queue.extend(current)
return None
def to_display_value(value: Any) -> Any:
value = flatten_wrapped_value(value)
if isinstance(value, (dict, list)):
try:
return json.dumps(value, ensure_ascii=False)
except Exception:
return str(value)
return value
def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]:
add_msgs = item.get("AddMsgs")
if isinstance(add_msgs, list):
for msg in add_msgs:
if isinstance(msg, dict):
return msg
return item
def summarize_message(item: Any) -> dict[str, Any]:
if not isinstance(item, dict):
return {"content": str(item)}
primary = pick_primary_message(item)
outer_type = find_first_value(item, ["Type"])
inner_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"])
msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"])
from_user = find_first_value(
primary,
["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"],
)
to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"])
create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"])
content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"])
if content in (None, "", {}, []):
# 部分系统消息正文在 msg_source 里
content = find_first_value(primary, ["msg_source", "MsgSource"])
return {
"id": to_display_value(msg_id),
"type": to_display_value(inner_type if inner_type not in (None, "") else outer_type),
"from": to_display_value(from_user),
"to": to_display_value(to_user),
"time": to_display_value(create_time),
"content": to_display_value(content),
}
def build_message_dedupe_key(summary: dict[str, Any], raw_item: Any) -> str:
msg_id = summary.get("id")
msg_type = summary.get("type")
if msg_id not in (None, "", "-"):
return f"id:{msg_id}|type:{msg_type}"
if isinstance(raw_item, dict):
fallback = find_first_value(raw_item, ["new_msg_id", "msg_id", "NewMsgId", "MsgId"])
if fallback not in (None, "", "-"):
return f"id:{fallback}|type:{msg_type}"
return "|".join(
[
f"type:{msg_type}",
f"from:{summary.get('from')}",
f"to:{summary.get('to')}",
f"time:{summary.get('time')}",
f"content:{str(summary.get('content') or '')[:120]}",
]
)
def seen_message_before(auth_key: str, dedupe_key: str) -> bool:
with MESSAGE_CACHE_LOCK:
msg_set = MESSAGE_CACHE_RECENT_SET.setdefault(auth_key, set())
if dedupe_key in msg_set:
return True
msg_queue = MESSAGE_CACHE_RECENT_KEYS.setdefault(auth_key, deque())
msg_queue.append(dedupe_key)
msg_set.add(dedupe_key)
while len(msg_queue) > MESSAGE_CACHE_MAX_KEYS:
old = msg_queue.popleft()
msg_set.discard(old)
return False
def clear_message_cache(auth_key: str | None = None) -> None:
with MESSAGE_CACHE_LOCK:
if auth_key:
MESSAGE_CACHE_RECENT_KEYS.pop(auth_key, None)
MESSAGE_CACHE_RECENT_SET.pop(auth_key, None)
return
MESSAGE_CACHE_RECENT_KEYS.clear()
MESSAGE_CACHE_RECENT_SET.clear()
def create_auth_key(base_url: str, admin_key: str, days: int, remark: str) -> str:
resp = api_request(
base_url=base_url,
method="POST",
path="/admin/GenAuthKey1",
key=admin_key,
body={"Count": 1, "Days": days, "Remark": remark},
)
if resp.get("Code") != 200:
raise RuntimeError(f"生成授权码失败: {json.dumps(resp, ensure_ascii=False)}")
data = resp.get("Data")
if isinstance(data, dict):
keys = data.get("authKeys") or data.get("AuthKeys")
if isinstance(keys, list) and keys:
return str(keys[0])
raise RuntimeError("授权码响应结构异常")
def get_login_qr(base_url: str, auth_key: str, qr_endpoint: str = "GetLoginQrCodeNewX") -> dict[str, Any]:
endpoint = qr_endpoint.strip() if qr_endpoint else "GetLoginQrCodeNewX"
if endpoint not in SUPPORTED_QR_ENDPOINTS:
raise RuntimeError(f"不支持的二维码接口: {endpoint}")
resp = api_request(
base_url=base_url,
method="POST",
path=f"/login/{endpoint}",
key=auth_key,
body={"Check": False, "Proxy": ""},
)
if resp.get("Code") != 200:
raise RuntimeError(f"获取二维码失败: {json.dumps(resp, ensure_ascii=False)}")
data = resp.get("Data")
if not isinstance(data, dict):
raise RuntimeError("二维码响应结构异常")
return data
def get_status_bundle(base_url: str, auth_key: str) -> dict[str, Any]:
check = api_request(base_url=base_url, method="GET", path="/login/CheckLoginStatus", key=auth_key)
online = api_request(base_url=base_url, method="GET", path="/login/GetLoginStatus", key=auth_key)
return {
"check": check,
"online": online,
"isOnline": online.get("Code") == 200,
}
def submit_verify(base_url: str, auth_key: str, code: str, ticket: str, data62: str, uuid: str) -> dict[str, Any]:
payload: dict[str, Any] = {"Code": code}
if ticket:
payload["Ticket"] = ticket
if data62:
payload["Data62"] = data62
if uuid:
payload["Uuid"] = uuid
return api_request(
base_url=base_url,
method="POST",
path="/login/YPayVerificationcode",
key=auth_key,
body=payload,
)
def poll_messages(base_url: str, auth_key: str, count: int) -> dict[str, Any]:
raw = api_request(
base_url=base_url,
method="POST",
path="/message/HttpSyncMsg",
key=auth_key,
body={"Count": count},
)
data = raw.get("Data")
items: list[dict[str, Any]] = []
batch_seen: set[str] = set()
if isinstance(data, list):
for item in data:
summary = summarize_message(item)
dedupe_key = build_message_dedupe_key(summary, item)
if dedupe_key in batch_seen:
continue
batch_seen.add(dedupe_key)
if seen_message_before(auth_key, dedupe_key):
continue
summary["raw"] = item
items.append(summary)
return {
"raw": raw,
"items": items,
"received": len(items),
}
def ext_device_confirm_get(base_url: str, auth_key: str, url: str) -> dict[str, Any]:
payload: dict[str, Any] = {}
if url:
payload["Url"] = url
return api_request(
base_url=base_url,
method="POST",
path="/login/ExtDeviceLoginConfirmGet",
key=auth_key,
body=payload,
)
def ext_device_confirm_ok(base_url: str, auth_key: str, url: str) -> dict[str, Any]:
payload: dict[str, Any] = {}
if url:
payload["Url"] = url
return api_request(
base_url=base_url,
method="POST",
path="/login/ExtDeviceLoginConfirmOk",
key=auth_key,
body=payload,
)
def try_logout(base_url: str, auth_key: str) -> dict[str, Any]:
attempts: list[dict[str, Any]] = []
candidates: list[tuple[str, str, dict[str, Any] | None]] = [
("POST", "/login/LogOut", {}),
("GET", "/login/LogOut", None),
("POST", "/login/Logout", {}),
("GET", "/login/Logout", None),
]
for method, path, body in candidates:
try:
resp = api_request(base_url=base_url, method=method, path=path, key=auth_key, body=body)
attempts.append({"method": method, "path": path, "ok": True, "raw": resp})
code = resp.get("Code") if isinstance(resp, dict) else None
if code in (None, 200, 300, -2, -3):
return {
"attempted": True,
"ok": True,
"method": method,
"path": path,
"raw": resp,
"attempts": attempts,
}
except Exception as exc:
attempts.append({"method": method, "path": path, "ok": False, "error": str(exc)})
last_error = next((item["error"] for item in reversed(attempts) if not item.get("ok")), "logout not supported")
return {"attempted": True, "ok": False, "error": last_error, "attempts": attempts}
class Handler(BaseHTTPRequestHandler):
server_version = "WCPPWebUI/1.0"
def log_message(self, fmt: str, *args: Any) -> None:
print(f"[webui] {self.address_string()} - {fmt % args}")
def do_OPTIONS(self) -> None:
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path in {"/", "/index.html"}:
if not INDEX_FILE.exists():
json_response(self, 500, {"error": "index.html 不存在"})
return
payload = INDEX_FILE.read_bytes()
text_response(self, 200, "text/html; charset=utf-8", payload)
return
if parsed.path == "/health":
json_response(self, 200, {"ok": True})
return
if parsed.path == "/api/config":
json_response(
self,
200,
{
"baseUrl": DEFAULT_BASE_URL,
"hasDefaultAdminKey": bool(DEFAULT_ADMIN_KEY),
"hasSavedSession": bool(get_session_state().get("authKey")),
"qrEndpoints": sorted(SUPPORTED_QR_ENDPOINTS),
},
)
return
if parsed.path == "/api/session":
json_response(self, 200, {"session": get_session_state()})
return
if parsed.path == "/api/login/status":
query = urllib.parse.parse_qs(parsed.query)
auth_key = (query.get("authKey") or [""])[0].strip()
base_url = (query.get("baseUrl") or [DEFAULT_BASE_URL])[0].strip() or DEFAULT_BASE_URL
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
result = get_status_bundle(base_url=base_url, auth_key=auth_key)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, result)
return
json_response(self, 404, {"error": "Not Found"})
def do_POST(self) -> None:
parsed = urllib.parse.urlparse(self.path)
try:
data = parse_json_body(self)
except ValueError as exc:
json_response(self, 400, {"error": str(exc)})
return
base_url = str(data.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL
if parsed.path == "/api/login/start":
admin_key = str(data.get("adminKey") or DEFAULT_ADMIN_KEY).strip()
if not admin_key:
json_response(self, 400, {"error": "adminKey 不能为空(也未找到默认 ADMIN_KEY"})
return
days = int(data.get("days") or 30)
remark = str(data.get("remark") or "webui-login").strip()
qr_endpoint = str(data.get("qrEndpoint") or "GetLoginQrCodeNewX").strip()
try:
auth_key = create_auth_key(base_url=base_url, admin_key=admin_key, days=days, remark=remark)
qr_data = get_login_qr(base_url=base_url, auth_key=auth_key, qr_endpoint=qr_endpoint)
clear_message_cache(auth_key)
update_session_state(
{
"baseUrl": base_url,
"authKey": auth_key,
"qrData": qr_data,
"ticket": "",
"qrEndpoint": qr_endpoint,
}
)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(
self,
200,
{
"authKey": auth_key,
"qrEndpoint": qr_endpoint,
"qrData": qr_data,
},
)
return
if parsed.path == "/api/login/wakeup":
auth_key = str(data.get("authKey") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
resp = api_request(
base_url=base_url,
method="POST",
path="/login/WakeUpLogin",
key=auth_key,
body={"Check": False, "Proxy": ""},
)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/verify":
auth_key = str(data.get("authKey") or "").strip()
code = str(data.get("code") or "").strip()
ticket = str(data.get("ticket") or "").strip()
data62 = str(data.get("data62") or "").strip()
uuid = str(data.get("uuid") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
if not code:
json_response(self, 400, {"error": "code 不能为空"})
return
try:
resp = submit_verify(
base_url=base_url,
auth_key=auth_key,
code=code,
ticket=ticket,
data62=data62,
uuid=uuid,
)
update_session_state({"baseUrl": base_url, "authKey": auth_key, "ticket": ticket})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/ext-confirm/get":
auth_key = str(data.get("authKey") or "").strip()
url = str(data.get("url") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
resp = ext_device_confirm_get(base_url=base_url, auth_key=auth_key, url=url)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/ext-confirm/ok":
auth_key = str(data.get("authKey") or "").strip()
url = str(data.get("url") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
resp = ext_device_confirm_ok(base_url=base_url, auth_key=auth_key, url=url)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/clear":
session = get_session_state()
auth_key = str(data.get("authKey") or session.get("authKey") or "").strip()
current_base_url = str(data.get("baseUrl") or session.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL
logout_result: dict[str, Any] = {"attempted": False, "ok": False}
if auth_key:
logout_result = try_logout(base_url=current_base_url, auth_key=auth_key)
try:
next_state = clear_session_state()
clear_message_cache(auth_key or None)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"session": next_state, "logout": logout_result})
return
if parsed.path == "/api/session":
try:
next_state = update_session_state(data)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"session": next_state})
return
if parsed.path == "/api/session/clear":
old_auth_key = str(get_session_state().get("authKey") or "").strip()
try:
next_state = clear_session_state()
clear_message_cache(old_auth_key or None)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"session": next_state})
return
if parsed.path == "/api/message/poll":
auth_key = str(data.get("authKey") or "").strip()
count = int(data.get("count") or 30)
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
result = poll_messages(base_url=base_url, auth_key=auth_key, count=count)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, result)
return
json_response(self, 404, {"error": "Not Found"})
def main() -> None:
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"[webui] listening on http://{HOST}:{PORT}")
print(f"[webui] default api base: {DEFAULT_BASE_URL}")
print(f"[webui] default admin key loaded: {bool(DEFAULT_ADMIN_KEY)}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
if __name__ == "__main__":
main()