#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import argparse import json import os import subprocess import time from datetime import datetime from typing import Any, Dict, Tuple from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen DEFAULT_STATE_FILE = "/tmp/zsglpt_health_monitor_state.json" def _now_text() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _safe_int(value: Any, default: int = 0) -> int: try: return int(value) except Exception: return int(default) def _safe_float(value: Any, default: float = 0.0) -> float: try: return float(value) except Exception: return float(default) def _load_state(path: str) -> Dict[str, Any]: if not path or not os.path.exists(path): return { "version": 1, "rules": {}, "counters": {}, } try: with open(path, "r", encoding="utf-8") as f: raw = json.load(f) if not isinstance(raw, dict): raise ValueError("state is not dict") raw.setdefault("version", 1) raw.setdefault("rules", {}) raw.setdefault("counters", {}) return raw except Exception: return { "version": 1, "rules": {}, "counters": {}, } def _save_state(path: str, state: Dict[str, Any]) -> None: if not path: return state_dir = os.path.dirname(path) if state_dir: os.makedirs(state_dir, exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2) os.replace(tmp_path, path) def _fetch_health(url: str, timeout: int) -> Tuple[int | None, Dict[str, Any], str | None]: req = Request( url, headers={ "User-Agent": "zsglpt-health-email-monitor/1.0", "Accept": "application/json", }, method="GET", ) try: with urlopen(req, timeout=max(1, int(timeout))) as resp: status = int(resp.getcode()) body = resp.read().decode("utf-8", errors="ignore") except HTTPError as e: status = int(getattr(e, "code", 0) or 0) body = "" try: body = e.read().decode("utf-8", errors="ignore") except Exception: pass data = {} if body: try: data = json.loads(body) if not isinstance(data, dict): data = {} except Exception: data = {} return status, data, f"HTTPError: {e}" except URLError as e: return None, {}, f"URLError: {e}" except Exception as e: return None, {}, f"RequestError: {e}" data: Dict[str, Any] = {} if body: try: loaded = json.loads(body) if isinstance(loaded, dict): data = loaded except Exception: data = {} return status, data, None def _inc_streak(state: Dict[str, Any], key: str, bad: bool) -> int: counters = state.setdefault("counters", {}) current = _safe_int(counters.get(key), 0) current = (current + 1) if bad else 0 counters[key] = current return current def _rule_transition( state: Dict[str, Any], *, rule_name: str, bad: bool, streak: int, threshold: int, remind_seconds: int, now_ts: float, ) -> str | None: rules = state.setdefault("rules", {}) rule_state = rules.setdefault(rule_name, {"active": False, "last_sent": 0}) is_active = bool(rule_state.get("active", False)) last_sent = _safe_float(rule_state.get("last_sent", 0), 0.0) threshold = max(1, int(threshold)) remind_seconds = max(60, int(remind_seconds)) if bad and streak >= threshold: if not is_active: rule_state["active"] = True rule_state["last_sent"] = now_ts return "alert" if (now_ts - last_sent) >= remind_seconds: rule_state["last_sent"] = now_ts return "alert" return None if is_active and (not bad): rule_state["active"] = False rule_state["last_sent"] = now_ts return "recover" return None def _send_email_via_container( *, container_name: str, to_email: str, subject: str, body: str, timeout_seconds: int = 45, ) -> Tuple[bool, str]: code = ( "import sys,email_service;" "res=email_service.send_email(to_email=sys.argv[1],subject=sys.argv[2],body=sys.argv[3],email_type='health_monitor');" "ok=bool(res.get('success'));" "print('ok' if ok else ('error:'+str(res.get('error'))));" "raise SystemExit(0 if ok else 2)" ) cmd = [ "docker", "exec", container_name, "python", "-c", code, to_email, subject, body, ] try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=max(5, int(timeout_seconds)), check=False, ) except Exception as e: return False, str(e) output = (proc.stdout or "") + (proc.stderr or "") output = output.strip() return proc.returncode == 0, output def _build_common_lines(status: int | None, data: Dict[str, Any], fetch_error: str | None) -> list[str]: metrics = data.get("metrics") if isinstance(data.get("metrics"), dict) else {} db_pool = metrics.get("db_pool") if isinstance(metrics.get("db_pool"), dict) else {} process = metrics.get("process") if isinstance(metrics.get("process"), dict) else {} task_queue = metrics.get("task_queue") if isinstance(metrics.get("task_queue"), dict) else {} lines = [ f"时间: {_now_text()}", f"健康地址: {data.get('_monitor_url', '')}", f"HTTP状态: {status if status is not None else '请求失败'}", f"ok/db_ok: {data.get('ok')} / {data.get('db_ok')}", ] if fetch_error: lines.append(f"请求错误: {fetch_error}") lines.extend( [ f"队列: pending={task_queue.get('pending_total', 'N/A')}, running={task_queue.get('running_total', 'N/A')}", f"连接池: size={db_pool.get('pool_size', 'N/A')}, available={db_pool.get('available', 'N/A')}, in_use={db_pool.get('in_use', 'N/A')}", f"进程: rss_mb={process.get('rss_mb', 'N/A')}, cpu%={process.get('cpu_percent', 'N/A')}, threads={process.get('threads', 'N/A')}", f"运行时长: {metrics.get('uptime_seconds', 'N/A')} 秒", ] ) return lines def main() -> int: parser = argparse.ArgumentParser(description="zsglpt 邮件健康监控(基于 /health)") parser.add_argument("--url", default=os.environ.get("HEALTH_URL", "http://127.0.0.1:51232/health")) parser.add_argument("--to", default=os.environ.get("MONITOR_EMAIL_TO", "")) parser.add_argument( "--container", default=os.environ.get("MONITOR_DOCKER_CONTAINER", "knowledge-automation-multiuser"), ) parser.add_argument("--state-file", default=os.environ.get("MONITOR_STATE_FILE", DEFAULT_STATE_FILE)) parser.add_argument("--timeout", type=int, default=_safe_int(os.environ.get("MONITOR_TIMEOUT", 8), 8)) parser.add_argument( "--remind-seconds", type=int, default=_safe_int(os.environ.get("MONITOR_REMIND_SECONDS", 3600), 3600), ) parser.add_argument( "--queue-threshold", type=int, default=_safe_int(os.environ.get("MONITOR_QUEUE_THRESHOLD", 50), 50), ) parser.add_argument( "--queue-streak", type=int, default=_safe_int(os.environ.get("MONITOR_QUEUE_STREAK", 5), 5), ) parser.add_argument( "--db-pool-streak", type=int, default=_safe_int(os.environ.get("MONITOR_DB_POOL_STREAK", 3), 3), ) parser.add_argument("--dry-run", action="store_true", help="仅打印,不实际发邮件") args = parser.parse_args() if not args.to: print("[monitor] 缺少收件人,请设置 --to 或 MONITOR_EMAIL_TO", flush=True) return 2 state = _load_state(args.state_file) now_ts = time.time() status, data, fetch_error = _fetch_health(args.url, args.timeout) if not isinstance(data, dict): data = {} data["_monitor_url"] = args.url metrics = data.get("metrics") if isinstance(data.get("metrics"), dict) else {} db_pool = metrics.get("db_pool") if isinstance(metrics.get("db_pool"), dict) else {} task_queue = metrics.get("task_queue") if isinstance(metrics.get("task_queue"), dict) else {} service_down = status is None health_fail = bool(status is not None and (status >= 500 or (not data.get("ok", False)) or (not data.get("db_ok", False)))) db_pool_exhausted = ( _safe_int(db_pool.get("pool_size"), 0) > 0 and _safe_int(db_pool.get("available"), 0) <= 0 and _safe_int(db_pool.get("in_use"), 0) >= _safe_int(db_pool.get("pool_size"), 0) ) queue_backlog_high = _safe_int(task_queue.get("pending_total"), 0) >= max(1, int(args.queue_threshold)) rule_defs = [ ("service_down", service_down, 1), ("health_fail", health_fail, 1), ("db_pool_exhausted", db_pool_exhausted, max(1, int(args.db_pool_streak))), ("queue_backlog_high", queue_backlog_high, max(1, int(args.queue_streak))), ] pending_notifications: list[tuple[str, str]] = [] for rule_name, bad, threshold in rule_defs: streak = _inc_streak(state, rule_name, bad) action = _rule_transition( state, rule_name=rule_name, bad=bad, streak=streak, threshold=threshold, remind_seconds=args.remind_seconds, now_ts=now_ts, ) if action: pending_notifications.append((rule_name, action)) _save_state(args.state_file, state) if not pending_notifications: print(f"[monitor] {_now_text()} 正常,无需发送邮件") return 0 common_lines = _build_common_lines(status, data, fetch_error) for rule_name, action in pending_notifications: level = "告警" if action == "alert" else "恢复" subject = f"[zsglpt健康监控][{level}] {rule_name}" body_lines = [ f"规则: {rule_name}", f"状态: {level}", "", *common_lines, ] body = "\n".join(body_lines) if args.dry_run: print(f"[monitor][dry-run] subject={subject}\n{body}\n") continue ok, msg = _send_email_via_container( container_name=args.container, to_email=args.to, subject=subject, body=body, ) if ok: print(f"[monitor] 邮件已发送: {subject}") else: print(f"[monitor] 邮件发送失败: {subject} | {msg}") return 0 if __name__ == "__main__": raise SystemExit(main())