172 lines
5.7 KiB
Python
172 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
请求级运行指标(轻量内存版)
|
|
- 记录请求总量、状态分布、耗时
|
|
- 记录慢请求样本(环形队列)
|
|
- 输出健康检查可读快照
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from typing import Dict
|
|
|
|
_SLOW_REQUEST_MS = max(0.0, float(os.environ.get("REQUEST_METRICS_SLOW_MS", "1200") or 1200))
|
|
_PATH_STATS_LIMIT = max(20, int(os.environ.get("REQUEST_METRICS_PATH_LIMIT", "120") or 120))
|
|
_RECENT_SLOW_LIMIT = max(10, int(os.environ.get("REQUEST_METRICS_RECENT_SLOW_LIMIT", "20") or 20))
|
|
|
|
_lock = threading.Lock()
|
|
|
|
_state = {
|
|
"start_ts": time.time(),
|
|
"last_request_ts": 0.0,
|
|
"total_requests": 0,
|
|
"api_requests": 0,
|
|
"error_requests": 0,
|
|
"slow_requests": 0,
|
|
"duration_total_ms": 0.0,
|
|
"max_duration_ms": 0.0,
|
|
"status_counts": {},
|
|
"path_stats": {},
|
|
"recent_slow": deque(maxlen=_RECENT_SLOW_LIMIT),
|
|
}
|
|
|
|
|
|
def _status_bucket(status_code: int) -> str:
|
|
code = int(status_code or 0)
|
|
if code <= 0:
|
|
return "unknown"
|
|
head = code // 100
|
|
if head in (1, 2, 3, 4, 5):
|
|
return f"{head}xx"
|
|
return str(code)
|
|
|
|
|
|
def _normalize_path(path: str) -> str:
|
|
text = str(path or "/")
|
|
if len(text) > 160:
|
|
return f"{text[:157]}..."
|
|
return text
|
|
|
|
|
|
def _prune_path_stats(path_stats: Dict[str, dict]) -> None:
|
|
if len(path_stats) < _PATH_STATS_LIMIT:
|
|
return
|
|
|
|
# 删除最不活跃的路径,避免无限增长
|
|
removable_key = None
|
|
removable_score = None
|
|
for key, item in path_stats.items():
|
|
count = int(item.get("count", 0) or 0)
|
|
max_ms = float(item.get("max_ms", 0.0) or 0.0)
|
|
score = (count, max_ms)
|
|
if removable_score is None or score < removable_score:
|
|
removable_key = key
|
|
removable_score = score
|
|
|
|
if removable_key:
|
|
path_stats.pop(removable_key, None)
|
|
|
|
|
|
def record_request_metric(*, path: str, method: str, status_code: int, duration_ms: float, is_api: bool = False) -> None:
|
|
duration = max(0.0, float(duration_ms or 0.0))
|
|
code = int(status_code or 0)
|
|
method_name = str(method or "GET").upper()
|
|
normalized_path = _normalize_path(path)
|
|
route_key = f"{method_name} {normalized_path}"
|
|
now = time.time()
|
|
|
|
with _lock:
|
|
_state["total_requests"] += 1
|
|
if is_api:
|
|
_state["api_requests"] += 1
|
|
if code >= 500:
|
|
_state["error_requests"] += 1
|
|
|
|
_state["last_request_ts"] = now
|
|
_state["duration_total_ms"] += duration
|
|
if duration > _state["max_duration_ms"]:
|
|
_state["max_duration_ms"] = duration
|
|
|
|
bucket = _status_bucket(code)
|
|
status_counts = _state["status_counts"]
|
|
status_counts[bucket] = int(status_counts.get(bucket, 0) or 0) + 1
|
|
|
|
path_stats = _state["path_stats"]
|
|
if route_key not in path_stats:
|
|
_prune_path_stats(path_stats)
|
|
path_stats[route_key] = {
|
|
"count": 0,
|
|
"total_ms": 0.0,
|
|
"max_ms": 0.0,
|
|
"status_5xx": 0,
|
|
}
|
|
|
|
item = path_stats[route_key]
|
|
item["count"] = int(item.get("count", 0) or 0) + 1
|
|
item["total_ms"] = float(item.get("total_ms", 0.0) or 0.0) + duration
|
|
if duration > float(item.get("max_ms", 0.0) or 0.0):
|
|
item["max_ms"] = duration
|
|
if code >= 500:
|
|
item["status_5xx"] = int(item.get("status_5xx", 0) or 0) + 1
|
|
|
|
if _SLOW_REQUEST_MS > 0 and duration >= _SLOW_REQUEST_MS:
|
|
_state["slow_requests"] += 1
|
|
_state["recent_slow"].append(
|
|
{
|
|
"path": normalized_path,
|
|
"method": method_name,
|
|
"status": code,
|
|
"duration_ms": round(duration, 2),
|
|
"time": int(now),
|
|
}
|
|
)
|
|
|
|
|
|
def get_request_metrics_snapshot() -> dict:
|
|
with _lock:
|
|
total_requests = int(_state["total_requests"])
|
|
duration_total_ms = float(_state["duration_total_ms"])
|
|
avg_duration_ms = round((duration_total_ms / total_requests), 2) if total_requests > 0 else 0.0
|
|
|
|
path_rows = []
|
|
for key, item in _state["path_stats"].items():
|
|
count = int(item.get("count", 0) or 0)
|
|
total_ms = float(item.get("total_ms", 0.0) or 0.0)
|
|
avg_ms = round((total_ms / count), 2) if count > 0 else 0.0
|
|
path_rows.append(
|
|
{
|
|
"path": key,
|
|
"count": count,
|
|
"avg_ms": avg_ms,
|
|
"max_ms": round(float(item.get("max_ms", 0.0) or 0.0), 2),
|
|
"status_5xx": int(item.get("status_5xx", 0) or 0),
|
|
}
|
|
)
|
|
|
|
top_paths = sorted(
|
|
path_rows,
|
|
key=lambda row: (float(row.get("max_ms", 0.0)), float(row.get("avg_ms", 0.0)), int(row.get("count", 0))),
|
|
reverse=True,
|
|
)[:8]
|
|
|
|
return {
|
|
"since_ts": int(_state["start_ts"]),
|
|
"uptime_seconds": max(0, int(time.time() - float(_state["start_ts"]))),
|
|
"last_request_ts": int(_state["last_request_ts"] or 0),
|
|
"total_requests": total_requests,
|
|
"api_requests": int(_state["api_requests"]),
|
|
"error_requests": int(_state["error_requests"]),
|
|
"slow_requests": int(_state["slow_requests"]),
|
|
"avg_duration_ms": avg_duration_ms,
|
|
"max_duration_ms": round(float(_state["max_duration_ms"]), 2),
|
|
"status_counts": dict(_state["status_counts"]),
|
|
"top_paths": top_paths,
|
|
"recent_slow": list(_state["recent_slow"]),
|
|
"slow_threshold_ms": _SLOW_REQUEST_MS,
|
|
}
|