1126 lines
42 KiB
Python
1126 lines
42 KiB
Python
#!/usr/bin/env python3
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import base64
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from collections import deque
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any, Iterable
|
||
|
||
|
||
DEFAULT_WCPP_BASE_URL = os.environ.get("WCPP_BASE_URL", "http://127.0.0.1:18238")
|
||
DEFAULT_XIBAO_BASE_URL = os.environ.get("XIBAO_BASE_URL", "http://127.0.0.1:8787")
|
||
DEFAULT_SESSION_FILE = Path(
|
||
os.environ.get("WCPP_SESSION_FILE", "/root/WeChatPadPro_test_20260227/webui/.session.json")
|
||
)
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
DEFAULT_STATE_FILE = BASE_DIR / "data" / "wechat_bridge_state.json"
|
||
DEFAULT_META_FILE = BASE_DIR / "data" / "wechat_bridge_meta.json"
|
||
DEFAULT_DAILY_CLEANUP_TIME = os.environ.get("XIBAO_DAILY_CLEANUP_TIME", "00:10")
|
||
RELAY_SERIAL_LINE_RE = re.compile(r"^\s*\d+\s*[、,,..::)\)]\s*")
|
||
|
||
|
||
def log(msg: str) -> None:
|
||
stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
print(f"[{stamp}] {msg}", flush=True)
|
||
|
||
|
||
def read_json_file(path: Path, fallback: Any) -> Any:
|
||
try:
|
||
if not path.exists():
|
||
return fallback
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return fallback
|
||
|
||
|
||
def write_json_file(path: Path, data: Any) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
tmp.replace(path)
|
||
|
||
|
||
def parse_hhmm(text: str) -> tuple[int, int]:
|
||
val = str(text or "").strip()
|
||
m = re.match(r"^(\d{1,2}):(\d{1,2})$", val)
|
||
if not m:
|
||
return (0, 10)
|
||
hour = int(m.group(1))
|
||
minute = int(m.group(2))
|
||
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
|
||
return (0, 10)
|
||
return (hour, minute)
|
||
|
||
|
||
def flatten_wrapped_value(value: Any) -> Any:
|
||
if isinstance(value, dict):
|
||
for name in ("str", "string", "String", "value", "Value"):
|
||
if name in value and value[name] not in (None, ""):
|
||
return value[name]
|
||
return value
|
||
|
||
|
||
def find_first_value(data: Any, keys: Iterable[str]) -> Any:
|
||
targets = {k.lower() for k in keys}
|
||
queue: list[Any] = [data]
|
||
while queue:
|
||
current = queue.pop(0)
|
||
if isinstance(current, dict):
|
||
for key, val in current.items():
|
||
if key.lower() in targets:
|
||
found = flatten_wrapped_value(val)
|
||
if found not in (None, "", [], {}):
|
||
return found
|
||
queue.append(val)
|
||
elif isinstance(current, list):
|
||
queue.extend(current)
|
||
return None
|
||
|
||
|
||
def pick_primary_message(item: dict[str, Any]) -> dict[str, Any]:
|
||
add_msgs = item.get("AddMsgs")
|
||
if isinstance(add_msgs, list):
|
||
for msg in add_msgs:
|
||
if isinstance(msg, dict):
|
||
return msg
|
||
return item
|
||
|
||
|
||
def summarize_message(item: Any) -> dict[str, Any]:
|
||
if not isinstance(item, dict):
|
||
text = str(item)
|
||
return {
|
||
"id": "-",
|
||
"type": "-",
|
||
"from": "-",
|
||
"to": "-",
|
||
"time": "-",
|
||
"content": text,
|
||
"raw": item,
|
||
}
|
||
|
||
primary = pick_primary_message(item)
|
||
msg_id = find_first_value(primary, ["msg_id", "new_msg_id", "MsgId", "NewMsgId", "ClientMsgId", "id"])
|
||
msg_type = find_first_value(primary, ["msg_type", "MsgType", "ContentType", "Type"])
|
||
from_user = find_first_value(
|
||
primary,
|
||
["from_user_name", "FromUserName", "FromWxid", "Sender", "Talker", "wxid", "userName"],
|
||
)
|
||
to_user = find_first_value(primary, ["to_user_name", "ToUserName", "ToWxid", "Receiver"])
|
||
create_time = find_first_value(primary, ["create_time", "CreateTime", "Timestamp", "Time", "MsgTime"])
|
||
content = find_first_value(primary, ["text_content", "TextContent", "content", "Content", "Message", "Text"])
|
||
if content in (None, "", {}, []):
|
||
content = find_first_value(primary, ["msg_source", "MsgSource"])
|
||
|
||
def _to_text(value: Any) -> str:
|
||
value = flatten_wrapped_value(value)
|
||
if isinstance(value, (dict, list)):
|
||
return json.dumps(value, ensure_ascii=False)
|
||
if value is None:
|
||
return "-"
|
||
return str(value)
|
||
|
||
return {
|
||
"id": _to_text(msg_id),
|
||
"type": _to_text(msg_type),
|
||
"from": _to_text(from_user),
|
||
"to": _to_text(to_user),
|
||
"time": _to_text(create_time),
|
||
"content": _to_text(content),
|
||
"raw": item,
|
||
}
|
||
|
||
|
||
def build_message_dedupe_key(msg: dict[str, Any]) -> str:
|
||
msg_id = str(msg.get("id") or "-")
|
||
msg_type = str(msg.get("type") or "-")
|
||
if msg_id not in {"", "-"}:
|
||
return f"id:{msg_id}|type:{msg_type}"
|
||
seed = "|".join(
|
||
[
|
||
str(msg.get("from") or "-"),
|
||
str(msg.get("to") or "-"),
|
||
str(msg.get("time") or "-"),
|
||
str(msg.get("content") or "")[:160],
|
||
]
|
||
)
|
||
digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:20]
|
||
return f"hash:{digest}|type:{msg_type}"
|
||
|
||
|
||
class SeenStore:
|
||
def __init__(self, path: Path, max_size: int = 8000) -> None:
|
||
self.path = path
|
||
self.max_size = max(100, int(max_size))
|
||
self._queue: deque[str] = deque()
|
||
self._set: set[str] = set()
|
||
self._dirty = False
|
||
self.load()
|
||
|
||
def load(self) -> None:
|
||
raw = read_json_file(self.path, {})
|
||
seen = raw.get("seen") if isinstance(raw, dict) else []
|
||
if not isinstance(seen, list):
|
||
seen = []
|
||
for item in seen:
|
||
key = str(item).strip()
|
||
if not key or key in self._set:
|
||
continue
|
||
self._queue.append(key)
|
||
self._set.add(key)
|
||
while len(self._queue) > self.max_size:
|
||
old = self._queue.popleft()
|
||
self._set.discard(old)
|
||
self._dirty = False
|
||
|
||
def contains(self, key: str) -> bool:
|
||
return key in self._set
|
||
|
||
def add(self, key: str) -> None:
|
||
if not key or key in self._set:
|
||
return
|
||
self._queue.append(key)
|
||
self._set.add(key)
|
||
while len(self._queue) > self.max_size:
|
||
old = self._queue.popleft()
|
||
self._set.discard(old)
|
||
self._dirty = True
|
||
|
||
def flush(self, force: bool = False) -> None:
|
||
if not force and not self._dirty:
|
||
return
|
||
data = {
|
||
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
||
"seen": list(self._queue),
|
||
}
|
||
write_json_file(self.path, data)
|
||
self._dirty = False
|
||
|
||
|
||
class WechatXibaoBridge:
|
||
def __init__(self, args: argparse.Namespace) -> None:
|
||
self.wechat_base_url = str(args.wechat_base_url).rstrip("/")
|
||
self.xibao_base_url = str(args.xibao_base_url).rstrip("/")
|
||
self.auth_key = str(args.wechat_auth_key or "").strip()
|
||
self.session_file = Path(args.wechat_session_file).resolve()
|
||
self.sync_count = max(1, int(args.sync_count))
|
||
self.poll_interval = max(0.5, float(args.poll_interval))
|
||
self.max_images = max(0, int(args.max_images))
|
||
self.once = bool(args.once)
|
||
self.dry_run = bool(args.dry_run)
|
||
self.allow_from = {x.strip() for x in str(args.allow_from or "").split(",") if x.strip()}
|
||
self.seen = SeenStore(Path(args.state_file).resolve())
|
||
self.meta_file = Path(args.meta_file).resolve()
|
||
self.cleanup_hour, self.cleanup_minute = parse_hhmm(str(args.daily_cleanup_time))
|
||
self.explicit_auth = bool(self.auth_key)
|
||
self.self_wxid = ""
|
||
self.meta = self._load_meta()
|
||
self.meta_dirty = False
|
||
self._last_state_flush = time.time()
|
||
self._last_meta_flush = time.time()
|
||
|
||
def _load_meta(self) -> dict[str, Any]:
|
||
raw = read_json_file(self.meta_file, {})
|
||
if not isinstance(raw, dict):
|
||
raw = {}
|
||
|
||
users = raw.get("users")
|
||
if not isinstance(users, dict):
|
||
users = {}
|
||
else:
|
||
normalized_users: dict[str, Any] = {}
|
||
for wxid, item in users.items():
|
||
if not isinstance(item, dict):
|
||
continue
|
||
line_map = item.get("line_map")
|
||
if not isinstance(line_map, dict):
|
||
line_map = {}
|
||
line_order = item.get("line_order")
|
||
if not isinstance(line_order, list):
|
||
line_order = []
|
||
normalized_users[str(wxid)] = {
|
||
"updated_at": str(item.get("updated_at", "")),
|
||
"line_map": {str(k): v for k, v in line_map.items() if isinstance(v, dict)},
|
||
"line_order": [str(x) for x in line_order if str(x).strip()],
|
||
}
|
||
users = normalized_users
|
||
|
||
daily_cleanup = raw.get("daily_cleanup")
|
||
if not isinstance(daily_cleanup, dict):
|
||
daily_cleanup = {}
|
||
|
||
daily_skip = raw.get("daily_skip")
|
||
if not isinstance(daily_skip, dict):
|
||
daily_skip = {}
|
||
else:
|
||
normalized_daily_skip: dict[str, Any] = {}
|
||
for wxid, item in daily_skip.items():
|
||
if not isinstance(item, dict):
|
||
continue
|
||
count_raw = item.get("count")
|
||
try:
|
||
count_val = int(count_raw or 0)
|
||
except Exception:
|
||
count_val = 0
|
||
if count_val <= 0:
|
||
continue
|
||
normalized_daily_skip[str(wxid)] = {
|
||
"date": str(item.get("date", "")).strip(),
|
||
"count": max(1, min(count_val, 500)),
|
||
"updated_at": str(item.get("updated_at", "")),
|
||
}
|
||
daily_skip = normalized_daily_skip
|
||
|
||
return {
|
||
"updated_at": str(raw.get("updated_at", "")),
|
||
"users": users,
|
||
"daily_cleanup": {
|
||
"last_date": str(daily_cleanup.get("last_date", "")).strip(),
|
||
},
|
||
"daily_skip": daily_skip,
|
||
}
|
||
|
||
def _mark_meta_dirty(self) -> None:
|
||
self.meta_dirty = True
|
||
self.meta["updated_at"] = datetime.now().isoformat(timespec="seconds")
|
||
|
||
def _flush_meta(self, force: bool = False) -> None:
|
||
if not force and not self.meta_dirty:
|
||
return
|
||
write_json_file(self.meta_file, self.meta)
|
||
self.meta_dirty = False
|
||
|
||
def _flush_state(self, force: bool = False) -> None:
|
||
self.seen.flush(force=force)
|
||
self._flush_meta(force=force)
|
||
|
||
def _request_json(
|
||
self,
|
||
*,
|
||
base_url: str,
|
||
path: str,
|
||
method: str,
|
||
payload: dict[str, Any] | None = None,
|
||
key: str | None = None,
|
||
timeout: int = 30,
|
||
accept_error: bool = False,
|
||
) -> tuple[int, dict[str, Any]]:
|
||
url = f"{base_url.rstrip('/')}{path}"
|
||
if key:
|
||
joiner = "&" if "?" in url else "?"
|
||
url = f"{url}{joiner}key={urllib.parse.quote(key)}"
|
||
|
||
body = None
|
||
headers: dict[str, str] = {}
|
||
if payload is not None:
|
||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
headers["Content-Type"] = "application/json"
|
||
|
||
req = urllib.request.Request(url, data=body, method=method.upper(), headers=headers)
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
data = json.loads(raw) if raw else {}
|
||
if not isinstance(data, dict):
|
||
data = {"raw": data}
|
||
return int(resp.status), data
|
||
except urllib.error.HTTPError as exc:
|
||
raw = exc.read().decode("utf-8", errors="replace")
|
||
try:
|
||
data = json.loads(raw) if raw else {}
|
||
except Exception:
|
||
data = {"ok": False, "error": raw or f"HTTP {exc.code}"}
|
||
if not isinstance(data, dict):
|
||
data = {"raw": data}
|
||
if accept_error:
|
||
return int(exc.code), data
|
||
raise RuntimeError(f"HTTP {exc.code}: {raw[:400]}") from exc
|
||
except urllib.error.URLError as exc:
|
||
raise RuntimeError(f"请求失败: {exc}") from exc
|
||
|
||
def _request_bytes(self, url: str, timeout: int = 45) -> tuple[bytes, str]:
|
||
req = urllib.request.Request(url, method="GET")
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
data = resp.read()
|
||
content_type = resp.headers.get("Content-Type", "application/octet-stream")
|
||
content_type = content_type.split(";", 1)[0].strip() or "application/octet-stream"
|
||
return data, content_type
|
||
|
||
def _load_auth_from_session(self) -> str:
|
||
raw = read_json_file(self.session_file, {})
|
||
if isinstance(raw, dict):
|
||
key = str(raw.get("authKey", "")).strip()
|
||
if key:
|
||
return key
|
||
return ""
|
||
|
||
def is_key_online(self, auth_key: str) -> bool:
|
||
key = str(auth_key or "").strip()
|
||
if not key:
|
||
return False
|
||
try:
|
||
_, resp = self._request_json(
|
||
base_url=self.wechat_base_url,
|
||
path="/login/GetLoginStatus",
|
||
method="GET",
|
||
key=key,
|
||
timeout=12,
|
||
accept_error=True,
|
||
)
|
||
return int(resp.get("Code") or 0) == 200
|
||
except Exception:
|
||
return False
|
||
|
||
def refresh_auth_key(self, force: bool = False) -> None:
|
||
if self.explicit_auth and not force:
|
||
return
|
||
latest = self._load_auth_from_session()
|
||
if not latest:
|
||
return
|
||
if latest != self.auth_key:
|
||
if not self.auth_key:
|
||
self.auth_key = latest
|
||
self.self_wxid = ""
|
||
log(f"已更新 authKey: {self.auth_key}")
|
||
return
|
||
|
||
latest_online = self.is_key_online(latest)
|
||
current_online = self.is_key_online(self.auth_key)
|
||
if latest_online or not current_online:
|
||
self.auth_key = latest
|
||
self.self_wxid = ""
|
||
log(f"已更新 authKey: {self.auth_key}")
|
||
else:
|
||
log("检测到新的 authKey 但未在线,继续使用当前在线会话。")
|
||
|
||
def ensure_self_wxid(self) -> None:
|
||
if not self.auth_key or self.self_wxid:
|
||
return
|
||
try:
|
||
_, resp = self._request_json(
|
||
base_url=self.wechat_base_url,
|
||
path="/user/GetProfile",
|
||
method="GET",
|
||
key=self.auth_key,
|
||
)
|
||
except Exception as exc:
|
||
log(f"获取自身 wxid 失败: {exc}")
|
||
return
|
||
if resp.get("Code") != 200:
|
||
return
|
||
data = resp.get("Data")
|
||
self_id = find_first_value(data, ["wxid", "Wxid", "userName", "UserName", "to_user_name"])
|
||
if self_id:
|
||
self.self_wxid = str(self_id).strip()
|
||
if self.self_wxid:
|
||
log(f"已识别当前账号 wxid: {self.self_wxid}")
|
||
|
||
def _extract_feedback_records(self, data: dict[str, Any]) -> list[dict[str, Any]]:
|
||
if not isinstance(data, dict):
|
||
return []
|
||
result = data.get("result")
|
||
if not isinstance(result, dict):
|
||
return []
|
||
records = result.get("records")
|
||
if isinstance(records, list) and records:
|
||
return [x for x in records if isinstance(x, dict)]
|
||
records = result.get("new_records")
|
||
if isinstance(records, list) and records:
|
||
return [x for x in records if isinstance(x, dict)]
|
||
return []
|
||
|
||
def save_feedback_context(self, from_user: str, data: dict[str, Any]) -> None:
|
||
from_user = str(from_user or "").strip()
|
||
if not from_user:
|
||
return
|
||
records = self._extract_feedback_records(data)
|
||
if not records:
|
||
return
|
||
|
||
line_map: dict[str, dict[str, Any]] = {}
|
||
line_order: list[str] = []
|
||
fallback_index = 0
|
||
for item in records:
|
||
source_line = str(item.get("source_line") or item.get("raw_text") or "").strip()
|
||
if not source_line:
|
||
continue
|
||
line_serial = str(item.get("line_serial") or "").strip()
|
||
if not line_serial:
|
||
fallback_index += 1
|
||
line_serial = str(fallback_index)
|
||
if line_serial in line_map:
|
||
continue
|
||
line_order.append(line_serial)
|
||
line_map[line_serial] = {
|
||
"source_line": source_line,
|
||
"record": {
|
||
"source_line": source_line,
|
||
"raw_text": str(item.get("raw_text") or ""),
|
||
"branch": str(item.get("branch") or ""),
|
||
"amount": str(item.get("amount") or ""),
|
||
"type": str(item.get("type") or ""),
|
||
"page": str(item.get("page") or ""),
|
||
"status": str(item.get("status") or ""),
|
||
"line_serial": line_serial,
|
||
"line_product_index": str(item.get("line_product_index") or ""),
|
||
"dedup_key": str(item.get("dedup_key") or ""),
|
||
"signature_key": str(item.get("signature_key") or ""),
|
||
},
|
||
}
|
||
|
||
if not line_map:
|
||
return
|
||
|
||
users = self.meta.setdefault("users", {})
|
||
users[from_user] = {
|
||
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
||
"line_map": line_map,
|
||
"line_order": line_order,
|
||
}
|
||
if len(users) > 400:
|
||
keys = sorted(users.keys(), key=lambda k: str(users[k].get("updated_at", "")))
|
||
for key in keys[:-300]:
|
||
users.pop(key, None)
|
||
|
||
self._mark_meta_dirty()
|
||
|
||
def resolve_feedback_item(self, from_user: str, serial: str) -> dict[str, Any] | None:
|
||
users = self.meta.get("users")
|
||
if not isinstance(users, dict):
|
||
return None
|
||
context = users.get(str(from_user or "").strip())
|
||
if not isinstance(context, dict):
|
||
return None
|
||
line_map = context.get("line_map")
|
||
if not isinstance(line_map, dict):
|
||
return None
|
||
serial_text = str(serial or "").strip()
|
||
if not serial_text:
|
||
return None
|
||
|
||
item = line_map.get(serial_text)
|
||
if isinstance(item, dict):
|
||
return item
|
||
|
||
line_order = context.get("line_order")
|
||
if isinstance(line_order, list) and serial_text.isdigit():
|
||
idx = int(serial_text) - 1
|
||
if 0 <= idx < len(line_order):
|
||
mapped_serial = str(line_order[idx]).strip()
|
||
item = line_map.get(mapped_serial)
|
||
if isinstance(item, dict):
|
||
return item
|
||
return None
|
||
|
||
def parse_feedback_command(self, text: str) -> tuple[str, str] | None:
|
||
content = str(text or "").strip()
|
||
if not content:
|
||
return None
|
||
m = re.match(r"^/?反馈\s*[+::\-]?\s*(\d{1,4})(?:[\s,,;;::\-]+(.*))?$", content)
|
||
if not m:
|
||
return None
|
||
serial = str(m.group(1)).strip()
|
||
note = str(m.group(2) or "").strip()
|
||
return serial, note
|
||
|
||
def parse_skip_command(self, text: str) -> int | None:
|
||
content = str(text or "").strip()
|
||
if not content:
|
||
return None
|
||
m = re.match(r"^/?跳过\s*[((]?\s*(\d{1,4})\s*[))]?\s*$", content)
|
||
if not m:
|
||
return None
|
||
return max(0, min(int(m.group(1)), 500))
|
||
|
||
def _today_text(self) -> str:
|
||
return datetime.now().strftime("%Y-%m-%d")
|
||
|
||
def set_daily_skip(self, from_user: str, count: int) -> int:
|
||
user = str(from_user or "").strip()
|
||
if not user:
|
||
return 0
|
||
|
||
daily_skip = self.meta.setdefault("daily_skip", {})
|
||
if not isinstance(daily_skip, dict):
|
||
daily_skip = {}
|
||
self.meta["daily_skip"] = daily_skip
|
||
|
||
normalized_count = max(0, min(int(count or 0), 500))
|
||
if normalized_count <= 0:
|
||
if user in daily_skip:
|
||
daily_skip.pop(user, None)
|
||
self._mark_meta_dirty()
|
||
return 0
|
||
|
||
daily_skip[user] = {
|
||
"date": self._today_text(),
|
||
"count": normalized_count,
|
||
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
||
}
|
||
if len(daily_skip) > 1000:
|
||
keys = sorted(daily_skip.keys(), key=lambda k: str(daily_skip[k].get("updated_at", "")))
|
||
for key in keys[:-800]:
|
||
daily_skip.pop(key, None)
|
||
|
||
self._mark_meta_dirty()
|
||
return normalized_count
|
||
|
||
def get_daily_skip(self, from_user: str) -> int:
|
||
user = str(from_user or "").strip()
|
||
if not user:
|
||
return 0
|
||
daily_skip = self.meta.get("daily_skip")
|
||
if not isinstance(daily_skip, dict):
|
||
return 0
|
||
item = daily_skip.get(user)
|
||
if not isinstance(item, dict):
|
||
return 0
|
||
|
||
if str(item.get("date") or "").strip() != self._today_text():
|
||
daily_skip.pop(user, None)
|
||
self._mark_meta_dirty()
|
||
return 0
|
||
|
||
try:
|
||
count = int(item.get("count") or 0)
|
||
except Exception:
|
||
count = 0
|
||
if count <= 0:
|
||
daily_skip.pop(user, None)
|
||
self._mark_meta_dirty()
|
||
return 0
|
||
return max(1, min(count, 500))
|
||
|
||
def apply_daily_skip_to_raw_text(self, from_user: str, raw_text: str) -> tuple[str, int, int]:
|
||
requested = self.get_daily_skip(from_user)
|
||
if requested <= 0:
|
||
return raw_text, 0, 0
|
||
|
||
lines = str(raw_text or "").splitlines()
|
||
if not lines:
|
||
return raw_text, 0, requested
|
||
|
||
removed = 0
|
||
keep_lines: list[str] = []
|
||
for line in lines:
|
||
source = str(line)
|
||
if removed < requested and RELAY_SERIAL_LINE_RE.match(source.strip()):
|
||
removed += 1
|
||
continue
|
||
keep_lines.append(source)
|
||
|
||
normalized = "\n".join(x for x in keep_lines if str(x).strip())
|
||
if not normalized.strip():
|
||
normalized = "#接龙"
|
||
return normalized, removed, requested
|
||
|
||
def handle_feedback(self, from_user: str, serial: str, note: str) -> None:
|
||
item = self.resolve_feedback_item(from_user=from_user, serial=serial)
|
||
if not item:
|
||
self.send_text(from_user, "未找到可反馈的序号,请先发送 #接龙 生成后再反馈。")
|
||
return
|
||
|
||
source_line = str(item.get("source_line") or "").strip()
|
||
record = item.get("record") if isinstance(item.get("record"), dict) else {}
|
||
note_text = str(note or "").strip() or f"用户反馈序号{serial}"
|
||
mark_type = "recognition_error" if "识别" in note_text else "generation_error"
|
||
|
||
if self.dry_run:
|
||
log(
|
||
f"[dry-run] feedback from={from_user} serial={serial} mark_type={mark_type} "
|
||
f"source_line={source_line} note={note_text}"
|
||
)
|
||
self.send_text(from_user, f"反馈已接收(演练模式):序号{serial}")
|
||
return
|
||
|
||
status, resp = self._request_json(
|
||
base_url=self.xibao_base_url,
|
||
path="/api/log/mark",
|
||
method="POST",
|
||
payload={
|
||
"mark_type": mark_type,
|
||
"source_line": source_line,
|
||
"note": f"{note_text} | from={from_user}",
|
||
"record": record,
|
||
},
|
||
accept_error=True,
|
||
timeout=40,
|
||
)
|
||
if not resp.get("ok"):
|
||
err = str(resp.get("error") or f"HTTP {status}").strip()
|
||
self.send_text(from_user, f"反馈记录失败: {err}")
|
||
return
|
||
|
||
issue = resp.get("issue") if isinstance(resp.get("issue"), dict) else {}
|
||
issue_id = str(issue.get("id") or "").strip()
|
||
suffix = f",编号 {issue_id}" if issue_id else ""
|
||
self.send_text(from_user, f"反馈已记录:序号{serial}{suffix}")
|
||
|
||
def _clear_history_keep_feedback(self) -> bool:
|
||
history_path = ""
|
||
try:
|
||
_, cfg = self._request_json(
|
||
base_url=self.xibao_base_url,
|
||
path="/api/config",
|
||
method="GET",
|
||
accept_error=False,
|
||
timeout=20,
|
||
)
|
||
history_path = str(cfg.get("resolved_history_file") or "").strip()
|
||
except Exception as exc:
|
||
log(f"读取配置失败(history路径): {exc}")
|
||
|
||
if history_path:
|
||
p = Path(history_path)
|
||
if p.exists() or p.parent.exists():
|
||
try:
|
||
write_json_file(p, [])
|
||
log(f"已清空历史文件: {p}")
|
||
return True
|
||
except Exception as exc:
|
||
log(f"清空历史文件失败: {exc}")
|
||
|
||
try:
|
||
_, resp = self._request_json(
|
||
base_url=self.xibao_base_url,
|
||
path="/api/history/clear",
|
||
method="POST",
|
||
payload={},
|
||
accept_error=True,
|
||
timeout=40,
|
||
)
|
||
if resp.get("ok"):
|
||
if resp.get("skipped_suppressed_cleared"):
|
||
log("已清空历史(附带清空了屏蔽列表)。")
|
||
else:
|
||
log("已清空历史。")
|
||
return True
|
||
log(f"清空历史失败: {resp}")
|
||
return False
|
||
except Exception as exc:
|
||
log(f"清空历史请求异常: {exc}")
|
||
return False
|
||
|
||
def maybe_run_daily_cleanup(self) -> None:
|
||
now = datetime.now()
|
||
today = now.strftime("%Y-%m-%d")
|
||
daily = self.meta.setdefault("daily_cleanup", {})
|
||
last_date = str(daily.get("last_date") or "").strip()
|
||
if last_date == today:
|
||
return
|
||
if (now.hour, now.minute) < (self.cleanup_hour, self.cleanup_minute):
|
||
return
|
||
|
||
log("开始执行每日清理:图片输出 + 历史记录(保留反馈记录)")
|
||
output_ok = False
|
||
history_ok = False
|
||
|
||
try:
|
||
_, resp = self._request_json(
|
||
base_url=self.xibao_base_url,
|
||
path="/api/output/clear",
|
||
method="POST",
|
||
payload={},
|
||
accept_error=True,
|
||
timeout=120,
|
||
)
|
||
output_ok = bool(resp.get("ok"))
|
||
if output_ok:
|
||
log("已清空输出图片。")
|
||
else:
|
||
log(f"清空输出图片失败: {resp}")
|
||
except Exception as exc:
|
||
log(f"清空输出图片异常: {exc}")
|
||
|
||
history_ok = self._clear_history_keep_feedback()
|
||
|
||
if output_ok and history_ok:
|
||
daily["last_date"] = today
|
||
self._mark_meta_dirty()
|
||
log(f"每日清理完成: {today}")
|
||
|
||
def send_text(self, to_user: str, text: str) -> None:
|
||
if self.dry_run:
|
||
log(f"[dry-run] send text to={to_user}: {text}")
|
||
return
|
||
body = {
|
||
"MsgItem": [
|
||
{
|
||
"MsgType": 1,
|
||
"ToUserName": to_user,
|
||
"TextContent": text,
|
||
}
|
||
]
|
||
}
|
||
_, resp = self._request_json(
|
||
base_url=self.wechat_base_url,
|
||
path="/message/SendTextMessage",
|
||
method="POST",
|
||
payload=body,
|
||
key=self.auth_key,
|
||
timeout=30,
|
||
)
|
||
if resp.get("Code") != 200:
|
||
raise RuntimeError(f"SendTextMessage失败: {json.dumps(resp, ensure_ascii=False)}")
|
||
|
||
def send_image_data_uri(self, to_user: str, data_uri: str) -> None:
|
||
if self.dry_run:
|
||
log(f"[dry-run] send image to={to_user}, bytes={len(data_uri)}")
|
||
return
|
||
body = {
|
||
"MsgItem": [
|
||
{
|
||
"MsgType": 2,
|
||
"ToUserName": to_user,
|
||
"ImageContent": data_uri,
|
||
}
|
||
]
|
||
}
|
||
_, resp = self._request_json(
|
||
base_url=self.wechat_base_url,
|
||
path="/message/SendImageNewMessage",
|
||
method="POST",
|
||
payload=body,
|
||
key=self.auth_key,
|
||
timeout=60,
|
||
)
|
||
if resp.get("Code") != 200:
|
||
raise RuntimeError(f"SendImageNewMessage失败: {json.dumps(resp, ensure_ascii=False)}")
|
||
|
||
def parse_command(self, text: str) -> dict[str, str] | None:
|
||
content = str(text or "").strip()
|
||
if not content:
|
||
return None
|
||
|
||
feedback = self.parse_feedback_command(content)
|
||
if feedback is not None:
|
||
serial, note = feedback
|
||
return {"action": "feedback", "serial": serial, "payload": note}
|
||
|
||
skip_count = self.parse_skip_command(content)
|
||
if skip_count is not None:
|
||
return {"action": "set_skip", "count": str(skip_count), "payload": ""}
|
||
|
||
lower = content.lower()
|
||
if lower in {"/喜报", "喜报", "#喜报", "/喜报 help", "/喜报 帮助", "喜报帮助"}:
|
||
return {"action": "help", "payload": ""}
|
||
|
||
prefixes = ["/喜报", "喜报", "#喜报"]
|
||
for prefix in prefixes:
|
||
if content.startswith(prefix):
|
||
rest = content[len(prefix) :].strip(" ::\n\t")
|
||
if not rest:
|
||
return {"action": "help", "payload": ""}
|
||
rest_lower = rest.lower()
|
||
if rest_lower in {"help", "帮助"}:
|
||
return {"action": "help", "payload": ""}
|
||
if rest.startswith("解析"):
|
||
payload = rest[2:].strip(" ::\n\t")
|
||
return {"action": "parse", "payload": payload}
|
||
if rest.startswith("生成"):
|
||
payload = rest[2:].strip(" ::\n\t")
|
||
return {"action": "generate", "payload": payload}
|
||
return {"action": "generate", "payload": rest}
|
||
|
||
if "#接龙" in content:
|
||
return {"action": "generate", "payload": content}
|
||
|
||
return None
|
||
|
||
def normalize_raw_text(self, payload: str) -> str:
|
||
text = str(payload or "").strip()
|
||
if not text:
|
||
return ""
|
||
if "#接龙" in text:
|
||
return text
|
||
return f"#接龙\n{text}"
|
||
|
||
def format_parse_result(self, data: dict[str, Any]) -> str:
|
||
result = data.get("result") if isinstance(data, dict) else {}
|
||
if not isinstance(result, dict):
|
||
return "解析完成。"
|
||
summary = result.get("summary") if isinstance(result.get("summary"), dict) else {}
|
||
parsed = int(summary.get("parsed") or 0)
|
||
new_count = int(summary.get("new") or 0)
|
||
duplicate = int(summary.get("duplicate") or 0)
|
||
pending = int(summary.get("insurance_pending") or 0)
|
||
return (
|
||
"解析完成\n"
|
||
f"- 识别条数: {parsed}\n"
|
||
f"- 可生成: {new_count}\n"
|
||
f"- 重复: {duplicate}\n"
|
||
f"- 待选保险年限: {pending}"
|
||
)
|
||
|
||
def format_generate_result(self, data: dict[str, Any]) -> str:
|
||
result = data.get("result") if isinstance(data, dict) else {}
|
||
summary = result.get("summary") if isinstance(result, dict) and isinstance(result.get("summary"), dict) else {}
|
||
generated = int(data.get("generated_count") or 0)
|
||
duplicate = int(summary.get("duplicate") or 0)
|
||
pending = int(summary.get("insurance_pending") or 0)
|
||
if generated <= 0:
|
||
return (
|
||
"本次没有可生成的新记录\n"
|
||
f"- 重复记录: {duplicate}\n"
|
||
f"- 待选保险年限: {pending}"
|
||
)
|
||
return f"生成完成,本次生成 {generated} 张。若有问题请发送:反馈+序号+说明"
|
||
|
||
def compose_help_text(self) -> str:
|
||
return (
|
||
"喜报机器人用法:\n"
|
||
"1) /喜报 生成 + 接龙文本\n"
|
||
"2) /喜报 解析 + 接龙文本\n"
|
||
"3) 直接发送包含 #接龙 的文本\n"
|
||
"4) 生成有误时:反馈+序号+说明\n"
|
||
"5) 发送 跳过3(或 跳过(3)),当天自动跳过前3条;跳过0 取消\n"
|
||
"示例:\n"
|
||
"/喜报 生成\\n#接龙\\n1、营江路揽收现金10万存一年\n"
|
||
"反馈2 网点写错了\n"
|
||
"跳过2"
|
||
)
|
||
|
||
def handle_command(self, msg: dict[str, Any], command: dict[str, str]) -> None:
|
||
from_user = str(msg.get("from") or "").strip()
|
||
if not from_user:
|
||
return
|
||
|
||
action = command.get("action", "")
|
||
payload = command.get("payload", "")
|
||
if action == "feedback":
|
||
serial = str(command.get("serial") or "").strip()
|
||
if not serial:
|
||
self.send_text(from_user, "反馈格式错误,请发送:反馈+序号+说明")
|
||
return
|
||
self.handle_feedback(from_user=from_user, serial=serial, note=payload)
|
||
return
|
||
|
||
if action == "set_skip":
|
||
try:
|
||
count = int(str(command.get("count") or "0").strip() or "0")
|
||
except Exception:
|
||
self.send_text(from_user, "跳过格式错误,请发送:跳过3 或 跳过(3)")
|
||
return
|
||
effective = self.set_daily_skip(from_user=from_user, count=count)
|
||
if effective <= 0:
|
||
self.send_text(from_user, "已取消今日自动跳过。")
|
||
else:
|
||
self.send_text(from_user, f"已设置今日自动跳过前 {effective} 条(仅当天生效)。")
|
||
return
|
||
|
||
if action == "help":
|
||
self.send_text(from_user, self.compose_help_text())
|
||
return
|
||
|
||
raw_text = self.normalize_raw_text(payload)
|
||
if not raw_text:
|
||
self.send_text(from_user, "未检测到有效文本,请发送 /喜报 帮助 查看用法。")
|
||
return
|
||
raw_text, skipped_count, requested_skip = self.apply_daily_skip_to_raw_text(from_user=from_user, raw_text=raw_text)
|
||
|
||
endpoint = "/api/parse" if action == "parse" else "/api/generate"
|
||
start_hint = "收到,正在解析..." if action == "parse" else "收到,正在生成喜报,请稍候..."
|
||
if requested_skip > 0:
|
||
start_hint = f"{start_hint}(已跳过 {skipped_count}/{requested_skip} 条)"
|
||
self.send_text(from_user, start_hint)
|
||
|
||
status, resp = self._request_json(
|
||
base_url=self.xibao_base_url,
|
||
path=endpoint,
|
||
method="POST",
|
||
payload={"raw_text": raw_text, "save_history": True} if action == "generate" else {"raw_text": raw_text},
|
||
accept_error=True,
|
||
timeout=300,
|
||
)
|
||
|
||
if not resp.get("ok"):
|
||
err = str(resp.get("error") or resp.get("message") or f"HTTP {status}").strip()
|
||
if resp.get("error_code") == "insurance_year_required":
|
||
err = "包含保险记录但未指定 3年交/5年交。请在文本里写明年限后重试。"
|
||
self.send_text(from_user, f"处理失败: {err}")
|
||
return
|
||
|
||
self.save_feedback_context(from_user=from_user, data=resp)
|
||
|
||
if action == "parse":
|
||
self.send_text(from_user, self.format_parse_result(resp))
|
||
return
|
||
|
||
self.send_text(from_user, self.format_generate_result(resp))
|
||
|
||
images = resp.get("download_images")
|
||
if not isinstance(images, list) or not images:
|
||
return
|
||
|
||
send_list = images[: self.max_images] if self.max_images > 0 else []
|
||
for idx, item in enumerate(send_list, start=1):
|
||
download_url = str(item.get("download_url") or "").strip()
|
||
if not download_url:
|
||
continue
|
||
full_url = urllib.parse.urljoin(f"{self.xibao_base_url}/", download_url.lstrip("/"))
|
||
try:
|
||
raw, content_type = self._request_bytes(full_url, timeout=90)
|
||
b64 = base64.b64encode(raw).decode("ascii")
|
||
data_uri = f"data:{content_type};base64,{b64}"
|
||
self.send_image_data_uri(from_user, data_uri)
|
||
log(f"已回发图片 {idx}/{len(send_list)} -> {from_user}")
|
||
except Exception as exc:
|
||
self.send_text(from_user, f"第{idx}张图片回发失败: {exc}")
|
||
|
||
remain = len(images) - len(send_list)
|
||
if remain > 0:
|
||
self.send_text(from_user, f"还有 {remain} 张图片未回发(已达到单次上限 {self.max_images})。")
|
||
|
||
def should_ignore_message(self, msg: dict[str, Any]) -> bool:
|
||
msg_type = str(msg.get("type") or "").strip()
|
||
from_user = str(msg.get("from") or "").strip()
|
||
to_user = str(msg.get("to") or "").strip()
|
||
content = str(msg.get("content") or "").strip()
|
||
|
||
if msg_type not in {"1", "01"}:
|
||
return True
|
||
if not from_user or not to_user or not content:
|
||
return True
|
||
if from_user == "filehelper":
|
||
return True
|
||
if self.self_wxid and from_user == self.self_wxid:
|
||
return True
|
||
if from_user.endswith("@chatroom") or to_user.endswith("@chatroom"):
|
||
return True
|
||
if self.allow_from and from_user not in self.allow_from:
|
||
return True
|
||
return False
|
||
|
||
def handle_polled_message(self, raw: Any) -> None:
|
||
msg = summarize_message(raw)
|
||
if self.should_ignore_message(msg):
|
||
return
|
||
|
||
dedupe_key = build_message_dedupe_key(msg)
|
||
if self.seen.contains(dedupe_key):
|
||
return
|
||
self.seen.add(dedupe_key)
|
||
|
||
cmd = self.parse_command(str(msg.get("content") or ""))
|
||
if not cmd:
|
||
return
|
||
|
||
log(
|
||
f"收到指令 from={msg.get('from')} id={msg.get('id')} type={msg.get('type')} "
|
||
f"content={str(msg.get('content') or '')[:120]}"
|
||
)
|
||
try:
|
||
self.handle_command(msg, cmd)
|
||
except Exception as exc:
|
||
log(f"处理指令失败: {exc}")
|
||
try:
|
||
self.send_text(str(msg.get("from") or ""), f"处理失败: {exc}")
|
||
except Exception:
|
||
pass
|
||
|
||
def poll_once(self) -> None:
|
||
self.refresh_auth_key(force=False)
|
||
if not self.auth_key:
|
||
log(f"未获取到 authKey(session: {self.session_file}),等待中...")
|
||
return
|
||
self.ensure_self_wxid()
|
||
|
||
_, resp = self._request_json(
|
||
base_url=self.wechat_base_url,
|
||
path="/message/HttpSyncMsg",
|
||
method="POST",
|
||
payload={"Count": self.sync_count},
|
||
key=self.auth_key,
|
||
timeout=30,
|
||
accept_error=False,
|
||
)
|
||
|
||
code = int(resp.get("Code") or 0)
|
||
text = str(resp.get("Text") or "").strip()
|
||
if code != 200:
|
||
if "重新登录" in text or "不存在" in text:
|
||
self.refresh_auth_key(force=True)
|
||
log(f"消息轮询返回 code={code} text={text or '-'}")
|
||
return
|
||
|
||
data = resp.get("Data")
|
||
if not isinstance(data, list) or not data:
|
||
return
|
||
|
||
for item in data:
|
||
self.handle_polled_message(item)
|
||
|
||
def run(self) -> int:
|
||
log("微信桥接机器人启动")
|
||
log(f"WeChat API: {self.wechat_base_url}")
|
||
log(f"喜报 API: {self.xibao_base_url}")
|
||
log(f"状态文件: {self.seen.path}")
|
||
log(f"元数据文件: {self.meta_file}")
|
||
log(f"每日清理时间: {self.cleanup_hour:02d}:{self.cleanup_minute:02d}")
|
||
if self.allow_from:
|
||
log(f"仅处理白名单发送者: {', '.join(sorted(self.allow_from))}")
|
||
|
||
try:
|
||
while True:
|
||
try:
|
||
self.maybe_run_daily_cleanup()
|
||
self.poll_once()
|
||
except Exception as exc:
|
||
log(f"轮询异常: {exc}")
|
||
|
||
now = time.time()
|
||
if now - self._last_state_flush >= 6:
|
||
self.seen.flush(force=False)
|
||
self._last_state_flush = now
|
||
if now - self._last_meta_flush >= 6:
|
||
self._flush_meta(force=False)
|
||
self._last_meta_flush = now
|
||
|
||
if self.once:
|
||
break
|
||
time.sleep(self.poll_interval)
|
||
except KeyboardInterrupt:
|
||
log("收到中断信号,准备退出")
|
||
finally:
|
||
self._flush_state(force=True)
|
||
return 0
|
||
|
||
|
||
def parse_args(argv: list[str]) -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description="Xibao <-> WeChatPadPro bridge")
|
||
parser.add_argument("--wechat-base-url", default=DEFAULT_WCPP_BASE_URL, help="WeChatPadPro API 地址")
|
||
parser.add_argument("--wechat-auth-key", default=os.environ.get("WCPP_AUTH_KEY", ""), help="WeChat authKey")
|
||
parser.add_argument("--wechat-session-file", default=str(DEFAULT_SESSION_FILE), help="webui 会话文件路径")
|
||
parser.add_argument("--xibao-base-url", default=DEFAULT_XIBAO_BASE_URL, help="喜报服务地址")
|
||
parser.add_argument("--poll-interval", type=float, default=2.0, help="消息轮询间隔(秒)")
|
||
parser.add_argument("--sync-count", type=int, default=30, help="每次轮询拉取条数")
|
||
parser.add_argument("--max-images", type=int, default=3, help="每次最多回发图片数量")
|
||
parser.add_argument("--state-file", default=str(DEFAULT_STATE_FILE), help="消息去重状态文件")
|
||
parser.add_argument("--meta-file", default=str(DEFAULT_META_FILE), help="反馈上下文与清理状态文件")
|
||
parser.add_argument("--daily-cleanup-time", default=DEFAULT_DAILY_CLEANUP_TIME, help="每日清理时间 HH:MM")
|
||
parser.add_argument("--allow-from", default="", help="只处理这些发送者wxid(逗号分隔)")
|
||
parser.add_argument("--once", action="store_true", help="只执行一次轮询")
|
||
parser.add_argument("--dry-run", action="store_true", help="仅打印不真正回消息")
|
||
return parser.parse_args(argv)
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
args = parse_args(argv or sys.argv[1:])
|
||
bridge = WechatXibaoBridge(args)
|
||
return bridge.run()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|