diff --git a/.env.example b/.env.example index 0fb7671..d25c8ca 100644 --- a/.env.example +++ b/.env.example @@ -18,6 +18,17 @@ SESSION_COOKIE_SECURE=false # 使用HTTPS时设为true # ==================== 数据库配置 ==================== DB_FILE=data/app_data.db DB_POOL_SIZE=5 +DB_CONNECT_TIMEOUT_SECONDS=10 +DB_BUSY_TIMEOUT_MS=10000 +DB_CACHE_SIZE_KB=8192 +DB_WAL_AUTOCHECKPOINT_PAGES=1000 +DB_MMAP_SIZE_MB=256 +DB_LOCK_RETRY_COUNT=3 +DB_LOCK_RETRY_BASE_MS=50 +DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS=21600 +DB_ANALYZE_INTERVAL_SECONDS=86400 +DB_WAL_CHECKPOINT_INTERVAL_SECONDS=43200 +DB_WAL_CHECKPOINT_MODE=PASSIVE # ==================== 并发控制配置 ==================== MAX_CONCURRENT_GLOBAL=2 diff --git a/app.py b/app.py index 9683b35..7507c32 100644 --- a/app.py +++ b/app.py @@ -35,7 +35,7 @@ from realtime.status_push import status_push_worker from routes import register_blueprints from security import init_security_middleware from services.checkpoints import init_checkpoint_manager -from services.maintenance import start_cleanup_scheduler, start_kdocs_monitor +from services.maintenance import start_cleanup_scheduler, start_database_maintenance_scheduler, start_kdocs_monitor from services.request_metrics import record_request_metric from services.models import User from services.runtime import init_runtime @@ -407,6 +407,7 @@ if __name__ == "__main__": _init_optional_email_service() start_cleanup_scheduler() + start_database_maintenance_scheduler() start_kdocs_monitor() _load_and_apply_scheduler_limits() diff --git a/app_config.py b/app_config.py index 1a7e071..49b1eb8 100755 --- a/app_config.py +++ b/app_config.py @@ -126,6 +126,17 @@ class Config: # ==================== 数据库配置 ==================== DB_FILE = os.environ.get("DB_FILE", "data/app_data.db") DB_POOL_SIZE = int(os.environ.get("DB_POOL_SIZE", "5")) + DB_CONNECT_TIMEOUT_SECONDS = int(os.environ.get("DB_CONNECT_TIMEOUT_SECONDS", "10")) + DB_BUSY_TIMEOUT_MS = int(os.environ.get("DB_BUSY_TIMEOUT_MS", "10000")) + DB_CACHE_SIZE_KB = int(os.environ.get("DB_CACHE_SIZE_KB", "8192")) + DB_WAL_AUTOCHECKPOINT_PAGES = int(os.environ.get("DB_WAL_AUTOCHECKPOINT_PAGES", "1000")) + DB_MMAP_SIZE_MB = int(os.environ.get("DB_MMAP_SIZE_MB", "256")) + DB_LOCK_RETRY_COUNT = int(os.environ.get("DB_LOCK_RETRY_COUNT", "3")) + DB_LOCK_RETRY_BASE_MS = int(os.environ.get("DB_LOCK_RETRY_BASE_MS", "50")) + DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS = int(os.environ.get("DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS", "21600")) + DB_ANALYZE_INTERVAL_SECONDS = int(os.environ.get("DB_ANALYZE_INTERVAL_SECONDS", "86400")) + DB_WAL_CHECKPOINT_INTERVAL_SECONDS = int(os.environ.get("DB_WAL_CHECKPOINT_INTERVAL_SECONDS", "43200")) + DB_WAL_CHECKPOINT_MODE = os.environ.get("DB_WAL_CHECKPOINT_MODE", "PASSIVE") # ==================== 浏览器配置 ==================== SCREENSHOTS_DIR = os.environ.get("SCREENSHOTS_DIR", "截图") @@ -249,6 +260,20 @@ class Config: if cls.DB_POOL_SIZE < 1: errors.append("DB_POOL_SIZE必须大于0") + if cls.DB_CONNECT_TIMEOUT_SECONDS < 1: + errors.append("DB_CONNECT_TIMEOUT_SECONDS必须大于0") + if cls.DB_BUSY_TIMEOUT_MS < 100: + errors.append("DB_BUSY_TIMEOUT_MS必须至少100毫秒") + if cls.DB_CACHE_SIZE_KB < 1024: + errors.append("DB_CACHE_SIZE_KB建议至少1024") + if cls.DB_WAL_AUTOCHECKPOINT_PAGES < 100: + errors.append("DB_WAL_AUTOCHECKPOINT_PAGES建议至少100") + if cls.DB_MMAP_SIZE_MB < 0: + errors.append("DB_MMAP_SIZE_MB不能为负数") + if cls.DB_LOCK_RETRY_COUNT < 0: + errors.append("DB_LOCK_RETRY_COUNT不能为负数") + if cls.DB_LOCK_RETRY_BASE_MS < 10: + errors.append("DB_LOCK_RETRY_BASE_MS建议至少10毫秒") # 验证日志配置 if cls.LOG_LEVEL not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]: diff --git a/db_pool.py b/db_pool.py index b5f4fe9..bdb0ac5 100755 --- a/db_pool.py +++ b/db_pool.py @@ -7,12 +7,28 @@ import sqlite3 import threading +import time from queue import Empty, Full, Queue +from app_config import get_config from app_logger import get_logger logger = get_logger("database") +config = get_config() + +DB_CONNECT_TIMEOUT_SECONDS = max(1, int(getattr(config, "DB_CONNECT_TIMEOUT_SECONDS", 10))) +DB_BUSY_TIMEOUT_MS = max(1000, int(getattr(config, "DB_BUSY_TIMEOUT_MS", 10000))) +DB_CACHE_SIZE_KB = max(1024, int(getattr(config, "DB_CACHE_SIZE_KB", 8192))) +DB_WAL_AUTOCHECKPOINT_PAGES = max(100, int(getattr(config, "DB_WAL_AUTOCHECKPOINT_PAGES", 1000))) +DB_MMAP_SIZE_MB = max(0, int(getattr(config, "DB_MMAP_SIZE_MB", 256))) +DB_LOCK_RETRY_COUNT = max(0, int(getattr(config, "DB_LOCK_RETRY_COUNT", 3))) +DB_LOCK_RETRY_BASE_MS = max(10, int(getattr(config, "DB_LOCK_RETRY_BASE_MS", 50))) + + +def _is_lock_conflict_error(error: sqlite3.OperationalError) -> bool: + message = str(error or "").lower() + return ("locked" in message) or ("busy" in message) class ConnectionPool: @@ -46,16 +62,29 @@ class ConnectionPool: def _create_connection(self): """创建新的数据库连接""" - conn = sqlite3.connect(self.database, check_same_thread=False) + conn = sqlite3.connect( + self.database, + check_same_thread=False, + timeout=DB_CONNECT_TIMEOUT_SECONDS, + ) conn.row_factory = sqlite3.Row - # 启用外键约束,确保 ON DELETE CASCADE 等约束生效 - conn.execute("PRAGMA foreign_keys=ON") - # 设置WAL模式提高并发性能 - conn.execute("PRAGMA journal_mode=WAL") - # 在WAL模式下使用NORMAL同步,兼顾性能与可靠性 - conn.execute("PRAGMA synchronous=NORMAL") - # 设置合理的超时时间 - conn.execute("PRAGMA busy_timeout=5000") + pragma_statements = [ + "PRAGMA foreign_keys=ON", + "PRAGMA journal_mode=WAL", + "PRAGMA synchronous=NORMAL", + f"PRAGMA busy_timeout={DB_BUSY_TIMEOUT_MS}", + "PRAGMA temp_store=MEMORY", + f"PRAGMA cache_size={-DB_CACHE_SIZE_KB}", + f"PRAGMA wal_autocheckpoint={DB_WAL_AUTOCHECKPOINT_PAGES}", + ] + if DB_MMAP_SIZE_MB > 0: + pragma_statements.append(f"PRAGMA mmap_size={DB_MMAP_SIZE_MB * 1024 * 1024}") + + for statement in pragma_statements: + try: + conn.execute(statement) + except sqlite3.DatabaseError as e: + logger.warning(f"设置数据库参数失败 ({statement}): {e}") return conn def _close_connection(self, conn) -> None: @@ -198,7 +227,20 @@ class PooledConnection: def commit(self): """提交事务""" - self._conn.commit() + for attempt in range(DB_LOCK_RETRY_COUNT + 1): + try: + self._conn.commit() + return + except sqlite3.OperationalError as e: + if (not _is_lock_conflict_error(e)) or attempt >= DB_LOCK_RETRY_COUNT: + raise + + sleep_seconds = (DB_LOCK_RETRY_BASE_MS * (2**attempt)) / 1000.0 + logger.warning( + f"数据库提交遇到锁冲突,{sleep_seconds:.3f}s 后重试 " + f"({attempt + 1}/{DB_LOCK_RETRY_COUNT})" + ) + time.sleep(sleep_seconds) def rollback(self): """回滚事务""" diff --git a/services/maintenance.py b/services/maintenance.py index fda72f1..9241cd1 100644 --- a/services/maintenance.py +++ b/services/maintenance.py @@ -29,6 +29,12 @@ config = get_config() USER_ACCOUNTS_EXPIRE_SECONDS = int(getattr(config, "USER_ACCOUNTS_EXPIRE_SECONDS", 3600)) BATCH_TASK_EXPIRE_SECONDS = int(getattr(config, "BATCH_TASK_EXPIRE_SECONDS", 21600)) PENDING_RANDOM_EXPIRE_SECONDS = int(getattr(config, "PENDING_RANDOM_EXPIRE_SECONDS", 7200)) +DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS = max(300, int(getattr(config, "DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS", 21600))) +DB_ANALYZE_INTERVAL_SECONDS = max(0, int(getattr(config, "DB_ANALYZE_INTERVAL_SECONDS", 86400))) +DB_WAL_CHECKPOINT_INTERVAL_SECONDS = max(0, int(getattr(config, "DB_WAL_CHECKPOINT_INTERVAL_SECONDS", 43200))) +DB_WAL_CHECKPOINT_MODE = str(getattr(config, "DB_WAL_CHECKPOINT_MODE", "PASSIVE") or "PASSIVE").upper().strip() +if DB_WAL_CHECKPOINT_MODE not in {"PASSIVE", "FULL", "RESTART", "TRUNCATE"}: + DB_WAL_CHECKPOINT_MODE = "PASSIVE" # 金山文档离线通知状态:每次掉线只通知一次,恢复在线后重置 _kdocs_offline_notified: bool = False @@ -275,6 +281,80 @@ def start_cleanup_scheduler() -> None: logger.info("内存清理调度器已启动") +def _execute_db_statement(statement: str, *, commit: bool = False, fetchone: bool = False): + import db_pool + + with db_pool.get_db() as conn: + conn.execute(statement) + row = conn.fetchone() if fetchone else None + if commit: + conn.commit() + return row + + +def optimize_database_runtime() -> None: + """执行 SQLite 运行期优化,提升查询计划和页缓存命中率。""" + row = _execute_db_statement("PRAGMA optimize", fetchone=True) + if row: + logger.debug(f"[DB维护] PRAGMA optimize 已执行: {tuple(row)}") + else: + logger.debug("[DB维护] PRAGMA optimize 已执行") + + +def analyze_database_stats() -> None: + """执行 ANALYZE,刷新统计信息,提升复杂查询稳定性。""" + _execute_db_statement("ANALYZE", commit=True) + logger.info("[DB维护] ANALYZE 已完成") + + +def checkpoint_database_wal() -> None: + """定期执行 WAL checkpoint,控制 WAL 文件体积。""" + row = _execute_db_statement( + f"PRAGMA wal_checkpoint({DB_WAL_CHECKPOINT_MODE})", + fetchone=True, + ) + if row: + logger.debug(f"[DB维护] WAL checkpoint({DB_WAL_CHECKPOINT_MODE}) 结果: {tuple(row)}") + else: + logger.debug(f"[DB维护] WAL checkpoint({DB_WAL_CHECKPOINT_MODE}) 已执行") + + +def start_database_maintenance_scheduler() -> None: + """启动数据库维护调度器。""" + _start_daemon_loop( + "db-optimize", + startup_delay=180, + interval_seconds=DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS, + job=optimize_database_runtime, + error_tag="[DB维护] PRAGMA optimize 执行失败", + ) + + if DB_ANALYZE_INTERVAL_SECONDS > 0: + _start_daemon_loop( + "db-analyze", + startup_delay=300, + interval_seconds=DB_ANALYZE_INTERVAL_SECONDS, + job=analyze_database_stats, + error_tag="[DB维护] ANALYZE 执行失败", + ) + + if DB_WAL_CHECKPOINT_INTERVAL_SECONDS > 0: + _start_daemon_loop( + "db-wal-checkpoint", + startup_delay=420, + interval_seconds=DB_WAL_CHECKPOINT_INTERVAL_SECONDS, + job=checkpoint_database_wal, + error_tag="[DB维护] WAL checkpoint 执行失败", + ) + + logger.info( + "[DB维护] 调度器已启动: " + f"optimize={DB_PRAGMA_OPTIMIZE_INTERVAL_SECONDS}s, " + f"analyze={DB_ANALYZE_INTERVAL_SECONDS}s, " + f"checkpoint={DB_WAL_CHECKPOINT_INTERVAL_SECONDS}s({DB_WAL_CHECKPOINT_MODE})" + ) + + def start_kdocs_monitor() -> None: """启动金山文档状态监控""" _start_daemon_loop(