perf(db): add slow-query tracing and composite indexes
This commit is contained in:
106
db_pool.py
106
db_pool.py
@@ -24,6 +24,8 @@ DB_WAL_AUTOCHECKPOINT_PAGES = max(100, int(getattr(config, "DB_WAL_AUTOCHECKPOIN
|
||||
DB_MMAP_SIZE_MB = max(0, int(getattr(config, "DB_MMAP_SIZE_MB", 256)))
|
||||
DB_LOCK_RETRY_COUNT = max(0, int(getattr(config, "DB_LOCK_RETRY_COUNT", 3)))
|
||||
DB_LOCK_RETRY_BASE_MS = max(10, int(getattr(config, "DB_LOCK_RETRY_BASE_MS", 50)))
|
||||
DB_SLOW_QUERY_MS = max(0, int(getattr(config, "DB_SLOW_QUERY_MS", 120)))
|
||||
DB_SLOW_QUERY_SQL_MAX_LEN = max(80, int(getattr(config, "DB_SLOW_QUERY_SQL_MAX_LEN", 240)))
|
||||
|
||||
|
||||
def _is_lock_conflict_error(error: sqlite3.OperationalError) -> bool:
|
||||
@@ -31,6 +33,85 @@ def _is_lock_conflict_error(error: sqlite3.OperationalError) -> bool:
|
||||
return ("locked" in message) or ("busy" in message)
|
||||
|
||||
|
||||
def _compact_sql(sql: str) -> str:
|
||||
statement = " ".join(str(sql or "").split())
|
||||
if len(statement) <= DB_SLOW_QUERY_SQL_MAX_LEN:
|
||||
return statement
|
||||
return statement[: DB_SLOW_QUERY_SQL_MAX_LEN - 3] + "..."
|
||||
|
||||
|
||||
def _describe_params(parameters) -> str:
|
||||
if parameters is None:
|
||||
return "none"
|
||||
if isinstance(parameters, dict):
|
||||
return f"dict[{len(parameters)}]"
|
||||
if isinstance(parameters, (list, tuple)):
|
||||
return f"{type(parameters).__name__}[{len(parameters)}]"
|
||||
return type(parameters).__name__
|
||||
|
||||
|
||||
class TracedCursor:
|
||||
"""带慢查询检测的游标包装器"""
|
||||
|
||||
def __init__(self, cursor, on_query_executed):
|
||||
self._cursor = cursor
|
||||
self._on_query_executed = on_query_executed
|
||||
|
||||
def _trace(self, sql, parameters, execute_fn):
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
execute_fn()
|
||||
finally:
|
||||
elapsed_ms = (time.perf_counter() - start) * 1000.0
|
||||
try:
|
||||
self._on_query_executed(sql, parameters, elapsed_ms)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def execute(self, sql, parameters=None):
|
||||
if parameters is None:
|
||||
self._trace(sql, None, lambda: self._cursor.execute(sql))
|
||||
else:
|
||||
self._trace(sql, parameters, lambda: self._cursor.execute(sql, parameters))
|
||||
return self
|
||||
|
||||
def executemany(self, sql, seq_of_parameters):
|
||||
self._trace(sql, seq_of_parameters, lambda: self._cursor.executemany(sql, seq_of_parameters))
|
||||
return self
|
||||
|
||||
def executescript(self, sql_script):
|
||||
self._trace(sql_script, None, lambda: self._cursor.executescript(sql_script))
|
||||
return self
|
||||
|
||||
def fetchone(self):
|
||||
return self._cursor.fetchone()
|
||||
|
||||
def fetchall(self):
|
||||
return self._cursor.fetchall()
|
||||
|
||||
def fetchmany(self, size=None):
|
||||
if size is None:
|
||||
return self._cursor.fetchmany()
|
||||
return self._cursor.fetchmany(size)
|
||||
|
||||
def close(self):
|
||||
return self._cursor.close()
|
||||
|
||||
@property
|
||||
def rowcount(self):
|
||||
return self._cursor.rowcount
|
||||
|
||||
@property
|
||||
def lastrowid(self):
|
||||
return self._cursor.lastrowid
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._cursor)
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self._cursor, item)
|
||||
|
||||
|
||||
class ConnectionPool:
|
||||
"""SQLite连接池"""
|
||||
|
||||
@@ -203,26 +284,33 @@ class PooledConnection:
|
||||
"""with语句结束时自动归还连接 [已修复Bug#3]"""
|
||||
try:
|
||||
if exc_type is not None:
|
||||
# 发生异常,回滚事务
|
||||
self._conn.rollback()
|
||||
logger.warning(f"数据库事务已回滚: {exc_type.__name__}")
|
||||
# 注意: 不自动commit,要求用户显式调用conn.commit()
|
||||
|
||||
if self._cursor:
|
||||
if self._cursor is not None:
|
||||
self._cursor.close()
|
||||
self._cursor = None
|
||||
except Exception as e:
|
||||
logger.warning(f"关闭游标失败: {e}")
|
||||
finally:
|
||||
# 归还连接
|
||||
self._pool.return_connection(self._conn)
|
||||
|
||||
return False # 不抑制异常
|
||||
return False
|
||||
|
||||
def _on_query_executed(self, sql: str, parameters, elapsed_ms: float) -> None:
|
||||
if DB_SLOW_QUERY_MS <= 0:
|
||||
return
|
||||
if elapsed_ms < DB_SLOW_QUERY_MS:
|
||||
return
|
||||
logger.warning(
|
||||
f"[慢SQL] {elapsed_ms:.1f}ms sql=\"{_compact_sql(sql)}\" params={_describe_params(parameters)}"
|
||||
)
|
||||
|
||||
def cursor(self):
|
||||
"""获取游标"""
|
||||
if self._cursor is None:
|
||||
self._cursor = self._conn.cursor()
|
||||
raw_cursor = self._conn.cursor()
|
||||
self._cursor = TracedCursor(raw_cursor, self._on_query_executed)
|
||||
return self._cursor
|
||||
|
||||
def commit(self):
|
||||
@@ -249,9 +337,9 @@ class PooledConnection:
|
||||
def execute(self, sql, parameters=None):
|
||||
"""执行SQL"""
|
||||
cursor = self.cursor()
|
||||
if parameters:
|
||||
return cursor.execute(sql, parameters)
|
||||
return cursor.execute(sql)
|
||||
if parameters is None:
|
||||
return cursor.execute(sql)
|
||||
return cursor.execute(sql, parameters)
|
||||
|
||||
def fetchone(self):
|
||||
"""获取一行"""
|
||||
|
||||
Reference in New Issue
Block a user