Files
zsglpt/task_checkpoint.py
yuyx 7cfb76abf2 修复12项安全漏洞和代码质量问题
安全修复:
- 使用secrets替代random生成验证码,提升安全性
- 添加内存清理调度器,防止内存泄漏
- PIL缺失时返回503而非降级服务
- 改进会话安全配置,支持环境自动检测
- 密钥文件路径支持环境变量配置

Bug修复:
- 改进异常处理,不再吞掉SystemExit/KeyboardInterrupt
- 清理死代码(if False占位符)
- 改进浏览器资源释放逻辑,使用try-finally确保关闭
- 重构数据库连接池归还逻辑,修复竞态条件
- 添加安全的JSON解析方法,处理损坏数据
- 日志级别默认值改为INFO
- 提取魔法数字为可配置常量

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 20:00:19 +08:00

378 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
任务断点续传模块
功能:
1. 记录任务执行进度(每个步骤的状态)
2. 任务异常时自动保存断点
3. 重启后自动恢复未完成任务
4. 智能重试机制
"""
import time
import json
from datetime import datetime
from enum import Enum
import db_pool
class TaskStage(Enum):
"""任务执行阶段"""
QUEUED = 'queued' # 排队中
STARTING = 'starting' # 启动浏览器
LOGGING_IN = 'logging_in' # 登录中
BROWSING = 'browsing' # 浏览中
DOWNLOADING = 'downloading' # 下载中
COMPLETING = 'completing' # 完成中
COMPLETED = 'completed' # 已完成
FAILED = 'failed' # 失败
PAUSED = 'paused' # 暂停(等待恢复)
class TaskCheckpoint:
"""任务断点管理器"""
def __init__(self):
"""初始化(使用全局连接池)"""
self._init_table()
def _safe_json_loads(self, data):
"""安全的JSON解析处理损坏或无效的数据
Args:
data: JSON字符串或None
Returns:
解析后的对象或None
"""
if not data:
return None
try:
return json.loads(data)
except (json.JSONDecodeError, TypeError, ValueError) as e:
print(f"[警告] JSON解析失败: {e}, 数据: {data[:100] if isinstance(data, str) else data}")
return None
def _init_table(self):
"""初始化任务进度表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS task_checkpoints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT UNIQUE NOT NULL, -- 任务唯一ID (user_id:account_id:timestamp)
user_id INTEGER NOT NULL,
account_id TEXT NOT NULL,
username TEXT NOT NULL,
browse_type TEXT NOT NULL,
-- 任务状态
stage TEXT NOT NULL, -- 当前阶段
status TEXT NOT NULL, -- running/paused/completed/failed
progress_percent INTEGER DEFAULT 0, -- 进度百分比
-- 进度详情
current_page INTEGER DEFAULT 0, -- 当前浏览到第几页
total_pages INTEGER DEFAULT 0, -- 总页数(如果已知)
processed_items INTEGER DEFAULT 0, -- 已处理条目数
downloaded_files INTEGER DEFAULT 0, -- 已下载文件数
-- 错误处理
retry_count INTEGER DEFAULT 0, -- 重试次数
max_retries INTEGER DEFAULT 3, -- 最大重试次数
last_error TEXT, -- 最后一次错误信息
error_count INTEGER DEFAULT 0, -- 累计错误次数
-- 断点数据(JSON格式存储上下文)
checkpoint_data TEXT, -- 断点上下文数据
-- 时间戳
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
""")
# 创建索引加速查询
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_task_status
ON task_checkpoints(status, stage)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_task_user
ON task_checkpoints(user_id, account_id)
""")
conn.commit()
def create_checkpoint(self, user_id, account_id, username, browse_type):
"""创建新的任务断点"""
task_id = f"{user_id}:{account_id}:{int(time.time())}"
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO task_checkpoints
(task_id, user_id, account_id, username, browse_type, stage, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (task_id, user_id, account_id, username, browse_type,
TaskStage.QUEUED.value, 'running'))
conn.commit()
return task_id
def update_stage(self, task_id, stage, progress_percent=None, checkpoint_data=None):
"""更新任务阶段"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = ['stage = ?', 'updated_at = CURRENT_TIMESTAMP']
params = [stage.value if isinstance(stage, TaskStage) else stage]
if progress_percent is not None:
updates.append('progress_percent = ?')
params.append(progress_percent)
if checkpoint_data is not None:
updates.append('checkpoint_data = ?')
params.append(json.dumps(checkpoint_data, ensure_ascii=False))
params.append(task_id)
cursor.execute(f"""
UPDATE task_checkpoints
SET {', '.join(updates)}
WHERE task_id = ?
""", params)
conn.commit()
def update_progress(self, task_id, **kwargs):
"""更新任务进度
Args:
task_id: 任务ID
current_page: 当前页码
total_pages: 总页数
processed_items: 已处理条目数
downloaded_files: 已下载文件数
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = ['updated_at = CURRENT_TIMESTAMP']
params = []
for key in ['current_page', 'total_pages', 'processed_items', 'downloaded_files']:
if key in kwargs:
updates.append(f'{key} = ?')
params.append(kwargs[key])
# 自动计算进度百分比
if 'current_page' in kwargs and 'total_pages' in kwargs and kwargs['total_pages'] > 0:
progress = int((kwargs['current_page'] / kwargs['total_pages']) * 100)
updates.append('progress_percent = ?')
params.append(min(progress, 100))
params.append(task_id)
cursor.execute(f"""
UPDATE task_checkpoints
SET {', '.join(updates)}
WHERE task_id = ?
""", params)
conn.commit()
def record_error(self, task_id, error_message, pause=False):
"""记录错误并决定是否暂停任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 获取当前重试次数和最大重试次数
cursor.execute("""
SELECT retry_count, max_retries, error_count
FROM task_checkpoints
WHERE task_id = ?
""", (task_id,))
result = cursor.fetchone()
if result:
retry_count, max_retries, error_count = result
retry_count += 1
error_count += 1
# 判断是否超过最大重试次数
if retry_count >= max_retries or pause:
# 超过重试次数,暂停任务等待人工处理
cursor.execute("""
UPDATE task_checkpoints
SET status = 'paused',
stage = ?,
retry_count = ?,
error_count = ?,
last_error = ?,
updated_at = CURRENT_TIMESTAMP
WHERE task_id = ?
""", (TaskStage.PAUSED.value, retry_count, error_count,
error_message, task_id))
conn.commit()
return 'paused'
else:
# 还可以重试
cursor.execute("""
UPDATE task_checkpoints
SET retry_count = ?,
error_count = ?,
last_error = ?,
updated_at = CURRENT_TIMESTAMP
WHERE task_id = ?
""", (retry_count, error_count, error_message, task_id))
conn.commit()
return 'retry'
return 'unknown'
def complete_task(self, task_id, success=True):
"""完成任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE task_checkpoints
SET status = ?,
stage = ?,
progress_percent = 100,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE task_id = ?
""", ('completed' if success else 'failed',
TaskStage.COMPLETED.value if success else TaskStage.FAILED.value,
task_id))
conn.commit()
def get_checkpoint(self, task_id):
"""获取任务断点信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT task_id, user_id, account_id, username, browse_type,
stage, status, progress_percent,
current_page, total_pages, processed_items, downloaded_files,
retry_count, max_retries, last_error, error_count,
checkpoint_data, created_at, updated_at, completed_at
FROM task_checkpoints
WHERE task_id = ?
""", (task_id,))
row = cursor.fetchone()
if row:
return {
'task_id': row[0],
'user_id': row[1],
'account_id': row[2],
'username': row[3],
'browse_type': row[4],
'stage': row[5],
'status': row[6],
'progress_percent': row[7],
'current_page': row[8],
'total_pages': row[9],
'processed_items': row[10],
'downloaded_files': row[11],
'retry_count': row[12],
'max_retries': row[13],
'last_error': row[14],
'error_count': row[15],
'checkpoint_data': self._safe_json_loads(row[16]),
'created_at': row[17],
'updated_at': row[18],
'completed_at': row[19]
}
return None
def get_paused_tasks(self, user_id=None):
"""获取所有暂停的任务(可恢复的任务)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
if user_id:
cursor.execute("""
SELECT task_id, user_id, account_id, username, browse_type,
stage, progress_percent, last_error, retry_count,
updated_at
FROM task_checkpoints
WHERE status = 'paused' AND user_id = ?
ORDER BY updated_at DESC
""", (user_id,))
else:
cursor.execute("""
SELECT task_id, user_id, account_id, username, browse_type,
stage, progress_percent, last_error, retry_count,
updated_at
FROM task_checkpoints
WHERE status = 'paused'
ORDER BY updated_at DESC
""")
tasks = []
for row in cursor.fetchall():
tasks.append({
'task_id': row[0],
'user_id': row[1],
'account_id': row[2],
'username': row[3],
'browse_type': row[4],
'stage': row[5],
'progress_percent': row[6],
'last_error': row[7],
'retry_count': row[8],
'updated_at': row[9]
})
return tasks
def resume_task(self, task_id):
"""恢复暂停的任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE task_checkpoints
SET status = 'running',
retry_count = 0,
updated_at = CURRENT_TIMESTAMP
WHERE task_id = ? AND status = 'paused'
""", (task_id,))
conn.commit()
return cursor.rowcount > 0
def abandon_task(self, task_id):
"""放弃暂停的任务"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE task_checkpoints
SET status = 'failed',
stage = ?,
completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE task_id = ? AND status = 'paused'
""", (TaskStage.FAILED.value, task_id))
conn.commit()
return cursor.rowcount > 0
def cleanup_old_checkpoints(self, days=7):
"""清理旧的断点数据(保留最近N天)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM task_checkpoints
WHERE status IN ('completed', 'failed')
AND datetime(completed_at) < datetime('now', '-' || ? || ' days')
""", (days,))
deleted = cursor.rowcount
conn.commit()
return deleted
# 全局单例
_checkpoint_manager = None
def get_checkpoint_manager():
"""获取全局断点管理器实例"""
global _checkpoint_manager
if _checkpoint_manager is None:
_checkpoint_manager = TaskCheckpoint()
return _checkpoint_manager