Files
zsglpt/database.py
root c22e2f6a19 修复定时任务日志和增强功能
1. 修复定时任务日志字段映射问题
   - 修正execute_time → created_at
   - 修正success_accounts → success_count
   - 修正failed_accounts → failed_count
   - 修正duration_seconds → duration
   - 位置: database.py 第1661-1686行

2. 添加定时任务调试日志
   - 显示当前检查时间和任务匹配情况
   - 帮助诊断定时任务不执行问题
   - 位置: app.py 第2869-2875行

3. 新增VIP权限对比表格
   - 在VIP信息弹窗中添加权限对比
   - 对比普通用户和VIP用户的6项权限
   - 位置: templates/index.html 第549-593行

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-10 16:16:45 +08:00

1700 lines
58 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库模块 - 使用SQLite进行数据持久化
支持VIP功能
优化内容:
1. 清理所有注释掉的代码
2. 统一使用bcrypt密码哈希
3. 优化数据库索引
4. 规范化事务处理
5. 添加数据迁移功能
6. 改进错误处理
"""
import sqlite3
import time
from datetime import datetime, timedelta
import pytz
import threading
import db_pool
from password_utils import (
hash_password_bcrypt,
verify_password_bcrypt,
is_sha256_hash,
verify_password_sha256
)
from app_config import get_config
# 获取配置
config = get_config()
# 数据库文件路径 - 从配置读取,避免硬编码
DB_FILE = config.DB_FILE
# 数据库版本 (用于迁移管理)
DB_VERSION = 5
def hash_password(password):
"""Password hashing using bcrypt"""
return hash_password_bcrypt(password)
def init_database():
"""初始化数据库表结构"""
db_pool.init_pool(DB_FILE, pool_size=config.DB_POOL_SIZE)
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 管理员表
cursor.execute('''
CREATE TABLE IF NOT EXISTS admins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
email TEXT,
status TEXT DEFAULT 'pending',
vip_expire_time TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
approved_at TIMESTAMP
)
''')
# 账号表(关联用户)
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
remember INTEGER DEFAULT 1,
remark TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# VIP配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS vip_config (
id INTEGER PRIMARY KEY CHECK (id = 1),
default_vip_days INTEGER DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 系统配置表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_config (
id INTEGER PRIMARY KEY CHECK (id = 1),
max_concurrent_global INTEGER DEFAULT 2,
max_concurrent_per_account INTEGER DEFAULT 1,
max_screenshot_concurrent INTEGER DEFAULT 3,
schedule_enabled INTEGER DEFAULT 0,
schedule_time TEXT DEFAULT '02:00',
schedule_browse_type TEXT DEFAULT '应读',
schedule_weekdays TEXT DEFAULT '1,2,3,4,5,6,7',
proxy_enabled INTEGER DEFAULT 0,
proxy_api_url TEXT DEFAULT '',
proxy_expire_minutes INTEGER DEFAULT 3,
enable_screenshot INTEGER DEFAULT 1,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 任务日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
account_id TEXT NOT NULL,
username TEXT NOT NULL,
browse_type TEXT NOT NULL,
status TEXT NOT NULL,
total_items INTEGER DEFAULT 0,
total_attachments INTEGER DEFAULT 0,
error_message TEXT,
duration INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source TEXT DEFAULT 'manual',
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 密码重置申请表
cursor.execute('''
CREATE TABLE IF NOT EXISTS password_reset_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
new_password_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 数据库版本表
cursor.execute('''
CREATE TABLE IF NOT EXISTS db_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Bug反馈表
cursor.execute('''
CREATE TABLE IF NOT EXISTS bug_feedbacks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
username TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT NOT NULL,
contact TEXT,
status TEXT DEFAULT 'pending',
admin_reply TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
replied_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# 用户定时任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT DEFAULT '我的定时任务',
enabled INTEGER DEFAULT 0,
schedule_time TEXT NOT NULL DEFAULT '08:00',
weekdays TEXT NOT NULL DEFAULT '1,2,3,4,5',
browse_type TEXT NOT NULL DEFAULT '应读',
enable_screenshot INTEGER DEFAULT 1,
account_ids TEXT,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
# ========== 创建索引 ==========
# 用户表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_status ON users(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_users_vip_expire ON users(vip_expire_time)')
# 账号表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_accounts_user_id ON accounts(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_accounts_username ON accounts(username)')
# 任务日志表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_user_id ON task_logs(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_status ON task_logs(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_created_at ON task_logs(created_at)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_task_logs_user_date ON task_logs(user_id, created_at)')
# 密码重置表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_password_reset_status ON password_reset_requests(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_password_reset_user_id ON password_reset_requests(user_id)')
# Bug反馈表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_user_id ON bug_feedbacks(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_status ON bug_feedbacks(status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_bug_feedbacks_created_at ON bug_feedbacks(created_at)')
# 用户定时任务表索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_user_id ON user_schedules(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled ON user_schedules(enabled)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)')
# 初始化VIP配置
try:
cursor.execute('INSERT INTO vip_config (id, default_vip_days) VALUES (1, 0)')
conn.commit()
print("✓ 已创建VIP配置(默认不赠送)")
except sqlite3.IntegrityError:
# VIP配置已存在忽略
pass
# 初始化系统配置
try:
cursor.execute('''
INSERT INTO system_config (
id, max_concurrent_global, schedule_enabled,
schedule_time, schedule_browse_type, schedule_weekdays
) VALUES (1, 2, 0, '02:00', '应读', '1,2,3,4,5,6,7')
''')
conn.commit()
print("✓ 已创建系统配置(默认并发2,定时任务关闭)")
except sqlite3.IntegrityError:
# 系统配置已存在,忽略
pass
# 初始化数据库版本
try:
cursor.execute('INSERT INTO db_version (id, version) VALUES (1, ?)', (DB_VERSION,))
conn.commit()
print(f"✓ 数据库版本: {DB_VERSION}")
except sqlite3.IntegrityError:
# 数据库版本记录已存在,忽略
pass
conn.commit()
print("✓ 数据库初始化完成")
# 执行数据迁移
migrate_database()
# 确保存在默认管理员
ensure_default_admin()
def migrate_database():
"""数据库迁移 - 自动检测并应用必要的迁移"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 获取当前数据库版本
cursor.execute('SELECT version FROM db_version WHERE id = 1')
row = cursor.fetchone()
current_version = row['version'] if row else 0
print(f"当前数据库版本: {current_version}, 目标版本: {DB_VERSION}")
# 应用迁移
if current_version < 1:
_migrate_to_v1(conn)
current_version = 1
if current_version < 2:
_migrate_to_v2(conn)
current_version = 2
if current_version < 3:
_migrate_to_v3(conn)
current_version = 3
if current_version < 4:
_migrate_to_v4(conn)
current_version = 4
if current_version < 5:
_migrate_to_v5(conn)
current_version = 5
# 更新版本号
cursor.execute('UPDATE db_version SET version = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1',
(DB_VERSION,))
conn.commit()
if current_version < DB_VERSION:
print(f"✓ 数据库已迁移到版本 {DB_VERSION}")
def _migrate_to_v1(conn):
"""迁移到版本1 - 添加缺失字段"""
cursor = conn.cursor()
# 检查并添加 schedule_weekdays 字段
cursor.execute("PRAGMA table_info(system_config)")
columns = [col[1] for col in cursor.fetchall()]
if 'schedule_weekdays' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN schedule_weekdays TEXT DEFAULT "1,2,3,4,5,6,7"')
print(" ✓ 添加 schedule_weekdays 字段")
if 'max_screenshot_concurrent' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN max_screenshot_concurrent INTEGER DEFAULT 3')
print(" ✓ 添加 max_screenshot_concurrent 字段")
if 'max_concurrent_per_account' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN max_concurrent_per_account INTEGER DEFAULT 1')
print(" ✓ 添加 max_concurrent_per_account 字段")
# 检查并添加 duration 字段到 task_logs
cursor.execute("PRAGMA table_info(task_logs)")
columns = [col[1] for col in cursor.fetchall()]
if 'duration' not in columns:
cursor.execute('ALTER TABLE task_logs ADD COLUMN duration INTEGER')
print(" ✓ 添加 duration 字段到 task_logs")
conn.commit()
def _migrate_to_v2(conn):
"""迁移到版本2 - 添加代理配置字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(system_config)")
columns = [col[1] for col in cursor.fetchall()]
if 'proxy_enabled' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_enabled INTEGER DEFAULT 0')
print(" ✓ 添加 proxy_enabled 字段")
if 'proxy_api_url' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_api_url TEXT DEFAULT ""')
print(" ✓ 添加 proxy_api_url 字段")
if 'proxy_expire_minutes' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN proxy_expire_minutes INTEGER DEFAULT 3')
print(" ✓ 添加 proxy_expire_minutes 字段")
if 'enable_screenshot' not in columns:
cursor.execute('ALTER TABLE system_config ADD COLUMN enable_screenshot INTEGER DEFAULT 1')
print(" ✓ 添加 enable_screenshot 字段")
conn.commit()
def _migrate_to_v3(conn):
"""迁移到版本3 - 添加账号状态和登录失败计数字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(accounts)")
columns = [col[1] for col in cursor.fetchall()]
if 'status' not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN status TEXT DEFAULT "active"')
print(" ✓ 添加 accounts.status 字段 (账号状态)")
if 'login_fail_count' not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN login_fail_count INTEGER DEFAULT 0')
print(" ✓ 添加 accounts.login_fail_count 字段 (登录失败计数)")
if 'last_login_error' not in columns:
cursor.execute('ALTER TABLE accounts ADD COLUMN last_login_error TEXT')
print(" ✓ 添加 accounts.last_login_error 字段 (最后登录错误)")
conn.commit()
def _migrate_to_v4(conn):
"""迁移到版本4 - 添加任务来源字段"""
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(task_logs)")
columns = [col[1] for col in cursor.fetchall()]
if 'source' not in columns:
cursor.execute('ALTER TABLE task_logs ADD COLUMN source TEXT DEFAULT "manual"')
print(" ✓ 添加 task_logs.source 字段 (任务来源: manual/scheduled/immediate)")
def _migrate_to_v5(conn):
"""迁移到版本5 - 添加用户定时任务表"""
cursor = conn.cursor()
# 检查user_schedules表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_schedules'")
if not cursor.fetchone():
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_schedules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT DEFAULT '我的定时任务',
enabled INTEGER DEFAULT 0,
schedule_time TEXT NOT NULL DEFAULT '08:00',
weekdays TEXT NOT NULL DEFAULT '1,2,3,4,5',
browse_type TEXT NOT NULL DEFAULT '应读',
enable_screenshot INTEGER DEFAULT 1,
account_ids TEXT,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
print(" ✓ 创建 user_schedules 表 (用户定时任务)")
# 定时任务执行日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS schedule_execution_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schedule_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
schedule_name TEXT,
execute_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
total_accounts INTEGER DEFAULT 0,
success_accounts INTEGER DEFAULT 0,
failed_accounts INTEGER DEFAULT 0,
total_items INTEGER DEFAULT 0,
total_attachments INTEGER DEFAULT 0,
total_screenshots INTEGER DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
error_message TEXT,
FOREIGN KEY (schedule_id) REFERENCES user_schedules (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
''')
print(" ✓ 创建 schedule_execution_logs 表 (定时任务执行日志)")
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_user_id ON user_schedules(user_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_enabled ON user_schedules(enabled)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_user_schedules_next_run ON user_schedules(next_run_at)')
print(" ✓ 创建 user_schedules 表索引")
conn.commit()
# ==================== 管理员相关 ====================
def ensure_default_admin():
"""确保存在默认管理员账号 admin/admin"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 检查是否已存在管理员
cursor.execute('SELECT COUNT(*) as count FROM admins')
result = cursor.fetchone()
if result['count'] == 0:
# 创建默认管理员 admin/admin
default_password_hash = hash_password_bcrypt('admin')
cursor.execute(
'INSERT INTO admins (username, password_hash) VALUES (?, ?)',
('admin', default_password_hash)
)
conn.commit()
print("✓ 已创建默认管理员账号 (admin/admin)")
return True
return False
def verify_admin(username, password):
"""验证管理员登录 - 自动从SHA256升级到bcrypt"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM admins WHERE username = ?', (username,))
admin = cursor.fetchone()
if not admin:
return None
admin_dict = dict(admin)
password_hash = admin_dict['password_hash']
# 检查是否为旧的SHA256哈希
if is_sha256_hash(password_hash):
if verify_password_sha256(password, password_hash):
# 自动升级到bcrypt
new_hash = hash_password_bcrypt(password)
cursor.execute('UPDATE admins SET password_hash = ? WHERE username = ?',
(new_hash, username))
conn.commit()
print(f"管理员 {username} 密码已自动升级到bcrypt")
return admin_dict
return None
else:
# bcrypt验证
if verify_password_bcrypt(password, password_hash):
return admin_dict
return None
def update_admin_password(username, new_password):
"""更新管理员密码"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password(new_password)
cursor.execute('UPDATE admins SET password_hash = ? WHERE username = ?',
(password_hash, username))
conn.commit()
return cursor.rowcount > 0
def update_admin_username(old_username, new_username):
"""更新管理员用户名"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute('UPDATE admins SET username = ? WHERE username = ?',
(new_username, old_username))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
# ==================== VIP管理 ====================
def get_vip_config():
"""获取VIP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM vip_config WHERE id = 1')
config = cursor.fetchone()
return dict(config) if config else {'default_vip_days': 0}
def set_default_vip_days(days):
"""设置默认VIP天数"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO vip_config (id, default_vip_days, updated_at)
VALUES (1, ?, CURRENT_TIMESTAMP)
''', (days,))
conn.commit()
return True
def set_user_vip(user_id, days):
"""设置用户VIP - days: 7=一周, 30=一个月, 365=一年, 999999=永久"""
with db_pool.get_db() as conn:
cst_tz = pytz.timezone("Asia/Shanghai")
cursor = conn.cursor()
if days == 999999:
expire_time = '2099-12-31 23:59:59'
else:
expire_time = (datetime.now(cst_tz) + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('UPDATE users SET vip_expire_time = ? WHERE id = ?', (expire_time, user_id))
conn.commit()
return cursor.rowcount > 0
def extend_user_vip(user_id, days):
"""延长用户VIP时间"""
user = get_user_by_id(user_id)
cst_tz = pytz.timezone("Asia/Shanghai")
if not user:
return False
with db_pool.get_db() as conn:
cursor = conn.cursor()
current_expire = user.get('vip_expire_time')
if current_expire and current_expire != '2099-12-31 23:59:59':
try:
expire_time_naive = datetime.strptime(current_expire, '%Y-%m-%d %H:%M:%S')
expire_time = cst_tz.localize(expire_time_naive)
now = datetime.now(cst_tz)
if expire_time < now:
expire_time = now
new_expire = (expire_time + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, AttributeError) as e:
# VIP过期时间格式错误使用当前时间
print(f"解析VIP过期时间失败: {e}, 使用当前时间")
new_expire = (datetime.now(cst_tz) + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
else:
new_expire = (datetime.now(cst_tz) + timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('UPDATE users SET vip_expire_time = ? WHERE id = ?', (new_expire, user_id))
conn.commit()
return cursor.rowcount > 0
def remove_user_vip(user_id):
"""移除用户VIP"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE users SET vip_expire_time = NULL WHERE id = ?', (user_id,))
conn.commit()
return cursor.rowcount > 0
def is_user_vip(user_id):
"""检查用户是否是VIP"""
cst_tz = pytz.timezone("Asia/Shanghai")
user = get_user_by_id(user_id)
if not user or not user.get('vip_expire_time'):
return False
try:
expire_time_naive = datetime.strptime(user['vip_expire_time'], '%Y-%m-%d %H:%M:%S')
expire_time = cst_tz.localize(expire_time_naive)
return datetime.now(cst_tz) < expire_time
except (ValueError, AttributeError) as e:
print(f"检查VIP状态失败 (user_id={user_id}): {e}")
return False
def get_user_vip_info(user_id):
"""获取用户VIP信息"""
cst_tz = pytz.timezone("Asia/Shanghai")
user = get_user_by_id(user_id)
if not user:
return {'is_vip': False, 'expire_time': None, 'days_left': 0, 'username': ''}
vip_expire_time = user.get('vip_expire_time')
if not vip_expire_time:
return {'is_vip': False, 'expire_time': None, 'days_left': 0, 'username': user.get('username', '')}
try:
expire_time_naive = datetime.strptime(vip_expire_time, '%Y-%m-%d %H:%M:%S')
expire_time = cst_tz.localize(expire_time_naive)
now = datetime.now(cst_tz)
is_vip = now < expire_time
days_left = (expire_time - now).days if is_vip else 0
return {
"username": user.get("username", ""),
'is_vip': is_vip,
'expire_time': vip_expire_time,
'days_left': max(0, days_left)
}
except Exception as e:
print(f"VIP信息获取错误: {e}")
return {'is_vip': False, 'expire_time': None, 'days_left': 0, 'username': user.get('username', '')}
# ==================== 用户相关 ====================
def create_user(username, password, email=''):
"""创建新用户(待审核状态,赠送默认VIP)"""
cst_tz = pytz.timezone("Asia/Shanghai")
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password(password)
# 获取默认VIP天数
default_vip_days = get_vip_config()['default_vip_days']
vip_expire_time = None
if default_vip_days > 0:
if default_vip_days == 999999:
vip_expire_time = '2099-12-31 23:59:59'
else:
vip_expire_time = (datetime.now(cst_tz) + timedelta(days=default_vip_days)).strftime('%Y-%m-%d %H:%M:%S')
try:
cursor.execute('''
INSERT INTO users (username, password_hash, email, status, vip_expire_time)
VALUES (?, ?, ?, 'pending', ?)
''', (username, password_hash, email, vip_expire_time))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
return None
def verify_user(username, password):
"""验证用户登录 - 自动从SHA256升级到bcrypt"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = ? AND status = 'approved'", (username,))
user = cursor.fetchone()
if not user:
return None
user_dict = dict(user)
password_hash = user_dict['password_hash']
# 检查是否为旧的SHA256哈希
if is_sha256_hash(password_hash):
if verify_password_sha256(password, password_hash):
# 自动升级到bcrypt
new_hash = hash_password_bcrypt(password)
cursor.execute('UPDATE users SET password_hash = ? WHERE id = ?',
(new_hash, user_dict['id']))
conn.commit()
print(f"用户 {username} 密码已自动升级到bcrypt")
return user_dict
return None
else:
# bcrypt验证
if verify_password_bcrypt(password, password_hash):
return user_dict
return None
def get_user_by_id(user_id):
"""根据ID获取用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
return dict(user) if user else None
def get_user_by_username(username):
"""根据用户名获取用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
user = cursor.fetchone()
return dict(user) if user else None
def get_all_users():
"""获取所有用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM users ORDER BY created_at DESC')
return [dict(row) for row in cursor.fetchall()]
def get_pending_users():
"""获取待审核用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE status = 'pending' ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def approve_user(user_id):
"""审核通过用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE users
SET status = 'approved', approved_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (user_id,))
conn.commit()
return cursor.rowcount > 0
def reject_user(user_id):
"""拒绝用户"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE users SET status = 'rejected' WHERE id = ?", (user_id,))
conn.commit()
return cursor.rowcount > 0
def delete_user(user_id):
"""删除用户(级联删除相关账号)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
conn.commit()
return cursor.rowcount > 0
# ==================== 账号相关 ====================
def create_account(user_id, account_id, username, password, remember=True, remark=''):
"""创建账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO accounts (id, user_id, username, password, remember, remark)
VALUES (?, ?, ?, ?, ?, ?)
''', (account_id, user_id, username, password, 1 if remember else 0, remark))
conn.commit()
return cursor.lastrowid
def get_user_accounts(user_id):
"""获取用户的所有账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts WHERE user_id = ? ORDER BY created_at DESC', (user_id,))
return [dict(row) for row in cursor.fetchall()]
def get_account(account_id):
"""获取单个账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts WHERE id = ?', (account_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_account_remark(account_id, remark):
"""更新账号备注"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('UPDATE accounts SET remark = ? WHERE id = ?', (remark, account_id))
conn.commit()
return cursor.rowcount > 0
def delete_account(account_id):
"""删除账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM accounts WHERE id = ?', (account_id,))
conn.commit()
return cursor.rowcount > 0
def increment_account_login_fail(account_id, error_message):
"""增加账号登录失败次数如果达到3次则暂停账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 获取当前失败次数
cursor.execute('SELECT login_fail_count FROM accounts WHERE id = ?', (account_id,))
row = cursor.fetchone()
if not row:
return False
fail_count = (row['login_fail_count'] or 0) + 1
# 更新失败次数和错误信息
if fail_count >= 3:
# 达到3次暂停账号
cursor.execute('''
UPDATE accounts
SET login_fail_count = ?,
last_login_error = ?,
status = 'suspended'
WHERE id = ?
''', (fail_count, error_message, account_id))
conn.commit()
return True # 返回True表示账号已被暂停
else:
# 未达到3次只更新计数
cursor.execute('''
UPDATE accounts
SET login_fail_count = ?,
last_login_error = ?
WHERE id = ?
''', (fail_count, error_message, account_id))
conn.commit()
return False # 返回False表示未暂停
def reset_account_login_status(account_id):
"""重置账号登录状态(修改密码后调用)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE accounts
SET login_fail_count = 0,
last_login_error = NULL,
status = 'active'
WHERE id = ?
''', (account_id,))
conn.commit()
return cursor.rowcount > 0
def get_account_status(account_id):
"""获取账号状态信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT status, login_fail_count, last_login_error
FROM accounts
WHERE id = ?
''', (account_id,))
return cursor.fetchone()
def delete_user_accounts(user_id):
"""删除用户的所有账号"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM accounts WHERE user_id = ?', (user_id,))
conn.commit()
return cursor.rowcount
# ==================== 统计相关 ====================
def get_user_stats(user_id):
"""获取用户统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as count FROM accounts WHERE user_id = ?', (user_id,))
account_count = cursor.fetchone()['count']
return {'account_count': account_count}
def get_system_stats():
"""获取系统统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as count FROM users')
total_users = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM users WHERE status = 'approved'")
approved_users = cursor.fetchone()['count']
cursor.execute("SELECT COUNT(*) as count FROM users WHERE status = 'pending'")
pending_users = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM accounts')
total_accounts = cursor.fetchone()['count']
cursor.execute('''
SELECT COUNT(*) as count FROM users
WHERE vip_expire_time IS NOT NULL
AND datetime(vip_expire_time) > datetime('now')
''')
vip_users = cursor.fetchone()['count']
return {
'total_users': total_users,
'approved_users': approved_users,
'pending_users': pending_users,
'total_accounts': total_accounts,
'vip_users': vip_users
}
# ==================== 系统配置管理 ====================
def get_system_config():
"""获取系统配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM system_config WHERE id = 1')
row = cursor.fetchone()
if row:
return dict(row)
# 返回默认值
return {
'max_concurrent_global': 2,
'max_concurrent_per_account': 1,
'max_screenshot_concurrent': 3,
'schedule_enabled': 0,
'schedule_time': '02:00',
'schedule_browse_type': '应读',
'schedule_weekdays': '1,2,3,4,5,6,7',
'proxy_enabled': 0,
'proxy_api_url': '',
'proxy_expire_minutes': 3,
'enable_screenshot': 1
}
def update_system_config(max_concurrent=None, schedule_enabled=None, schedule_time=None,
schedule_browse_type=None, schedule_weekdays=None,
max_concurrent_per_account=None, max_screenshot_concurrent=None, proxy_enabled=None,
proxy_api_url=None, proxy_expire_minutes=None):
"""更新系统配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
if max_concurrent is not None:
updates.append('max_concurrent_global = ?')
params.append(max_concurrent)
if schedule_enabled is not None:
updates.append('schedule_enabled = ?')
params.append(schedule_enabled)
if schedule_time is not None:
updates.append('schedule_time = ?')
params.append(schedule_time)
if schedule_browse_type is not None:
updates.append('schedule_browse_type = ?')
params.append(schedule_browse_type)
if max_concurrent_per_account is not None:
updates.append('max_concurrent_per_account = ?')
params.append(max_concurrent_per_account)
if schedule_weekdays is not None:
updates.append('schedule_weekdays = ?')
params.append(schedule_weekdays)
if proxy_enabled is not None:
updates.append('proxy_enabled = ?')
params.append(proxy_enabled)
if proxy_api_url is not None:
updates.append('proxy_api_url = ?')
params.append(proxy_api_url)
if proxy_expire_minutes is not None:
updates.append('proxy_expire_minutes = ?')
params.append(proxy_expire_minutes)
if updates:
updates.append('updated_at = CURRENT_TIMESTAMP')
sql = f"UPDATE system_config SET {', '.join(updates)} WHERE id = 1"
cursor.execute(sql, params)
conn.commit()
return True
return False
# ==================== 任务日志管理 ====================
def create_task_log(user_id, account_id, username, browse_type, status,
total_items=0, total_attachments=0, error_message='', duration=None, source='manual'):
"""创建任务日志记录
Args:
source: 任务来源 - 'manual'(手动执行), 'scheduled'(定时任务), 'immediate'(立即执行)
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
INSERT INTO task_logs (
user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, created_at, source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (user_id, account_id, username, browse_type, status,
total_items, total_attachments, error_message, duration, cst_time, source))
conn.commit()
return cursor.lastrowid
def get_task_logs(limit=100, offset=0, date_filter=None, status_filter=None,
source_filter=None, user_id_filter=None, account_filter=None):
"""获取任务日志列表(支持分页和多种筛选)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 构建WHERE条件
where_clauses = ["1=1"]
params = []
if date_filter:
where_clauses.append("date(tl.created_at) = ?")
params.append(date_filter)
if status_filter:
where_clauses.append("tl.status = ?")
params.append(status_filter)
if source_filter:
where_clauses.append("tl.source = ?")
params.append(source_filter)
if user_id_filter:
where_clauses.append("tl.user_id = ?")
params.append(user_id_filter)
if account_filter:
where_clauses.append("tl.username LIKE ?")
params.append(f"%{account_filter}%")
where_sql = " AND ".join(where_clauses)
# 获取总数
count_sql = f'''
SELECT COUNT(*) as total
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
'''
cursor.execute(count_sql, params)
total = cursor.fetchone()['total']
# 获取分页数据
data_sql = f'''
SELECT
tl.*,
u.username as user_username
FROM task_logs tl
LEFT JOIN users u ON tl.user_id = u.id
WHERE {where_sql}
ORDER BY tl.created_at DESC
LIMIT ? OFFSET ?
'''
params.extend([limit, offset])
cursor.execute(data_sql, params)
logs = [dict(row) for row in cursor.fetchall()]
return {
'logs': logs,
'total': total
}
def get_task_stats(date_filter=None):
"""获取任务统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime('%Y-%m-%d')
# 当日统计
cursor.execute('''
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE date(created_at) = ?
''', (date_filter,))
today_stats = cursor.fetchone()
# 历史累计统计
cursor.execute('''
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success_tasks,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
''')
total_stats = cursor.fetchone()
return {
'today': {
'total_tasks': today_stats['total_tasks'] or 0,
'success_tasks': today_stats['success_tasks'] or 0,
'failed_tasks': today_stats['failed_tasks'] or 0,
'total_items': today_stats['total_items'] or 0,
'total_attachments': today_stats['total_attachments'] or 0
},
'total': {
'total_tasks': total_stats['total_tasks'] or 0,
'success_tasks': total_stats['success_tasks'] or 0,
'failed_tasks': total_stats['failed_tasks'] or 0,
'total_items': total_stats['total_items'] or 0,
'total_attachments': total_stats['total_attachments'] or 0
}
}
def delete_old_task_logs(days=30):
"""删除N天前的任务日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
DELETE FROM task_logs
WHERE created_at < datetime('now', '-' || ? || ' days')
''', (days,))
conn.commit()
return cursor.rowcount
def get_user_run_stats(user_id, date_filter=None):
"""获取用户的运行统计信息"""
with db_pool.get_db() as conn:
cst_tz = pytz.timezone("Asia/Shanghai")
cursor = conn.cursor()
if date_filter is None:
date_filter = datetime.now(cst_tz).strftime('%Y-%m-%d')
cursor.execute('''
SELECT
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
SUM(total_items) as total_items,
SUM(total_attachments) as total_attachments
FROM task_logs
WHERE user_id = ? AND date(created_at) = ?
''', (user_id, date_filter))
stats = cursor.fetchone()
return {
'completed': stats['completed'] or 0,
'failed': stats['failed'] or 0,
'total_items': stats['total_items'] or 0,
'total_attachments': stats['total_attachments'] or 0
}
# ==================== 密码重置功能 ====================
def create_password_reset_request(user_id, new_password):
"""创建密码重置申请 - 使用bcrypt哈希"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password_bcrypt(new_password)
try:
cursor.execute('''
INSERT INTO password_reset_requests (user_id, new_password_hash, status)
VALUES (?, ?, 'pending')
''', (user_id, password_hash))
conn.commit()
return cursor.lastrowid
except Exception as e:
print(f"创建密码重置申请失败: {e}")
return None
def get_pending_password_resets():
"""获取所有待审核的密码重置申请"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT r.id, r.user_id, r.created_at, r.status,
u.username, u.email
FROM password_reset_requests r
JOIN users u ON r.user_id = u.id
WHERE r.status = 'pending'
ORDER BY r.created_at DESC
''')
return [dict(row) for row in cursor.fetchall()]
def approve_password_reset(request_id):
"""批准密码重置申请"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
try:
# 获取申请信息
cursor.execute('''
SELECT user_id, new_password_hash
FROM password_reset_requests
WHERE id = ? AND status = 'pending'
''', (request_id,))
result = cursor.fetchone()
if not result:
return False
user_id = result['user_id']
new_password_hash = result['new_password_hash']
# 更新用户密码
cursor.execute('UPDATE users SET password_hash = ? WHERE id = ?',
(new_password_hash, user_id))
# 更新申请状态
cursor.execute('''
UPDATE password_reset_requests
SET status = 'approved', processed_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (request_id,))
conn.commit()
return True
except Exception as e:
print(f"批准密码重置失败: {e}")
return False
def reject_password_reset(request_id):
"""拒绝密码重置申请"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute('''
UPDATE password_reset_requests
SET status = 'rejected', processed_at = CURRENT_TIMESTAMP
WHERE id = ? AND status = 'pending'
''', (request_id,))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"拒绝密码重置失败: {e}")
return False
def admin_reset_user_password(user_id, new_password):
"""管理员直接重置用户密码 - 使用bcrypt哈希"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password_bcrypt(new_password)
try:
cursor.execute('UPDATE users SET password_hash = ? WHERE id = ?',
(password_hash, user_id))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"管理员重置密码失败: {e}")
return False
# ==================== 日志清理 ====================
def clean_old_operation_logs(days=30):
"""清理指定天数前的操作日志如果存在operation_logs表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 检查表是否存在
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='operation_logs'
""")
if not cursor.fetchone():
return 0
try:
cursor.execute('''
DELETE FROM operation_logs
WHERE created_at < datetime('now', '-' || ? || ' days')
''', (days,))
deleted_count = cursor.rowcount
conn.commit()
print(f"已清理 {deleted_count} 条旧操作日志 (>{days}天)")
return deleted_count
except Exception as e:
print(f"清理旧操作日志失败: {e}")
return 0
# ==================== Bug反馈管理 ====================
def create_bug_feedback(user_id, username, title, description, contact=''):
"""创建Bug反馈"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
INSERT INTO bug_feedbacks (user_id, username, title, description, contact, created_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (user_id, username, title, description, contact, cst_time))
conn.commit()
return cursor.lastrowid
def get_bug_feedbacks(limit=100, offset=0, status_filter=None):
"""获取Bug反馈列表管理员用"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
sql = 'SELECT * FROM bug_feedbacks WHERE 1=1'
params = []
if status_filter:
sql += ' AND status = ?'
params.append(status_filter)
sql += ' ORDER BY created_at DESC LIMIT ? OFFSET ?'
params.extend([limit, offset])
cursor.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
def get_user_feedbacks(user_id, limit=50):
"""获取用户自己的反馈列表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM bug_feedbacks
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ?
''', (user_id, limit))
return [dict(row) for row in cursor.fetchall()]
def get_feedback_by_id(feedback_id):
"""根据ID获取反馈详情"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM bug_feedbacks WHERE id = ?', (feedback_id,))
row = cursor.fetchone()
return dict(row) if row else None
def reply_feedback(feedback_id, admin_reply):
"""管理员回复反馈"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
UPDATE bug_feedbacks
SET admin_reply = ?, status = 'replied', replied_at = ?
WHERE id = ?
''', (admin_reply, cst_time, feedback_id))
conn.commit()
return cursor.rowcount > 0
def close_feedback(feedback_id):
"""关闭反馈"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE bug_feedbacks
SET status = 'closed'
WHERE id = ?
''', (feedback_id,))
conn.commit()
return cursor.rowcount > 0
def delete_feedback(feedback_id):
"""删除反馈"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM bug_feedbacks WHERE id = ?', (feedback_id,))
conn.commit()
return cursor.rowcount > 0
def get_feedback_stats():
"""获取反馈统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END) as replied,
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed
FROM bug_feedbacks
''')
row = cursor.fetchone()
return dict(row) if row else {'total': 0, 'pending': 0, 'replied': 0, 'closed': 0}
# ==================== 用户定时任务管理 ====================
def get_user_schedules(user_id):
"""获取用户的所有定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM user_schedules
WHERE user_id = ?
ORDER BY created_at DESC
''', (user_id,))
return [dict(row) for row in cursor.fetchall()]
def get_schedule_by_id(schedule_id):
"""根据ID获取定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM user_schedules WHERE id = ?', (schedule_id,))
row = cursor.fetchone()
return dict(row) if row else None
def create_user_schedule(user_id, name='我的定时任务', schedule_time='08:00',
weekdays='1,2,3,4,5', browse_type='应读',
enable_screenshot=1, account_ids=None):
"""创建用户定时任务"""
import json
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
account_ids_str = json.dumps(account_ids) if account_ids else '[]'
cursor.execute('''
INSERT INTO user_schedules (
user_id, name, enabled, schedule_time, weekdays,
browse_type, enable_screenshot, account_ids, created_at, updated_at
) VALUES (?, ?, 0, ?, ?, ?, ?, ?, ?, ?)
''', (user_id, name, schedule_time, weekdays, browse_type,
enable_screenshot, account_ids_str, cst_time, cst_time))
conn.commit()
return cursor.lastrowid
def update_user_schedule(schedule_id, **kwargs):
"""更新用户定时任务"""
import json
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
updates = []
params = []
allowed_fields = ['name', 'enabled', 'schedule_time', 'weekdays',
'browse_type', 'enable_screenshot', 'account_ids']
for field in allowed_fields:
if field in kwargs:
value = kwargs[field]
if field == 'account_ids' and isinstance(value, list):
value = json.dumps(value)
updates.append(f'{field} = ?')
params.append(value)
if not updates:
return False
updates.append('updated_at = ?')
params.append(datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S"))
params.append(schedule_id)
sql = f"UPDATE user_schedules SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def delete_user_schedule(schedule_id):
"""删除用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM user_schedules WHERE id = ?', (schedule_id,))
conn.commit()
return cursor.rowcount > 0
def toggle_user_schedule(schedule_id, enabled):
"""启用/禁用用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
UPDATE user_schedules
SET enabled = ?, updated_at = ?
WHERE id = ?
''', (1 if enabled else 0, cst_time, schedule_id))
conn.commit()
return cursor.rowcount > 0
def get_enabled_user_schedules():
"""获取所有启用的用户定时任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT us.*, u.username as user_username
FROM user_schedules us
JOIN users u ON us.user_id = u.id
WHERE us.enabled = 1
ORDER BY us.schedule_time
''')
return [dict(row) for row in cursor.fetchall()]
def update_schedule_last_run(schedule_id):
"""更新定时任务最后运行时间"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
UPDATE user_schedules
SET last_run_at = ?, updated_at = ?
WHERE id = ?
''', (cst_time, cst_time, schedule_id))
conn.commit()
return cursor.rowcount > 0
# ==================== 定时任务执行日志 ====================
def create_schedule_execution_log(schedule_id, user_id, schedule_name):
"""创建定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_tz = pytz.timezone("Asia/Shanghai")
execute_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S")
cursor.execute('''
INSERT INTO schedule_execution_logs (
schedule_id, user_id, schedule_name, execute_time, status
) VALUES (?, ?, ?, ?, 'running')
''', (schedule_id, user_id, schedule_name, execute_time))
conn.commit()
return cursor.lastrowid
def update_schedule_execution_log(log_id, **kwargs):
"""更新定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
allowed_fields = ['total_accounts', 'success_accounts', 'failed_accounts',
'total_items', 'total_attachments', 'total_screenshots',
'duration_seconds', 'status', 'error_message']
for field in allowed_fields:
if field in kwargs:
updates.append(f'{field} = ?')
params.append(kwargs[field])
if not updates:
return False
params.append(log_id)
sql = f"UPDATE schedule_execution_logs SET {', '.join(updates)} WHERE id = ?"
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount > 0
def get_schedule_execution_logs(schedule_id, limit=10):
"""获取定时任务执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT
id,
schedule_id,
user_id,
schedule_name,
execute_time as created_at,
total_accounts,
success_accounts as success_count,
failed_accounts as failed_count,
total_items,
total_attachments,
total_screenshots,
duration_seconds as duration,
status,
error_message
FROM schedule_execution_logs
WHERE schedule_id = ?
ORDER BY execute_time DESC
LIMIT ?
''', (schedule_id, limit))
return [dict(row) for row in cursor.fetchall()]
def get_user_all_schedule_logs(user_id, limit=50):
"""获取用户所有定时任务的执行日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM schedule_execution_logs
WHERE user_id = ?
ORDER BY execute_time DESC
LIMIT ?
''', (user_id, limit))
return [dict(row) for row in cursor.fetchall()]