diff --git a/.env.example b/.env.example index d25c8ca..b291a32 100644 --- a/.env.example +++ b/.env.example @@ -25,6 +25,8 @@ DB_WAL_AUTOCHECKPOINT_PAGES=1000 DB_MMAP_SIZE_MB=256 DB_LOCK_RETRY_COUNT=3 DB_LOCK_RETRY_BASE_MS=50 +DB_SLOW_QUERY_MS=120 +DB_SLOW_QUERY_SQL_MAX_LEN=240 DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS=21600 DB_ANALYZE_INTERVAL_SECONDS=86400 DB_WAL_CHECKPOINT_INTERVAL_SECONDS=43200 diff --git a/app_config.py b/app_config.py index 49b1eb8..4a0a1f6 100755 --- a/app_config.py +++ b/app_config.py @@ -133,6 +133,8 @@ class Config: DB_MMAP_SIZE_MB = int(os.environ.get("DB_MMAP_SIZE_MB", "256")) DB_LOCK_RETRY_COUNT = int(os.environ.get("DB_LOCK_RETRY_COUNT", "3")) DB_LOCK_RETRY_BASE_MS = int(os.environ.get("DB_LOCK_RETRY_BASE_MS", "50")) + DB_SLOW_QUERY_MS = int(os.environ.get("DB_SLOW_QUERY_MS", "120")) + DB_SLOW_QUERY_SQL_MAX_LEN = int(os.environ.get("DB_SLOW_QUERY_SQL_MAX_LEN", "240")) DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS = int(os.environ.get("DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS", "21600")) DB_ANALYZE_INTERVAL_SECONDS = int(os.environ.get("DB_ANALYZE_INTERVAL_SECONDS", "86400")) DB_WAL_CHECKPOINT_INTERVAL_SECONDS = int(os.environ.get("DB_WAL_CHECKPOINT_INTERVAL_SECONDS", "43200")) @@ -274,6 +276,10 @@ class Config: errors.append("DB_LOCK_RETRY_COUNT不能为负数") if cls.DB_LOCK_RETRY_BASE_MS < 10: errors.append("DB_LOCK_RETRY_BASE_MS建议至少10毫秒") + if cls.DB_SLOW_QUERY_MS < 0: + errors.append("DB_SLOW_QUERY_MS不能为负数") + if cls.DB_SLOW_QUERY_SQL_MAX_LEN < 80: + errors.append("DB_SLOW_QUERY_SQL_MAX_LEN建议至少80") # 验证日志配置 if cls.LOG_LEVEL not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]: diff --git a/database.py b/database.py index 56121e1..fadca2c 100644 --- a/database.py +++ b/database.py @@ -120,7 +120,7 @@ config = get_config() DB_FILE = config.DB_FILE # 数据库版本 (用于迁移管理) -DB_VERSION = 18 +DB_VERSION = 19 # ==================== 系统配置缓存(P1 / O-03) ==================== diff --git a/db/migrations.py b/db/migrations.py index c1e88db..5d56175 100644 --- a/db/migrations.py +++ b/db/migrations.py @@ -73,6 +73,7 @@ def _get_migration_steps(): (16, _migrate_to_v16), (17, _migrate_to_v17), (18, _migrate_to_v18), + (19, _migrate_to_v19), ] @@ -860,3 +861,22 @@ def _migrate_to_v18(conn): ) conn.commit() + + + +def _migrate_to_v19(conn): + """迁移到版本19 - 报表与调度查询复合索引优化""" + cursor = conn.cursor() + + index_statements = [ + "CREATE INDEX IF NOT EXISTS idx_users_status_created_at ON users(status, created_at)", + "CREATE INDEX IF NOT EXISTS idx_task_logs_status_created_at ON task_logs(status, created_at)", + "CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled_next_run ON user_schedules(enabled, next_run_at)", + "CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_status_created_at ON bug_feedbacks(status, created_at)", + "CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_user_created_at ON bug_feedbacks(user_id, created_at)", + ] + + for statement in index_statements: + cursor.execute(statement) + + conn.commit() diff --git a/db/schema.py b/db/schema.py index 14ec136..1c53100 100644 --- a/db/schema.py +++ b/db/schema.py @@ -364,6 +364,7 @@ def ensure_schema(conn) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_vip_expire ON users(vip_expire_time)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_status_created_at ON users(status, created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_login_fingerprints_user ON login_fingerprints(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_login_ips_user ON login_ips(user_id)") @@ -392,6 +393,7 @@ def ensure_schema(conn) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_logs_user_id ON task_logs(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_logs_status ON task_logs(status)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_logs_status_created_at ON task_logs(status, created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_logs_created_at ON task_logs(created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_logs_source ON task_logs(source)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_task_logs_source_created_at ON task_logs(source, created_at)") @@ -400,6 +402,8 @@ def ensure_schema(conn) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_user_id ON bug_feedbacks(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_status ON bug_feedbacks(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_created_at ON bug_feedbacks(created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_status_created_at ON bug_feedbacks(status, created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_user_created_at ON bug_feedbacks(user_id, created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_announcements_active ON announcements(is_active)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_announcements_created_at ON announcements(created_at)") @@ -408,6 +412,7 @@ def ensure_schema(conn) -> None: cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_user_id ON user_schedules(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled ON user_schedules(enabled)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled_next_run ON user_schedules(enabled, next_run_at)") # 复合索引优化 cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_user_enabled ON user_schedules(user_id, enabled)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_schedule_execution_logs_schedule_id ON schedule_execution_logs(schedule_id)") diff --git a/db/tasks.py b/db/tasks.py index f28e280..c638c2b 100644 --- a/db/tasks.py +++ b/db/tasks.py @@ -83,8 +83,8 @@ def _build_task_logs_where_sql( if source_filter: source_filter = str(source_filter or "").strip() if source_filter == "user_scheduled": - where_clauses.append("tl.source LIKE ? ESCAPE '\\\\'") - params.append("user_scheduled:%") + where_clauses.append("tl.source >= ? AND tl.source < ?") + params.extend(["user_scheduled:", "user_scheduled;"]) elif source_filter.endswith("*"): prefix = source_filter[:-1] safe_prefix = sanitize_sql_like_pattern(prefix) diff --git a/db_pool.py b/db_pool.py index bdb0ac5..b0ed734 100755 --- a/db_pool.py +++ b/db_pool.py @@ -24,6 +24,8 @@ DB_WAL_AUTOCHECKPOINT_PAGES = max(100, int(getattr(config, "DB_WAL_AUTOCHECKPOIN DB_MMAP_SIZE_MB = max(0, int(getattr(config, "DB_MMAP_SIZE_MB", 256))) DB_LOCK_RETRY_COUNT = max(0, int(getattr(config, "DB_LOCK_RETRY_COUNT", 3))) DB_LOCK_RETRY_BASE_MS = max(10, int(getattr(config, "DB_LOCK_RETRY_BASE_MS", 50))) +DB_SLOW_QUERY_MS = max(0, int(getattr(config, "DB_SLOW_QUERY_MS", 120))) +DB_SLOW_QUERY_SQL_MAX_LEN = max(80, int(getattr(config, "DB_SLOW_QUERY_SQL_MAX_LEN", 240))) def _is_lock_conflict_error(error: sqlite3.OperationalError) -> bool: @@ -31,6 +33,85 @@ def _is_lock_conflict_error(error: sqlite3.OperationalError) -> bool: return ("locked" in message) or ("busy" in message) +def _compact_sql(sql: str) -> str: + statement = " ".join(str(sql or "").split()) + if len(statement) <= DB_SLOW_QUERY_SQL_MAX_LEN: + return statement + return statement[: DB_SLOW_QUERY_SQL_MAX_LEN - 3] + "..." + + +def _describe_params(parameters) -> str: + if parameters is None: + return "none" + if isinstance(parameters, dict): + return f"dict[{len(parameters)}]" + if isinstance(parameters, (list, tuple)): + return f"{type(parameters).__name__}[{len(parameters)}]" + return type(parameters).__name__ + + +class TracedCursor: + """带慢查询检测的游标包装器""" + + def __init__(self, cursor, on_query_executed): + self._cursor = cursor + self._on_query_executed = on_query_executed + + def _trace(self, sql, parameters, execute_fn): + start = time.perf_counter() + try: + execute_fn() + finally: + elapsed_ms = (time.perf_counter() - start) * 1000.0 + try: + self._on_query_executed(sql, parameters, elapsed_ms) + except Exception: + pass + + def execute(self, sql, parameters=None): + if parameters is None: + self._trace(sql, None, lambda: self._cursor.execute(sql)) + else: + self._trace(sql, parameters, lambda: self._cursor.execute(sql, parameters)) + return self + + def executemany(self, sql, seq_of_parameters): + self._trace(sql, seq_of_parameters, lambda: self._cursor.executemany(sql, seq_of_parameters)) + return self + + def executescript(self, sql_script): + self._trace(sql_script, None, lambda: self._cursor.executescript(sql_script)) + return self + + def fetchone(self): + return self._cursor.fetchone() + + def fetchall(self): + return self._cursor.fetchall() + + def fetchmany(self, size=None): + if size is None: + return self._cursor.fetchmany() + return self._cursor.fetchmany(size) + + def close(self): + return self._cursor.close() + + @property + def rowcount(self): + return self._cursor.rowcount + + @property + def lastrowid(self): + return self._cursor.lastrowid + + def __iter__(self): + return iter(self._cursor) + + def __getattr__(self, item): + return getattr(self._cursor, item) + + class ConnectionPool: """SQLite连接池""" @@ -203,26 +284,33 @@ class PooledConnection: """with语句结束时自动归还连接 [已修复Bug#3]""" try: if exc_type is not None: - # 发生异常,回滚事务 self._conn.rollback() logger.warning(f"数据库事务已回滚: {exc_type.__name__}") - # 注意: 不自动commit,要求用户显式调用conn.commit() - if self._cursor: + if self._cursor is not None: self._cursor.close() self._cursor = None except Exception as e: logger.warning(f"关闭游标失败: {e}") finally: - # 归还连接 self._pool.return_connection(self._conn) - return False # 不抑制异常 + return False + + def _on_query_executed(self, sql: str, parameters, elapsed_ms: float) -> None: + if DB_SLOW_QUERY_MS <= 0: + return + if elapsed_ms < DB_SLOW_QUERY_MS: + return + logger.warning( + f"[慢SQL] {elapsed_ms:.1f}ms sql=\"{_compact_sql(sql)}\" params={_describe_params(parameters)}" + ) def cursor(self): """获取游标""" if self._cursor is None: - self._cursor = self._conn.cursor() + raw_cursor = self._conn.cursor() + self._cursor = TracedCursor(raw_cursor, self._on_query_executed) return self._cursor def commit(self): @@ -249,9 +337,9 @@ class PooledConnection: def execute(self, sql, parameters=None): """执行SQL""" cursor = self.cursor() - if parameters: - return cursor.execute(sql, parameters) - return cursor.execute(sql) + if parameters is None: + return cursor.execute(sql) + return cursor.execute(sql, parameters) def fetchone(self): """获取一行"""