Files
xb/app/wechat_bot_bridge.py

1126 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable
DEFAULT_WCPP_BASE_URL = os.environ.get("WCPP_BASE_URL", "http://127.0.0.1:18238")
DEFAULT_XIBAO_BASE_URL = os.environ.get("XIBAO_BASE_URL", "http://127.0.0.1:8787")
DEFAULT_SESSION_FILE = Path(
os.environ.get("WCPP_SESSION_FILE", "/root/WeChatPadPro_test_20260227/webui/.session.json")
)
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_STATE_FILE = BASE_DIR / "data" / "wechat_bridge_state.json"
DEFAULT_META_FILE = BASE_DIR / "data" / "wechat_bridge_meta.json"
DEFAULT_DAILY_CLEANUP_TIME = os.environ.get("XIBAO_DAILY_CLEANUP_TIME", "00:10")
RELAY_SERIAL_LINE_RE = re.compile(r"^\s*\d+\s*[、,,.:)\]\s*")
def log(msg: str) -> None:
stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{stamp}] {msg}", flush=True)
def read_json_file(path: Path, fallback: Any) -> Any:
try:
if not path.exists():
return fallback
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return fallback
def write_json_file(path: Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(path)
def parse_hhmm(text: str) -> tuple[int, int]:
val = str(text or "").strip()
m = re.match(r"^(\d{1,2}):(\d{1,2})$", val)
if not m:
return (0, 10)
hour = int(m.group(1))
minute = int(m.group(2))
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
return (0, 10)
return (hour, minute)
def flatten_wrapped_value(value: Any) -> Any:
if isinstance(value, dict):
for name in ("str", "string", "String", "value", "Value"):
if name in value and value[name] not in (None, ""):
return value[name]
return value
def find_first_value(data: Any, keys: Iterable[str]) -> Any:
targets = {k.lower() for k in keys}
queue: list[Any] = [data]
while queue:
current = queue.pop(0)
if isinstance(current, dict):
for key, val in current.items():
if key.lower() in targets:
found = flatten_wrapped_value(val)
if found not in (None, "", [], {}):
return found
queue.append(val)
elif isinstance(current, list):
queue.extend(current)
return None
def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]:
add_msgs = item.get("AddMsgs")
if isinstance(add_msgs, list):
for msg in add_msgs:
if isinstance(msg, dict):
return msg
return item
def summarize_message(item: Any) -> dict[str, Any]:
if not isinstance(item, dict):
text = str(item)
return {
"id": "-",
"type": "-",
"from": "-",
"to": "-",
"time": "-",
"content": text,
"raw": item,
}
primary = pick_primary_message(item)
msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"])
msg_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"])
from_user = find_first_value(
primary,
["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"],
)
to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"])
create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"])
content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"])
if content in (None, "", {}, []):
content = find_first_value(primary, ["msg_source", "MsgSource"])
def _to_text(value: Any) -> str:
value = flatten_wrapped_value(value)
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
if value is None:
return "-"
return str(value)
return {
"id": _to_text(msg_id),
"type": _to_text(msg_type),
"from": _to_text(from_user),
"to": _to_text(to_user),
"time": _to_text(create_time),
"content": _to_text(content),
"raw": item,
}
def build_message_dedupe_key(msg: dict[str, Any]) -> str:
msg_id = str(msg.get("id") or "-")
msg_type = str(msg.get("type") or "-")
if msg_id not in {"", "-"}:
return f"id:{msg_id}|type:{msg_type}"
seed = "|".join(
[
str(msg.get("from") or "-"),
str(msg.get("to") or "-"),
str(msg.get("time") or "-"),
str(msg.get("content") or "")[:160],
]
)
digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:20]
return f"hash:{digest}|type:{msg_type}"
class SeenStore:
def __init__(self, path: Path, max_size: int = 8000) -> None:
self.path = path
self.max_size = max(100, int(max_size))
self._queue: deque[str] = deque()
self._set: set[str] = set()
self._dirty = False
self.load()
def load(self) -> None:
raw = read_json_file(self.path, {})
seen = raw.get("seen") if isinstance(raw, dict) else []
if not isinstance(seen, list):
seen = []
for item in seen:
key = str(item).strip()
if not key or key in self._set:
continue
self._queue.append(key)
self._set.add(key)
while len(self._queue) > self.max_size:
old = self._queue.popleft()
self._set.discard(old)
self._dirty = False
def contains(self, key: str) -> bool:
return key in self._set
def add(self, key: str) -> None:
if not key or key in self._set:
return
self._queue.append(key)
self._set.add(key)
while len(self._queue) > self.max_size:
old = self._queue.popleft()
self._set.discard(old)
self._dirty = True
def flush(self, force: bool = False) -> None:
if not force and not self._dirty:
return
data = {
"updated_at": datetime.now().isoformat(timespec="seconds"),
"seen": list(self._queue),
}
write_json_file(self.path, data)
self._dirty = False
class WechatXibaoBridge:
def __init__(self, args: argparse.Namespace) -> None:
self.wechat_base_url = str(args.wechat_base_url).rstrip("/")
self.xibao_base_url = str(args.xibao_base_url).rstrip("/")
self.auth_key = str(args.wechat_auth_key or "").strip()
self.session_file = Path(args.wechat_session_file).resolve()
self.sync_count = max(1, int(args.sync_count))
self.poll_interval = max(0.5, float(args.poll_interval))
self.max_images = max(0, int(args.max_images))
self.once = bool(args.once)
self.dry_run = bool(args.dry_run)
self.allow_from = {x.strip() for x in str(args.allow_from or "").split(",") if x.strip()}
self.seen = SeenStore(Path(args.state_file).resolve())
self.meta_file = Path(args.meta_file).resolve()
self.cleanup_hour, self.cleanup_minute = parse_hhmm(str(args.daily_cleanup_time))
self.explicit_auth = bool(self.auth_key)
self.self_wxid = ""
self.meta = self._load_meta()
self.meta_dirty = False
self._last_state_flush = time.time()
self._last_meta_flush = time.time()
def _load_meta(self) -> dict[str, Any]:
raw = read_json_file(self.meta_file, {})
if not isinstance(raw, dict):
raw = {}
users = raw.get("users")
if not isinstance(users, dict):
users = {}
else:
normalized_users: dict[str, Any] = {}
for wxid, item in users.items():
if not isinstance(item, dict):
continue
line_map = item.get("line_map")
if not isinstance(line_map, dict):
line_map = {}
line_order = item.get("line_order")
if not isinstance(line_order, list):
line_order = []
normalized_users[str(wxid)] = {
"updated_at": str(item.get("updated_at", "")),
"line_map": {str(k): v for k, v in line_map.items() if isinstance(v, dict)},
"line_order": [str(x) for x in line_order if str(x).strip()],
}
users = normalized_users
daily_cleanup = raw.get("daily_cleanup")
if not isinstance(daily_cleanup, dict):
daily_cleanup = {}
daily_skip = raw.get("daily_skip")
if not isinstance(daily_skip, dict):
daily_skip = {}
else:
normalized_daily_skip: dict[str, Any] = {}
for wxid, item in daily_skip.items():
if not isinstance(item, dict):
continue
count_raw = item.get("count")
try:
count_val = int(count_raw or 0)
except Exception:
count_val = 0
if count_val <= 0:
continue
normalized_daily_skip[str(wxid)] = {
"date": str(item.get("date", "")).strip(),
"count": max(1, min(count_val, 500)),
"updated_at": str(item.get("updated_at", "")),
}
daily_skip = normalized_daily_skip
return {
"updated_at": str(raw.get("updated_at", "")),
"users": users,
"daily_cleanup": {
"last_date": str(daily_cleanup.get("last_date", "")).strip(),
},
"daily_skip": daily_skip,
}
def _mark_meta_dirty(self) -> None:
self.meta_dirty = True
self.meta["updated_at"] = datetime.now().isoformat(timespec="seconds")
def _flush_meta(self, force: bool = False) -> None:
if not force and not self.meta_dirty:
return
write_json_file(self.meta_file, self.meta)
self.meta_dirty = False
def _flush_state(self, force: bool = False) -> None:
self.seen.flush(force=force)
self._flush_meta(force=force)
def _request_json(
self,
*,
base_url: str,
path: str,
method: str,
payload: dict[str, Any] | None = None,
key: str | None = None,
timeout: int = 30,
accept_error: bool = False,
) -> tuple[int, dict[str, Any]]:
url = f"{base_url.rstrip('/')}{path}"
if key:
joiner = "&" if "?" in url else "?"
url = f"{url}{joiner}key={urllib.parse.quote(key)}"
body = None
headers: dict[str, str] = {}
if payload is not None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, method=method.upper(), headers=headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
data = json.loads(raw) if raw else {}
if not isinstance(data, dict):
data = {"raw": data}
return int(resp.status), data
except urllib.error.HTTPError as exc:
raw = exc.read().decode("utf-8", errors="replace")
try:
data = json.loads(raw) if raw else {}
except Exception:
data = {"ok": False, "error": raw or f"HTTP {exc.code}"}
if not isinstance(data, dict):
data = {"raw": data}
if accept_error:
return int(exc.code), data
raise RuntimeError(f"HTTP {exc.code}: {raw[:400]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"请求失败: {exc}") from exc
def _request_bytes(self, url: str, timeout: int = 45) -> tuple[bytes, str]:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = resp.read()
content_type = resp.headers.get("Content-Type", "application/octet-stream")
content_type = content_type.split(";", 1)[0].strip() or "application/octet-stream"
return data, content_type
def _load_auth_from_session(self) -> str:
raw = read_json_file(self.session_file, {})
if isinstance(raw, dict):
key = str(raw.get("authKey", "")).strip()
if key:
return key
return ""
def is_key_online(self, auth_key: str) -> bool:
key = str(auth_key or "").strip()
if not key:
return False
try:
_, resp = self._request_json(
base_url=self.wechat_base_url,
path="/login/GetLoginStatus",
method="GET",
key=key,
timeout=12,
accept_error=True,
)
return int(resp.get("Code") or 0) == 200
except Exception:
return False
def refresh_auth_key(self, force: bool = False) -> None:
if self.explicit_auth and not force:
return
latest = self._load_auth_from_session()
if not latest:
return
if latest != self.auth_key:
if not self.auth_key:
self.auth_key = latest
self.self_wxid = ""
log(f"已更新 authKey: {self.auth_key}")
return
latest_online = self.is_key_online(latest)
current_online = self.is_key_online(self.auth_key)
if latest_online or not current_online:
self.auth_key = latest
self.self_wxid = ""
log(f"已更新 authKey: {self.auth_key}")
else:
log("检测到新的 authKey 但未在线,继续使用当前在线会话。")
def ensure_self_wxid(self) -> None:
if not self.auth_key or self.self_wxid:
return
try:
_, resp = self._request_json(
base_url=self.wechat_base_url,
path="/user/GetProfile",
method="GET",
key=self.auth_key,
)
except Exception as exc:
log(f"获取自身 wxid 失败: {exc}")
return
if resp.get("Code") != 200:
return
data = resp.get("Data")
self_id = find_first_value(data, ["wxid", "Wxid", "userName", "UserName", "to_user_name"])
if self_id:
self.self_wxid = str(self_id).strip()
if self.self_wxid:
log(f"已识别当前账号 wxid: {self.self_wxid}")
def _extract_feedback_records(self, data: dict[str, Any]) -> list[dict[str, Any]]:
if not isinstance(data, dict):
return []
result = data.get("result")
if not isinstance(result, dict):
return []
records = result.get("records")
if isinstance(records, list) and records:
return [x for x in records if isinstance(x, dict)]
records = result.get("new_records")
if isinstance(records, list) and records:
return [x for x in records if isinstance(x, dict)]
return []
def save_feedback_context(self, from_user: str, data: dict[str, Any]) -> None:
from_user = str(from_user or "").strip()
if not from_user:
return
records = self._extract_feedback_records(data)
if not records:
return
line_map: dict[str, dict[str, Any]] = {}
line_order: list[str] = []
fallback_index = 0
for item in records:
source_line = str(item.get("source_line") or item.get("raw_text") or "").strip()
if not source_line:
continue
line_serial = str(item.get("line_serial") or "").strip()
if not line_serial:
fallback_index += 1
line_serial = str(fallback_index)
if line_serial in line_map:
continue
line_order.append(line_serial)
line_map[line_serial] = {
"source_line": source_line,
"record": {
"source_line": source_line,
"raw_text": str(item.get("raw_text") or ""),
"branch": str(item.get("branch") or ""),
"amount": str(item.get("amount") or ""),
"type": str(item.get("type") or ""),
"page": str(item.get("page") or ""),
"status": str(item.get("status") or ""),
"line_serial": line_serial,
"line_product_index": str(item.get("line_product_index") or ""),
"dedup_key": str(item.get("dedup_key") or ""),
"signature_key": str(item.get("signature_key") or ""),
},
}
if not line_map:
return
users = self.meta.setdefault("users", {})
users[from_user] = {
"updated_at": datetime.now().isoformat(timespec="seconds"),
"line_map": line_map,
"line_order": line_order,
}
if len(users) > 400:
keys = sorted(users.keys(), key=lambda k: str(users[k].get("updated_at", "")))
for key in keys[:-300]:
users.pop(key, None)
self._mark_meta_dirty()
def resolve_feedback_item(self, from_user: str, serial: str) -> dict[str, Any] | None:
users = self.meta.get("users")
if not isinstance(users, dict):
return None
context = users.get(str(from_user or "").strip())
if not isinstance(context, dict):
return None
line_map = context.get("line_map")
if not isinstance(line_map, dict):
return None
serial_text = str(serial or "").strip()
if not serial_text:
return None
item = line_map.get(serial_text)
if isinstance(item, dict):
return item
line_order = context.get("line_order")
if isinstance(line_order, list) and serial_text.isdigit():
idx = int(serial_text) - 1
if 0 <= idx < len(line_order):
mapped_serial = str(line_order[idx]).strip()
item = line_map.get(mapped_serial)
if isinstance(item, dict):
return item
return None
def parse_feedback_command(self, text: str) -> tuple[str, str] | None:
content = str(text or "").strip()
if not content:
return None
m = re.match(r"^/?反馈\s*[+:\-]?\s*(\d{1,4})(?:[\s,;:\-]+(.*))?$", content)
if not m:
return None
serial = str(m.group(1)).strip()
note = str(m.group(2) or "").strip()
return serial, note
def parse_skip_command(self, text: str) -> int | None:
content = str(text or "").strip()
if not content:
return None
m = re.match(r"^/?跳过\s*[(]?\s*(\d{1,4})\s*[)]?\s*$", content)
if not m:
return None
return max(0, min(int(m.group(1)), 500))
def _today_text(self) -> str:
return datetime.now().strftime("%Y-%m-%d")
def set_daily_skip(self, from_user: str, count: int) -> int:
user = str(from_user or "").strip()
if not user:
return 0
daily_skip = self.meta.setdefault("daily_skip", {})
if not isinstance(daily_skip, dict):
daily_skip = {}
self.meta["daily_skip"] = daily_skip
normalized_count = max(0, min(int(count or 0), 500))
if normalized_count <= 0:
if user in daily_skip:
daily_skip.pop(user, None)
self._mark_meta_dirty()
return 0
daily_skip[user] = {
"date": self._today_text(),
"count": normalized_count,
"updated_at": datetime.now().isoformat(timespec="seconds"),
}
if len(daily_skip) > 1000:
keys = sorted(daily_skip.keys(), key=lambda k: str(daily_skip[k].get("updated_at", "")))
for key in keys[:-800]:
daily_skip.pop(key, None)
self._mark_meta_dirty()
return normalized_count
def get_daily_skip(self, from_user: str) -> int:
user = str(from_user or "").strip()
if not user:
return 0
daily_skip = self.meta.get("daily_skip")
if not isinstance(daily_skip, dict):
return 0
item = daily_skip.get(user)
if not isinstance(item, dict):
return 0
if str(item.get("date") or "").strip() != self._today_text():
daily_skip.pop(user, None)
self._mark_meta_dirty()
return 0
try:
count = int(item.get("count") or 0)
except Exception:
count = 0
if count <= 0:
daily_skip.pop(user, None)
self._mark_meta_dirty()
return 0
return max(1, min(count, 500))
def apply_daily_skip_to_raw_text(self, from_user: str, raw_text: str) -> tuple[str, int, int]:
requested = self.get_daily_skip(from_user)
if requested <= 0:
return raw_text, 0, 0
lines = str(raw_text or "").splitlines()
if not lines:
return raw_text, 0, requested
removed = 0
keep_lines: list[str] = []
for line in lines:
source = str(line)
if removed < requested and RELAY_SERIAL_LINE_RE.match(source.strip()):
removed += 1
continue
keep_lines.append(source)
normalized = "\n".join(x for x in keep_lines if str(x).strip())
if not normalized.strip():
normalized = "#接龙"
return normalized, removed, requested
def handle_feedback(self, from_user: str, serial: str, note: str) -> None:
item = self.resolve_feedback_item(from_user=from_user, serial=serial)
if not item:
self.send_text(from_user, "未找到可反馈的序号,请先发送 #接龙 生成后再反馈。")
return
source_line = str(item.get("source_line") or "").strip()
record = item.get("record") if isinstance(item.get("record"), dict) else {}
note_text = str(note or "").strip() or f"用户反馈序号{serial}"
mark_type = "recognition_error" if "识别" in note_text else "generation_error"
if self.dry_run:
log(
f"[dry-run] feedback from={from_user} serial={serial} mark_type={mark_type} "
f"source_line={source_line} note={note_text}"
)
self.send_text(from_user, f"反馈已接收(演练模式):序号{serial}")
return
status, resp = self._request_json(
base_url=self.xibao_base_url,
path="/api/log/mark",
method="POST",
payload={
"mark_type": mark_type,
"source_line": source_line,
"note": f"{note_text} | from={from_user}",
"record": record,
},
accept_error=True,
timeout=40,
)
if not resp.get("ok"):
err = str(resp.get("error") or f"HTTP {status}").strip()
self.send_text(from_user, f"反馈记录失败: {err}")
return
issue = resp.get("issue") if isinstance(resp.get("issue"), dict) else {}
issue_id = str(issue.get("id") or "").strip()
suffix = f",编号 {issue_id}" if issue_id else ""
self.send_text(from_user, f"反馈已记录:序号{serial}{suffix}")
def _clear_history_keep_feedback(self) -> bool:
history_path = ""
try:
_, cfg = self._request_json(
base_url=self.xibao_base_url,
path="/api/config",
method="GET",
accept_error=False,
timeout=20,
)
history_path = str(cfg.get("resolved_history_file") or "").strip()
except Exception as exc:
log(f"读取配置失败history路径: {exc}")
if history_path:
p = Path(history_path)
if p.exists() or p.parent.exists():
try:
write_json_file(p, [])
log(f"已清空历史文件: {p}")
return True
except Exception as exc:
log(f"清空历史文件失败: {exc}")
try:
_, resp = self._request_json(
base_url=self.xibao_base_url,
path="/api/history/clear",
method="POST",
payload={},
accept_error=True,
timeout=40,
)
if resp.get("ok"):
if resp.get("skipped_suppressed_cleared"):
log("已清空历史(附带清空了屏蔽列表)。")
else:
log("已清空历史。")
return True
log(f"清空历史失败: {resp}")
return False
except Exception as exc:
log(f"清空历史请求异常: {exc}")
return False
def maybe_run_daily_cleanup(self) -> None:
now = datetime.now()
today = now.strftime("%Y-%m-%d")
daily = self.meta.setdefault("daily_cleanup", {})
last_date = str(daily.get("last_date") or "").strip()
if last_date == today:
return
if (now.hour, now.minute) < (self.cleanup_hour, self.cleanup_minute):
return
log("开始执行每日清理:图片输出 + 历史记录(保留反馈记录)")
output_ok = False
history_ok = False
try:
_, resp = self._request_json(
base_url=self.xibao_base_url,
path="/api/output/clear",
method="POST",
payload={},
accept_error=True,
timeout=120,
)
output_ok = bool(resp.get("ok"))
if output_ok:
log("已清空输出图片。")
else:
log(f"清空输出图片失败: {resp}")
except Exception as exc:
log(f"清空输出图片异常: {exc}")
history_ok = self._clear_history_keep_feedback()
if output_ok and history_ok:
daily["last_date"] = today
self._mark_meta_dirty()
log(f"每日清理完成: {today}")
def send_text(self, to_user: str, text: str) -> None:
if self.dry_run:
log(f"[dry-run] send text to={to_user}: {text}")
return
body = {
"MsgItem": [
{
"MsgType": 1,
"ToUserName": to_user,
"TextContent": text,
}
]
}
_, resp = self._request_json(
base_url=self.wechat_base_url,
path="/message/SendTextMessage",
method="POST",
payload=body,
key=self.auth_key,
timeout=30,
)
if resp.get("Code") != 200:
raise RuntimeError(f"SendTextMessage失败: {json.dumps(resp, ensure_ascii=False)}")
def send_image_data_uri(self, to_user: str, data_uri: str) -> None:
if self.dry_run:
log(f"[dry-run] send image to={to_user}, bytes={len(data_uri)}")
return
body = {
"MsgItem": [
{
"MsgType": 2,
"ToUserName": to_user,
"ImageContent": data_uri,
}
]
}
_, resp = self._request_json(
base_url=self.wechat_base_url,
path="/message/SendImageNewMessage",
method="POST",
payload=body,
key=self.auth_key,
timeout=60,
)
if resp.get("Code") != 200:
raise RuntimeError(f"SendImageNewMessage失败: {json.dumps(resp, ensure_ascii=False)}")
def parse_command(self, text: str) -> dict[str, str] | None:
content = str(text or "").strip()
if not content:
return None
feedback = self.parse_feedback_command(content)
if feedback is not None:
serial, note = feedback
return {"action": "feedback", "serial": serial, "payload": note}
skip_count = self.parse_skip_command(content)
if skip_count is not None:
return {"action": "set_skip", "count": str(skip_count), "payload": ""}
lower = content.lower()
if lower in {"/喜报", "喜报", "#喜报", "/喜报 help", "/喜报 帮助", "喜报帮助"}:
return {"action": "help", "payload": ""}
prefixes = ["/喜报", "喜报", "#喜报"]
for prefix in prefixes:
if content.startswith(prefix):
rest = content[len(prefix) :].strip(" :\n\t")
if not rest:
return {"action": "help", "payload": ""}
rest_lower = rest.lower()
if rest_lower in {"help", "帮助"}:
return {"action": "help", "payload": ""}
if rest.startswith("解析"):
payload = rest[2:].strip(" :\n\t")
return {"action": "parse", "payload": payload}
if rest.startswith("生成"):
payload = rest[2:].strip(" :\n\t")
return {"action": "generate", "payload": payload}
return {"action": "generate", "payload": rest}
if "#接龙" in content:
return {"action": "generate", "payload": content}
return None
def normalize_raw_text(self, payload: str) -> str:
text = str(payload or "").strip()
if not text:
return ""
if "#接龙" in text:
return text
return f"#接龙\n{text}"
def format_parse_result(self, data: dict[str, Any]) -> str:
result = data.get("result") if isinstance(data, dict) else {}
if not isinstance(result, dict):
return "解析完成。"
summary = result.get("summary") if isinstance(result.get("summary"), dict) else {}
parsed = int(summary.get("parsed") or 0)
new_count = int(summary.get("new") or 0)
duplicate = int(summary.get("duplicate") or 0)
pending = int(summary.get("insurance_pending") or 0)
return (
"解析完成\n"
f"- 识别条数: {parsed}\n"
f"- 可生成: {new_count}\n"
f"- 重复: {duplicate}\n"
f"- 待选保险年限: {pending}"
)
def format_generate_result(self, data: dict[str, Any]) -> str:
result = data.get("result") if isinstance(data, dict) else {}
summary = result.get("summary") if isinstance(result, dict) and isinstance(result.get("summary"), dict) else {}
generated = int(data.get("generated_count") or 0)
duplicate = int(summary.get("duplicate") or 0)
pending = int(summary.get("insurance_pending") or 0)
if generated <= 0:
return (
"本次没有可生成的新记录\n"
f"- 重复记录: {duplicate}\n"
f"- 待选保险年限: {pending}"
)
return f"生成完成,本次生成 {generated} 张。若有问题请发送:反馈+序号+说明"
def compose_help_text(self) -> str:
return (
"喜报机器人用法:\n"
"1) /喜报 生成 + 接龙文本\n"
"2) /喜报 解析 + 接龙文本\n"
"3) 直接发送包含 #接龙 的文本\n"
"4) 生成有误时:反馈+序号+说明\n"
"5) 发送 跳过3或 跳过(3)当天自动跳过前3条跳过0 取消\n"
"示例:\n"
"/喜报 生成\\n#接龙\\n1、营江路揽收现金10万存一年\n"
"反馈2 网点写错了\n"
"跳过2"
)
def handle_command(self, msg: dict[str, Any], command: dict[str, str]) -> None:
from_user = str(msg.get("from") or "").strip()
if not from_user:
return
action = command.get("action", "")
payload = command.get("payload", "")
if action == "feedback":
serial = str(command.get("serial") or "").strip()
if not serial:
self.send_text(from_user, "反馈格式错误,请发送:反馈+序号+说明")
return
self.handle_feedback(from_user=from_user, serial=serial, note=payload)
return
if action == "set_skip":
try:
count = int(str(command.get("count") or "0").strip() or "0")
except Exception:
self.send_text(from_user, "跳过格式错误请发送跳过3 或 跳过(3)")
return
effective = self.set_daily_skip(from_user=from_user, count=count)
if effective <= 0:
self.send_text(from_user, "已取消今日自动跳过。")
else:
self.send_text(from_user, f"已设置今日自动跳过前 {effective} 条(仅当天生效)。")
return
if action == "help":
self.send_text(from_user, self.compose_help_text())
return
raw_text = self.normalize_raw_text(payload)
if not raw_text:
self.send_text(from_user, "未检测到有效文本,请发送 /喜报 帮助 查看用法。")
return
raw_text, skipped_count, requested_skip = self.apply_daily_skip_to_raw_text(from_user=from_user, raw_text=raw_text)
endpoint = "/api/parse" if action == "parse" else "/api/generate"
start_hint = "收到,正在解析..." if action == "parse" else "收到,正在生成喜报,请稍候..."
if requested_skip > 0:
start_hint = f"{start_hint}(已跳过 {skipped_count}/{requested_skip} 条)"
self.send_text(from_user, start_hint)
status, resp = self._request_json(
base_url=self.xibao_base_url,
path=endpoint,
method="POST",
payload={"raw_text": raw_text, "save_history": True} if action == "generate" else {"raw_text": raw_text},
accept_error=True,
timeout=300,
)
if not resp.get("ok"):
err = str(resp.get("error") or resp.get("message") or f"HTTP {status}").strip()
if resp.get("error_code") == "insurance_year_required":
err = "包含保险记录但未指定 3年交/5年交。请在文本里写明年限后重试。"
self.send_text(from_user, f"处理失败: {err}")
return
self.save_feedback_context(from_user=from_user, data=resp)
if action == "parse":
self.send_text(from_user, self.format_parse_result(resp))
return
self.send_text(from_user, self.format_generate_result(resp))
images = resp.get("download_images")
if not isinstance(images, list) or not images:
return
send_list = images[: self.max_images] if self.max_images > 0 else []
for idx, item in enumerate(send_list, start=1):
download_url = str(item.get("download_url") or "").strip()
if not download_url:
continue
full_url = urllib.parse.urljoin(f"{self.xibao_base_url}/", download_url.lstrip("/"))
try:
raw, content_type = self._request_bytes(full_url, timeout=90)
b64 = base64.b64encode(raw).decode("ascii")
data_uri = f"data:{content_type};base64,{b64}"
self.send_image_data_uri(from_user, data_uri)
log(f"已回发图片 {idx}/{len(send_list)} -> {from_user}")
except Exception as exc:
self.send_text(from_user, f"{idx}张图片回发失败: {exc}")
remain = len(images) - len(send_list)
if remain > 0:
self.send_text(from_user, f"还有 {remain} 张图片未回发(已达到单次上限 {self.max_images})。")
def should_ignore_message(self, msg: dict[str, Any]) -> bool:
msg_type = str(msg.get("type") or "").strip()
from_user = str(msg.get("from") or "").strip()
to_user = str(msg.get("to") or "").strip()
content = str(msg.get("content") or "").strip()
if msg_type not in {"1", "01"}:
return True
if not from_user or not to_user or not content:
return True
if from_user == "filehelper":
return True
if self.self_wxid and from_user == self.self_wxid:
return True
if from_user.endswith("@chatroom") or to_user.endswith("@chatroom"):
return True
if self.allow_from and from_user not in self.allow_from:
return True
return False
def handle_polled_message(self, raw: Any) -> None:
msg = summarize_message(raw)
if self.should_ignore_message(msg):
return
dedupe_key = build_message_dedupe_key(msg)
if self.seen.contains(dedupe_key):
return
self.seen.add(dedupe_key)
cmd = self.parse_command(str(msg.get("content") or ""))
if not cmd:
return
log(
f"收到指令 from={msg.get('from')} id={msg.get('id')} type={msg.get('type')} "
f"content={str(msg.get('content') or '')[:120]}"
)
try:
self.handle_command(msg, cmd)
except Exception as exc:
log(f"处理指令失败: {exc}")
try:
self.send_text(str(msg.get("from") or ""), f"处理失败: {exc}")
except Exception:
pass
def poll_once(self) -> None:
self.refresh_auth_key(force=False)
if not self.auth_key:
log(f"未获取到 authKeysession: {self.session_file}),等待中...")
return
self.ensure_self_wxid()
_, resp = self._request_json(
base_url=self.wechat_base_url,
path="/message/HttpSyncMsg",
method="POST",
payload={"Count": self.sync_count},
key=self.auth_key,
timeout=30,
accept_error=False,
)
code = int(resp.get("Code") or 0)
text = str(resp.get("Text") or "").strip()
if code != 200:
if "重新登录" in text or "不存在" in text:
self.refresh_auth_key(force=True)
log(f"消息轮询返回 code={code} text={text or '-'}")
return
data = resp.get("Data")
if not isinstance(data, list) or not data:
return
for item in data:
self.handle_polled_message(item)
def run(self) -> int:
log("微信桥接机器人启动")
log(f"WeChat API: {self.wechat_base_url}")
log(f"喜报 API: {self.xibao_base_url}")
log(f"状态文件: {self.seen.path}")
log(f"元数据文件: {self.meta_file}")
log(f"每日清理时间: {self.cleanup_hour:02d}:{self.cleanup_minute:02d}")
if self.allow_from:
log(f"仅处理白名单发送者: {', '.join(sorted(self.allow_from))}")
try:
while True:
try:
self.maybe_run_daily_cleanup()
self.poll_once()
except Exception as exc:
log(f"轮询异常: {exc}")
now = time.time()
if now - self._last_state_flush >= 6:
self.seen.flush(force=False)
self._last_state_flush = now
if now - self._last_meta_flush >= 6:
self._flush_meta(force=False)
self._last_meta_flush = now
if self.once:
break
time.sleep(self.poll_interval)
except KeyboardInterrupt:
log("收到中断信号,准备退出")
finally:
self._flush_state(force=True)
return 0
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Xibao <-> WeChatPadPro bridge")
parser.add_argument("--wechat-base-url", default=DEFAULT_WCPP_BASE_URL, help="WeChatPadPro API 地址")
parser.add_argument("--wechat-auth-key", default=os.environ.get("WCPP_AUTH_KEY", ""), help="WeChat authKey")
parser.add_argument("--wechat-session-file", default=str(DEFAULT_SESSION_FILE), help="webui 会话文件路径")
parser.add_argument("--xibao-base-url", default=DEFAULT_XIBAO_BASE_URL, help="喜报服务地址")
parser.add_argument("--poll-interval", type=float, default=2.0, help="消息轮询间隔(秒)")
parser.add_argument("--sync-count", type=int, default=30, help="每次轮询拉取条数")
parser.add_argument("--max-images", type=int, default=3, help="每次最多回发图片数量")
parser.add_argument("--state-file", default=str(DEFAULT_STATE_FILE), help="消息去重状态文件")
parser.add_argument("--meta-file", default=str(DEFAULT_META_FILE), help="反馈上下文与清理状态文件")
parser.add_argument("--daily-cleanup-time", default=DEFAULT_DAILY_CLEANUP_TIME, help="每日清理时间 HH:MM")
parser.add_argument("--allow-from", default="", help="只处理这些发送者wxid逗号分隔")
parser.add_argument("--once", action="store_true", help="只执行一次轮询")
parser.add_argument("--dry-run", action="store_true", help="仅打印不真正回消息")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
bridge = WechatXibaoBridge(args)
return bridge.run()
if __name__ == "__main__":
raise SystemExit(main())