#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import hashlib import json import os import re import sys import time import urllib.error import urllib.parse import urllib.request from collections import deque from datetime import datetime from pathlib import Path from typing import Any, Iterable DEFAULT_WCPP_BASE_URL = os.environ.get("WCPP_BASE_URL", "http://127.0.0.1:18238") DEFAULT_XIBAO_BASE_URL = os.environ.get("XIBAO_BASE_URL", "http://127.0.0.1:8787") DEFAULT_SESSION_FILE = Path( os.environ.get("WCPP_SESSION_FILE", "/root/WeChatPadPro_test_20260227/webui/.session.json") ) BASE_DIR = Path(__file__).resolve().parent DEFAULT_STATE_FILE = BASE_DIR / "data" / "wechat_bridge_state.json" DEFAULT_META_FILE = BASE_DIR / "data" / "wechat_bridge_meta.json" DEFAULT_DAILY_CLEANUP_TIME = os.environ.get("XIBAO_DAILY_CLEANUP_TIME", "00:10") RELAY_SERIAL_LINE_RE = re.compile(r"^\s*\d+\s*[、,,..::)\)]\s*") def log(msg: str) -> None: stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{stamp}] {msg}", flush=True) def read_json_file(path: Path, fallback: Any) -> Any: try: if not path.exists(): return fallback return json.loads(path.read_text(encoding="utf-8")) except Exception: return fallback def write_json_file(path: Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") tmp.replace(path) def parse_hhmm(text: str) -> tuple[int, int]: val = str(text or "").strip() m = re.match(r"^(\d{1,2}):(\d{1,2})$", val) if not m: return (0, 10) hour = int(m.group(1)) minute = int(m.group(2)) if hour < 0 or hour > 23 or minute < 0 or minute > 59: return (0, 10) return (hour, minute) def flatten_wrapped_value(value: Any) -> Any: if isinstance(value, dict): for name in ("str", "string", "String", "value", "Value"): if name in value and value[name] not in (None, ""): return value[name] return value def find_first_value(data: Any, keys: Iterable[str]) -> Any: targets = {k.lower() for k in keys} queue: list[Any] = [data] while queue: current = queue.pop(0) if isinstance(current, dict): for key, val in current.items(): if key.lower() in targets: found = flatten_wrapped_value(val) if found not in (None, "", [], {}): return found queue.append(val) elif isinstance(current, list): queue.extend(current) return None def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]: add_msgs = item.get("AddMsgs") if isinstance(add_msgs, list): for msg in add_msgs: if isinstance(msg, dict): return msg return item def summarize_message(item: Any) -> dict[str, Any]: if not isinstance(item, dict): text = str(item) return { "id": "-", "type": "-", "from": "-", "to": "-", "time": "-", "content": text, "raw": item, } primary = pick_primary_message(item) msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"]) msg_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"]) from_user = find_first_value( primary, ["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"], ) to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"]) create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"]) content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"]) if content in (None, "", {}, []): content = find_first_value(primary, ["msg_source", "MsgSource"]) def _to_text(value: Any) -> str: value = flatten_wrapped_value(value) if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) if value is None: return "-" return str(value) return { "id": _to_text(msg_id), "type": _to_text(msg_type), "from": _to_text(from_user), "to": _to_text(to_user), "time": _to_text(create_time), "content": _to_text(content), "raw": item, } def build_message_dedupe_key(msg: dict[str, Any]) -> str: msg_id = str(msg.get("id") or "-") msg_type = str(msg.get("type") or "-") if msg_id not in {"", "-"}: return f"id:{msg_id}|type:{msg_type}" seed = "|".join( [ str(msg.get("from") or "-"), str(msg.get("to") or "-"), str(msg.get("time") or "-"), str(msg.get("content") or "")[:160], ] ) digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:20] return f"hash:{digest}|type:{msg_type}" class SeenStore: def __init__(self, path: Path, max_size: int = 8000) -> None: self.path = path self.max_size = max(100, int(max_size)) self._queue: deque[str] = deque() self._set: set[str] = set() self._dirty = False self.load() def load(self) -> None: raw = read_json_file(self.path, {}) seen = raw.get("seen") if isinstance(raw, dict) else [] if not isinstance(seen, list): seen = [] for item in seen: key = str(item).strip() if not key or key in self._set: continue self._queue.append(key) self._set.add(key) while len(self._queue) > self.max_size: old = self._queue.popleft() self._set.discard(old) self._dirty = False def contains(self, key: str) -> bool: return key in self._set def add(self, key: str) -> None: if not key or key in self._set: return self._queue.append(key) self._set.add(key) while len(self._queue) > self.max_size: old = self._queue.popleft() self._set.discard(old) self._dirty = True def flush(self, force: bool = False) -> None: if not force and not self._dirty: return data = { "updated_at": datetime.now().isoformat(timespec="seconds"), "seen": list(self._queue), } write_json_file(self.path, data) self._dirty = False class WechatXibaoBridge: def __init__(self, args: argparse.Namespace) -> None: self.wechat_base_url = str(args.wechat_base_url).rstrip("/") self.xibao_base_url = str(args.xibao_base_url).rstrip("/") self.auth_key = str(args.wechat_auth_key or "").strip() self.session_file = Path(args.wechat_session_file).resolve() self.sync_count = max(1, int(args.sync_count)) self.poll_interval = max(0.5, float(args.poll_interval)) self.max_images = max(0, int(args.max_images)) self.once = bool(args.once) self.dry_run = bool(args.dry_run) self.allow_from = {x.strip() for x in str(args.allow_from or "").split(",") if x.strip()} self.seen = SeenStore(Path(args.state_file).resolve()) self.meta_file = Path(args.meta_file).resolve() self.cleanup_hour, self.cleanup_minute = parse_hhmm(str(args.daily_cleanup_time)) self.explicit_auth = bool(self.auth_key) self.self_wxid = "" self.meta = self._load_meta() self.meta_dirty = False self._last_state_flush = time.time() self._last_meta_flush = time.time() def _load_meta(self) -> dict[str, Any]: raw = read_json_file(self.meta_file, {}) if not isinstance(raw, dict): raw = {} users = raw.get("users") if not isinstance(users, dict): users = {} else: normalized_users: dict[str, Any] = {} for wxid, item in users.items(): if not isinstance(item, dict): continue line_map = item.get("line_map") if not isinstance(line_map, dict): line_map = {} line_order = item.get("line_order") if not isinstance(line_order, list): line_order = [] normalized_users[str(wxid)] = { "updated_at": str(item.get("updated_at", "")), "line_map": {str(k): v for k, v in line_map.items() if isinstance(v, dict)}, "line_order": [str(x) for x in line_order if str(x).strip()], } users = normalized_users daily_cleanup = raw.get("daily_cleanup") if not isinstance(daily_cleanup, dict): daily_cleanup = {} daily_skip = raw.get("daily_skip") if not isinstance(daily_skip, dict): daily_skip = {} else: normalized_daily_skip: dict[str, Any] = {} for wxid, item in daily_skip.items(): if not isinstance(item, dict): continue count_raw = item.get("count") try: count_val = int(count_raw or 0) except Exception: count_val = 0 if count_val <= 0: continue normalized_daily_skip[str(wxid)] = { "date": str(item.get("date", "")).strip(), "count": max(1, min(count_val, 500)), "updated_at": str(item.get("updated_at", "")), } daily_skip = normalized_daily_skip return { "updated_at": str(raw.get("updated_at", "")), "users": users, "daily_cleanup": { "last_date": str(daily_cleanup.get("last_date", "")).strip(), }, "daily_skip": daily_skip, } def _mark_meta_dirty(self) -> None: self.meta_dirty = True self.meta["updated_at"] = datetime.now().isoformat(timespec="seconds") def _flush_meta(self, force: bool = False) -> None: if not force and not self.meta_dirty: return write_json_file(self.meta_file, self.meta) self.meta_dirty = False def _flush_state(self, force: bool = False) -> None: self.seen.flush(force=force) self._flush_meta(force=force) def _request_json( self, *, base_url: str, path: str, method: str, payload: dict[str, Any] | None = None, key: str | None = None, timeout: int = 30, accept_error: bool = False, ) -> tuple[int, dict[str, Any]]: url = f"{base_url.rstrip('/')}{path}" if key: joiner = "&" if "?" in url else "?" url = f"{url}{joiner}key={urllib.parse.quote(key)}" body = None headers: dict[str, str] = {} if payload is not None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=body, method=method.upper(), headers=headers) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") data = json.loads(raw) if raw else {} if not isinstance(data, dict): data = {"raw": data} return int(resp.status), data except urllib.error.HTTPError as exc: raw = exc.read().decode("utf-8", errors="replace") try: data = json.loads(raw) if raw else {} except Exception: data = {"ok": False, "error": raw or f"HTTP {exc.code}"} if not isinstance(data, dict): data = {"raw": data} if accept_error: return int(exc.code), data raise RuntimeError(f"HTTP {exc.code}: {raw[:400]}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"请求失败: {exc}") from exc def _request_bytes(self, url: str, timeout: int = 45) -> tuple[bytes, str]: req = urllib.request.Request(url, method="GET") with urllib.request.urlopen(req, timeout=timeout) as resp: data = resp.read() content_type = resp.headers.get("Content-Type", "application/octet-stream") content_type = content_type.split(";", 1)[0].strip() or "application/octet-stream" return data, content_type def _load_auth_from_session(self) -> str: raw = read_json_file(self.session_file, {}) if isinstance(raw, dict): key = str(raw.get("authKey", "")).strip() if key: return key return "" def is_key_online(self, auth_key: str) -> bool: key = str(auth_key or "").strip() if not key: return False try: _, resp = self._request_json( base_url=self.wechat_base_url, path="/login/GetLoginStatus", method="GET", key=key, timeout=12, accept_error=True, ) return int(resp.get("Code") or 0) == 200 except Exception: return False def refresh_auth_key(self, force: bool = False) -> None: if self.explicit_auth and not force: return latest = self._load_auth_from_session() if not latest: return if latest != self.auth_key: if not self.auth_key: self.auth_key = latest self.self_wxid = "" log(f"已更新 authKey: {self.auth_key}") return latest_online = self.is_key_online(latest) current_online = self.is_key_online(self.auth_key) if latest_online or not current_online: self.auth_key = latest self.self_wxid = "" log(f"已更新 authKey: {self.auth_key}") else: log("检测到新的 authKey 但未在线,继续使用当前在线会话。") def ensure_self_wxid(self) -> None: if not self.auth_key or self.self_wxid: return try: _, resp = self._request_json( base_url=self.wechat_base_url, path="/user/GetProfile", method="GET", key=self.auth_key, ) except Exception as exc: log(f"获取自身 wxid 失败: {exc}") return if resp.get("Code") != 200: return data = resp.get("Data") self_id = find_first_value(data, ["wxid", "Wxid", "userName", "UserName", "to_user_name"]) if self_id: self.self_wxid = str(self_id).strip() if self.self_wxid: log(f"已识别当前账号 wxid: {self.self_wxid}") def _extract_feedback_records(self, data: dict[str, Any]) -> list[dict[str, Any]]: if not isinstance(data, dict): return [] result = data.get("result") if not isinstance(result, dict): return [] records = result.get("records") if isinstance(records, list) and records: return [x for x in records if isinstance(x, dict)] records = result.get("new_records") if isinstance(records, list) and records: return [x for x in records if isinstance(x, dict)] return [] def save_feedback_context(self, from_user: str, data: dict[str, Any]) -> None: from_user = str(from_user or "").strip() if not from_user: return records = self._extract_feedback_records(data) if not records: return line_map: dict[str, dict[str, Any]] = {} line_order: list[str] = [] fallback_index = 0 for item in records: source_line = str(item.get("source_line") or item.get("raw_text") or "").strip() if not source_line: continue line_serial = str(item.get("line_serial") or "").strip() if not line_serial: fallback_index += 1 line_serial = str(fallback_index) if line_serial in line_map: continue line_order.append(line_serial) line_map[line_serial] = { "source_line": source_line, "record": { "source_line": source_line, "raw_text": str(item.get("raw_text") or ""), "branch": str(item.get("branch") or ""), "amount": str(item.get("amount") or ""), "type": str(item.get("type") or ""), "page": str(item.get("page") or ""), "status": str(item.get("status") or ""), "line_serial": line_serial, "line_product_index": str(item.get("line_product_index") or ""), "dedup_key": str(item.get("dedup_key") or ""), "signature_key": str(item.get("signature_key") or ""), }, } if not line_map: return users = self.meta.setdefault("users", {}) users[from_user] = { "updated_at": datetime.now().isoformat(timespec="seconds"), "line_map": line_map, "line_order": line_order, } if len(users) > 400: keys = sorted(users.keys(), key=lambda k: str(users[k].get("updated_at", ""))) for key in keys[:-300]: users.pop(key, None) self._mark_meta_dirty() def resolve_feedback_item(self, from_user: str, serial: str) -> dict[str, Any] | None: users = self.meta.get("users") if not isinstance(users, dict): return None context = users.get(str(from_user or "").strip()) if not isinstance(context, dict): return None line_map = context.get("line_map") if not isinstance(line_map, dict): return None serial_text = str(serial or "").strip() if not serial_text: return None item = line_map.get(serial_text) if isinstance(item, dict): return item line_order = context.get("line_order") if isinstance(line_order, list) and serial_text.isdigit(): idx = int(serial_text) - 1 if 0 <= idx < len(line_order): mapped_serial = str(line_order[idx]).strip() item = line_map.get(mapped_serial) if isinstance(item, dict): return item return None def parse_feedback_command(self, text: str) -> tuple[str, str] | None: content = str(text or "").strip() if not content: return None m = re.match(r"^/?反馈\s*[+::\-]?\s*(\d{1,4})(?:[\s,,;;::\-]+(.*))?$", content) if not m: return None serial = str(m.group(1)).strip() note = str(m.group(2) or "").strip() return serial, note def parse_skip_command(self, text: str) -> int | None: content = str(text or "").strip() if not content: return None m = re.match(r"^/?跳过\s*[((]?\s*(\d{1,4})\s*[))]?\s*$", content) if not m: return None return max(0, min(int(m.group(1)), 500)) def _today_text(self) -> str: return datetime.now().strftime("%Y-%m-%d") def set_daily_skip(self, from_user: str, count: int) -> int: user = str(from_user or "").strip() if not user: return 0 daily_skip = self.meta.setdefault("daily_skip", {}) if not isinstance(daily_skip, dict): daily_skip = {} self.meta["daily_skip"] = daily_skip normalized_count = max(0, min(int(count or 0), 500)) if normalized_count <= 0: if user in daily_skip: daily_skip.pop(user, None) self._mark_meta_dirty() return 0 daily_skip[user] = { "date": self._today_text(), "count": normalized_count, "updated_at": datetime.now().isoformat(timespec="seconds"), } if len(daily_skip) > 1000: keys = sorted(daily_skip.keys(), key=lambda k: str(daily_skip[k].get("updated_at", ""))) for key in keys[:-800]: daily_skip.pop(key, None) self._mark_meta_dirty() return normalized_count def get_daily_skip(self, from_user: str) -> int: user = str(from_user or "").strip() if not user: return 0 daily_skip = self.meta.get("daily_skip") if not isinstance(daily_skip, dict): return 0 item = daily_skip.get(user) if not isinstance(item, dict): return 0 if str(item.get("date") or "").strip() != self._today_text(): daily_skip.pop(user, None) self._mark_meta_dirty() return 0 try: count = int(item.get("count") or 0) except Exception: count = 0 if count <= 0: daily_skip.pop(user, None) self._mark_meta_dirty() return 0 return max(1, min(count, 500)) def apply_daily_skip_to_raw_text(self, from_user: str, raw_text: str) -> tuple[str, int, int]: requested = self.get_daily_skip(from_user) if requested <= 0: return raw_text, 0, 0 lines = str(raw_text or "").splitlines() if not lines: return raw_text, 0, requested removed = 0 keep_lines: list[str] = [] for line in lines: source = str(line) if removed < requested and RELAY_SERIAL_LINE_RE.match(source.strip()): removed += 1 continue keep_lines.append(source) normalized = "\n".join(x for x in keep_lines if str(x).strip()) if not normalized.strip(): normalized = "#接龙" return normalized, removed, requested def handle_feedback(self, from_user: str, serial: str, note: str) -> None: item = self.resolve_feedback_item(from_user=from_user, serial=serial) if not item: self.send_text(from_user, "未找到可反馈的序号,请先发送 #接龙 生成后再反馈。") return source_line = str(item.get("source_line") or "").strip() record = item.get("record") if isinstance(item.get("record"), dict) else {} note_text = str(note or "").strip() or f"用户反馈序号{serial}" mark_type = "recognition_error" if "识别" in note_text else "generation_error" if self.dry_run: log( f"[dry-run] feedback from={from_user} serial={serial} mark_type={mark_type} " f"source_line={source_line} note={note_text}" ) self.send_text(from_user, f"反馈已接收(演练模式):序号{serial}") return status, resp = self._request_json( base_url=self.xibao_base_url, path="/api/log/mark", method="POST", payload={ "mark_type": mark_type, "source_line": source_line, "note": f"{note_text} | from={from_user}", "record": record, }, accept_error=True, timeout=40, ) if not resp.get("ok"): err = str(resp.get("error") or f"HTTP {status}").strip() self.send_text(from_user, f"反馈记录失败: {err}") return issue = resp.get("issue") if isinstance(resp.get("issue"), dict) else {} issue_id = str(issue.get("id") or "").strip() suffix = f",编号 {issue_id}" if issue_id else "" self.send_text(from_user, f"反馈已记录:序号{serial}{suffix}") def _clear_history_keep_feedback(self) -> bool: history_path = "" try: _, cfg = self._request_json( base_url=self.xibao_base_url, path="/api/config", method="GET", accept_error=False, timeout=20, ) history_path = str(cfg.get("resolved_history_file") or "").strip() except Exception as exc: log(f"读取配置失败(history路径): {exc}") if history_path: p = Path(history_path) if p.exists() or p.parent.exists(): try: write_json_file(p, []) log(f"已清空历史文件: {p}") return True except Exception as exc: log(f"清空历史文件失败: {exc}") try: _, resp = self._request_json( base_url=self.xibao_base_url, path="/api/history/clear", method="POST", payload={}, accept_error=True, timeout=40, ) if resp.get("ok"): if resp.get("skipped_suppressed_cleared"): log("已清空历史(附带清空了屏蔽列表)。") else: log("已清空历史。") return True log(f"清空历史失败: {resp}") return False except Exception as exc: log(f"清空历史请求异常: {exc}") return False def maybe_run_daily_cleanup(self) -> None: now = datetime.now() today = now.strftime("%Y-%m-%d") daily = self.meta.setdefault("daily_cleanup", {}) last_date = str(daily.get("last_date") or "").strip() if last_date == today: return if (now.hour, now.minute) < (self.cleanup_hour, self.cleanup_minute): return log("开始执行每日清理:图片输出 + 历史记录(保留反馈记录)") output_ok = False history_ok = False try: _, resp = self._request_json( base_url=self.xibao_base_url, path="/api/output/clear", method="POST", payload={}, accept_error=True, timeout=120, ) output_ok = bool(resp.get("ok")) if output_ok: log("已清空输出图片。") else: log(f"清空输出图片失败: {resp}") except Exception as exc: log(f"清空输出图片异常: {exc}") history_ok = self._clear_history_keep_feedback() if output_ok and history_ok: daily["last_date"] = today self._mark_meta_dirty() log(f"每日清理完成: {today}") def send_text(self, to_user: str, text: str) -> None: if self.dry_run: log(f"[dry-run] send text to={to_user}: {text}") return body = { "MsgItem": [ { "MsgType": 1, "ToUserName": to_user, "TextContent": text, } ] } _, resp = self._request_json( base_url=self.wechat_base_url, path="/message/SendTextMessage", method="POST", payload=body, key=self.auth_key, timeout=30, ) if resp.get("Code") != 200: raise RuntimeError(f"SendTextMessage失败: {json.dumps(resp, ensure_ascii=False)}") def send_image_data_uri(self, to_user: str, data_uri: str) -> None: if self.dry_run: log(f"[dry-run] send image to={to_user}, bytes={len(data_uri)}") return body = { "MsgItem": [ { "MsgType": 2, "ToUserName": to_user, "ImageContent": data_uri, } ] } _, resp = self._request_json( base_url=self.wechat_base_url, path="/message/SendImageNewMessage", method="POST", payload=body, key=self.auth_key, timeout=60, ) if resp.get("Code") != 200: raise RuntimeError(f"SendImageNewMessage失败: {json.dumps(resp, ensure_ascii=False)}") def parse_command(self, text: str) -> dict[str, str] | None: content = str(text or "").strip() if not content: return None feedback = self.parse_feedback_command(content) if feedback is not None: serial, note = feedback return {"action": "feedback", "serial": serial, "payload": note} skip_count = self.parse_skip_command(content) if skip_count is not None: return {"action": "set_skip", "count": str(skip_count), "payload": ""} lower = content.lower() if lower in {"/喜报", "喜报", "#喜报", "/喜报 help", "/喜报 帮助", "喜报帮助"}: return {"action": "help", "payload": ""} prefixes = ["/喜报", "喜报", "#喜报"] for prefix in prefixes: if content.startswith(prefix): rest = content[len(prefix) :].strip(" ::\n\t") if not rest: return {"action": "help", "payload": ""} rest_lower = rest.lower() if rest_lower in {"help", "帮助"}: return {"action": "help", "payload": ""} if rest.startswith("解析"): payload = rest[2:].strip(" ::\n\t") return {"action": "parse", "payload": payload} if rest.startswith("生成"): payload = rest[2:].strip(" ::\n\t") return {"action": "generate", "payload": payload} return {"action": "generate", "payload": rest} if "#接龙" in content: return {"action": "generate", "payload": content} return None def normalize_raw_text(self, payload: str) -> str: text = str(payload or "").strip() if not text: return "" if "#接龙" in text: return text return f"#接龙\n{text}" def format_parse_result(self, data: dict[str, Any]) -> str: result = data.get("result") if isinstance(data, dict) else {} if not isinstance(result, dict): return "解析完成。" summary = result.get("summary") if isinstance(result.get("summary"), dict) else {} parsed = int(summary.get("parsed") or 0) new_count = int(summary.get("new") or 0) duplicate = int(summary.get("duplicate") or 0) pending = int(summary.get("insurance_pending") or 0) return ( "解析完成\n" f"- 识别条数: {parsed}\n" f"- 可生成: {new_count}\n" f"- 重复: {duplicate}\n" f"- 待选保险年限: {pending}" ) def format_generate_result(self, data: dict[str, Any]) -> str: result = data.get("result") if isinstance(data, dict) else {} summary = result.get("summary") if isinstance(result, dict) and isinstance(result.get("summary"), dict) else {} generated = int(data.get("generated_count") or 0) duplicate = int(summary.get("duplicate") or 0) pending = int(summary.get("insurance_pending") or 0) if generated <= 0: return ( "本次没有可生成的新记录\n" f"- 重复记录: {duplicate}\n" f"- 待选保险年限: {pending}" ) return f"生成完成,本次生成 {generated} 张。若有问题请发送:反馈+序号+说明" def compose_help_text(self) -> str: return ( "喜报机器人用法:\n" "1) /喜报 生成 + 接龙文本\n" "2) /喜报 解析 + 接龙文本\n" "3) 直接发送包含 #接龙 的文本\n" "4) 生成有误时:反馈+序号+说明\n" "5) 发送 跳过3(或 跳过(3)),当天自动跳过前3条;跳过0 取消\n" "示例:\n" "/喜报 生成\\n#接龙\\n1、营江路揽收现金10万存一年\n" "反馈2 网点写错了\n" "跳过2" ) def handle_command(self, msg: dict[str, Any], command: dict[str, str]) -> None: from_user = str(msg.get("from") or "").strip() if not from_user: return action = command.get("action", "") payload = command.get("payload", "") if action == "feedback": serial = str(command.get("serial") or "").strip() if not serial: self.send_text(from_user, "反馈格式错误,请发送:反馈+序号+说明") return self.handle_feedback(from_user=from_user, serial=serial, note=payload) return if action == "set_skip": try: count = int(str(command.get("count") or "0").strip() or "0") except Exception: self.send_text(from_user, "跳过格式错误,请发送:跳过3 或 跳过(3)") return effective = self.set_daily_skip(from_user=from_user, count=count) if effective <= 0: self.send_text(from_user, "已取消今日自动跳过。") else: self.send_text(from_user, f"已设置今日自动跳过前 {effective} 条(仅当天生效)。") return if action == "help": self.send_text(from_user, self.compose_help_text()) return raw_text = self.normalize_raw_text(payload) if not raw_text: self.send_text(from_user, "未检测到有效文本,请发送 /喜报 帮助 查看用法。") return raw_text, skipped_count, requested_skip = self.apply_daily_skip_to_raw_text(from_user=from_user, raw_text=raw_text) endpoint = "/api/parse" if action == "parse" else "/api/generate" start_hint = "收到,正在解析..." if action == "parse" else "收到,正在生成喜报,请稍候..." if requested_skip > 0: start_hint = f"{start_hint}(已跳过 {skipped_count}/{requested_skip} 条)" self.send_text(from_user, start_hint) status, resp = self._request_json( base_url=self.xibao_base_url, path=endpoint, method="POST", payload={"raw_text": raw_text, "save_history": True} if action == "generate" else {"raw_text": raw_text}, accept_error=True, timeout=300, ) if not resp.get("ok"): err = str(resp.get("error") or resp.get("message") or f"HTTP {status}").strip() if resp.get("error_code") == "insurance_year_required": err = "包含保险记录但未指定 3年交/5年交。请在文本里写明年限后重试。" self.send_text(from_user, f"处理失败: {err}") return self.save_feedback_context(from_user=from_user, data=resp) if action == "parse": self.send_text(from_user, self.format_parse_result(resp)) return self.send_text(from_user, self.format_generate_result(resp)) images = resp.get("download_images") if not isinstance(images, list) or not images: return send_list = images[: self.max_images] if self.max_images > 0 else [] for idx, item in enumerate(send_list, start=1): download_url = str(item.get("download_url") or "").strip() if not download_url: continue full_url = urllib.parse.urljoin(f"{self.xibao_base_url}/", download_url.lstrip("/")) try: raw, content_type = self._request_bytes(full_url, timeout=90) b64 = base64.b64encode(raw).decode("ascii") data_uri = f"data:{content_type};base64,{b64}" self.send_image_data_uri(from_user, data_uri) log(f"已回发图片 {idx}/{len(send_list)} -> {from_user}") except Exception as exc: self.send_text(from_user, f"第{idx}张图片回发失败: {exc}") remain = len(images) - len(send_list) if remain > 0: self.send_text(from_user, f"还有 {remain} 张图片未回发(已达到单次上限 {self.max_images})。") def should_ignore_message(self, msg: dict[str, Any]) -> bool: msg_type = str(msg.get("type") or "").strip() from_user = str(msg.get("from") or "").strip() to_user = str(msg.get("to") or "").strip() content = str(msg.get("content") or "").strip() if msg_type not in {"1", "01"}: return True if not from_user or not to_user or not content: return True if from_user == "filehelper": return True if self.self_wxid and from_user == self.self_wxid: return True if from_user.endswith("@chatroom") or to_user.endswith("@chatroom"): return True if self.allow_from and from_user not in self.allow_from: return True return False def handle_polled_message(self, raw: Any) -> None: msg = summarize_message(raw) if self.should_ignore_message(msg): return dedupe_key = build_message_dedupe_key(msg) if self.seen.contains(dedupe_key): return self.seen.add(dedupe_key) cmd = self.parse_command(str(msg.get("content") or "")) if not cmd: return log( f"收到指令 from={msg.get('from')} id={msg.get('id')} type={msg.get('type')} " f"content={str(msg.get('content') or '')[:120]}" ) try: self.handle_command(msg, cmd) except Exception as exc: log(f"处理指令失败: {exc}") try: self.send_text(str(msg.get("from") or ""), f"处理失败: {exc}") except Exception: pass def poll_once(self) -> None: self.refresh_auth_key(force=False) if not self.auth_key: log(f"未获取到 authKey(session: {self.session_file}),等待中...") return self.ensure_self_wxid() _, resp = self._request_json( base_url=self.wechat_base_url, path="/message/HttpSyncMsg", method="POST", payload={"Count": self.sync_count}, key=self.auth_key, timeout=30, accept_error=False, ) code = int(resp.get("Code") or 0) text = str(resp.get("Text") or "").strip() if code != 200: if "重新登录" in text or "不存在" in text: self.refresh_auth_key(force=True) log(f"消息轮询返回 code={code} text={text or '-'}") return data = resp.get("Data") if not isinstance(data, list) or not data: return for item in data: self.handle_polled_message(item) def run(self) -> int: log("微信桥接机器人启动") log(f"WeChat API: {self.wechat_base_url}") log(f"喜报 API: {self.xibao_base_url}") log(f"状态文件: {self.seen.path}") log(f"元数据文件: {self.meta_file}") log(f"每日清理时间: {self.cleanup_hour:02d}:{self.cleanup_minute:02d}") if self.allow_from: log(f"仅处理白名单发送者: {', '.join(sorted(self.allow_from))}") try: while True: try: self.maybe_run_daily_cleanup() self.poll_once() except Exception as exc: log(f"轮询异常: {exc}") now = time.time() if now - self._last_state_flush >= 6: self.seen.flush(force=False) self._last_state_flush = now if now - self._last_meta_flush >= 6: self._flush_meta(force=False) self._last_meta_flush = now if self.once: break time.sleep(self.poll_interval) except KeyboardInterrupt: log("收到中断信号,准备退出") finally: self._flush_state(force=True) return 0 def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Xibao <-> WeChatPadPro bridge") parser.add_argument("--wechat-base-url", default=DEFAULT_WCPP_BASE_URL, help="WeChatPadPro API 地址") parser.add_argument("--wechat-auth-key", default=os.environ.get("WCPP_AUTH_KEY", ""), help="WeChat authKey") parser.add_argument("--wechat-session-file", default=str(DEFAULT_SESSION_FILE), help="webui 会话文件路径") parser.add_argument("--xibao-base-url", default=DEFAULT_XIBAO_BASE_URL, help="喜报服务地址") parser.add_argument("--poll-interval", type=float, default=2.0, help="消息轮询间隔(秒)") parser.add_argument("--sync-count", type=int, default=30, help="每次轮询拉取条数") parser.add_argument("--max-images", type=int, default=3, help="每次最多回发图片数量") parser.add_argument("--state-file", default=str(DEFAULT_STATE_FILE), help="消息去重状态文件") parser.add_argument("--meta-file", default=str(DEFAULT_META_FILE), help="反馈上下文与清理状态文件") parser.add_argument("--daily-cleanup-time", default=DEFAULT_DAILY_CLEANUP_TIME, help="每日清理时间 HH:MM") parser.add_argument("--allow-from", default="", help="只处理这些发送者wxid(逗号分隔)") parser.add_argument("--once", action="store_true", help="只执行一次轮询") parser.add_argument("--dry-run", action="store_true", help="仅打印不真正回消息") return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv or sys.argv[1:]) bridge = WechatXibaoBridge(args) return bridge.run() if __name__ == "__main__": raise SystemExit(main())