Files
zsglpt/database.py

2225 lines
78 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库模块 - 使用SQLite进行数据持久化
支持VIP功能
优化内容:
1. 清理所有注释掉的代码
2. 统一使用bcrypt密码哈希
3. 优化数据库索引
4. 规范化事务处理
5. 添加数据迁移功能
6. 改进错误处理
"""
import sqlite3
import time
from datetime import datetime, timedelta
import pytz
import threading
import db_pool
from password_utils import (
hash_password_bcrypt,
verify_password_bcrypt,
is_sha256_hash,
verify_password_sha256
)
from app_config import get_config
from crypto_utils import encrypt_password, decrypt_password, migrate_password
# Bug fix: 将 app_security 导入移到顶部,避免循环导入
# 注意:如果出现循环导入,需要检查 app_security 是否导入了 database
try:
from app_security import escape_html, sanitize_sql_like_pattern
except ImportError:
# 如果导入失败,提供基础实现
import html
def escape_html(text):
"""基础HTML转义"""
if text is None:
return ''
return html.escape(str(text))
def sanitize_sql_like_pattern(pattern):
"""基础SQL LIKE模式清理"""
if pattern is None:
return ''
return str(pattern).replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_')
# 获取配置
config = get_config()
# 数据库文件路径 - 从配置读取,避免硬编码
DB_FILE = config.DB_FILE
# 数据库版本 (用于迁移管理)
DB_VERSION = 7
# ==================== 时区处理工具函数 ====================
# Bug fix: 统一时区处理,避免混用导致的问题
CST_TZ = pytz.timezone("Asia/Shanghai")
def get_cst_now():
"""获取当前CST时间统一入口"""
return datetime.now(CST_TZ)
def get_cst_now_str():
"""获取当前CST时间字符串"""
return get_cst_now().strftime('%Y-%m-%d %H:%M:%S')
def parse_cst_datetime(datetime_str):
"""解析CST时间字符串为带时区的datetime对象
Args:
datetime_str: 格式为 'YYYY-MM-DD HH:MM:SS' 的字符串
Returns:
带CST时区的datetime对象
"""
naive = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
return CST_TZ.localize(naive)
def hash_password(password):
"""Password hashing using bcrypt"""
return hash_password_bcrypt(password)
def init_database():
"""初始化数据库表结构"""
db_pool.init_pool(DB_FILE, pool_size=config.DB_POOL_SIZE)
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 管理员表
cursor.execute('''
CREATE TABLE IF NOT EXISTS admins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
email TEXT,
status TEXT DEFAULT 'pending',
vip_expire_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
approved_at TIMESTAMP
)
''')
# 账号表(关联用户)
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
remember INTEGER DEFAULT 1,
remark TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# VIP配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS vip_config (
id INTEGER PRIMARY KEY CHECK (id = 1),
default_vip_days INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 系统配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_config (
id INTEGER PRIMARY KEY CHECK (id = 1),
max_concurrent_global INTEGER DEFAULT 2,
max_concurrent_per_account INTEGER DEFAULT 1,
max_screenshot_concurrent INTEGER DEFAULT 3,
schedule_enabled INTEGER DEFAULT 0,
schedule_time TEXT DEFAULT '02:00',
schedule_browse_type TEXT DEFAULT '应读',
schedule_weekdays TEXT DEFAULT '1,2,3,4,5,6,7',
proxy_enabled INTEGER DEFAULT 0,
proxy_api_url TEXT DEFAULT '',
proxy_expire_minutes INTEGER DEFAULT 3,
enable_screenshot INTEGER DEFAULT 1,
auto_approve_enabled INTEGER DEFAULT 0,
auto_approve_hourly_limit INTEGER DEFAULT 10,
auto_approve_vip_days INTEGER DEFAULT 7,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 任务日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
account_id TEXT NOT NULL,
username TEXT NOT NULL,
browse_type TEXT NOT NULL,
status TEXT NOT NULL,
total_items INTEGER DEFAULT 0,
total_attachments INTEGER DEFAULT 0,
error_message TEXT,
duration INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source TEXT DEFAULT 'manual',
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 密码重置申请表
cursor.execute('''
CREATE TABLE IF NOT EXISTS password_reset_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
new_password_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 数据库版本表
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Bug反馈表
cursor.execute('''
CREATE TABLE IF NOT EXISTS bug_feedbacks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
username TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT NOT NULL,
contact TEXT,
status TEXT DEFAULT 'pending',
admin_reply TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
replied_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 公告表
cursor.execute('''
CREATE TABLE IF NOT EXISTS announcements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
is_active INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 公告永久关闭记录表(用户维度)
cursor.execute('''
CREATE TABLE IF NOT EXISTS announcement_dismissals (
user_id INTEGER NOT NULL,
announcement_id INTEGER NOT NULL,
dismissed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, announcement_id),
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE,
FOREIGN KEY (announcement_id) REFERENCES announcements (id) ON DELETE CASCADE
)
''')
# 用户定时任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT DEFAULT '我的定时任务',
enabled INTEGER DEFAULT 0,
schedule_time TEXT NOT NULL DEFAULT '08:00',
weekdays TEXT NOT NULL DEFAULT '1,2,3,4,5',
browse_type TEXT NOT NULL DEFAULT '应读',
enable_screenshot INTEGER DEFAULT 1,
account_ids TEXT,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# ========== 创建索引 ==========
# 用户表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_status ON users(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_vip_expire ON users(vip_expire_time)')
# 账号表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_accounts_user_id ON accounts(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_accounts_username ON accounts(username)')
# 任务日志表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_user_id ON task_logs(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_status ON task_logs(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_created_at ON task_logs(created_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_user_date ON task_logs(user_id, created_at)')
# 密码重置表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_password_reset_status ON password_reset_requests(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_password_reset_user_id ON password_reset_requests(user_id)')
# Bug反馈表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_user_id ON bug_feedbacks(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_status ON bug_feedbacks(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_created_at ON bug_feedbacks(created_at)')
# 公告表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_announcements_active ON announcements(is_active)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_announcements_created_at ON announcements(created_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_announcement_dismissals_user ON announcement_dismissals(user_id)')
# 用户定时任务表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_user_id ON user_schedules(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled ON user_schedules(enabled)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)')
# 初始化VIP配置
try:
cursor.execute('INSERT INTO vip_config (id, default_vip_days) VALUES (1, 0)')
conn.commit()
print("✓ 已创建VIP配置(默认不赠送)")
except sqlite3.IntegrityError:
# VIP配置已存在忽略
pass
# 初始化系统配置
try:
cursor.execute('''
INSERT INTO system_config (
id, max_concurrent_global, max_concurrent_per_account, max_screenshot_concurrent,
schedule_enabled, schedule_time, schedule_browse_type, schedule_weekdays,
proxy_enabled, proxy_api_url, proxy_expire_minutes, enable_screenshot,
auto_approve_enabled, auto_approve_hourly_limit, auto_approve_vip_days
) VALUES (1, 2, 1, 3, 0, '02:00', '应读', '1,2,3,4,5,6,7', 0, '', 3, 1, 0, 10, 7)
''')
conn.commit()
print("✓ 已创建系统配置(默认并发2,定时任务关闭)")
except sqlite3.IntegrityError:
# 系统配置已存在,忽略
pass
# 初始化数据库版本
try:
cursor.execute('INSERT INTO db_version (id, version) VALUES (1, ?)', (DB_VERSION,))
conn.commit()
print(f"✓ 数据库版本: {DB_VERSION}")
except sqlite3.IntegrityError:
# 数据库版本记录已存在,忽略
pass
conn.commit()
print("✓ 数据库初始化完成")
# 执行数据迁移
migrate_database()
# 确保存在默认管理员
ensure_default_admin()
def migrate_database():
"""数据库迁移 - 自动检测并应用必要的迁移"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 获取当前数据库版本
cursor.execute('SELECT version FROM db_version WHERE id = 1')
row = cursor.fetchone()
current_version = row['version'] if row else 0
print(f"当前数据库版本: {current_version}, 目标版本: {DB_VERSION}")
# 应用迁移
if current_version < 1:
_migrate_to_v1(conn)
current_version = 1
if current_version < 2:
_migrate_to_v2(conn)
current_version = 2
if current_version < 3:
_migrate_to_v3(conn)
current_version = 3
if current_version < 4:
_migrate_to_v4(conn)
current_version = 4
if current_version < 5:
_migrate_to_v5(conn)
current_version = 5
if current_version < 6:
_migrate_to_v6(conn)
current_version = 6
if current_version < 7:
_migrate_to_v7(conn)
current_version = 7
# 更新版本号
cursor.execute('UPDATE db_version SET version = ?, updated_at = ? WHERE id = 1',
(DB_VERSION, get_cst_now_str()))
conn.commit()
if current_version < DB_VERSION:
print(f"✓ 数据库已迁移到版本 {DB_VERSION}")
def _migrate_to_v1(conn):
"""迁移到版本1 - 添加缺失字段"""
cursor = conn.cursor()
# 检查并添加 schedule_weekdays 字段
cursor.execute("PRAGMA table_info(system_config)")
columns = [col[1] for col in cursor.fetchall()]
if 'schedule_weekdays' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN schedule_weekdays TEXT DEFAULT "1,2,3,4,5,6,7"')
print(" ✓ 添加 schedule_weekdays 字段")
if 'max_screenshot_concurrent' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN max_screenshot_concurrent INTEGER DEFAULT 3')
print(" ✓ 添加 max_screenshot_concurrent 字段")
if 'max_concurrent_per_account' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN max_concurrent_per_account INTEGER DEFAULT 1')
print(" ✓ 添加 max_concurrent_per_account 字段")
if 'auto_approve_enabled' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN auto_approve_enabled INTEGER DEFAULT 0')
print(" ✓ 添加 auto_approve_enabled 字段")
if 'auto_approve_hourly_limit' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN auto_approve_hourly_limit INTEGER DEFAULT 10')
print(" ✓ 添加 auto_approve_hourly_limit 字段")
if 'auto_approve_vip_days' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN auto_approve_vip_days INTEGER DEFAULT 7')
print(" ✓ 添加 auto_approve_vip_days 字段")
# 检查并添加 duration 字段到 task_logs
cursor.execute("PRAGMA table_info(task_logs)")
columns = [col[1] for col in cursor.fetchall()]
if 'duration' not in columns:
cursor.execute('ALTER TABLE task_logs ADD COLUMN duration INTEGER')
print(" ✓ 添加 duration 字段到 task_logs")
conn.commit()
def _migrate_to_v2(conn):
"""迁移到版本2 - 添加代理配置字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(system_config)")
columns = [col[1] for col in cursor.fetchall()]
if 'proxy_enabled' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_enabled INTEGER DEFAULT 0')
print(" ✓ 添加 proxy_enabled 字段")
if 'proxy_api_url' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_api_url TEXT DEFAULT ""')
print(" ✓ 添加 proxy_api_url 字段")
if 'proxy_expire_minutes' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_expire_minutes INTEGER DEFAULT 3')
print(" ✓ 添加 proxy_expire_minutes 字段")
if 'enable_screenshot' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN enable_screenshot INTEGER DEFAULT 1')
print(" ✓ 添加 enable_screenshot 字段")
conn.commit()
def _migrate_to_v3(conn):
"""迁移到版本3 - 添加账号状态和登录失败计数字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(accounts)")
columns = [col[1] for col in cursor.fetchall()]
if 'status' not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN status TEXT DEFAULT "active"')
print(" ✓ 添加 accounts.status 字段 (账号状态)")
if 'login_fail_count' not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN login_fail_count INTEGER DEFAULT 0')
print(" ✓ 添加 accounts.login_fail_count 字段 (登录失败计数)")
if 'last_login_error' not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN last_login_error TEXT')
print(" ✓ 添加 accounts.last_login_error 字段 (最后登录错误)")
conn.commit()
def _migrate_to_v4(conn):
"""迁移到版本4 - 添加任务来源字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(task_logs)")
columns = [col[1] for col in cursor.fetchall()]
if 'source' not in columns:
cursor.execute('ALTER TABLE task_logs ADD COLUMN source TEXT DEFAULT "manual"')
print(" ✓ 添加 task_logs.source 字段 (任务来源: manual/scheduled/immediate)")
def _migrate_to_v5(conn):
"""迁移到版本5 - 添加用户定时任务表"""
cursor = conn.cursor()
# 检查user_schedules表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_schedules'")
if not cursor.fetchone():
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT DEFAULT '我的定时任务',
enabled INTEGER DEFAULT 0,
schedule_time TEXT NOT NULL DEFAULT '08:00',
weekdays TEXT NOT NULL DEFAULT '1,2,3,4,5',
browse_type TEXT NOT NULL DEFAULT '应读',
enable_screenshot INTEGER DEFAULT 1,
account_ids TEXT,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
print(" ✓ 创建 user_schedules 表 (用户定时任务)")
# 定时任务执行日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS schedule_execution_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schedule_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
schedule_name TEXT,
execute_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_accounts INTEGER DEFAULT 0,
success_accounts INTEGER DEFAULT 0,
failed_accounts INTEGER DEFAULT 0,
total_items INTEGER DEFAULT 0,
total_attachments INTEGER DEFAULT 0,
total_screenshots INTEGER DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
error_message TEXT,
FOREIGN KEY (schedule_id) REFERENCES user_schedules (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
print(" ✓ 创建 schedule_execution_logs 表 (定时任务执行日志)")
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_user_id ON user_schedules(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled ON user_schedules(enabled)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)')
print(" ✓ 创建 user_schedules 表索引")
conn.commit()
def _migrate_to_v6(conn):
"""迁移到版本6 - 添加公告功能相关表"""
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='announcements'")
if not cursor.fetchone():
cursor.execute('''
CREATE TABLE IF NOT EXISTS announcements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
is_active INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
print(" ✓ 创建 announcements 表 (公告)")
cursor.execute('CREATE INDEX IF NOT EXISTS idx_announcements_active ON announcements(is_active)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_announcements_created_at ON announcements(created_at)')
print(" ✓ 创建 announcements 表索引")
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='announcement_dismissals'")
if not cursor.fetchone():
cursor.execute('''
CREATE TABLE IF NOT EXISTS announcement_dismissals (
user_id INTEGER NOT NULL,
announcement_id INTEGER NOT NULL,
dismissed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, announcement_id),
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE,
FOREIGN KEY (announcement_id) REFERENCES announcements (id) ON DELETE CASCADE
)
''')
print(" ✓ 创建 announcement_dismissals 表 (公告永久关闭记录)")
cursor.execute('CREATE INDEX IF NOT EXISTS idx_announcement_dismissals_user ON announcement_dismissals(user_id)')
print(" ✓ 创建 announcement_dismissals 表索引")
conn.commit()
def _migrate_to_v7(conn):
"""迁移到版本7 - 统一存储北京时间将历史UTC时间字段整体+8小时"""
cursor = conn.cursor()
def table_exists(table_name: str) -> bool:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def column_exists(table_name: str, column_name: str) -> bool:
cursor.execute(f"PRAGMA table_info({table_name})")
return any(row[1] == column_name for row in cursor.fetchall())
def shift_utc_to_cst(table_name: str, column_name: str) -> None:
if not table_exists(table_name):
return
if not column_exists(table_name, column_name):
return
cursor.execute(
f"""
UPDATE {table_name}
SET {column_name} = datetime({column_name}, '+8 hours')
WHERE {column_name} IS NOT NULL AND {column_name} != ''
"""
)
# 主库(这些字段历史上主要由 CURRENT_TIMESTAMP 产生为UTC
for table, col in [
("users", "created_at"),
("users", "approved_at"),
("admins", "created_at"),
("accounts", "created_at"),
("password_reset_requests", "created_at"),
("password_reset_requests", "processed_at"),
]:
shift_utc_to_cst(table, col)
# 邮件模块(同库,不一定启用,但表可能存在)
for table, col in [
("smtp_configs", "created_at"),
("smtp_configs", "updated_at"),
("smtp_configs", "last_success_at"),
("email_settings", "updated_at"),
("email_tokens", "created_at"),
("email_logs", "created_at"),
("email_stats", "last_updated"),
]:
shift_utc_to_cst(table, col)
# 断点续传(同库,表由 task_checkpoint.py 创建)
for table, col in [
("task_checkpoints", "created_at"),
("task_checkpoints", "updated_at"),
("task_checkpoints", "completed_at"),
]:
shift_utc_to_cst(table, col)
conn.commit()
print(" ✓ 时区迁移历史UTC时间已转换为北京时间")
# ==================== 管理员相关 ====================
def ensure_default_admin():
"""确保存在默认管理员账号
安全修复:使用随机生成的强密码代替弱密码'admin'
首次运行时会将密码打印到控制台,请及时修改
"""
import secrets
import string
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 检查是否已存在管理员
cursor.execute('SELECT COUNT(*) as count FROM admins')
result = cursor.fetchone()
if result['count'] == 0:
# 安全修复生成随机强密码12位包含大小写字母和数字
alphabet = string.ascii_letters + string.digits
random_password = ''.join(secrets.choice(alphabet) for _ in range(12))
default_password_hash = hash_password_bcrypt(random_password)
cursor.execute(
'INSERT INTO admins (username, password_hash, created_at) VALUES (?, ?, ?)',
('admin', default_password_hash, get_cst_now_str())
)
conn.commit()
print("=" * 60)
print("安全提醒:已创建默认管理员账号")
print(f"用户名: admin")
print(f"密码: {random_password}")
print("请立即登录后修改密码!")
print("=" * 60)
return True
return False
def verify_admin(username, password):
"""验证管理员登录 - 自动从SHA256升级到bcrypt"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM admins WHERE username = ?', (username,))
admin = cursor.fetchone()
if not admin:
return None
admin_dict = dict(admin)
password_hash = admin_dict['password_hash']
# 检查是否为旧的SHA256哈希
if is_sha256_hash(password_hash):
if verify_password_sha256(password, password_hash):
# 自动升级到bcrypt
new_hash = hash_password_bcrypt(password)
cursor.execute('UPDATE admins SET password_hash = ? WHERE username = ?',
(new_hash, username))
conn.commit()
print(f"管理员 {username} 密码已自动升级到bcrypt")
return admin_dict
return None
else:
# bcrypt验证
if verify_password_bcrypt(password, password_hash):
return admin_dict
return None
def update_admin_password(username, new_password):
"""更新管理员密码"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password(new_password)
cursor.execute('UPDATE admins SET password_hash = ? WHERE username = ?',
(password_hash, username))
conn.commit()
return cursor.rowcount > 0
def update_admin_username(old_username, new_username):
"""更新管理员用户名"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute('UPDATE admins SET username = ? WHERE username = ?',
(new_username, old_username))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
# ==================== VIP管理 ====================
def get_vip_config():
"""获取VIP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM vip_config WHERE id = 1')
config = cursor.fetchone()
return dict(config) if config else {'default_vip_days': 0}
def set_default_vip_days(days):
"""设置默认VIP天数"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
cursor.execute('''
INSERT OR REPLACE INTO vip_config (id, default_vip_days, updated_at)
VALUES (1, ?, ?)
''', (days, cst_time))
conn.commit()
return True
def set_user_vip(user_id, days):
"""设置用户VIP - days: 7=一周, 30=一个月, 365=一年, 999999=永久"""
with db_pool.get_db() as conn:
cst_tz = pytz.timezone("Asia/Shanghai")
cursor = conn.cursor()
if days == 999999:
expire_time = '2099-12-31 23:59:59'
else:
expire_time = (datetime.now(cst_tz) + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('UPDATE users SET vip_expire_time = ? WHERE id = ?', (expire_time, user_id))
conn.commit()
return cursor.rowcount > 0
def extend_user_vip(user_id, days):
"""延长用户VIP时间"""
user = get_user_by_id(user_id)
cst_tz = pytz.timezone("Asia/Shanghai")
if not user:
return False
with db_pool.get_db() as conn:
cursor = conn.cursor()
current_expire = user.get('vip_expire_time')
if current_expire and current_expire != '2099-12-31 23:59:59':
try:
expire_time_naive = datetime.strptime(current_expire, '%Y-%m-%d %H:%M:%S')
expire_time = cst_tz.localize(expire_time_naive)
now = datetime.now(cst_tz)
if expire_time < now:
expire_time = now
new_expire = (expire_time + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, AttributeError) as e:
# VIP过期时间格式错误使用当前时间
print(f"解析VIP过期时间失败: {e}, 使用当前时间")
new_expire = (datetime.now(cst_tz) + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
else:
new_expire = (datetime.now(cst_tz) + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('UPDATE users SET vip_expire_time = ? WHERE id = ?', (new_expire, user_id))
conn.commit()
return cursor.rowcount > 0
def remove_user_vip(user_id):
"""移除用户VIP"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE users SET vip_expire_time = NULL WHERE id = ?', (user_id,))
conn.commit()
return cursor.rowcount > 0
def is_user_vip(user_id):
"""检查用户是否是VIP
注意数据库中存储的时间统一使用CSTAsia/Shanghai时区
"""
cst_tz = pytz.timezone("Asia/Shanghai")
user = get_user_by_id(user_id)
if not user or not user.get('vip_expire_time'):
return False
try:
# 时区处理说明数据库存储的是CST时间字符串
# 如果将来改为UTC存储需要修改此处逻辑
expire_time_naive = datetime.strptime(user['vip_expire_time'], '%Y-%m-%d %H:%M:%S')
expire_time = cst_tz.localize(expire_time_naive)
now = datetime.now(cst_tz)
return now < expire_time
except (ValueError, AttributeError) as e:
print(f"检查VIP状态失败 (user_id={user_id}): {e}")
return False
def get_user_vip_info(user_id):
"""获取用户VIP信息"""
cst_tz = pytz.timezone("Asia/Shanghai")
user = get_user_by_id(user_id)
if not user:
return {'is_vip': False, 'expire_time': None, 'days_left': 0, 'username': ''}
vip_expire_time = user.get('vip_expire_time')
if not vip_expire_time:
return {'is_vip': False, 'expire_time': None, 'days_left': 0, 'username': user.get('username', '')}
try:
expire_time_naive = datetime.strptime(vip_expire_time, '%Y-%m-%d %H:%M:%S')
expire_time = cst_tz.localize(expire_time_naive)
now = datetime.now(cst_tz)
is_vip = now < expire_time
days_left = (expire_time - now).days if is_vip else 0
return {
"username": user.get("username", ""),
'is_vip': is_vip,
'expire_time': vip_expire_time,
'days_left': max(0, days_left)
}
except Exception as e:
print(f"VIP信息获取错误: {e}")
return {'is_vip': False, 'expire_time': None, 'days_left': 0, 'username': user.get('username', '')}
# ==================== 用户相关 ====================
def create_user(username, password, email=''):
"""创建新用户(待审核状态,赠送默认VIP)"""
cst_tz = pytz.timezone("Asia/Shanghai")
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password(password)
cst_time = get_cst_now_str()
# 获取默认VIP天数
default_vip_days = get_vip_config()['default_vip_days']
vip_expire_time = None
if default_vip_days > 0:
if default_vip_days == 999999:
vip_expire_time = '2099-12-31 23:59:59'
else:
vip_expire_time = (datetime.now(cst_tz) + timedelta(days=default_vip_days)).strftime('%Y-%m-%d %H:%M:%S')
try:
cursor.execute('''
INSERT INTO users (username, password_hash, email, status, vip_expire_time, created_at)
VALUES (?, ?, ?, 'pending', ?, ?)
''', (username, password_hash, email, vip_expire_time, cst_time))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
return None
def verify_user(username, password):
"""验证用户登录 - 自动从SHA256升级到bcrypt"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = ? AND status = 'approved'", (username,))
user = cursor.fetchone()
if not user:
return None
user_dict = dict(user)
password_hash = user_dict['password_hash']
# 检查是否为旧的SHA256哈希
if is_sha256_hash(password_hash):
if verify_password_sha256(password, password_hash):
# 自动升级到bcrypt
new_hash = hash_password_bcrypt(password)
cursor.execute('UPDATE users SET password_hash = ? WHERE id = ?',
(new_hash, user_dict['id']))
conn.commit()
print(f"用户 {username} 密码已自动升级到bcrypt")
return user_dict
return None
else:
# bcrypt验证
if verify_password_bcrypt(password, password_hash):
return user_dict
return None
def get_user_by_id(user_id):
"""根据ID获取用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
return dict(user) if user else None
def get_user_by_username(username):
"""根据用户名获取用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
user = cursor.fetchone()
return dict(user) if user else None
def get_user_by_email(email):
"""根据邮箱获取用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE email = ?', (email,))
user = cursor.fetchone()
return dict(user) if user else None
def update_user_email(user_id, email, verified=False):
"""更新用户邮箱"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先检查email_verified字段是否存在不存在则添加
try:
cursor.execute('SELECT email_verified FROM users LIMIT 1')
except:
cursor.execute('ALTER TABLE users ADD COLUMN email_verified INTEGER DEFAULT 0')
conn.commit()
cursor.execute('''
UPDATE users
SET email = ?, email_verified = ?
WHERE id = ?
''', (email, int(verified), user_id))
conn.commit()
return cursor.rowcount > 0
def update_user_email_notify(user_id, enabled):
"""更新用户邮件通知偏好"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先检查字段是否存在
try:
cursor.execute('SELECT email_notify_enabled FROM users LIMIT 1')
except:
cursor.execute('ALTER TABLE users ADD COLUMN email_notify_enabled INTEGER DEFAULT 1')
conn.commit()
cursor.execute('''
UPDATE users
SET email_notify_enabled = ?
WHERE id = ?
''', (int(enabled), user_id))
conn.commit()
return cursor.rowcount > 0
def get_user_email_notify(user_id):
"""获取用户邮件通知偏好(默认开启)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先检查字段是否存在
try:
cursor.execute('SELECT email_notify_enabled FROM users WHERE id = ?', (user_id,))
row = cursor.fetchone()
if row is None:
return True
return bool(row[0]) if row[0] is not None else True
except:
return True # 字段不存在时默认开启
def get_all_users():
"""获取所有用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users ORDER BY created_at DESC')
return [dict(row) for row in cursor.fetchall()]
def get_pending_users():
"""获取待审核用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE status = 'pending' ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def approve_user(user_id):
"""审核通过用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
cursor.execute('''
UPDATE users
SET status = 'approved', approved_at = ?
WHERE id = ?
''', (cst_time, user_id))
conn.commit()
return cursor.rowcount > 0
def reject_user(user_id):
"""拒绝用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE users SET status = 'rejected' WHERE id = ?", (user_id,))
conn.commit()
return cursor.rowcount > 0
def delete_user(user_id):
"""删除用户(级联删除相关账号)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
conn.commit()
return cursor.rowcount > 0
# ==================== 账号相关 ====================
def create_account(user_id, account_id, username, password, remember=True, remark=''):
"""创建账号(密码加密存储)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
# 安全修复:加密存储第三方账号密码
encrypted_password = encrypt_password(password)
cursor.execute('''
INSERT INTO accounts (id, user_id, username, password, remember, remark, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (account_id, user_id, username, encrypted_password, 1 if remember else 0, remark, cst_time))
conn.commit()
return cursor.lastrowid
def get_user_accounts(user_id):
"""获取用户的所有账号(自动解密密码)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts WHERE user_id = ? ORDER BY created_at DESC', (user_id,))
accounts = []
for row in cursor.fetchall():
account = dict(row)
# 安全修复:解密第三方账号密码(兼容旧数据)
account['password'] = decrypt_password(account.get('password', ''))
accounts.append(account)
return accounts
def get_account(account_id):
"""获取单个账号(自动解密密码)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts WHERE id = ?', (account_id,))
row = cursor.fetchone()
if row:
account = dict(row)
# 安全修复:解密第三方账号密码(兼容旧数据)
account['password'] = decrypt_password(account.get('password', ''))
return account
return None
def update_account_remark(account_id, remark):
"""更新账号备注"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE accounts SET remark = ? WHERE id = ?', (remark, account_id))
conn.commit()
return cursor.rowcount > 0
def delete_account(account_id):
"""删除账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM accounts WHERE id = ?', (account_id,))
conn.commit()
return cursor.rowcount > 0
def increment_account_login_fail(account_id, error_message):
"""增加账号登录失败次数如果达到3次则暂停账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 获取当前失败次数
cursor.execute('SELECT login_fail_count FROM accounts WHERE id = ?', (account_id,))
row = cursor.fetchone()
if not row:
return False
fail_count = (row['login_fail_count'] or 0) + 1
# 更新失败次数和错误信息
if fail_count >= 3:
# 达到3次暂停账号
cursor.execute('''
UPDATE accounts
SET login_fail_count = ?,
last_login_error = ?,
status = 'suspended'
WHERE id = ?
''', (fail_count, error_message, account_id))
conn.commit()
return True # 返回True表示账号已被暂停
else:
# 未达到3次只更新计数
cursor.execute('''
UPDATE accounts
SET login_fail_count = ?,
last_login_error = ?
WHERE id = ?
''', (fail_count, error_message, account_id))
conn.commit()
return False # 返回False表示未暂停
def reset_account_login_status(account_id):
"""重置账号登录状态(修改密码后调用)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE accounts
SET login_fail_count = 0,
last_login_error = NULL,
status = 'active'
WHERE id = ?
''', (account_id,))
conn.commit()
return cursor.rowcount > 0
def get_account_status(account_id):
"""获取账号状态信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT status, login_fail_count, last_login_error
FROM accounts
WHERE id = ?
''', (account_id,))
return cursor.fetchone()
def delete_user_accounts(user_id):
"""删除用户的所有账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM accounts WHERE user_id = ?', (user_id,))
conn.commit()
return cursor.rowcount
# ==================== 统计相关 ====================
def get_user_stats(user_id):
"""获取用户统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as count FROM accounts WHERE user_id = ?', (user_id,))
account_count = cursor.fetchone()['count']
return {'account_count': account_count}
def get_system_stats():
"""获取系统统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as count FROM users')
total_users = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM users WHERE status = 'approved'")
approved_users = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM users WHERE status = 'pending'")
pending_users = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM accounts')
total_accounts = cursor.fetchone()['count']
cursor.execute('''
SELECT COUNT(*) as count FROM users
WHERE vip_expire_time IS NOT NULL
AND datetime(vip_expire_time) > datetime('now', 'localtime')
''')
vip_users = cursor.fetchone()['count']
return {
'total_users': total_users,
'approved_users': approved_users,
'pending_users': pending_users,
'total_accounts': total_accounts,
'vip_users': vip_users
}
# ==================== 系统配置管理 ====================
def get_system_config():
"""获取系统配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM system_config WHERE id = 1')
row = cursor.fetchone()
if row:
return dict(row)
# 返回默认值
return {
'max_concurrent_global': 2,
'max_concurrent_per_account': 1,
'max_screenshot_concurrent': 3,
'schedule_enabled': 0,
'schedule_time': '02:00',
'schedule_browse_type': '应读',
'schedule_weekdays': '1,2,3,4,5,6,7',
'proxy_enabled': 0,
'proxy_api_url': '',
'proxy_expire_minutes': 3,
'enable_screenshot': 1,
'auto_approve_enabled': 0,
'auto_approve_hourly_limit': 10,
'auto_approve_vip_days': 7
}
def update_system_config(max_concurrent=None, schedule_enabled=None, schedule_time=None,
schedule_browse_type=None, schedule_weekdays=None,
max_concurrent_per_account=None, max_screenshot_concurrent=None, proxy_enabled=None,
proxy_api_url=None, proxy_expire_minutes=None,
auto_approve_enabled=None, auto_approve_hourly_limit=None, auto_approve_vip_days=None):
"""更新系统配置
Bug fix: 添加字段名白名单验证防止SQL注入风险
"""
# 白名单:允许更新的字段名
ALLOWED_FIELDS = {
'max_concurrent_global', 'schedule_enabled', 'schedule_time',
'schedule_browse_type', 'schedule_weekdays', 'max_concurrent_per_account',
'max_screenshot_concurrent', 'proxy_enabled', 'proxy_api_url',
'proxy_expire_minutes', 'auto_approve_enabled', 'auto_approve_hourly_limit',
'auto_approve_vip_days', 'updated_at'
}
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
if max_concurrent is not None:
updates.append('max_concurrent_global = ?')
params.append(max_concurrent)
if schedule_enabled is not None:
updates.append('schedule_enabled = ?')
params.append(schedule_enabled)
if schedule_time is not None:
updates.append('schedule_time = ?')
params.append(schedule_time)
if schedule_browse_type is not None:
updates.append('schedule_browse_type = ?')
params.append(schedule_browse_type)
if max_concurrent_per_account is not None:
updates.append('max_concurrent_per_account = ?')
params.append(max_concurrent_per_account)
if max_screenshot_concurrent is not None:
updates.append('max_screenshot_concurrent = ?')
params.append(max_screenshot_concurrent)
if schedule_weekdays is not None:
updates.append('schedule_weekdays = ?')
params.append(schedule_weekdays)
if proxy_enabled is not None:
updates.append('proxy_enabled = ?')
params.append(proxy_enabled)
if proxy_api_url is not None:
updates.append('proxy_api_url = ?')
params.append(proxy_api_url)
if proxy_expire_minutes is not None:
updates.append('proxy_expire_minutes = ?')
params.append(proxy_expire_minutes)
if auto_approve_enabled is not None:
updates.append('auto_approve_enabled = ?')
params.append(auto_approve_enabled)
if auto_approve_hourly_limit is not None:
updates.append('auto_approve_hourly_limit = ?')
params.append(auto_approve_hourly_limit)
if auto_approve_vip_days is not None:
updates.append('auto_approve_vip_days = ?')
params.append(auto_approve_vip_days)
if updates:
updates.append('updated_at = ?')
params.append(get_cst_now_str())
# Bug fix: 验证所有字段名都在白名单中
for update_clause in updates:
field_name = update_clause.split('=')[0].strip()
if field_name not in ALLOWED_FIELDS:
raise ValueError(f"非法字段名: {field_name}")
sql = f"UPDATE system_config SET {', '.join(updates)} WHERE id = 1"
cursor.execute(sql, params)
conn.commit()
return True
return False
def get_hourly_registration_count():
"""获取最近一小时内的注册用户数"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT COUNT(*) FROM users
WHERE created_at >= datetime('now', 'localtime', '-1 hour')
''')
return cursor.fetchone()[0]
# ==================== 任务日志管理 ====================
def create_task_log(user_id, account_id, username, browse_type, status,
total_items=0, total_attachments=0, error_message='', duration=None, source='manual'):
"""创建任务日志记录
Args:
source: 任务来源 - 'manual'(手动执行), 'scheduled'(定时任务), 'immediate'(立即执行)
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
INSERT INTO task_logs (
user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, created_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, cst_time, source))
conn.commit()
return cursor.lastrowid
def get_task_logs(limit=100, offset=0, date_filter=None, status_filter=None,
source_filter=None, user_id_filter=None, account_filter=None):
"""获取任务日志列表(支持分页和多种筛选)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 构建WHERE条件
where_clauses = ["1=1"]
params = []
if date_filter:
where_clauses.append("date(tl.created_at) = ?")
params.append(date_filter)
if status_filter:
where_clauses.append("tl.status = ?")
params.append(status_filter)
if source_filter:
where_clauses.append("tl.source = ?")
params.append(source_filter)
if user_id_filter:
where_clauses.append("tl.user_id = ?")
params.append(user_id_filter)
if account_filter:
# 转义LIKE中的特殊字符防止绕过过滤使用顶部导入的函数
safe_filter = sanitize_sql_like_pattern(account_filter)
where_clauses.append("tl.username LIKE ? ESCAPE '\\'")
params.append(f"%{safe_filter}%")
where_sql = " AND ".join(where_clauses)
# 获取总数
count_sql = f'''
SELECT COUNT(*) as total
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
'''
cursor.execute(count_sql, params)
total = cursor.fetchone()['total']
# 获取分页数据
data_sql = f'''
SELECT
tl.*,
u.username as user_username
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
ORDER BY tl.created_at DESC
LIMIT ? OFFSET ?
'''
params.extend([limit, offset])
cursor.execute(data_sql, params)
logs = [dict(row) for row in cursor.fetchall()]
return {
'logs': logs,
'total': total
}
def get_task_stats(date_filter=None):
"""获取任务统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime('%Y-%m-%d')
# 当日统计
cursor.execute('''
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE date(created_at) = ?
''', (date_filter,))
today_stats = cursor.fetchone()
# 历史累计统计
cursor.execute('''
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
''')
total_stats = cursor.fetchone()
return {
'today': {
'total_tasks': today_stats['total_tasks'] or 0,
'success_tasks': today_stats['success_tasks'] or 0,
'failed_tasks': today_stats['failed_tasks'] or 0,
'total_items': today_stats['total_items'] or 0,
'total_attachments': today_stats['total_attachments'] or 0
},
'total': {
'total_tasks': total_stats['total_tasks'] or 0,
'success_tasks': total_stats['success_tasks'] or 0,
'failed_tasks': total_stats['failed_tasks'] or 0,
'total_items': total_stats['total_items'] or 0,
'total_attachments': total_stats['total_attachments'] or 0
}
}
def delete_old_task_logs(days=30, batch_size=1000):
"""删除N天前的任务日志
Bug fix: 分批删除,避免长时间锁表
Args:
days: 删除多少天前的日志
batch_size: 每批删除的数量
Returns:
int: 删除的总行数
"""
total_deleted = 0
while True:
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 分批删除使用LIMIT避免长时间锁表
cursor.execute('''
DELETE FROM task_logs
WHERE rowid IN (
SELECT rowid FROM task_logs
WHERE created_at < datetime('now', 'localtime', '-' || ? || ' days')
LIMIT ?
)
''', (days, batch_size))
deleted = cursor.rowcount
conn.commit()
if deleted == 0:
break
total_deleted += deleted
return total_deleted
def get_user_run_stats(user_id, date_filter=None):
"""获取用户的运行统计信息"""
with db_pool.get_db() as conn:
cst_tz = pytz.timezone("Asia/Shanghai")
cursor = conn.cursor()
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime('%Y-%m-%d')
cursor.execute('''
SELECT
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE user_id = ? AND date(created_at) = ?
''', (user_id, date_filter))
stats = cursor.fetchone()
return {
'completed': stats['completed'] or 0,
'failed': stats['failed'] or 0,
'total_items': stats['total_items'] or 0,
'total_attachments': stats['total_attachments'] or 0
}
# ==================== 密码重置功能 ====================
def create_password_reset_request(user_id, new_password):
"""创建密码重置申请 - 使用bcrypt哈希"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password_bcrypt(new_password)
cst_time = get_cst_now_str()
try:
cursor.execute('''
INSERT INTO password_reset_requests (user_id, new_password_hash, status, created_at)
VALUES (?, ?, 'pending', ?)
''', (user_id, password_hash, cst_time))
conn.commit()
return cursor.lastrowid
except Exception as e:
print(f"创建密码重置申请失败: {e}")
return None
def get_pending_password_resets():
"""获取所有待审核的密码重置申请"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT r.id, r.user_id, r.created_at, r.status,
u.username, u.email
FROM password_reset_requests r
JOIN users u ON r.user_id = u.id
WHERE r.status = 'pending'
ORDER BY r.created_at DESC
''')
return [dict(row) for row in cursor.fetchall()]
def approve_password_reset(request_id):
"""批准密码重置申请"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
try:
# 获取申请信息
cursor.execute('''
SELECT user_id, new_password_hash
FROM password_reset_requests
WHERE id = ? AND status = 'pending'
''', (request_id,))
result = cursor.fetchone()
if not result:
return False
user_id = result['user_id']
new_password_hash = result['new_password_hash']
# 更新用户密码
cursor.execute('UPDATE users SET password_hash = ? WHERE id = ?',
(new_password_hash, user_id))
# 更新申请状态
cursor.execute('''
UPDATE password_reset_requests
SET status = 'approved', processed_at = ?
WHERE id = ?
''', (cst_time, request_id))
conn.commit()
return True
except Exception as e:
print(f"批准密码重置失败: {e}")
return False
def reject_password_reset(request_id):
"""拒绝密码重置申请"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
try:
cursor.execute('''
UPDATE password_reset_requests
SET status = 'rejected', processed_at = ?
WHERE id = ? AND status = 'pending'
''', (cst_time, request_id))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"拒绝密码重置失败: {e}")
return False
def admin_reset_user_password(user_id, new_password):
"""管理员直接重置用户密码 - 使用bcrypt哈希"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password_bcrypt(new_password)
try:
cursor.execute('UPDATE users SET password_hash = ? WHERE id = ?',
(password_hash, user_id))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"管理员重置密码失败: {e}")
return False
# ==================== 日志清理 ====================
def clean_old_operation_logs(days=30):
"""清理指定天数前的操作日志如果存在operation_logs表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 检查表是否存在
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='operation_logs'
""")
if not cursor.fetchone():
return 0
try:
cursor.execute('''
DELETE FROM operation_logs
WHERE created_at < datetime('now', 'localtime', '-' || ? || ' days')
''', (days,))
deleted_count = cursor.rowcount
conn.commit()
print(f"已清理 {deleted_count} 条旧操作日志 (>{days}天)")
return deleted_count
except Exception as e:
print(f"清理旧操作日志失败: {e}")
return 0
# ==================== Bug反馈管理 ====================
def create_bug_feedback(user_id, username, title, description, contact=''):
"""创建Bug反馈带XSS防护使用顶部导入的escape_html函数"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
# 安全修复转义用户输入防止存储型XSS攻击
safe_title = escape_html(title) if title else ''
safe_description = escape_html(description) if description else ''
safe_contact = escape_html(contact) if contact else ''
safe_username = escape_html(username) if username else ''
cursor.execute('''
INSERT INTO bug_feedbacks (user_id, username, title, description, contact, created_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (user_id, safe_username, safe_title, safe_description, safe_contact, cst_time))
conn.commit()
return cursor.lastrowid
def get_bug_feedbacks(limit=100, offset=0, status_filter=None):
"""获取Bug反馈列表管理员用"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
sql = 'SELECT * FROM bug_feedbacks WHERE 1=1'
params = []
if status_filter:
sql += ' AND status = ?'
params.append(status_filter)
sql += ' ORDER BY created_at DESC LIMIT ? OFFSET ?'
params.extend([limit, offset])
cursor.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
def get_user_feedbacks(user_id, limit=50):
"""获取用户自己的反馈列表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM bug_feedbacks
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ?
''', (user_id, limit))
return [dict(row) for row in cursor.fetchall()]
def get_feedback_by_id(feedback_id):
"""根据ID获取反馈详情"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM bug_feedbacks WHERE id = ?', (feedback_id,))
row = cursor.fetchone()
return dict(row) if row else None
def reply_feedback(feedback_id, admin_reply):
"""管理员回复反馈带XSS防护使用顶部导入的escape_html函数"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
# 安全修复转义管理员回复防止存储型XSS攻击
safe_reply = escape_html(admin_reply) if admin_reply else ''
cursor.execute('''
UPDATE bug_feedbacks
SET admin_reply = ?, status = 'replied', replied_at = ?
WHERE id = ?
''', (safe_reply, cst_time, feedback_id))
conn.commit()
return cursor.rowcount > 0
def close_feedback(feedback_id):
"""关闭反馈"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE bug_feedbacks
SET status = 'closed'
WHERE id = ?
''', (feedback_id,))
conn.commit()
return cursor.rowcount > 0
def delete_feedback(feedback_id):
"""删除反馈"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM bug_feedbacks WHERE id = ?', (feedback_id,))
conn.commit()
return cursor.rowcount > 0
def get_feedback_stats():
"""获取反馈统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END) as replied,
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed
FROM bug_feedbacks
''')
row = cursor.fetchone()
return dict(row) if row else {'total': 0, 'pending': 0, 'replied': 0, 'closed': 0}
# ==================== 公告管理 ====================
def create_announcement(title, content, is_active=True):
"""创建公告(默认启用;启用时会自动停用其他公告)"""
title = (title or '').strip()
content = (content or '').strip()
if not title or not content:
return None
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
if is_active:
cursor.execute('UPDATE announcements SET is_active = 0, updated_at = ? WHERE is_active = 1', (cst_time,))
cursor.execute('''
INSERT INTO announcements (title, content, is_active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
''', (title, content, 1 if is_active else 0, cst_time, cst_time))
conn.commit()
return cursor.lastrowid
def get_announcement_by_id(announcement_id):
"""根据ID获取公告"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM announcements WHERE id = ?', (announcement_id,))
row = cursor.fetchone()
return dict(row) if row else None
def get_announcements(limit=50, offset=0):
"""获取公告列表(管理员用)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM announcements
ORDER BY created_at DESC, id DESC
LIMIT ? OFFSET ?
''', (limit, offset))
return [dict(row) for row in cursor.fetchall()]
def set_announcement_active(announcement_id, is_active):
"""启用/停用公告;启用时会自动停用其他公告"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
if is_active:
cursor.execute('UPDATE announcements SET is_active = 0, updated_at = ? WHERE is_active = 1', (cst_time,))
cursor.execute('''
UPDATE announcements
SET is_active = 1, updated_at = ?
WHERE id = ?
''', (cst_time, announcement_id))
else:
cursor.execute('''
UPDATE announcements
SET is_active = 0, updated_at = ?
WHERE id = ?
''', (cst_time, announcement_id))
conn.commit()
return cursor.rowcount > 0
def delete_announcement(announcement_id):
"""删除公告(同时清理用户关闭记录)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM announcement_dismissals WHERE announcement_id = ?', (announcement_id,))
cursor.execute('DELETE FROM announcements WHERE id = ?', (announcement_id,))
conn.commit()
return cursor.rowcount > 0
def get_active_announcement_for_user(user_id):
"""获取当前用户应展示的启用公告(已永久关闭的不再返回)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT a.*
FROM announcements a
LEFT JOIN announcement_dismissals d
ON d.announcement_id = a.id AND d.user_id = ?
WHERE a.is_active = 1 AND d.announcement_id IS NULL
ORDER BY a.created_at DESC, a.id DESC
LIMIT 1
''', (user_id,))
row = cursor.fetchone()
return dict(row) if row else None
def dismiss_announcement_for_user(user_id, announcement_id):
"""用户永久关闭某条公告(幂等)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
cursor.execute('''
INSERT OR IGNORE INTO announcement_dismissals (user_id, announcement_id, dismissed_at)
VALUES (?, ?, ?)
''', (user_id, announcement_id, cst_time))
conn.commit()
return cursor.rowcount >= 0
# ==================== 用户定时任务管理 ====================
def get_user_schedules(user_id):
"""获取用户的所有定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM user_schedules
WHERE user_id = ?
ORDER BY created_at DESC
''', (user_id,))
return [dict(row) for row in cursor.fetchall()]
def get_schedule_by_id(schedule_id):
"""根据ID获取定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM user_schedules WHERE id = ?', (schedule_id,))
row = cursor.fetchone()
return dict(row) if row else None
def create_user_schedule(user_id, name='我的定时任务', schedule_time='08:00',
weekdays='1,2,3,4,5', browse_type='应读',
enable_screenshot=1, account_ids=None):
"""创建用户定时任务"""
import json
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
account_ids_str = json.dumps(account_ids) if account_ids else '[]'
cursor.execute('''
INSERT INTO user_schedules (
user_id, name, enabled, schedule_time, weekdays,
browse_type, enable_screenshot, account_ids, created_at, updated_at
) VALUES (?, ?, 0, ?, ?, ?, ?, ?, ?, ?)
''', (user_id, name, schedule_time, weekdays, browse_type,
enable_screenshot, account_ids_str, cst_time, cst_time))
conn.commit()
return cursor.lastrowid
def update_user_schedule(schedule_id, **kwargs):
"""更新用户定时任务"""
import json
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
updates = []
params = []
allowed_fields = ['name', 'enabled', 'schedule_time', 'weekdays',
'browse_type', 'enable_screenshot', 'account_ids']
for field in allowed_fields:
if field in kwargs:
value = kwargs[field]
if field == 'account_ids' and isinstance(value, list):
value = json.dumps(value)
updates.append(f'{field} = ?')
params.append(value)
if not updates:
return False
updates.append('updated_at = ?')
params.append(datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S"))
params.append(schedule_id)
sql = f"UPDATE user_schedules SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def delete_user_schedule(schedule_id):
"""删除用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM user_schedules WHERE id = ?', (schedule_id,))
conn.commit()
return cursor.rowcount > 0
def toggle_user_schedule(schedule_id, enabled):
"""启用/禁用用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
UPDATE user_schedules
SET enabled = ?, updated_at = ?
WHERE id = ?
''', (1 if enabled else 0, cst_time, schedule_id))
conn.commit()
return cursor.rowcount > 0
def get_enabled_user_schedules():
"""获取所有启用的用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT us.*, u.username as user_username
FROM user_schedules us
JOIN users u ON us.user_id = u.id
WHERE us.enabled = 1
ORDER BY us.schedule_time
''')
return [dict(row) for row in cursor.fetchall()]
def update_schedule_last_run(schedule_id):
"""更新定时任务最后运行时间"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
UPDATE user_schedules
SET last_run_at = ?, updated_at = ?
WHERE id = ?
''', (cst_time, cst_time, schedule_id))
conn.commit()
return cursor.rowcount > 0
# ==================== 定时任务执行日志 ====================
def create_schedule_execution_log(schedule_id, user_id, schedule_name):
"""创建定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
execute_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
INSERT INTO schedule_execution_logs (
schedule_id, user_id, schedule_name, execute_time, status
) VALUES (?, ?, ?, ?, 'running')
''', (schedule_id, user_id, schedule_name, execute_time))
conn.commit()
return cursor.lastrowid
def update_schedule_execution_log(log_id, **kwargs):
"""更新定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
allowed_fields = ['total_accounts', 'success_accounts', 'failed_accounts',
'total_items', 'total_attachments', 'total_screenshots',
'duration_seconds', 'status', 'error_message']
for field in allowed_fields:
if field in kwargs:
updates.append(f'{field} = ?')
params.append(kwargs[field])
if not updates:
return False
params.append(log_id)
sql = f"UPDATE schedule_execution_logs SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def get_schedule_execution_logs(schedule_id, limit=10):
"""获取定时任务执行日志"""
try:
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM schedule_execution_logs
WHERE schedule_id = ?
ORDER BY execute_time DESC
LIMIT ?
''', (schedule_id, limit))
# 将数据库字段映射到前端期望的字段名
logs = []
rows = cursor.fetchall()
for row in rows:
try:
log = dict(row)
# 字段映射
log['created_at'] = log.get('execute_time')
log['success_count'] = log.get('success_accounts', 0)
log['failed_count'] = log.get('failed_accounts', 0)
log['duration'] = log.get('duration_seconds', 0)
logs.append(log)
except Exception as e:
print(f"[数据库] 处理日志行时出错: {e}")
continue
return logs
except Exception as e:
print(f"[数据库] 查询定时任务日志时出错: {e}")
import traceback
traceback.print_exc()
return [] # 出错时返回空列表,而不是抛出异常
def get_user_all_schedule_logs(user_id, limit=50):
"""获取用户所有定时任务的执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM schedule_execution_logs
WHERE user_id = ?
ORDER BY execute_time DESC
LIMIT ?
''', (user_id, limit))
return [dict(row) for row in cursor.fetchall()]
def delete_schedule_logs(schedule_id, user_id):
"""删除指定定时任务的所有执行日志(需验证用户权限)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM schedule_execution_logs
WHERE schedule_id = ? AND user_id = ?
''', (schedule_id, user_id))
conn.commit()
return cursor.rowcount
def clean_old_schedule_logs(days=30):
"""清理指定天数前的定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM schedule_execution_logs
WHERE execute_time < datetime('now', 'localtime', '-' || ? || ' days')
''', (days,))
conn.commit()
return cursor.rowcount