Files
zsglpt/scripts/health_email_monitor.py

349 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import json
import os
import subprocess
import time
from datetime import datetime
from typing import Any, Dict, Tuple
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
DEFAULT_STATE_FILE = "/tmp/zsglpt_health_monitor_state.json"
def _now_text() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except Exception:
return int(default)
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except Exception:
return float(default)
def _load_state(path: str) -> Dict[str, Any]:
if not path or not os.path.exists(path):
return {
"version": 1,
"rules": {},
"counters": {},
}
try:
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
if not isinstance(raw, dict):
raise ValueError("state is not dict")
raw.setdefault("version", 1)
raw.setdefault("rules", {})
raw.setdefault("counters", {})
return raw
except Exception:
return {
"version": 1,
"rules": {},
"counters": {},
}
def _save_state(path: str, state: Dict[str, Any]) -> None:
if not path:
return
state_dir = os.path.dirname(path)
if state_dir:
os.makedirs(state_dir, exist_ok=True)
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, path)
def _fetch_health(url: str, timeout: int) -> Tuple[int | None, Dict[str, Any], str | None]:
req = Request(
url,
headers={
"User-Agent": "zsglpt-health-email-monitor/1.0",
"Accept": "application/json",
},
method="GET",
)
try:
with urlopen(req, timeout=max(1, int(timeout))) as resp:
status = int(resp.getcode())
body = resp.read().decode("utf-8", errors="ignore")
except HTTPError as e:
status = int(getattr(e, "code", 0) or 0)
body = ""
try:
body = e.read().decode("utf-8", errors="ignore")
except Exception:
pass
data = {}
if body:
try:
data = json.loads(body)
if not isinstance(data, dict):
data = {}
except Exception:
data = {}
return status, data, f"HTTPError: {e}"
except URLError as e:
return None, {}, f"URLError: {e}"
except Exception as e:
return None, {}, f"RequestError: {e}"
data: Dict[str, Any] = {}
if body:
try:
loaded = json.loads(body)
if isinstance(loaded, dict):
data = loaded
except Exception:
data = {}
return status, data, None
def _inc_streak(state: Dict[str, Any], key: str, bad: bool) -> int:
counters = state.setdefault("counters", {})
current = _safe_int(counters.get(key), 0)
current = (current + 1) if bad else 0
counters[key] = current
return current
def _rule_transition(
state: Dict[str, Any],
*,
rule_name: str,
bad: bool,
streak: int,
threshold: int,
remind_seconds: int,
now_ts: float,
) -> str | None:
rules = state.setdefault("rules", {})
rule_state = rules.setdefault(rule_name, {"active": False, "last_sent": 0})
is_active = bool(rule_state.get("active", False))
last_sent = _safe_float(rule_state.get("last_sent", 0), 0.0)
threshold = max(1, int(threshold))
remind_seconds = max(60, int(remind_seconds))
if bad and streak >= threshold:
if not is_active:
rule_state["active"] = True
rule_state["last_sent"] = now_ts
return "alert"
if (now_ts - last_sent) >= remind_seconds:
rule_state["last_sent"] = now_ts
return "alert"
return None
if is_active and (not bad):
rule_state["active"] = False
rule_state["last_sent"] = now_ts
return "recover"
return None
def _send_email_via_container(
*,
container_name: str,
to_email: str,
subject: str,
body: str,
timeout_seconds: int = 45,
) -> Tuple[bool, str]:
code = (
"import sys,email_service;"
"res=email_service.send_email(to_email=sys.argv[1],subject=sys.argv[2],body=sys.argv[3],email_type='health_monitor');"
"ok=bool(res.get('success'));"
"print('ok' if ok else ('error:'+str(res.get('error'))));"
"raise SystemExit(0 if ok else 2)"
)
cmd = [
"docker",
"exec",
container_name,
"python",
"-c",
code,
to_email,
subject,
body,
]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=max(5, int(timeout_seconds)),
check=False,
)
except Exception as e:
return False, str(e)
output = (proc.stdout or "") + (proc.stderr or "")
output = output.strip()
return proc.returncode == 0, output
def _build_common_lines(status: int | None, data: Dict[str, Any], fetch_error: str | None) -> list[str]:
metrics = data.get("metrics") if isinstance(data.get("metrics"), dict) else {}
db_pool = metrics.get("db_pool") if isinstance(metrics.get("db_pool"), dict) else {}
process = metrics.get("process") if isinstance(metrics.get("process"), dict) else {}
task_queue = metrics.get("task_queue") if isinstance(metrics.get("task_queue"), dict) else {}
lines = [
f"时间: {_now_text()}",
f"健康地址: {data.get('_monitor_url', '')}",
f"HTTP状态: {status if status is not None else '请求失败'}",
f"ok/db_ok: {data.get('ok')} / {data.get('db_ok')}",
]
if fetch_error:
lines.append(f"请求错误: {fetch_error}")
lines.extend(
[
f"队列: pending={task_queue.get('pending_total', 'N/A')}, running={task_queue.get('running_total', 'N/A')}",
f"连接池: size={db_pool.get('pool_size', 'N/A')}, available={db_pool.get('available', 'N/A')}, in_use={db_pool.get('in_use', 'N/A')}",
f"进程: rss_mb={process.get('rss_mb', 'N/A')}, cpu%={process.get('cpu_percent', 'N/A')}, threads={process.get('threads', 'N/A')}",
f"运行时长: {metrics.get('uptime_seconds', 'N/A')}",
]
)
return lines
def main() -> int:
parser = argparse.ArgumentParser(description="zsglpt 邮件健康监控(基于 /health")
parser.add_argument("--url", default=os.environ.get("HEALTH_URL", "http://127.0.0.1:51232/health"))
parser.add_argument("--to", default=os.environ.get("MONITOR_EMAIL_TO", ""))
parser.add_argument(
"--container",
default=os.environ.get("MONITOR_DOCKER_CONTAINER", "knowledge-automation-multiuser"),
)
parser.add_argument("--state-file", default=os.environ.get("MONITOR_STATE_FILE", DEFAULT_STATE_FILE))
parser.add_argument("--timeout", type=int, default=_safe_int(os.environ.get("MONITOR_TIMEOUT", 8), 8))
parser.add_argument(
"--remind-seconds",
type=int,
default=_safe_int(os.environ.get("MONITOR_REMIND_SECONDS", 3600), 3600),
)
parser.add_argument(
"--queue-threshold",
type=int,
default=_safe_int(os.environ.get("MONITOR_QUEUE_THRESHOLD", 50), 50),
)
parser.add_argument(
"--queue-streak",
type=int,
default=_safe_int(os.environ.get("MONITOR_QUEUE_STREAK", 5), 5),
)
parser.add_argument(
"--db-pool-streak",
type=int,
default=_safe_int(os.environ.get("MONITOR_DB_POOL_STREAK", 3), 3),
)
parser.add_argument("--dry-run", action="store_true", help="仅打印,不实际发邮件")
args = parser.parse_args()
if not args.to:
print("[monitor] 缺少收件人,请设置 --to 或 MONITOR_EMAIL_TO", flush=True)
return 2
state = _load_state(args.state_file)
now_ts = time.time()
status, data, fetch_error = _fetch_health(args.url, args.timeout)
if not isinstance(data, dict):
data = {}
data["_monitor_url"] = args.url
metrics = data.get("metrics") if isinstance(data.get("metrics"), dict) else {}
db_pool = metrics.get("db_pool") if isinstance(metrics.get("db_pool"), dict) else {}
task_queue = metrics.get("task_queue") if isinstance(metrics.get("task_queue"), dict) else {}
service_down = status is None
health_fail = bool(status is not None and (status >= 500 or (not data.get("ok", False)) or (not data.get("db_ok", False))))
db_pool_exhausted = (
_safe_int(db_pool.get("pool_size"), 0) > 0
and _safe_int(db_pool.get("available"), 0) <= 0
and _safe_int(db_pool.get("in_use"), 0) >= _safe_int(db_pool.get("pool_size"), 0)
)
queue_backlog_high = _safe_int(task_queue.get("pending_total"), 0) >= max(1, int(args.queue_threshold))
rule_defs = [
("service_down", service_down, 1),
("health_fail", health_fail, 1),
("db_pool_exhausted", db_pool_exhausted, max(1, int(args.db_pool_streak))),
("queue_backlog_high", queue_backlog_high, max(1, int(args.queue_streak))),
]
pending_notifications: list[tuple[str, str]] = []
for rule_name, bad, threshold in rule_defs:
streak = _inc_streak(state, rule_name, bad)
action = _rule_transition(
state,
rule_name=rule_name,
bad=bad,
streak=streak,
threshold=threshold,
remind_seconds=args.remind_seconds,
now_ts=now_ts,
)
if action:
pending_notifications.append((rule_name, action))
_save_state(args.state_file, state)
if not pending_notifications:
print(f"[monitor] {_now_text()} 正常,无需发送邮件")
return 0
common_lines = _build_common_lines(status, data, fetch_error)
for rule_name, action in pending_notifications:
level = "告警" if action == "alert" else "恢复"
subject = f"[zsglpt健康监控][{level}] {rule_name}"
body_lines = [
f"规则: {rule_name}",
f"状态: {level}",
"",
*common_lines,
]
body = "\n".join(body_lines)
if args.dry_run:
print(f"[monitor][dry-run] subject={subject}\n{body}\n")
continue
ok, msg = _send_email_via_container(
container_name=args.container,
to_email=args.to,
subject=subject,
body=body,
)
if ok:
print(f"[monitor] 邮件已发送: {subject}")
else:
print(f"[monitor] 邮件发送失败: {subject} | {msg}")
return 0
if __name__ == "__main__":
raise SystemExit(main())