Files
zsglpt/db/migrations.py

453 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import sqlite3
from db.utils import get_cst_now_str
def get_current_version(conn) -> int:
cursor = conn.cursor()
cursor.execute("SELECT version FROM db_version WHERE id = 1")
row = cursor.fetchone()
if not row:
return 0
try:
return int(row["version"])
except Exception:
try:
return int(row[0])
except Exception:
return 0
def set_current_version(conn, version: int) -> None:
cursor = conn.cursor()
cursor.execute("UPDATE db_version SET version = ?, updated_at = ? WHERE id = 1", (int(version), get_cst_now_str()))
conn.commit()
def migrate_database(conn, target_version: int) -> None:
"""数据库迁移:按版本增量升级(向前兼容)。"""
cursor = conn.cursor()
cursor.execute("INSERT OR IGNORE INTO db_version (id, version, updated_at) VALUES (1, 0, ?)", (get_cst_now_str(),))
conn.commit()
current_version = get_current_version(conn)
if current_version < 1:
_migrate_to_v1(conn)
current_version = 1
if current_version < 2:
_migrate_to_v2(conn)
current_version = 2
if current_version < 3:
_migrate_to_v3(conn)
current_version = 3
if current_version < 4:
_migrate_to_v4(conn)
current_version = 4
if current_version < 5:
_migrate_to_v5(conn)
current_version = 5
if current_version < 6:
_migrate_to_v6(conn)
current_version = 6
if current_version < 7:
_migrate_to_v7(conn)
current_version = 7
if current_version < 8:
_migrate_to_v8(conn)
current_version = 8
if current_version < 9:
_migrate_to_v9(conn)
current_version = 9
if current_version < 10:
_migrate_to_v10(conn)
current_version = 10
if current_version != int(target_version):
set_current_version(conn, int(target_version))
def _migrate_to_v1(conn):
"""迁移到版本1 - 添加缺失字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(system_config)")
columns = [col[1] for col in cursor.fetchall()]
if "schedule_weekdays" not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN schedule_weekdays TEXT DEFAULT "1,2,3,4,5,6,7"')
print(" ✓ 添加 schedule_weekdays 字段")
if "max_screenshot_concurrent" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN max_screenshot_concurrent INTEGER DEFAULT 3")
print(" ✓ 添加 max_screenshot_concurrent 字段")
if "max_concurrent_per_account" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN max_concurrent_per_account INTEGER DEFAULT 1")
print(" ✓ 添加 max_concurrent_per_account 字段")
if "auto_approve_enabled" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN auto_approve_enabled INTEGER DEFAULT 0")
print(" ✓ 添加 auto_approve_enabled 字段")
if "auto_approve_hourly_limit" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN auto_approve_hourly_limit INTEGER DEFAULT 10")
print(" ✓ 添加 auto_approve_hourly_limit 字段")
if "auto_approve_vip_days" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN auto_approve_vip_days INTEGER DEFAULT 7")
print(" ✓ 添加 auto_approve_vip_days 字段")
cursor.execute("PRAGMA table_info(task_logs)")
columns = [col[1] for col in cursor.fetchall()]
if "duration" not in columns:
cursor.execute("ALTER TABLE task_logs ADD COLUMN duration INTEGER")
print(" ✓ 添加 duration 字段到 task_logs")
conn.commit()
def _migrate_to_v2(conn):
"""迁移到版本2 - 添加代理配置字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(system_config)")
columns = [col[1] for col in cursor.fetchall()]
if "proxy_enabled" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN proxy_enabled INTEGER DEFAULT 0")
print(" ✓ 添加 proxy_enabled 字段")
if "proxy_api_url" not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_api_url TEXT DEFAULT ""')
print(" ✓ 添加 proxy_api_url 字段")
if "proxy_expire_minutes" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN proxy_expire_minutes INTEGER DEFAULT 3")
print(" ✓ 添加 proxy_expire_minutes 字段")
if "enable_screenshot" not in columns:
cursor.execute("ALTER TABLE system_config ADD COLUMN enable_screenshot INTEGER DEFAULT 1")
print(" ✓ 添加 enable_screenshot 字段")
conn.commit()
def _migrate_to_v3(conn):
"""迁移到版本3 - 添加账号状态和登录失败计数字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(accounts)")
columns = [col[1] for col in cursor.fetchall()]
if "status" not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN status TEXT DEFAULT "active"')
print(" ✓ 添加 accounts.status 字段 (账号状态)")
if "login_fail_count" not in columns:
cursor.execute("ALTER TABLE accounts ADD COLUMN login_fail_count INTEGER DEFAULT 0")
print(" ✓ 添加 accounts.login_fail_count 字段 (登录失败计数)")
if "last_login_error" not in columns:
cursor.execute("ALTER TABLE accounts ADD COLUMN last_login_error TEXT")
print(" ✓ 添加 accounts.last_login_error 字段 (最后登录错误)")
conn.commit()
def _migrate_to_v4(conn):
"""迁移到版本4 - 添加任务来源字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(task_logs)")
columns = [col[1] for col in cursor.fetchall()]
if "source" not in columns:
cursor.execute('ALTER TABLE task_logs ADD COLUMN source TEXT DEFAULT "manual"')
print(" ✓ 添加 task_logs.source 字段 (任务来源: manual/scheduled/immediate)")
conn.commit()
def _migrate_to_v5(conn):
"""迁移到版本5 - 添加用户定时任务表"""
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_schedules'")
if not cursor.fetchone():
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS user_schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT DEFAULT '我的定时任务',
enabled INTEGER DEFAULT 0,
schedule_time TEXT NOT NULL DEFAULT '08:00',
weekdays TEXT NOT NULL DEFAULT '1,2,3,4,5',
browse_type TEXT NOT NULL DEFAULT '应读',
enable_screenshot INTEGER DEFAULT 1,
account_ids TEXT,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
"""
)
print(" ✓ 创建 user_schedules 表 (用户定时任务)")
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS schedule_execution_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schedule_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
schedule_name TEXT,
execute_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_accounts INTEGER DEFAULT 0,
success_accounts INTEGER DEFAULT 0,
failed_accounts INTEGER DEFAULT 0,
total_items INTEGER DEFAULT 0,
total_attachments INTEGER DEFAULT 0,
total_screenshots INTEGER DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
error_message TEXT,
FOREIGN KEY (schedule_id) REFERENCES user_schedules (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
"""
)
print(" ✓ 创建 schedule_execution_logs 表 (定时任务执行日志)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_user_id ON user_schedules(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled ON user_schedules(enabled)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)")
print(" ✓ 创建 user_schedules 表索引")
conn.commit()
def _migrate_to_v6(conn):
"""迁移到版本6 - 添加公告功能相关表"""
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='announcements'")
if not cursor.fetchone():
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS announcements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
is_active INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
print(" ✓ 创建 announcements 表 (公告)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_announcements_active ON announcements(is_active)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_announcements_created_at ON announcements(created_at)")
print(" ✓ 创建 announcements 表索引")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='announcement_dismissals'")
if not cursor.fetchone():
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS announcement_dismissals (
user_id INTEGER NOT NULL,
announcement_id INTEGER NOT NULL,
dismissed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, announcement_id),
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE,
FOREIGN KEY (announcement_id) REFERENCES announcements (id) ON DELETE CASCADE
)
"""
)
print(" ✓ 创建 announcement_dismissals 表 (公告永久关闭记录)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_announcement_dismissals_user ON announcement_dismissals(user_id)")
print(" ✓ 创建 announcement_dismissals 表索引")
conn.commit()
def _migrate_to_v7(conn):
"""迁移到版本7 - 统一存储北京时间将历史UTC时间字段整体+8小时"""
cursor = conn.cursor()
def table_exists(table_name: str) -> bool:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def column_exists(table_name: str, column_name: str) -> bool:
cursor.execute(f"PRAGMA table_info({table_name})")
return any(row[1] == column_name for row in cursor.fetchall())
def shift_utc_to_cst(table_name: str, column_name: str) -> None:
if not table_exists(table_name):
return
if not column_exists(table_name, column_name):
return
cursor.execute(
f"""
UPDATE {table_name}
SET {column_name} = datetime({column_name}, '+8 hours')
WHERE {column_name} IS NOT NULL AND {column_name} != ''
"""
)
for table, col in [
("users", "created_at"),
("users", "approved_at"),
("admins", "created_at"),
("accounts", "created_at"),
("password_reset_requests", "created_at"),
("password_reset_requests", "processed_at"),
]:
shift_utc_to_cst(table, col)
for table, col in [
("smtp_configs", "created_at"),
("smtp_configs", "updated_at"),
("smtp_configs", "last_success_at"),
("email_settings", "updated_at"),
("email_tokens", "created_at"),
("email_logs", "created_at"),
("email_stats", "last_updated"),
]:
shift_utc_to_cst(table, col)
for table, col in [
("task_checkpoints", "created_at"),
("task_checkpoints", "updated_at"),
("task_checkpoints", "completed_at"),
]:
shift_utc_to_cst(table, col)
conn.commit()
print(" ✓ 时区迁移历史UTC时间已转换为北京时间")
def _migrate_to_v8(conn):
"""迁移到版本8 - 用户定时 next_run_at 随机延迟落库O-08"""
cursor = conn.cursor()
# 1) 增量字段random_delay旧库可能不存在
cursor.execute("PRAGMA table_info(user_schedules)")
columns = [col[1] for col in cursor.fetchall()]
if "random_delay" not in columns:
cursor.execute("ALTER TABLE user_schedules ADD COLUMN random_delay INTEGER DEFAULT 0")
print(" ✓ 添加 user_schedules.random_delay 字段")
if "next_run_at" not in columns:
cursor.execute("ALTER TABLE user_schedules ADD COLUMN next_run_at TIMESTAMP")
print(" ✓ 添加 user_schedules.next_run_at 字段")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)")
conn.commit()
# 2) 为历史 enabled schedule 补算 next_run_at保证索引驱动可用
try:
from services.schedule_utils import compute_next_run_at, format_cst
from services.time_utils import get_beijing_now
now_dt = get_beijing_now()
now_str = format_cst(now_dt)
cursor.execute(
"""
SELECT id, schedule_time, weekdays, random_delay, last_run_at, next_run_at
FROM user_schedules
WHERE enabled = 1
"""
)
rows = cursor.fetchall() or []
fixed = 0
for row in rows:
try:
schedule_id = row["id"] if isinstance(row, sqlite3.Row) else row[0]
schedule_time = row["schedule_time"] if isinstance(row, sqlite3.Row) else row[1]
weekdays = row["weekdays"] if isinstance(row, sqlite3.Row) else row[2]
random_delay = row["random_delay"] if isinstance(row, sqlite3.Row) else row[3]
last_run_at = row["last_run_at"] if isinstance(row, sqlite3.Row) else row[4]
next_run_at = row["next_run_at"] if isinstance(row, sqlite3.Row) else row[5]
except Exception:
continue
next_run_text = str(next_run_at or "").strip()
# 若 next_run_at 为空/非法/已过期,则重算
if (not next_run_text) or (next_run_text <= now_str):
next_dt = compute_next_run_at(
now=now_dt,
schedule_time=str(schedule_time or "08:00"),
weekdays=str(weekdays or "1,2,3,4,5"),
random_delay=int(random_delay or 0),
last_run_at=str(last_run_at or "") if last_run_at else None,
)
next_run_text = format_cst(next_dt)
cursor.execute(
"UPDATE user_schedules SET next_run_at = ?, updated_at = ? WHERE id = ?",
(next_run_text, get_cst_now_str(), int(schedule_id)),
)
fixed += 1
conn.commit()
if fixed:
print(f" ✓ 已为 {fixed} 条启用定时任务补算 next_run_at")
except Exception as e:
# 迁移过程中不阻断主流程;上线后由 worker 兜底补算
print(f" ⚠ v8 迁移补算 next_run_at 失败: {e}")
def _migrate_to_v9(conn):
"""迁移到版本9 - 邮件设置字段迁移(清理 email_service scattered ALTER TABLE"""
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='email_settings'")
if not cursor.fetchone():
# 邮件表由 email_service.init_email_tables 创建;此处仅做增量字段迁移
return
cursor.execute("PRAGMA table_info(email_settings)")
columns = [col[1] for col in cursor.fetchall()]
changed = False
if "register_verify_enabled" not in columns:
cursor.execute("ALTER TABLE email_settings ADD COLUMN register_verify_enabled INTEGER DEFAULT 0")
print(" ✓ 添加 email_settings.register_verify_enabled 字段")
changed = True
if "base_url" not in columns:
cursor.execute("ALTER TABLE email_settings ADD COLUMN base_url TEXT DEFAULT ''")
print(" ✓ 添加 email_settings.base_url 字段")
changed = True
if "task_notify_enabled" not in columns:
cursor.execute("ALTER TABLE email_settings ADD COLUMN task_notify_enabled INTEGER DEFAULT 0")
print(" ✓ 添加 email_settings.task_notify_enabled 字段")
changed = True
if changed:
conn.commit()
def _migrate_to_v10(conn):
"""迁移到版本10 - users 邮箱字段迁移(避免运行时 ALTER TABLE"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(users)")
columns = [col[1] for col in cursor.fetchall()]
changed = False
if "email_verified" not in columns:
cursor.execute("ALTER TABLE users ADD COLUMN email_verified INTEGER DEFAULT 0")
print(" ✓ 添加 users.email_verified 字段")
changed = True
if "email_notify_enabled" not in columns:
cursor.execute("ALTER TABLE users ADD COLUMN email_notify_enabled INTEGER DEFAULT 1")
print(" ✓ 添加 users.email_notify_enabled 字段")
changed = True
if changed:
conn.commit()