Files
zsglpt-pc/utils/storage.py
237899745 83fef6dff2 feat: 知识管理平台精简版 - PyQt6桌面应用
主要功能:
- 账号管理:添加/编辑/删除账号,测试登录
- 浏览任务:批量浏览应读/选读内容并标记已读
- 截图管理:wkhtmltoimage截图,查看历史
- 金山文档:扫码登录/微信快捷登录,自动上传截图

技术栈:
- PyQt6 GUI框架
- Playwright 浏览器自动化
- SQLite 本地数据存储
- wkhtmltoimage 网页截图

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-18 22:16:36 +08:00

399 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SQLite storage module - local database for config and accounts
"""
import sqlite3
import json
import os
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional
from contextlib import contextmanager
if TYPE_CHECKING:
from config import AppConfig, AccountConfig
def _get_db_path() -> Path:
"""Get database file path"""
from config import DATA_DIR
return DATA_DIR / "zsglpt.db"
@contextmanager
def get_connection():
"""Get database connection with context manager"""
db_path = _get_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_database():
"""Initialize database tables"""
with get_connection() as conn:
cursor = conn.cursor()
# Accounts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
remark TEXT DEFAULT '',
enabled INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Settings table (key-value store)
cursor.execute('''
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create index
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_accounts_username ON accounts(username)
''')
def _ensure_db():
"""Ensure database is initialized"""
db_path = _get_db_path()
if not db_path.exists():
init_database()
# ==================== Account Operations ====================
def get_all_accounts() -> List[dict]:
"""Get all accounts from database"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts ORDER BY id')
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_account_by_username(username: str) -> Optional[dict]:
"""Get account by username"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts WHERE username = ?', (username,))
row = cursor.fetchone()
return dict(row) if row else None
def add_account(username: str, password: str, remark: str = '', enabled: bool = True) -> int:
"""Add new account, returns account id"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO accounts (username, password, remark, enabled)
VALUES (?, ?, ?, ?)
''', (username, password, remark, 1 if enabled else 0))
return cursor.lastrowid
def update_account(account_id: int, username: str, password: str, remark: str, enabled: bool):
"""Update existing account"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE accounts
SET username = ?, password = ?, remark = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (username, password, remark, 1 if enabled else 0, account_id))
def delete_account(account_id: int):
"""Delete account by id"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM accounts WHERE id = ?', (account_id,))
def delete_account_by_username(username: str):
"""Delete account by username"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM accounts WHERE username = ?', (username,))
# ==================== Settings Operations ====================
def get_setting(key: str, default: str = '') -> str:
"""Get setting value by key"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT value FROM settings WHERE key = ?', (key,))
row = cursor.fetchone()
return row['value'] if row else default
def set_setting(key: str, value: str):
"""Set setting value"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO settings (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
''', (key, value))
def get_all_settings() -> dict:
"""Get all settings as dictionary"""
_ensure_db()
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT key, value FROM settings')
rows = cursor.fetchall()
return {row['key']: row['value'] for row in rows}
# ==================== Config Bridge (compatibility with existing code) ====================
def load_config() -> "AppConfig":
"""
Load config from SQLite database
Returns AppConfig object for compatibility
"""
from config import AppConfig, AccountConfig, KDocsConfig, ScreenshotConfig, ProxyConfig, ZSGLConfig, SCREENSHOTS_DIR
_ensure_db()
config = AppConfig()
# Load accounts
accounts = get_all_accounts()
config.accounts = [
AccountConfig(
username=a['username'],
password=a['password'],
remark=a['remark'] or '',
enabled=bool(a['enabled'])
) for a in accounts
]
# Load settings
settings = get_all_settings()
# KDocs config - 默认文档链接
DEFAULT_KDOCS_URL = 'https://kdocs.cn/l/cpwEOo5ynKX4'
config.kdocs = KDocsConfig(
enabled=settings.get('kdocs_enabled', 'false').lower() == 'true',
doc_url=settings.get('kdocs_doc_url', '') or DEFAULT_KDOCS_URL, # 空字符串也用默认值
sheet_name=settings.get('kdocs_sheet_name', 'Sheet1'),
sheet_index=int(settings.get('kdocs_sheet_index', '0')),
unit_column=settings.get('kdocs_unit_column', 'A'),
image_column=settings.get('kdocs_image_column', 'D'),
unit=settings.get('kdocs_unit', ''),
name_column=settings.get('kdocs_name_column', 'C'),
row_start=int(settings.get('kdocs_row_start', '0')),
row_end=int(settings.get('kdocs_row_end', '0')),
)
# Screenshot config
config.screenshot = ScreenshotConfig(
dir=settings.get('screenshot_dir', str(SCREENSHOTS_DIR)),
quality=int(settings.get('screenshot_quality', '95')),
width=int(settings.get('screenshot_width', '1920')),
height=int(settings.get('screenshot_height', '1080')),
js_delay_ms=int(settings.get('screenshot_js_delay_ms', '3000')),
timeout_seconds=int(settings.get('screenshot_timeout_seconds', '60')),
wkhtmltoimage_path=settings.get('screenshot_wkhtmltoimage_path', ''),
)
# Proxy config
config.proxy = ProxyConfig(
enabled=settings.get('proxy_enabled', 'false').lower() == 'true',
server=settings.get('proxy_server', ''),
)
# ZSGL config
config.zsgl = ZSGLConfig(
base_url=settings.get('zsgl_base_url', 'https://postoa.aidunsoft.com'),
login_url=settings.get('zsgl_login_url', 'https://postoa.aidunsoft.com/admin/login.aspx'),
index_url_pattern=settings.get('zsgl_index_url_pattern', 'index.aspx'),
)
# Theme
config.theme = settings.get('theme', 'light')
return config
def save_config(config: "AppConfig") -> bool:
"""
Save config to SQLite database
"""
_ensure_db()
try:
with get_connection() as conn:
cursor = conn.cursor()
# Save accounts - first get existing accounts
existing_usernames = set()
cursor.execute('SELECT username FROM accounts')
for row in cursor.fetchall():
existing_usernames.add(row['username'])
# Update or insert accounts
config_usernames = set()
for account in config.accounts:
config_usernames.add(account.username)
if account.username in existing_usernames:
# Update existing
cursor.execute('''
UPDATE accounts
SET password = ?, remark = ?, enabled = ?, updated_at = CURRENT_TIMESTAMP
WHERE username = ?
''', (account.password, account.remark, 1 if account.enabled else 0, account.username))
else:
# Insert new
cursor.execute('''
INSERT INTO accounts (username, password, remark, enabled)
VALUES (?, ?, ?, ?)
''', (account.username, account.password, account.remark, 1 if account.enabled else 0))
# Delete removed accounts
removed = existing_usernames - config_usernames
for username in removed:
cursor.execute('DELETE FROM accounts WHERE username = ?', (username,))
# Save settings
settings_to_save = {
# KDocs
'kdocs_enabled': str(config.kdocs.enabled).lower(),
'kdocs_doc_url': config.kdocs.doc_url,
'kdocs_sheet_name': config.kdocs.sheet_name,
'kdocs_sheet_index': str(config.kdocs.sheet_index),
'kdocs_unit_column': config.kdocs.unit_column,
'kdocs_image_column': config.kdocs.image_column,
'kdocs_unit': config.kdocs.unit,
'kdocs_name_column': config.kdocs.name_column,
'kdocs_row_start': str(config.kdocs.row_start),
'kdocs_row_end': str(config.kdocs.row_end),
# Screenshot
'screenshot_dir': config.screenshot.dir,
'screenshot_quality': str(config.screenshot.quality),
'screenshot_width': str(config.screenshot.width),
'screenshot_height': str(config.screenshot.height),
'screenshot_js_delay_ms': str(config.screenshot.js_delay_ms),
'screenshot_timeout_seconds': str(config.screenshot.timeout_seconds),
'screenshot_wkhtmltoimage_path': config.screenshot.wkhtmltoimage_path,
# Proxy
'proxy_enabled': str(config.proxy.enabled).lower(),
'proxy_server': config.proxy.server,
# ZSGL
'zsgl_base_url': config.zsgl.base_url,
'zsgl_login_url': config.zsgl.login_url,
'zsgl_index_url_pattern': config.zsgl.index_url_pattern,
# Theme
'theme': config.theme,
}
for key, value in settings_to_save.items():
cursor.execute('''
INSERT OR REPLACE INTO settings (key, value, updated_at)
VALUES (?, ?, CURRENT_TIMESTAMP)
''', (key, value))
return True
except Exception as e:
print(f"[Error] Save config failed: {e}")
return False
def backup_config() -> bool:
"""Backup database file"""
db_path = _get_db_path()
if not db_path.exists():
return False
backup_path = db_path.with_suffix('.db.bak')
try:
import shutil
shutil.copy2(db_path, backup_path)
return True
except IOError as e:
print(f"[Error] Backup failed: {e}")
return False
def restore_config() -> bool:
"""Restore database from backup"""
db_path = _get_db_path()
backup_path = db_path.with_suffix('.db.bak')
if not backup_path.exists():
return False
try:
import shutil
shutil.copy2(backup_path, db_path)
return True
except IOError as e:
print(f"[Error] Restore failed: {e}")
return False
def migrate_from_json():
"""Migrate data from old JSON config to SQLite"""
from config import CONFIG_FILE
if not CONFIG_FILE.exists():
return False
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load using old format
from config import AppConfig
old_config = AppConfig.from_dict(data)
# Save to SQLite
save_config(old_config)
# Rename old file
backup = CONFIG_FILE.with_suffix('.json.migrated')
CONFIG_FILE.rename(backup)
print(f"[Info] Migrated from JSON to SQLite, old file renamed to {backup}")
return True
except Exception as e:
print(f"[Error] Migration failed: {e}")
return False