#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ SQLite storage module - local database for config and accounts """ import sqlite3 import json import os from pathlib import Path from typing import TYPE_CHECKING, List, Optional from contextlib import contextmanager if TYPE_CHECKING: from config import AppConfig, AccountConfig def _get_db_path() -> Path: """Get database file path""" from config import DATA_DIR return DATA_DIR / "zsglpt.db" @contextmanager def get_connection(): """Get database connection with context manager""" db_path = _get_db_path() db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_database(): """Initialize database tables""" with get_connection() as conn: cursor = conn.cursor() # Accounts table cursor.execute(''' CREATE TABLE IF NOT EXISTS accounts ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password TEXT NOT NULL, remark TEXT DEFAULT '', enabled INTEGER DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Settings table (key-value store) cursor.execute(''' CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create index cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_accounts_username ON accounts(username) ''') def _ensure_db(): """Ensure database is initialized""" db_path = _get_db_path() if not db_path.exists(): init_database() # ==================== Account Operations ==================== def get_all_accounts() -> List[dict]: """Get all accounts from database""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM accounts ORDER BY id') rows = cursor.fetchall() return [dict(row) for row in rows] def get_account_by_username(username: str) -> Optional[dict]: """Get account by username""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM accounts WHERE username = ?', (username,)) row = cursor.fetchone() return dict(row) if row else None def add_account(username: str, password: str, remark: str = '', enabled: bool = True) -> int: """Add new account, returns account id""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO accounts (username, password, remark, enabled) VALUES (?, ?, ?, ?) ''', (username, password, remark, 1 if enabled else 0)) return cursor.lastrowid def update_account(account_id: int, username: str, password: str, remark: str, enabled: bool): """Update existing account""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute(''' UPDATE accounts SET username = ?, password = ?, remark = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (username, password, remark, 1 if enabled else 0, account_id)) def delete_account(account_id: int): """Delete account by id""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute('DELETE FROM accounts WHERE id = ?', (account_id,)) def delete_account_by_username(username: str): """Delete account by username""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute('DELETE FROM accounts WHERE username = ?', (username,)) # ==================== Settings Operations ==================== def get_setting(key: str, default: str = '') -> str: """Get setting value by key""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT value FROM settings WHERE key = ?', (key,)) row = cursor.fetchone() return row['value'] if row else default def set_setting(key: str, value: str): """Set setting value""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) ''', (key, value)) def get_all_settings() -> dict: """Get all settings as dictionary""" _ensure_db() with get_connection() as conn: cursor = conn.cursor() cursor.execute('SELECT key, value FROM settings') rows = cursor.fetchall() return {row['key']: row['value'] for row in rows} # ==================== Config Bridge (compatibility with existing code) ==================== def load_config() -> "AppConfig": """ Load config from SQLite database Returns AppConfig object for compatibility """ from config import AppConfig, AccountConfig, KDocsConfig, ScreenshotConfig, ProxyConfig, ZSGLConfig, SCREENSHOTS_DIR _ensure_db() config = AppConfig() # Load accounts accounts = get_all_accounts() config.accounts = [ AccountConfig( username=a['username'], password=a['password'], remark=a['remark'] or '', enabled=bool(a['enabled']) ) for a in accounts ] # Load settings settings = get_all_settings() # KDocs config - 默认文档链接 DEFAULT_KDOCS_URL = 'https://kdocs.cn/l/cpwEOo5ynKX4' config.kdocs = KDocsConfig( enabled=settings.get('kdocs_enabled', 'false').lower() == 'true', doc_url=settings.get('kdocs_doc_url', '') or DEFAULT_KDOCS_URL, # 空字符串也用默认值 sheet_name=settings.get('kdocs_sheet_name', 'Sheet1'), sheet_index=int(settings.get('kdocs_sheet_index', '0')), unit_column=settings.get('kdocs_unit_column', 'A'), image_column=settings.get('kdocs_image_column', 'D'), unit=settings.get('kdocs_unit', ''), name_column=settings.get('kdocs_name_column', 'C'), row_start=int(settings.get('kdocs_row_start', '0')), row_end=int(settings.get('kdocs_row_end', '0')), ) # Screenshot config config.screenshot = ScreenshotConfig( dir=settings.get('screenshot_dir', str(SCREENSHOTS_DIR)), quality=int(settings.get('screenshot_quality', '95')), width=int(settings.get('screenshot_width', '1920')), height=int(settings.get('screenshot_height', '1080')), js_delay_ms=int(settings.get('screenshot_js_delay_ms', '3000')), timeout_seconds=int(settings.get('screenshot_timeout_seconds', '60')), wkhtmltoimage_path=settings.get('screenshot_wkhtmltoimage_path', ''), ) # Proxy config config.proxy = ProxyConfig( enabled=settings.get('proxy_enabled', 'false').lower() == 'true', server=settings.get('proxy_server', ''), ) # ZSGL config config.zsgl = ZSGLConfig( base_url=settings.get('zsgl_base_url', 'https://postoa.aidunsoft.com'), login_url=settings.get('zsgl_login_url', 'https://postoa.aidunsoft.com/admin/login.aspx'), index_url_pattern=settings.get('zsgl_index_url_pattern', 'index.aspx'), ) # Theme config.theme = settings.get('theme', 'light') return config def save_config(config: "AppConfig") -> bool: """ Save config to SQLite database """ _ensure_db() try: with get_connection() as conn: cursor = conn.cursor() # Save accounts - first get existing accounts existing_usernames = set() cursor.execute('SELECT username FROM accounts') for row in cursor.fetchall(): existing_usernames.add(row['username']) # Update or insert accounts config_usernames = set() for account in config.accounts: config_usernames.add(account.username) if account.username in existing_usernames: # Update existing cursor.execute(''' UPDATE accounts SET password = ?, remark = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE username = ? ''', (account.password, account.remark, 1 if account.enabled else 0, account.username)) else: # Insert new cursor.execute(''' INSERT INTO accounts (username, password, remark, enabled) VALUES (?, ?, ?, ?) ''', (account.username, account.password, account.remark, 1 if account.enabled else 0)) # Delete removed accounts removed = existing_usernames - config_usernames for username in removed: cursor.execute('DELETE FROM accounts WHERE username = ?', (username,)) # Save settings settings_to_save = { # KDocs 'kdocs_enabled': str(config.kdocs.enabled).lower(), 'kdocs_doc_url': config.kdocs.doc_url, 'kdocs_sheet_name': config.kdocs.sheet_name, 'kdocs_sheet_index': str(config.kdocs.sheet_index), 'kdocs_unit_column': config.kdocs.unit_column, 'kdocs_image_column': config.kdocs.image_column, 'kdocs_unit': config.kdocs.unit, 'kdocs_name_column': config.kdocs.name_column, 'kdocs_row_start': str(config.kdocs.row_start), 'kdocs_row_end': str(config.kdocs.row_end), # Screenshot 'screenshot_dir': config.screenshot.dir, 'screenshot_quality': str(config.screenshot.quality), 'screenshot_width': str(config.screenshot.width), 'screenshot_height': str(config.screenshot.height), 'screenshot_js_delay_ms': str(config.screenshot.js_delay_ms), 'screenshot_timeout_seconds': str(config.screenshot.timeout_seconds), 'screenshot_wkhtmltoimage_path': config.screenshot.wkhtmltoimage_path, # Proxy 'proxy_enabled': str(config.proxy.enabled).lower(), 'proxy_server': config.proxy.server, # ZSGL 'zsgl_base_url': config.zsgl.base_url, 'zsgl_login_url': config.zsgl.login_url, 'zsgl_index_url_pattern': config.zsgl.index_url_pattern, # Theme 'theme': config.theme, } for key, value in settings_to_save.items(): cursor.execute(''' INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) ''', (key, value)) return True except Exception as e: print(f"[Error] Save config failed: {e}") return False def backup_config() -> bool: """Backup database file""" db_path = _get_db_path() if not db_path.exists(): return False backup_path = db_path.with_suffix('.db.bak') try: import shutil shutil.copy2(db_path, backup_path) return True except IOError as e: print(f"[Error] Backup failed: {e}") return False def restore_config() -> bool: """Restore database from backup""" db_path = _get_db_path() backup_path = db_path.with_suffix('.db.bak') if not backup_path.exists(): return False try: import shutil shutil.copy2(backup_path, db_path) return True except IOError as e: print(f"[Error] Restore failed: {e}") return False def migrate_from_json(): """Migrate data from old JSON config to SQLite""" from config import CONFIG_FILE if not CONFIG_FILE.exists(): return False try: with open(CONFIG_FILE, 'r', encoding='utf-8') as f: data = json.load(f) # Load using old format from config import AppConfig old_config = AppConfig.from_dict(data) # Save to SQLite save_config(old_config) # Rename old file backup = CONFIG_FILE.with_suffix('.json.migrated') CONFIG_FILE.rename(backup) print(f"[Info] Migrated from JSON to SQLite, old file renamed to {backup}") return True except Exception as e: print(f"[Error] Migration failed: {e}") return False