feat: add wechat webui login server and page

This commit is contained in:
237899745
2026-02-27 15:24:20 +08:00
parent 0951732c7a
commit 5d5ddf0679
3 changed files with 1626 additions and 0 deletions

5
.gitignore vendored
View File

@@ -33,3 +33,8 @@ __pycache__/
# OS files
.DS_Store
Thumbs.db
# WeChat webui runtime/session artifacts
/wechat-webui/.session.json
/wechat-webui/webui.log
/wechat-webui/__pycache__/

878
wechat-webui/index.html Normal file
View File

@@ -0,0 +1,878 @@
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>WeChatPadPro 登录控制台</title>
<style>
:root {
--bg: #f3f6ff;
--card: #ffffff;
--line: #d9e1f6;
--text: #1f2a44;
--muted: #5f6b87;
--primary: #2356d8;
--primary-deep: #1b43ad;
--danger: #c9343f;
--ok: #14b86e;
--mono: "SFMono-Regular", Menlo, Consolas, monospace;
}
* {
box-sizing: border-box;
}
body {
margin: 0;
font-family: "PingFang SC", "Microsoft YaHei", "Noto Sans SC", sans-serif;
color: var(--text);
background:
radial-gradient(1200px 500px at 80% -20%, #d9e7ff 0%, transparent 70%),
radial-gradient(800px 300px at 10% -10%, #eef4ff 0%, transparent 72%),
var(--bg);
min-height: 100vh;
padding: 18px;
}
.shell {
max-width: 1180px;
margin: 0 auto;
display: grid;
gap: 14px;
}
.top {
display: flex;
justify-content: space-between;
gap: 12px;
align-items: center;
background: linear-gradient(120deg, #2448a5 0%, #2f66e3 100%);
border-radius: 14px;
padding: 16px 18px;
color: #fff;
box-shadow: 0 10px 24px rgba(35, 86, 216, 0.2);
}
.title {
margin: 0;
font-size: 20px;
font-weight: 700;
}
.subtitle {
margin: 5px 0 0;
font-size: 13px;
opacity: 0.9;
}
.status-pill {
background: rgba(255, 255, 255, 0.16);
border: 1px solid rgba(255, 255, 255, 0.24);
border-radius: 999px;
padding: 7px 12px;
font-size: 13px;
display: inline-flex;
align-items: center;
gap: 8px;
white-space: nowrap;
}
.dot {
width: 9px;
height: 9px;
border-radius: 50%;
background: #ffd25f;
box-shadow: 0 0 0 0 rgba(255, 210, 95, 0.6);
animation: pulse 1.5s infinite;
}
.dot.online {
background: var(--ok);
box-shadow: 0 0 0 0 rgba(20, 184, 110, 0.55);
}
@keyframes pulse {
0% {
box-shadow: 0 0 0 0 rgba(255, 255, 255, 0.35);
}
70% {
box-shadow: 0 0 0 9px rgba(255, 255, 255, 0);
}
100% {
box-shadow: 0 0 0 0 rgba(255, 255, 255, 0);
}
}
.grid {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 14px;
}
.panel {
background: var(--card);
border: 1px solid var(--line);
border-radius: 14px;
padding: 14px;
box-shadow: 0 7px 20px rgba(30, 68, 163, 0.06);
min-height: 100%;
}
.panel h2 {
margin: 0 0 12px;
font-size: 16px;
font-weight: 700;
}
.field {
display: grid;
gap: 6px;
}
label {
font-size: 13px;
color: var(--muted);
}
select,
textarea {
width: 100%;
border-radius: 10px;
border: 1px solid #cbd7f7;
background: #fbfdff;
padding: 10px 11px;
color: var(--text);
font-size: 14px;
outline: none;
transition: border-color 0.15s ease, box-shadow 0.15s ease;
}
select:focus,
textarea:focus {
border-color: var(--primary);
box-shadow: 0 0 0 3px rgba(35, 86, 216, 0.16);
}
.actions {
display: flex;
gap: 9px;
margin-top: 12px;
flex-wrap: wrap;
}
button {
border: none;
border-radius: 10px;
padding: 9px 14px;
font-size: 14px;
cursor: pointer;
transition: transform 0.06s ease, opacity 0.15s ease;
}
button:active {
transform: translateY(1px);
}
button:disabled {
cursor: not-allowed;
opacity: 0.58;
transform: none;
}
.btn-main {
color: #fff;
background: linear-gradient(120deg, var(--primary), var(--primary-deep));
}
.btn-danger {
color: #fff;
background: linear-gradient(120deg, #d73f4a, #b61e2a);
}
.qr-wrap {
border-radius: 12px;
border: 1px dashed #cad4f0;
min-height: 330px;
display: grid;
align-content: center;
justify-items: center;
gap: 8px;
background: #fcfdff;
padding: 12px;
}
.qr-img {
width: min(330px, 100%);
border: 1px solid #dde5f8;
border-radius: 12px;
background: #fff;
display: none;
}
.empty {
font-size: 13px;
color: var(--muted);
text-align: center;
line-height: 1.45;
}
.message-list {
display: grid;
gap: 9px;
max-height: 360px;
overflow: auto;
padding-right: 2px;
}
.message-card {
border-radius: 11px;
border: 1px solid #dbe4fa;
background: #fbfdff;
padding: 10px;
display: grid;
gap: 6px;
}
.message-meta {
font-family: var(--mono);
font-size: 12px;
color: #44506a;
display: flex;
flex-wrap: wrap;
gap: 8px;
}
.message-content {
font-size: 13px;
line-height: 1.42;
color: #0f172a;
white-space: pre-wrap;
word-break: break-word;
}
textarea {
min-height: 360px;
resize: vertical;
font-family: var(--mono);
font-size: 12px;
background: #f8fbff;
}
@media (max-width: 980px) {
.grid {
grid-template-columns: 1fr;
}
.top {
flex-direction: column;
align-items: flex-start;
}
.status-pill {
align-self: flex-start;
}
}
</style>
</head>
<body>
<main class="shell">
<section class="top">
<div>
<h1 class="title">WeChatPadPro 登录控制台</h1>
<p class="subtitle">只保留扫码登录,状态与消息自动轮询</p>
</div>
<div class="status-pill">
<span id="onlineDot" class="dot"></span>
<span id="onlineText">未连接</span>
</div>
</section>
<section class="grid">
<section class="panel">
<h2>1. 登录配置</h2>
<div class="field">
<label for="qrEndpoint">二维码类型</label>
<select id="qrEndpoint">
<option value="GetLoginQrCodeNewX">NewX默认</option>
<option value="GetLoginQrCodePadX">PadX</option>
<option value="GetLoginQrCodeWin">Win</option>
<option value="GetLoginQrCodeMac">Mac</option>
<option value="GetLoginQrCodeAndroidPad">AndroidPad</option>
<option value="GetLoginQrCodeCar">Car</option>
<option value="GetLoginQrCodeA16">A16</option>
<option value="GetLoginQrCodeNew">New</option>
</select>
</div>
<div class="actions">
<button id="startLoginBtn" class="btn-main">生成授权二维码</button>
<button id="clearLoginBtn" class="btn-danger">清除登录状态</button>
</div>
</section>
<section class="panel">
<h2>2. 扫码窗口</h2>
<div class="qr-wrap">
<img id="qrImg" class="qr-img" alt="二维码" />
<div id="qrHint" class="empty">点击“生成授权二维码”后在这里显示二维码</div>
</div>
</section>
</section>
<section class="panel">
<h2>状态日志</h2>
<textarea id="statusView" readonly></textarea>
</section>
</main>
<script>
const state = {
baseUrl: "http://127.0.0.1:18238",
authKey: "",
qrData: null,
isOnline: false,
statusKnown: false,
statusTimer: null,
messageTimer: null,
lastStatusSig: "",
lastStatusAlertSig: "",
lastStatusReqError: "",
seenMessageKeys: new Set(),
persistTimer: null
};
const STATUS_INTERVAL_MS = 2000;
const MESSAGE_INTERVAL_MS = 2000;
const MESSAGE_COUNT = 30;
const QR_HINT_DEFAULT = "点击“生成授权二维码”后在这里显示二维码";
const QR_HINT_ONLINE = "当前账号已登录,二维码已隐藏。掉线或清除登录状态后可重新生成。";
const QR_HINT_CHECKING = "检测到已保存会话,正在校验登录状态,请稍候。";
const QR_ENDPOINT_PRESET = [
{ value: "GetLoginQrCodeNewX", label: "NewX默认" },
{ value: "GetLoginQrCodePadX", label: "PadX" },
{ value: "GetLoginQrCodeWin", label: "Win" },
{ value: "GetLoginQrCodeMac", label: "Mac" },
{ value: "GetLoginQrCodeAndroidPad", label: "AndroidPad" },
{ value: "GetLoginQrCodeCar", label: "Car" },
{ value: "GetLoginQrCodeA16", label: "A16" },
{ value: "GetLoginQrCodeNew", label: "New" }
];
const el = (id) => document.getElementById(id);
function log(line) {
const area = el("statusView");
const stamp = new Date().toLocaleTimeString();
area.value = `[${stamp}] ${line}\n` + area.value;
}
async function api(path, method = "GET", payload = null) {
const options = { method, headers: {} };
if (payload) {
options.headers["Content-Type"] = "application/json";
options.body = JSON.stringify(payload);
}
const resp = await fetch(path, options);
let data;
try {
data = await resp.json();
} catch (err) {
throw new Error(`响应不是 JSON: ${resp.status}`);
}
if (!resp.ok) {
throw new Error(data.error || `请求失败: ${resp.status}`);
}
return data;
}
function setOnline(isOnline) {
state.isOnline = Boolean(isOnline);
const dot = el("onlineDot");
const text = el("onlineText");
if (state.isOnline) {
dot.classList.add("online");
text.textContent = "已在线";
} else {
dot.classList.remove("online");
text.textContent = "未在线";
}
refreshLoginUiByState();
}
function hideQr(hintText = QR_HINT_DEFAULT) {
const img = el("qrImg");
const hint = el("qrHint");
img.src = "";
img.style.display = "none";
hint.textContent = hintText;
}
function refreshLoginUiByState() {
const btn = el("startLoginBtn");
if (!btn) {
return;
}
if (state.authKey && !state.statusKnown) {
btn.disabled = true;
btn.textContent = "状态检测中...";
hideQr(QR_HINT_CHECKING);
return;
}
if (state.isOnline) {
btn.disabled = true;
btn.textContent = "当前已登录";
hideQr(QR_HINT_ONLINE);
return;
}
btn.disabled = false;
btn.textContent = "生成授权二维码";
if (state.qrData?.qrCodeBase64) {
renderQr(state.qrData);
} else {
hideQr(QR_HINT_DEFAULT);
}
}
function renderQr(qrData) {
const img = el("qrImg");
const hint = el("qrHint");
const base64 = qrData?.qrCodeBase64 || "";
if (!base64) {
hideQr(QR_HINT_DEFAULT);
return;
}
img.src = base64.startsWith("data:") ? base64 : `data:image/jpeg;base64,${base64}`;
img.style.display = "block";
hint.textContent = "请用微信扫码并在手机确认登录";
}
function clearMessageList() {
const list = el("messageList");
if (list) {
list.innerHTML = `<div class="empty">暂无消息</div>`;
}
state.seenMessageKeys.clear();
}
function appendMessages(items) {
if (!Array.isArray(items) || items.length === 0) {
return;
}
const list = el("messageList");
if (!list) {
return;
}
const firstEmpty = list.querySelector(".empty");
if (firstEmpty) {
firstEmpty.remove();
}
items.forEach((item) => {
const dedupeKey =
item.id && item.id !== "-"
? `id:${item.id}|type:${item.type ?? "-"}`
: [
item.id ?? "-",
item.type ?? "-",
item.time ?? "-",
item.from ?? "-",
item.to ?? "-",
String(item.content ?? "").slice(0, 120)
].join("|");
if (state.seenMessageKeys.has(dedupeKey)) {
return;
}
state.seenMessageKeys.add(dedupeKey);
if (state.seenMessageKeys.size > 3500) {
const first = state.seenMessageKeys.values().next().value;
state.seenMessageKeys.delete(first);
}
const card = document.createElement("article");
card.className = "message-card";
const meta = document.createElement("div");
meta.className = "message-meta";
meta.innerHTML =
`<span>id: ${item.id ?? "-"}</span>` +
`<span>type: ${item.type ?? "-"}</span>` +
`<span>from: ${item.from ?? "-"}</span>` +
`<span>to: ${item.to ?? "-"}</span>`;
const content = document.createElement("div");
content.className = "message-content";
content.textContent = item.content ?? "-";
card.appendChild(meta);
card.appendChild(content);
list.prepend(card);
});
}
function queuePersistSession() {
if (state.persistTimer) {
clearTimeout(state.persistTimer);
}
state.persistTimer = setTimeout(() => {
persistSession(true);
}, 250);
}
async function persistSession(silent = true) {
try {
await api("/api/session", "POST", {
baseUrl: state.baseUrl,
authKey: state.authKey,
qrData: state.qrData || {},
qrEndpoint: el("qrEndpoint").value
});
} catch (err) {
if (!silent) {
log(`保存会话失败: ${err.message}`);
}
}
}
function ensureStatusPolling() {
if (state.statusTimer) {
return;
}
state.statusTimer = setInterval(() => {
checkStatus(false);
}, STATUS_INTERVAL_MS);
log(`已启动自动状态轮询(${STATUS_INTERVAL_MS / 1000}秒)。`);
}
function ensureMessagePolling() {
if (state.messageTimer) {
return;
}
state.messageTimer = setInterval(() => {
pollMessages(false);
}, MESSAGE_INTERVAL_MS);
log(`已启动自动消息轮询(${MESSAGE_INTERVAL_MS / 1000}秒)。`);
}
function stopMessagePolling(silent = false) {
if (!state.messageTimer) {
return;
}
clearInterval(state.messageTimer);
state.messageTimer = null;
if (!silent) {
log("已停止自动消息轮询。");
}
}
function buildStatusSig(data) {
const checkCode = data?.check?.Code;
const checkState = data?.check?.Data?.state ?? "-";
const onlineCode = data?.online?.Code;
const onlineText = data?.online?.Text || "-";
return `${checkCode}|${checkState}|${onlineCode}|${onlineText}`;
}
function logStatusIfChanged(data) {
const sig = buildStatusSig(data);
if (sig === state.lastStatusSig) {
return;
}
state.lastStatusSig = sig;
const checkCode = data?.check?.Code;
const checkText = data?.check?.Text || "-";
const checkState = data?.check?.Data?.state;
const onlineCode = data?.online?.Code;
const onlineText = data?.online?.Text || "-";
log(
`状态: CheckLoginStatus=${checkCode}, state=${checkState ?? "-"}(${checkText}) | ` +
`GetLoginStatus=${onlineCode}(${onlineText})`
);
}
function reportStatusAlerts(data) {
if (state.isOnline) {
state.lastStatusAlertSig = "";
return;
}
const alerts = [];
const checkCode = Number(data?.check?.Code);
const checkState = data?.check?.Data?.state;
const checkText = String(data?.check?.Text || "").trim();
const onlineCode = Number(data?.online?.Code);
const onlineText = String(data?.online?.Text || "").trim();
if (onlineCode !== 200 && onlineText) {
alerts.push(`账号未在线: ${onlineCode}(${onlineText})`);
}
if (onlineText.includes("版本过低")) {
alerts.push("检测到版本过低请切换二维码类型Win/Mac/AndroidPad或升级后端组件。");
}
if (onlineText.includes("重新登录") || onlineText.includes("MMLoginStateNoLogin")) {
alerts.push("账号状态需要重新登录,请点击“生成授权二维码”重新扫码。");
}
if (checkCode === -3) {
alerts.push("当前流程需要验证码,若持续出现请改用支持验证码的登录流程。");
}
if (checkCode === 300 || checkCode === -2) {
alerts.push("授权或二维码状态异常(可能已失效),请重新生成授权二维码。");
}
if (checkCode < 0 && checkCode !== -2 && checkCode !== -3) {
alerts.push(`扫码状态异常: ${checkCode}(${checkText || "-"})`);
}
if (checkState === 4 && onlineCode !== 200) {
alerts.push("登录卡在新设备验证阶段,请在手机确认或换二维码类型重试。");
}
const sig = alerts.join("||");
if (!sig) {
state.lastStatusAlertSig = "";
return;
}
if (sig === state.lastStatusAlertSig) {
return;
}
state.lastStatusAlertSig = sig;
alerts.forEach((line) => log(`异常: ${line}`));
}
function rebuildQrEndpointOptions(serverEndpoints) {
const select = el("qrEndpoint");
const current = select.value;
const allowed =
Array.isArray(serverEndpoints) && serverEndpoints.length > 0
? new Set(serverEndpoints)
: null;
const options = [];
QR_ENDPOINT_PRESET.forEach((item) => {
if (!allowed || allowed.has(item.value)) {
options.push(item);
}
});
if (allowed) {
Array.from(allowed)
.sort()
.forEach((name) => {
if (!options.some((x) => x.value === name)) {
options.push({ value: name, label: name });
}
});
}
if (options.length === 0) {
options.push(...QR_ENDPOINT_PRESET);
}
select.innerHTML = "";
options.forEach((item) => {
const opt = document.createElement("option");
opt.value = item.value;
opt.textContent = item.label;
select.appendChild(opt);
});
if (current && options.some((x) => x.value === current)) {
select.value = current;
return;
}
if (options.some((x) => x.value === "GetLoginQrCodeNewX")) {
select.value = "GetLoginQrCodeNewX";
return;
}
select.value = options[0].value;
}
async function loadConfig() {
try {
const cfg = await api("/api/config");
if (cfg.baseUrl) {
state.baseUrl = cfg.baseUrl;
}
rebuildQrEndpointOptions(cfg.qrEndpoints);
if (cfg.hasDefaultAdminKey) {
log("已检测到后端默认 ADMIN_KEY可直接生成授权二维码。");
} else {
log("后端没有默认 ADMIN_KEY生成授权二维码会失败。请先在服务端 deploy/.env 配置。");
}
if (cfg.hasSavedSession) {
log("检测到已保存会话,正在恢复。");
}
} catch (err) {
log(`读取配置失败: ${err.message}`);
}
}
async function restoreSession() {
try {
const data = await api("/api/session");
const session = data.session || {};
if (session.baseUrl) {
state.baseUrl = session.baseUrl;
}
if (session.authKey) {
state.authKey = session.authKey;
state.statusKnown = false;
log("已恢复上次会话authKey/baseUrl。");
}
if (session.qrEndpoint) {
el("qrEndpoint").value = session.qrEndpoint;
}
if (session.qrData && typeof session.qrData === "object") {
state.qrData = session.qrData;
}
refreshLoginUiByState();
} catch (err) {
log(`恢复会话失败: ${err.message}`);
}
}
async function startLogin() {
if (state.isOnline) {
log("当前账号已在线,已禁止重复生成授权二维码。");
return;
}
try {
const payload = {
baseUrl: state.baseUrl,
qrEndpoint: el("qrEndpoint").value
};
const data = await api("/api/login/start", "POST", payload);
state.authKey = data.authKey || "";
state.qrData = data.qrData || null;
state.statusKnown = true;
if (data.qrEndpoint) {
el("qrEndpoint").value = data.qrEndpoint;
}
renderQr(state.qrData || {});
clearMessageList();
await persistSession(true);
log(`已生成 authKey: ${state.authKey}`);
ensureStatusPolling();
await checkStatus(true);
} catch (err) {
log(`生成授权二维码失败: ${err.message}`);
}
}
async function checkStatus(logError = true) {
if (!state.authKey) {
state.statusKnown = true;
state.lastStatusReqError = "";
setOnline(false);
stopMessagePolling(true);
return;
}
try {
const url =
`/api/login/status?authKey=${encodeURIComponent(state.authKey)}` +
`&baseUrl=${encodeURIComponent(state.baseUrl)}`;
const data = await api(url);
state.statusKnown = true;
state.lastStatusReqError = "";
setOnline(Boolean(data.isOnline));
logStatusIfChanged(data);
reportStatusAlerts(data);
queuePersistSession();
if (state.isOnline) {
if (state.qrData) {
state.qrData = null;
queuePersistSession();
}
ensureMessagePolling();
pollMessages(false);
} else {
stopMessagePolling(true);
}
} catch (err) {
const message = `查询状态失败: ${err.message}`;
if (logError || state.lastStatusReqError !== message) {
log(message);
}
state.lastStatusReqError = message;
state.statusKnown = false;
setOnline(false);
stopMessagePolling(true);
}
}
async function pollMessages(logError = true) {
if (!state.authKey || !state.isOnline) {
return;
}
try {
const payload = {
baseUrl: state.baseUrl,
authKey: state.authKey,
count: MESSAGE_COUNT
};
const data = await api("/api/message/poll", "POST", payload);
if (data.received > 0) {
appendMessages(data.items);
log(`收到 ${data.received} 条消息。`);
}
} catch (err) {
if (logError) {
log(`自动拉取消息失败: ${err.message}`);
}
}
}
async function clearLoginState() {
try {
const resp = await api("/api/login/clear", "POST", {
baseUrl: state.baseUrl,
authKey: state.authKey
});
const logout = resp.logout || {};
state.authKey = "";
state.qrData = null;
state.statusKnown = true;
state.lastStatusSig = "";
state.lastStatusAlertSig = "";
state.lastStatusReqError = "";
setOnline(false);
stopMessagePolling(true);
clearMessageList();
if (logout.attempted) {
if (logout.ok) {
log(`远端登出成功: ${logout.method} ${logout.path}`);
} else if (logout.error) {
log(`远端登出结果: ${logout.error}`);
}
}
log("已清除登录状态与持久化会话。");
} catch (err) {
log(`清除登录状态失败: ${err.message}`);
}
}
el("startLoginBtn").addEventListener("click", startLogin);
el("clearLoginBtn").addEventListener("click", clearLoginState);
el("qrEndpoint").addEventListener("change", queuePersistSession);
(async () => {
await loadConfig();
await restoreSession();
refreshLoginUiByState();
ensureStatusPolling();
if (state.authKey) {
await checkStatus(true);
} else {
state.statusKnown = true;
refreshLoginUiByState();
}
})();
</script>
</body>
</html>

743
wechat-webui/server.py Normal file
View File

@@ -0,0 +1,743 @@
#!/usr/bin/env python3
"""WeChatPadPro login/message web UI server."""
from __future__ import annotations
import json
import os
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from collections import deque
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, Iterable
ROOT_DIR = Path(__file__).resolve().parent
INDEX_FILE = ROOT_DIR / "index.html"
SESSION_FILE = ROOT_DIR / ".session.json"
DEFAULT_BASE_URL = os.environ.get("WCPP_API_BASE", "http://127.0.0.1:18238")
HOST = os.environ.get("WEBUI_HOST", "0.0.0.0")
PORT = int(os.environ.get("WEBUI_PORT", "18239"))
SESSION_LOCK = threading.Lock()
MESSAGE_CACHE_LOCK = threading.Lock()
SUPPORTED_QR_ENDPOINTS = {
"GetLoginQrCodeNewX",
"GetLoginQrCodePadX",
"GetLoginQrCodeWin",
"GetLoginQrCodeMac",
"GetLoginQrCodeAndroidPad",
"GetLoginQrCodeCar",
"GetLoginQrCodeA16",
"GetLoginQrCodeNew",
}
MESSAGE_CACHE_MAX_KEYS = 6000
MESSAGE_CACHE_RECENT_KEYS: dict[str, deque[str]] = {}
MESSAGE_CACHE_RECENT_SET: dict[str, set[str]] = {}
def load_admin_key() -> str:
env_key = os.environ.get("WCPP_ADMIN_KEY", "").strip()
if env_key:
return env_key
candidates = [
ROOT_DIR.parent / "deploy" / ".env",
ROOT_DIR.parent / ".env",
]
for env_file in candidates:
if not env_file.exists():
continue
text = env_file.read_text(encoding="utf-8", errors="ignore")
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
if key.strip() == "ADMIN_KEY" and value.strip():
return value.strip()
return ""
DEFAULT_ADMIN_KEY = load_admin_key()
DEFAULT_SESSION_STATE: dict[str, Any] = {
"baseUrl": DEFAULT_BASE_URL,
"authKey": "",
"ticket": "",
"qrData": {},
"qrEndpoint": "GetLoginQrCodeNewX",
"updatedAt": 0,
}
def _normalize_session(data: Any) -> dict[str, Any]:
state = dict(DEFAULT_SESSION_STATE)
if isinstance(data, dict):
base_url = data.get("baseUrl")
auth_key = data.get("authKey")
ticket = data.get("ticket")
qr_data = data.get("qrData")
qr_endpoint = data.get("qrEndpoint")
updated_at = data.get("updatedAt")
if isinstance(base_url, str) and base_url.strip():
state["baseUrl"] = base_url.strip()
if isinstance(auth_key, str):
state["authKey"] = auth_key.strip()
if isinstance(ticket, str):
state["ticket"] = ticket.strip()
if isinstance(qr_data, dict):
state["qrData"] = qr_data
if isinstance(qr_endpoint, str) and qr_endpoint.strip() in SUPPORTED_QR_ENDPOINTS:
state["qrEndpoint"] = qr_endpoint.strip()
if isinstance(updated_at, (int, float)):
state["updatedAt"] = int(updated_at)
return state
def _write_session_file(state: dict[str, Any]) -> None:
tmp = SESSION_FILE.with_suffix(".json.tmp")
tmp.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
tmp.replace(SESSION_FILE)
def load_session_state() -> dict[str, Any]:
if not SESSION_FILE.exists():
return dict(DEFAULT_SESSION_STATE)
try:
raw = json.loads(SESSION_FILE.read_text(encoding="utf-8"))
except Exception:
return dict(DEFAULT_SESSION_STATE)
return _normalize_session(raw)
SESSION_STATE: dict[str, Any] = load_session_state()
def get_session_state() -> dict[str, Any]:
with SESSION_LOCK:
return dict(SESSION_STATE)
def update_session_state(patch: dict[str, Any]) -> dict[str, Any]:
with SESSION_LOCK:
merged = dict(SESSION_STATE)
for key in ("baseUrl", "authKey", "ticket", "qrData", "qrEndpoint"):
if key in patch:
merged[key] = patch[key]
merged = _normalize_session(merged)
merged["updatedAt"] = int(time.time())
SESSION_STATE.clear()
SESSION_STATE.update(merged)
_write_session_file(SESSION_STATE)
return dict(SESSION_STATE)
def clear_session_state() -> dict[str, Any]:
with SESSION_LOCK:
SESSION_STATE.clear()
SESSION_STATE.update(dict(DEFAULT_SESSION_STATE))
SESSION_STATE["updatedAt"] = int(time.time())
_write_session_file(SESSION_STATE)
return dict(SESSION_STATE)
def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any]) -> None:
raw = json.dumps(payload, ensure_ascii=False).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(raw)))
handler.send_header("Cache-Control", "no-store")
handler.send_header("Access-Control-Allow-Origin", "*")
handler.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
handler.send_header("Access-Control-Allow-Headers", "Content-Type")
handler.end_headers()
handler.wfile.write(raw)
def text_response(handler: BaseHTTPRequestHandler, status: int, content_type: str, payload: bytes) -> None:
handler.send_response(status)
handler.send_header("Content-Type", content_type)
handler.send_header("Content-Length", str(len(payload)))
handler.send_header("Cache-Control", "no-store")
handler.send_header("Access-Control-Allow-Origin", "*")
handler.end_headers()
handler.wfile.write(payload)
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
length = int(handler.headers.get("Content-Length", "0") or "0")
if length == 0:
return {}
raw = handler.rfile.read(length)
try:
data = json.loads(raw.decode("utf-8"))
except json.JSONDecodeError as exc:
raise ValueError(f"请求体 JSON 无效: {exc}") from exc
if not isinstance(data, dict):
raise ValueError("请求体必须是 JSON 对象")
return data
def api_request(
base_url: str,
method: str,
path: str,
key: str | None = None,
body: dict[str, Any] | None = None,
timeout: int = 25,
) -> dict[str, Any]:
base = base_url.rstrip("/")
url = f"{base}{path}"
if key:
joiner = "&" if "?" in url else "?"
url = f"{url}{joiner}key={urllib.parse.quote(key)}"
payload = None
headers: dict[str, str] = {}
if body is not None:
payload = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=payload, method=method.upper(), headers=headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"请求失败: {exc}") from exc
def flatten_wrapped_value(value: Any) -> Any:
if isinstance(value, dict):
for name in ("str", "string", "String", "value", "Value"):
if name in value and value[name] not in (None, ""):
return value[name]
return value
def find_first_value(data: Any, keys: Iterable[str]) -> Any:
targets = {k.lower() for k in keys}
queue: list[Any] = [data]
while queue:
current = queue.pop(0)
if isinstance(current, dict):
for k, v in current.items():
if k.lower() in targets:
found = flatten_wrapped_value(v)
if found not in (None, "", [], {}):
return found
queue.append(v)
elif isinstance(current, list):
queue.extend(current)
return None
def to_display_value(value: Any) -> Any:
value = flatten_wrapped_value(value)
if isinstance(value, (dict, list)):
try:
return json.dumps(value, ensure_ascii=False)
except Exception:
return str(value)
return value
def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]:
add_msgs = item.get("AddMsgs")
if isinstance(add_msgs, list):
for msg in add_msgs:
if isinstance(msg, dict):
return msg
return item
def summarize_message(item: Any) -> dict[str, Any]:
if not isinstance(item, dict):
return {"content": str(item)}
primary = pick_primary_message(item)
outer_type = find_first_value(item, ["Type"])
inner_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"])
msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"])
from_user = find_first_value(
primary,
["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"],
)
to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"])
create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"])
content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"])
if content in (None, "", {}, []):
# 部分系统消息正文在 msg_source 里
content = find_first_value(primary, ["msg_source", "MsgSource"])
return {
"id": to_display_value(msg_id),
"type": to_display_value(inner_type if inner_type not in (None, "") else outer_type),
"from": to_display_value(from_user),
"to": to_display_value(to_user),
"time": to_display_value(create_time),
"content": to_display_value(content),
}
def build_message_dedupe_key(summary: dict[str, Any], raw_item: Any) -> str:
msg_id = summary.get("id")
msg_type = summary.get("type")
if msg_id not in (None, "", "-"):
return f"id:{msg_id}|type:{msg_type}"
if isinstance(raw_item, dict):
fallback = find_first_value(raw_item, ["new_msg_id", "msg_id", "NewMsgId", "MsgId"])
if fallback not in (None, "", "-"):
return f"id:{fallback}|type:{msg_type}"
return "|".join(
[
f"type:{msg_type}",
f"from:{summary.get('from')}",
f"to:{summary.get('to')}",
f"time:{summary.get('time')}",
f"content:{str(summary.get('content') or '')[:120]}",
]
)
def seen_message_before(auth_key: str, dedupe_key: str) -> bool:
with MESSAGE_CACHE_LOCK:
msg_set = MESSAGE_CACHE_RECENT_SET.setdefault(auth_key, set())
if dedupe_key in msg_set:
return True
msg_queue = MESSAGE_CACHE_RECENT_KEYS.setdefault(auth_key, deque())
msg_queue.append(dedupe_key)
msg_set.add(dedupe_key)
while len(msg_queue) > MESSAGE_CACHE_MAX_KEYS:
old = msg_queue.popleft()
msg_set.discard(old)
return False
def clear_message_cache(auth_key: str | None = None) -> None:
with MESSAGE_CACHE_LOCK:
if auth_key:
MESSAGE_CACHE_RECENT_KEYS.pop(auth_key, None)
MESSAGE_CACHE_RECENT_SET.pop(auth_key, None)
return
MESSAGE_CACHE_RECENT_KEYS.clear()
MESSAGE_CACHE_RECENT_SET.clear()
def create_auth_key(base_url: str, admin_key: str, days: int, remark: str) -> str:
resp = api_request(
base_url=base_url,
method="POST",
path="/admin/GenAuthKey1",
key=admin_key,
body={"Count": 1, "Days": days, "Remark": remark},
)
if resp.get("Code") != 200:
raise RuntimeError(f"生成授权码失败: {json.dumps(resp, ensure_ascii=False)}")
data = resp.get("Data")
if isinstance(data, dict):
keys = data.get("authKeys") or data.get("AuthKeys")
if isinstance(keys, list) and keys:
return str(keys[0])
raise RuntimeError("授权码响应结构异常")
def get_login_qr(base_url: str, auth_key: str, qr_endpoint: str = "GetLoginQrCodeNewX") -> dict[str, Any]:
endpoint = qr_endpoint.strip() if qr_endpoint else "GetLoginQrCodeNewX"
if endpoint not in SUPPORTED_QR_ENDPOINTS:
raise RuntimeError(f"不支持的二维码接口: {endpoint}")
resp = api_request(
base_url=base_url,
method="POST",
path=f"/login/{endpoint}",
key=auth_key,
body={"Check": False, "Proxy": ""},
)
if resp.get("Code") != 200:
raise RuntimeError(f"获取二维码失败: {json.dumps(resp, ensure_ascii=False)}")
data = resp.get("Data")
if not isinstance(data, dict):
raise RuntimeError("二维码响应结构异常")
return data
def get_status_bundle(base_url: str, auth_key: str) -> dict[str, Any]:
check = api_request(base_url=base_url, method="GET", path="/login/CheckLoginStatus", key=auth_key)
online = api_request(base_url=base_url, method="GET", path="/login/GetLoginStatus", key=auth_key)
return {
"check": check,
"online": online,
"isOnline": online.get("Code") == 200,
}
def submit_verify(base_url: str, auth_key: str, code: str, ticket: str, data62: str, uuid: str) -> dict[str, Any]:
payload: dict[str, Any] = {"Code": code}
if ticket:
payload["Ticket"] = ticket
if data62:
payload["Data62"] = data62
if uuid:
payload["Uuid"] = uuid
return api_request(
base_url=base_url,
method="POST",
path="/login/YPayVerificationcode",
key=auth_key,
body=payload,
)
def poll_messages(base_url: str, auth_key: str, count: int) -> dict[str, Any]:
raw = api_request(
base_url=base_url,
method="POST",
path="/message/HttpSyncMsg",
key=auth_key,
body={"Count": count},
)
data = raw.get("Data")
items: list[dict[str, Any]] = []
batch_seen: set[str] = set()
if isinstance(data, list):
for item in data:
summary = summarize_message(item)
dedupe_key = build_message_dedupe_key(summary, item)
if dedupe_key in batch_seen:
continue
batch_seen.add(dedupe_key)
if seen_message_before(auth_key, dedupe_key):
continue
summary["raw"] = item
items.append(summary)
return {
"raw": raw,
"items": items,
"received": len(items),
}
def ext_device_confirm_get(base_url: str, auth_key: str, url: str) -> dict[str, Any]:
payload: dict[str, Any] = {}
if url:
payload["Url"] = url
return api_request(
base_url=base_url,
method="POST",
path="/login/ExtDeviceLoginConfirmGet",
key=auth_key,
body=payload,
)
def ext_device_confirm_ok(base_url: str, auth_key: str, url: str) -> dict[str, Any]:
payload: dict[str, Any] = {}
if url:
payload["Url"] = url
return api_request(
base_url=base_url,
method="POST",
path="/login/ExtDeviceLoginConfirmOk",
key=auth_key,
body=payload,
)
def try_logout(base_url: str, auth_key: str) -> dict[str, Any]:
attempts: list[dict[str, Any]] = []
candidates: list[tuple[str, str, dict[str, Any] | None]] = [
("POST", "/login/LogOut", {}),
("GET", "/login/LogOut", None),
("POST", "/login/Logout", {}),
("GET", "/login/Logout", None),
]
for method, path, body in candidates:
try:
resp = api_request(base_url=base_url, method=method, path=path, key=auth_key, body=body)
attempts.append({"method": method, "path": path, "ok": True, "raw": resp})
code = resp.get("Code") if isinstance(resp, dict) else None
if code in (None, 200, 300, -2, -3):
return {
"attempted": True,
"ok": True,
"method": method,
"path": path,
"raw": resp,
"attempts": attempts,
}
except Exception as exc:
attempts.append({"method": method, "path": path, "ok": False, "error": str(exc)})
last_error = next((item["error"] for item in reversed(attempts) if not item.get("ok")), "logout not supported")
return {"attempted": True, "ok": False, "error": last_error, "attempts": attempts}
class Handler(BaseHTTPRequestHandler):
server_version = "WCPPWebUI/1.0"
def log_message(self, fmt: str, *args: Any) -> None:
print(f"[webui] {self.address_string()} - {fmt % args}")
def do_OPTIONS(self) -> None:
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_GET(self) -> None:
parsed = urllib.parse.urlparse(self.path)
if parsed.path in {"/", "/index.html"}:
if not INDEX_FILE.exists():
json_response(self, 500, {"error": "index.html 不存在"})
return
payload = INDEX_FILE.read_bytes()
text_response(self, 200, "text/html; charset=utf-8", payload)
return
if parsed.path == "/health":
json_response(self, 200, {"ok": True})
return
if parsed.path == "/api/config":
json_response(
self,
200,
{
"baseUrl": DEFAULT_BASE_URL,
"hasDefaultAdminKey": bool(DEFAULT_ADMIN_KEY),
"hasSavedSession": bool(get_session_state().get("authKey")),
"qrEndpoints": sorted(SUPPORTED_QR_ENDPOINTS),
},
)
return
if parsed.path == "/api/session":
json_response(self, 200, {"session": get_session_state()})
return
if parsed.path == "/api/login/status":
query = urllib.parse.parse_qs(parsed.query)
auth_key = (query.get("authKey") or [""])[0].strip()
base_url = (query.get("baseUrl") or [DEFAULT_BASE_URL])[0].strip() or DEFAULT_BASE_URL
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
result = get_status_bundle(base_url=base_url, auth_key=auth_key)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, result)
return
json_response(self, 404, {"error": "Not Found"})
def do_POST(self) -> None:
parsed = urllib.parse.urlparse(self.path)
try:
data = parse_json_body(self)
except ValueError as exc:
json_response(self, 400, {"error": str(exc)})
return
base_url = str(data.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL
if parsed.path == "/api/login/start":
admin_key = str(data.get("adminKey") or DEFAULT_ADMIN_KEY).strip()
if not admin_key:
json_response(self, 400, {"error": "adminKey 不能为空(也未找到默认 ADMIN_KEY"})
return
days = int(data.get("days") or 30)
remark = str(data.get("remark") or "webui-login").strip()
qr_endpoint = str(data.get("qrEndpoint") or "GetLoginQrCodeNewX").strip()
try:
auth_key = create_auth_key(base_url=base_url, admin_key=admin_key, days=days, remark=remark)
qr_data = get_login_qr(base_url=base_url, auth_key=auth_key, qr_endpoint=qr_endpoint)
clear_message_cache(auth_key)
update_session_state(
{
"baseUrl": base_url,
"authKey": auth_key,
"qrData": qr_data,
"ticket": "",
"qrEndpoint": qr_endpoint,
}
)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(
self,
200,
{
"authKey": auth_key,
"qrEndpoint": qr_endpoint,
"qrData": qr_data,
},
)
return
if parsed.path == "/api/login/wakeup":
auth_key = str(data.get("authKey") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
resp = api_request(
base_url=base_url,
method="POST",
path="/login/WakeUpLogin",
key=auth_key,
body={"Check": False, "Proxy": ""},
)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/verify":
auth_key = str(data.get("authKey") or "").strip()
code = str(data.get("code") or "").strip()
ticket = str(data.get("ticket") or "").strip()
data62 = str(data.get("data62") or "").strip()
uuid = str(data.get("uuid") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
if not code:
json_response(self, 400, {"error": "code 不能为空"})
return
try:
resp = submit_verify(
base_url=base_url,
auth_key=auth_key,
code=code,
ticket=ticket,
data62=data62,
uuid=uuid,
)
update_session_state({"baseUrl": base_url, "authKey": auth_key, "ticket": ticket})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/ext-confirm/get":
auth_key = str(data.get("authKey") or "").strip()
url = str(data.get("url") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
resp = ext_device_confirm_get(base_url=base_url, auth_key=auth_key, url=url)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/ext-confirm/ok":
auth_key = str(data.get("authKey") or "").strip()
url = str(data.get("url") or "").strip()
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
resp = ext_device_confirm_ok(base_url=base_url, auth_key=auth_key, url=url)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"raw": resp})
return
if parsed.path == "/api/login/clear":
session = get_session_state()
auth_key = str(data.get("authKey") or session.get("authKey") or "").strip()
current_base_url = str(data.get("baseUrl") or session.get("baseUrl") or DEFAULT_BASE_URL).strip() or DEFAULT_BASE_URL
logout_result: dict[str, Any] = {"attempted": False, "ok": False}
if auth_key:
logout_result = try_logout(base_url=current_base_url, auth_key=auth_key)
try:
next_state = clear_session_state()
clear_message_cache(auth_key or None)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"session": next_state, "logout": logout_result})
return
if parsed.path == "/api/session":
try:
next_state = update_session_state(data)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"session": next_state})
return
if parsed.path == "/api/session/clear":
old_auth_key = str(get_session_state().get("authKey") or "").strip()
try:
next_state = clear_session_state()
clear_message_cache(old_auth_key or None)
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, {"session": next_state})
return
if parsed.path == "/api/message/poll":
auth_key = str(data.get("authKey") or "").strip()
count = int(data.get("count") or 30)
if not auth_key:
json_response(self, 400, {"error": "authKey 不能为空"})
return
try:
result = poll_messages(base_url=base_url, auth_key=auth_key, count=count)
update_session_state({"baseUrl": base_url, "authKey": auth_key})
except Exception as exc: # pragma: no cover
json_response(self, 500, {"error": str(exc)})
return
json_response(self, 200, result)
return
json_response(self, 404, {"error": "Not Found"})
def main() -> None:
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(f"[webui] listening on http://{HOST}:{PORT}")
print(f"[webui] default api base: {DEFAULT_BASE_URL}")
print(f"[webui] default admin key loaded: {bool(DEFAULT_ADMIN_KEY)}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
if __name__ == "__main__":
main()