#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 慢 SQL 指标(轻量内存版) - 记录超过阈值的 SQL 执行样本 - 维护近窗口期(默认24小时)聚合统计 - 输出 TOP SQL 与最近慢 SQL 列表 """ from __future__ import annotations import os import threading import time from collections import deque _SLOW_SQL_THRESHOLD_MS = max(0.0, float(os.environ.get("DB_SLOW_QUERY_MS", "120") or 120)) _WINDOW_SECONDS = max(600, int(os.environ.get("DB_SLOW_SQL_WINDOW_SECONDS", "86400") or 86400)) _TOP_LIMIT = max(5, int(os.environ.get("DB_SLOW_SQL_TOP_LIMIT", "12") or 12)) _RECENT_LIMIT = max(10, int(os.environ.get("DB_SLOW_SQL_RECENT_LIMIT", "50") or 50)) _MAX_EVENTS = max(_RECENT_LIMIT, int(os.environ.get("DB_SLOW_SQL_MAX_EVENTS", "20000") or 20000)) _SQL_MAX_LEN = max(80, int(os.environ.get("DB_SLOW_QUERY_SQL_MAX_LEN", "240") or 240)) _lock = threading.Lock() _state = { "start_ts": time.time(), "last_slow_ts": 0.0, "events": deque(), "recent": deque(maxlen=_RECENT_LIMIT), } def _compact_text(value: str, max_len: int) -> str: text = " ".join(str(value or "").split()) if len(text) <= max_len: return text return f"{text[: max_len - 3]}..." def _compact_sql(sql: str) -> str: return _compact_text(str(sql or ""), _SQL_MAX_LEN) def _compact_params(params_info: str) -> str: return _compact_text(str(params_info or "none"), 64) def _prune_events_locked(now_ts: float) -> None: cutoff_ts = now_ts - float(_WINDOW_SECONDS) events = _state["events"] while events and float(events[0].get("time", 0.0) or 0.0) < cutoff_ts: events.popleft() overflow = len(events) - int(_MAX_EVENTS) while overflow > 0 and events: events.popleft() overflow -= 1 def record_slow_sql(*, sql: str, duration_ms: float, params_info: str = "none") -> None: duration = max(0.0, float(duration_ms or 0.0)) now = time.time() sql_text = _compact_sql(sql) params_text = _compact_params(params_info) event = { "time": now, "sql": sql_text, "duration_ms": round(duration, 2), "params": params_text, } with _lock: _prune_events_locked(now) _state["events"].append(event) _state["recent"].append(event) _state["last_slow_ts"] = now def get_slow_sql_metrics_snapshot() -> dict: now = time.time() with _lock: _prune_events_locked(now) events = list(_state["events"]) recent_rows = list(_state["recent"]) last_slow_ts = float(_state.get("last_slow_ts") or 0.0) grouped: dict[str, dict] = {} total_duration_ms = 0.0 max_duration_ms = 0.0 for item in events: sql_text = str(item.get("sql") or "-") duration = float(item.get("duration_ms") or 0.0) ts = float(item.get("time") or 0.0) params_text = str(item.get("params") or "none") total_duration_ms += duration if duration > max_duration_ms: max_duration_ms = duration bucket = grouped.get(sql_text) if bucket is None: bucket = { "sql": sql_text, "count": 0, "total_ms": 0.0, "max_ms": 0.0, "last_ts": 0.0, "params": params_text, } grouped[sql_text] = bucket bucket["count"] = int(bucket["count"] or 0) + 1 bucket["total_ms"] = float(bucket["total_ms"] or 0.0) + duration if duration > float(bucket["max_ms"] or 0.0): bucket["max_ms"] = duration bucket["params"] = params_text if ts >= float(bucket["last_ts"] or 0.0): bucket["last_ts"] = ts top_sql_rows = sorted( grouped.values(), key=lambda row: ( int(row.get("count", 0) or 0), float(row.get("max_ms", 0.0) or 0.0), float(row.get("total_ms", 0.0) or 0.0), ), reverse=True, )[:_TOP_LIMIT] top_sql = [] for idx, row in enumerate(top_sql_rows, start=1): count = int(row.get("count", 0) or 0) total_ms = float(row.get("total_ms", 0.0) or 0.0) avg_ms = (total_ms / count) if count > 0 else 0.0 top_sql.append( { "rank": idx, "sql": row.get("sql") or "-", "count": count, "avg_ms": round(avg_ms, 2), "max_ms": round(float(row.get("max_ms", 0.0) or 0.0), 2), "last_ts": int(float(row.get("last_ts", 0.0) or 0.0)), "sample_params": row.get("params") or "none", } ) cutoff_ts = now - float(_WINDOW_SECONDS) recent = [ { "time": int(float(item.get("time") or 0.0)), "sql": str(item.get("sql") or "-"), "duration_ms": round(float(item.get("duration_ms") or 0.0), 2), "params": str(item.get("params") or "none"), } for item in recent_rows if float(item.get("time") or 0.0) >= cutoff_ts ] total_events = len(events) avg_duration_ms = round((total_duration_ms / total_events), 2) if total_events > 0 else 0.0 return { "since_ts": int(float(events[0].get("time") or 0.0)) if events else 0, "window_seconds": _WINDOW_SECONDS, "top_limit": _TOP_LIMIT, "recent_limit": _RECENT_LIMIT, "slow_threshold_ms": _SLOW_SQL_THRESHOLD_MS, "total_slow_queries": total_events, "unique_sql": len(grouped), "avg_duration_ms": avg_duration_ms, "max_duration_ms": round(max_duration_ms, 2), "last_slow_ts": int(last_slow_ts) if last_slow_ts > 0 else 0, "top_sql": top_sql, "recent_slow_sql": recent, }