Files
zsglpt/app.py
root f9aa511806 修复三个关键Bug
1. 修复定时任务设置时间不执行问题
   - 将定时任务检查频率从60秒提升到5秒
   - 确保定时任务在设定时间准时执行
   - 位置: app.py 第3010-3011行

2. 修复账号管理卡片设置按钮无法点击问题
   - 修正JavaScript引号转义错误
   - 位置: templates/index.html 第886行

3. 修复用户反馈和后台反馈进度不同步问题
   - 前端改为检查status字段而非reply字段
   - 新增closed状态支持
   - 正确显示待处理/已回复/已关闭三种状态
   - 位置: templates/index.html 第249-251行, 第1550-1567行

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-10 15:53:53 +08:00

3419 lines
131 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
知识管理平台自动化工具 - 多用户版本
支持用户注册登录、后台管理、数据隔离
"""
# 设置时区为中国标准时间CST, UTC+8
import os
os.environ['TZ'] = 'Asia/Shanghai'
try:
import time
time.tzset()
except AttributeError:
pass # Windows系统不支持tzset()
import pytz
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_from_directory, redirect, url_for, session
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import threading
import time
import json
import os
from datetime import datetime, timedelta, timezone
from functools import wraps
# 导入数据库模块和核心模块
import database
import requests
from browser_pool import get_browser_pool, init_browser_pool
from browser_pool_worker import get_browser_worker_pool, init_browser_worker_pool, shutdown_browser_worker_pool
from playwright_automation import PlaywrightBrowserManager, PlaywrightAutomation, BrowseResult
from api_browser import APIBrowser, APIBrowseResult
from browser_installer import check_and_install_browser
# ========== 优化模块导入 ==========
from app_config import get_config
from app_logger import init_logging, get_logger, audit_logger
from app_security import (
ip_rate_limiter, require_ip_not_locked,
validate_username, validate_password, validate_email,
is_safe_path, sanitize_filename, get_client_ip
)
from app_utils import verify_and_consume_captcha
# ========== 初始化配置 ==========
config = get_config()
app = Flask(__name__)
app.config.from_object(config)
# 确保SECRET_KEY已设置
if not app.config.get('SECRET_KEY'):
raise RuntimeError("SECRET_KEY未配置请检查app_config.py")
socketio = SocketIO(
app,
cors_allowed_origins="*",
async_mode='threading', # 明确指定async模式
ping_timeout=60, # ping超时60秒
ping_interval=25, # 每25秒ping一次
logger=False, # 禁用socketio debug日志
engineio_logger=False
)
# ========== 初始化日志系统 ==========
init_logging(log_level=config.LOG_LEVEL, log_file=config.LOG_FILE)
logger = get_logger('app')
logger.info("="*60)
logger.info("知识管理平台自动化工具 - 多用户版")
logger.info("="*60)
logger.info(f"Session配置: COOKIE_NAME={app.config.get('SESSION_COOKIE_NAME', 'session')}, "
f"SAMESITE={app.config.get('SESSION_COOKIE_SAMESITE', 'None')}, "
f"HTTPONLY={app.config.get('SESSION_COOKIE_HTTPONLY', 'None')}, "
f"SECURE={app.config.get('SESSION_COOKIE_SECURE', 'None')}, "
f"PATH={app.config.get('SESSION_COOKIE_PATH', 'None')}")
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login_page'
@login_manager.unauthorized_handler
def unauthorized():
"""处理未授权访问 - API请求返回JSON,页面请求重定向"""
if request.path.startswith('/api/') or request.path.startswith('/yuyx/api/'):
return jsonify({"error": "请先登录", "code": "unauthorized"}), 401
return redirect(url_for('login_page', next=request.url))
# 截图目录
SCREENSHOTS_DIR = config.SCREENSHOTS_DIR
os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
# 全局变量
browser_manager = None
user_accounts = {} # {user_id: {account_id: Account对象}}
active_tasks = {} # {account_id: Thread对象}
task_status = {} # {account_id: {"user_id": x, "username": y, "status": "排队中/运行中", "detail_status": "具体状态", "browse_type": z, "start_time": t, "source": s, "progress": {...}, "is_vip": bool}}
# VIP优先级队列
vip_task_queue = [] # VIP用户任务队列
normal_task_queue = [] # 普通用户任务队列
task_queue_lock = threading.Lock()
log_cache = {} # {user_id: [logs]} 每个用户独立的日志缓存
log_cache_total_count = 0 # 全局日志总数,防止无限增长
# 日志缓存限制
MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条
MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏
# 并发控制每个用户同时最多运行1个账号避免内存不足
# 验证码存储:{session_id: {"code": "1234", "expire_time": timestamp, "failed_attempts": 0}}
captcha_storage = {}
# IP限流存储:{ip: {"attempts": count, "lock_until": timestamp, "first_attempt": timestamp}}
ip_rate_limit = {}
# 限流配置 - 从 config 读取,避免硬编码
MAX_CAPTCHA_ATTEMPTS = config.MAX_CAPTCHA_ATTEMPTS
MAX_IP_ATTEMPTS_PER_HOUR = config.MAX_IP_ATTEMPTS_PER_HOUR
IP_LOCK_DURATION = config.IP_LOCK_DURATION
# 全局限制:整个系统同时最多运行N个账号线程本地架构,每个线程独立浏览器,内存占用约200MB/浏览器)
max_concurrent_per_account = config.MAX_CONCURRENT_PER_ACCOUNT
max_concurrent_global = config.MAX_CONCURRENT_GLOBAL
user_semaphores = {} # {user_id: Semaphore}
global_semaphore = threading.Semaphore(max_concurrent_global)
# 截图专用信号量:限制同时进行的截图任务数量为1(避免资源竞争)
# <20><><EFBFBD>图信号量将在首次使用时初始化
screenshot_semaphore = None
screenshot_semaphore_lock = threading.Lock()
def get_screenshot_semaphore():
"""获取截图信号量(懒加载,根据配置动态创建)"""
global screenshot_semaphore
with screenshot_semaphore_lock:
config = database.get_system_config()
max_concurrent = config.get('max_screenshot_concurrent', 3)
if screenshot_semaphore is None:
screenshot_semaphore = threading.Semaphore(max_concurrent)
return screenshot_semaphore, max_concurrent
class User(UserMixin):
"""Flask-Login 用户类"""
def __init__(self, user_id):
self.id = user_id
class Admin(UserMixin):
"""管理员类"""
def __init__(self, admin_id):
self.id = admin_id
self.is_admin = True
class Account:
"""账号类"""
def __init__(self, account_id, user_id, username, password, remember=True, remark=''):
self.id = account_id
self.user_id = user_id
self.username = username
self.password = password
self.remember = remember
self.remark = remark
self.status = "未开始"
self.is_running = False
self.should_stop = False
self.total_items = 0
self.total_attachments = 0
self.automation = None
self.last_browse_type = "注册前未读"
self.proxy_config = None # 保存代理配置,浏览和截图共用
def to_dict(self):
result = {
"id": self.id,
"username": self.username,
"status": self.status,
"remark": self.remark,
"total_items": self.total_items,
"total_attachments": self.total_attachments,
"is_running": self.is_running
}
# 添加详细进度信息(如果有)
if self.id in task_status:
ts = task_status[self.id]
progress = ts.get('progress', {})
result['detail_status'] = ts.get('detail_status', '')
result['progress_items'] = progress.get('items', 0)
result['progress_attachments'] = progress.get('attachments', 0)
result['start_time'] = ts.get('start_time', 0)
# 计算运行时长
if ts.get('start_time'):
import time
elapsed = int(time.time() - ts['start_time'])
result['elapsed_seconds'] = elapsed
mins, secs = divmod(elapsed, 60)
result['elapsed_display'] = f"{mins}{secs}"
else:
# 非运行状态下根据status设置detail_status
status_map = {
'已完成': '任务完成',
'截图中': '正在截图',
'浏览完成': '浏览完成',
'登录失败': '登录失败',
'已暂停': '任务已暂停'
}
for key, val in status_map.items():
if key in self.status:
result['detail_status'] = val
break
return result
@login_manager.user_loader
def load_user(user_id):
"""Flask-Login 用户加载"""
user = database.get_user_by_id(int(user_id))
if user:
return User(user['id'])
return None
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
logger.debug(f"[admin_required] Session内容: {dict(session)}")
logger.debug(f"[admin_required] Cookies: {request.cookies}")
if 'admin_id' not in session:
logger.warning(f"[admin_required] 拒绝访问 {request.path} - session中无admin_id")
return jsonify({"error": "需要管理员权限"}), 403
logger.info(f"[admin_required] 管理员 {session.get('admin_username')} 访问 {request.path}")
return f(*args, **kwargs)
return decorated_function
def log_to_client(message, user_id=None, account_id=None):
"""发送日志到Web客户端(用户隔离)"""
beijing_tz = timezone(timedelta(hours=8))
timestamp = datetime.now(beijing_tz).strftime('%H:%M:%S')
log_data = {
'timestamp': timestamp,
'message': message,
'account_id': account_id
}
# 如果指定了user_id,则缓存到该用户的日志
if user_id:
global log_cache_total_count
if user_id not in log_cache:
log_cache[user_id] = []
log_cache[user_id].append(log_data)
log_cache_total_count += 1
# 持久化到数据库 (已禁用,使用task_logs表代替)
# try:
# database.save_operation_log(user_id, message, account_id, 'INFO')
# except Exception as e:
# print(f"保存日志到数据库失败: {e}")
# 单用户限制
if len(log_cache[user_id]) > MAX_LOGS_PER_USER:
log_cache[user_id].pop(0)
log_cache_total_count -= 1
# 全局限制 - 如果超过总数限制,清理日志最多的用户
while log_cache_total_count > MAX_TOTAL_LOGS:
if log_cache:
max_user = max(log_cache.keys(), key=lambda u: len(log_cache[u]))
if log_cache[max_user]:
log_cache[max_user].pop(0)
log_cache_total_count -= 1
else:
break
else:
break
# 发送到该用户的room
socketio.emit('log', log_data, room=f'user_{user_id}')
# 控制台日志:添加账号短标识便于区分
if account_id:
# 显示账号ID前4位作为标识
short_id = account_id[:4] if len(account_id) >= 4 else account_id
print(f"[{timestamp}] U{user_id}:{short_id} | {message}")
else:
print(f"[{timestamp}] U{user_id} | {message}")
def get_proxy_from_api(api_url, max_retries=3):
"""从API获取代理IP支持重试
Args:
api_url: 代理API地址
max_retries: 最大重试次数
Returns:
代理服务器地址(格式: http://IP:PORT或 None
"""
import re
# IP:PORT 格式正则
ip_port_pattern = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$')
for attempt in range(max_retries):
try:
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
text = response.text.strip()
# 尝试解析JSON响应
try:
import json
data = json.loads(text)
# 检查是否是错误响应
if isinstance(data, dict):
if data.get('status') != 200 and data.get('status') != 0:
error_msg = data.get('msg', data.get('message', '未知错误'))
print(f"✗ 代理API返回错误: {error_msg} (尝试 {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(1)
continue
# 尝试从JSON中获取IP
ip_port = data.get('ip') or data.get('proxy') or data.get('data')
if ip_port:
text = str(ip_port).strip()
except (json.JSONDecodeError, ValueError):
# 不是JSON继续使用原始文本
pass
# 验证IP:PORT格式
if ip_port_pattern.match(text):
proxy_server = f"http://{text}"
print(f"✓ 获取代理成功: {proxy_server} (尝试 {attempt + 1}/{max_retries})")
return proxy_server
else:
print(f"✗ 代理格式无效: {text[:50]} (尝试 {attempt + 1}/{max_retries})")
else:
print(f"✗ 获取代理失败: HTTP {response.status_code} (尝试 {attempt + 1}/{max_retries})")
except Exception as e:
print(f"✗ 获取代理异常: {str(e)} (尝试 {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(1)
print(f"✗ 获取代理失败,已重试 {max_retries} 次,将不使用代理继续")
return None
def init_browser_manager():
"""初始化浏览器管理器"""
global browser_manager
if browser_manager is None:
print("正在初始化Playwright浏览器管理器...")
if not check_and_install_browser(log_callback=lambda msg, account_id=None: print(msg)):
print("浏览器环境检查失败!")
return False
browser_manager = PlaywrightBrowserManager(
headless=True,
log_callback=lambda msg, account_id=None: print(msg)
)
try:
# 不再需要initialize(),每个账号会创建独立浏览器
print("Playwright浏览器管理器创建成功")
return True
except Exception as e:
print(f"Playwright初始化失败: {str(e)}")
return False
return True
# ==================== 前端路由 ====================
@app.route('/')
def index():
"""主页 - 重定向到登录或应用"""
if current_user.is_authenticated:
return redirect(url_for('app_page'))
return redirect(url_for('login_page'))
@app.route('/login')
def login_page():
"""登录页面"""
return render_template('login.html')
@app.route('/register')
def register_page():
"""注册页面"""
return render_template('register.html')
@app.route('/app')
@login_required
def app_page():
"""主应用页面"""
return render_template('index.html')
@app.route('/yuyx')
def admin_login_page():
"""后台登录页面"""
if 'admin_id' in session:
return redirect(url_for('admin_page'))
return render_template('admin_login.html')
@app.route('/yuyx/admin')
@admin_required
def admin_page():
"""后台管理页面"""
return render_template('admin.html')
@app.route('/yuyx/vip')
@admin_required
def vip_admin_page():
"""VIP管理页面"""
return render_template('vip_admin.html')
# ==================== 用户认证API ====================
@app.route('/api/register', methods=['POST'])
@require_ip_not_locked # IP限流保护
def register():
"""用户注册"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
email = data.get('email', '').strip()
captcha_session = data.get('captcha_session', '')
captcha_code = data.get('captcha', '').strip()
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
# 获取客户端IP用于IP限流检查
client_ip = get_client_ip()
# 检查IP限流
allowed, error_msg = check_ip_rate_limit(client_ip)
if not allowed:
return jsonify({"error": error_msg}), 429
# 验证验证码
success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage, MAX_CAPTCHA_ATTEMPTS)
if not success:
# 验证失败记录IP失败尝试注册特有的IP限流逻辑
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
user_id = database.create_user(username, password, email)
if user_id:
return jsonify({"success": True, "message": "注册成功,请等待管理员审核"})
else:
return jsonify({"error": "用户名已存在"}), 400
# ==================== 验证码API ====================
import random
from task_checkpoint import get_checkpoint_manager, TaskStage
checkpoint_mgr = None # 任务断点管理器
def check_ip_rate_limit(ip_address):
"""检查IP是否被限流"""
current_time = time.time()
# 清理过期的IP记录
expired_ips = [ip for ip, data in ip_rate_limit.items()
if data.get("lock_until", 0) < current_time and
current_time - data.get("first_attempt", current_time) > 3600]
for ip in expired_ips:
del ip_rate_limit[ip]
# 检查IP是否被锁定
if ip_address in ip_rate_limit:
ip_data = ip_rate_limit[ip_address]
# 如果IP被锁定且未到解锁时间
if ip_data.get("lock_until", 0) > current_time:
remaining_time = int(ip_data["lock_until"] - current_time)
return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1)
# 如果超过1小时,重置计数
if current_time - ip_data.get("first_attempt", current_time) > 3600:
ip_rate_limit[ip_address] = {
"attempts": 0,
"first_attempt": current_time
}
return True, None
def record_failed_captcha(ip_address):
"""记录验证码失败尝试"""
current_time = time.time()
if ip_address not in ip_rate_limit:
ip_rate_limit[ip_address] = {
"attempts": 1,
"first_attempt": current_time
}
else:
ip_rate_limit[ip_address]["attempts"] += 1
# 检查是否超过限制
if ip_rate_limit[ip_address]["attempts"] >= MAX_IP_ATTEMPTS_PER_HOUR:
ip_rate_limit[ip_address]["lock_until"] = current_time + IP_LOCK_DURATION
return True # 表示IP已被锁定
return False # 表示还未锁定
@app.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码"""
import uuid
session_id = str(uuid.uuid4())
# 生成4位随机数字
code = "".join([str(random.randint(0, 9)) for _ in range(4)])
# 存储验证码5分钟过期
captcha_storage[session_id] = {
"code": code,
"expire_time": time.time() + 300,
"failed_attempts": 0
}
# 清理过期验证码
expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < time.time()]
for k in expired_keys:
del captcha_storage[k]
return jsonify({"session_id": session_id, "captcha": code})
@app.route('/api/login', methods=['POST'])
@require_ip_not_locked # IP限流保护
def login():
"""用户登录"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
captcha_session = data.get('captcha_session', '')
captcha_code = data.get('captcha', '').strip()
need_captcha = data.get('need_captcha', False)
# 如果需要验证码,验证验证码
if need_captcha:
success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage)
if not success:
return jsonify({"error": message}), 400
# 先检查用户是否存在
user_exists = database.get_user_by_username(username)
if not user_exists:
return jsonify({"error": "账号未注册", "need_captcha": True}), 401
# 检查密码是否正确
user = database.verify_user(username, password)
if not user:
# 密码错误
return jsonify({"error": "密码错误", "need_captcha": True}), 401
# 检查审核状态
if user['status'] != 'approved':
return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401
# 登录成功
user_obj = User(user['id'])
login_user(user_obj)
load_user_accounts(user['id'])
return jsonify({"success": True})
@app.route('/api/logout', methods=['POST'])
@login_required
def logout():
"""用户登出"""
logout_user()
return jsonify({"success": True})
# ==================== 管理员认证API ====================
@app.route('/yuyx/api/debug-config', methods=['GET'])
def debug_config():
"""调试配置信息"""
return jsonify({
"secret_key_set": bool(app.secret_key),
"secret_key_length": len(app.secret_key) if app.secret_key else 0,
"session_config": {
"SESSION_COOKIE_NAME": app.config.get('SESSION_COOKIE_NAME'),
"SESSION_COOKIE_SECURE": app.config.get('SESSION_COOKIE_SECURE'),
"SESSION_COOKIE_HTTPONLY": app.config.get('SESSION_COOKIE_HTTPONLY'),
"SESSION_COOKIE_SAMESITE": app.config.get('SESSION_COOKIE_SAMESITE'),
"PERMANENT_SESSION_LIFETIME": str(app.config.get('PERMANENT_SESSION_LIFETIME')),
},
"current_session": dict(session),
"cookies_received": list(request.cookies.keys())
})
@app.route('/yuyx/api/login', methods=['POST'])
@require_ip_not_locked # IP限流保护
def admin_login():
"""管理员登录支持JSON和form-data两种格式"""
# 兼容JSON和form-data两种提交方式
if request.is_json:
data = request.json
else:
data = request.form
username = data.get('username', '').strip()
password = data.get('password', '').strip()
captcha_session = data.get('captcha_session', '')
captcha_code = data.get('captcha', '').strip()
need_captcha = data.get('need_captcha', False)
# 如果需要验证码,验证验证码
if need_captcha:
success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage)
if not success:
if request.is_json:
return jsonify({"error": message}), 400
else:
return redirect(url_for('admin_login_page'))
admin = database.verify_admin(username, password)
if admin:
# 清除旧session确保干净的状态
session.clear()
# 设置管理员session
session['admin_id'] = admin['id']
session['admin_username'] = admin['username']
session.permanent = True # 设置为永久会话使用PERMANENT_SESSION_LIFETIME配置
session.modified = True # 强制标记session为已修改确保保存
logger.info(f"[admin_login] 管理员 {username} 登录成功, session已设置: admin_id={admin['id']}")
logger.debug(f"[admin_login] Session内容: {dict(session)}")
logger.debug(f"[admin_login] Cookie将被设置: name={app.config.get('SESSION_COOKIE_NAME', 'session')}")
# 根据请求类型返回不同响应
if request.is_json:
# JSON请求返回JSON响应给JavaScript使用
response = jsonify({"success": True, "redirect": "/yuyx/admin"})
return response
else:
# form-data请求直接重定向到后台页面
return redirect(url_for('admin_page'))
else:
logger.warning(f"[admin_login] 管理员 {username} 登录失败 - 用户名或密码错误")
if request.is_json:
return jsonify({"error": "管理员用户名或密码错误", "need_captcha": True}), 401
else:
# form提交失败重定向回登录页TODO: 可以添加flash消息
return redirect(url_for('admin_login_page'))
@app.route('/yuyx/api/logout', methods=['POST'])
@admin_required
def admin_logout():
"""管理员登出"""
session.pop('admin_id', None)
session.pop('admin_username', None)
return jsonify({"success": True})
@app.route('/yuyx/api/users', methods=['GET'])
@admin_required
def get_all_users():
"""获取所有用户"""
users = database.get_all_users()
return jsonify(users)
@app.route('/yuyx/api/users/pending', methods=['GET'])
@admin_required
def get_pending_users():
"""获取待审核用户"""
users = database.get_pending_users()
return jsonify(users)
@app.route('/yuyx/api/users/<int:user_id>/approve', methods=['POST'])
@admin_required
def approve_user_route(user_id):
"""审核通过用户"""
if database.approve_user(user_id):
return jsonify({"success": True})
return jsonify({"error": "审核失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>/reject', methods=['POST'])
@admin_required
def reject_user_route(user_id):
"""拒绝用户"""
if database.reject_user(user_id):
return jsonify({"success": True})
return jsonify({"error": "拒绝失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user_route(user_id):
"""删除用户"""
if database.delete_user(user_id):
# 清理内存中的账号数据
if user_id in user_accounts:
del user_accounts[user_id]
# 清理用户信号量,防止内存泄漏
if user_id in user_semaphores:
del user_semaphores[user_id]
# 清理用户日志缓存,防止内存泄漏
global log_cache_total_count
if user_id in log_cache:
log_cache_total_count -= len(log_cache[user_id])
del log_cache[user_id]
return jsonify({"success": True})
return jsonify({"error": "删除失败"}), 400
@app.route('/yuyx/api/stats', methods=['GET'])
@admin_required
def get_system_stats():
"""获取系统统计"""
stats = database.get_system_stats()
# 从session获取管理员用户名
stats["admin_username"] = session.get('admin_username', 'admin')
return jsonify(stats)
@app.route('/yuyx/api/docker_stats', methods=['GET'])
@admin_required
def get_docker_stats():
"""获取Docker容器运行状态"""
import subprocess
docker_status = {
'running': False,
'container_name': 'N/A',
'uptime': 'N/A',
'memory_usage': 'N/A',
'memory_limit': 'N/A',
'memory_percent': 'N/A',
'cpu_percent': 'N/A',
'status': 'Unknown'
}
try:
# 检查是否在Docker容器内
if os.path.exists('/.dockerenv'):
docker_status['running'] = True
# 获取容器名称
try:
with open('/etc/hostname', 'r') as f:
docker_status['container_name'] = f.read().strip()
except Exception as e:
logger.debug(f"读取容器名称失败: {e}")
# 获取内存使用情况 (cgroup v2)
try:
# 尝试cgroup v2路径
if os.path.exists('/sys/fs/cgroup/memory.current'):
# Read total memory
with open('/sys/fs/cgroup/memory.current', 'r') as f:
mem_total = int(f.read().strip())
# Read cache from memory.stat
cache = 0
if os.path.exists('/sys/fs/cgroup/memory.stat'):
with open('/sys/fs/cgroup/memory.stat', 'r') as f:
for line in f:
if line.startswith('inactive_file '):
cache = int(line.split()[1])
break
# Actual memory = total - cache
mem_bytes = mem_total - cache
docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024)
# 获取内存限制
if os.path.exists('/sys/fs/cgroup/memory.max'):
with open('/sys/fs/cgroup/memory.max', 'r') as f:
limit_str = f.read().strip()
if limit_str != 'max':
limit_bytes = int(limit_str)
docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024)
docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100)
# 尝试cgroup v1路径
elif os.path.exists('/sys/fs/cgroup/memory/memory.usage_in_bytes'):
# 从 memory.stat 读取内存信息
mem_bytes = 0
if os.path.exists('/sys/fs/cgroup/memory/memory.stat'):
with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f:
rss = 0
cache = 0
for line in f:
if line.startswith('total_rss '):
rss = int(line.split()[1])
elif line.startswith('total_cache '):
cache = int(line.split()[1])
# 使用 RSS + (一部分活跃的cache),更接近docker stats的计算
# 但为了准确性,我们只用RSS
mem_bytes = rss
# 如果找不到,则使用总内存减去缓存作为后备
if mem_bytes == 0:
with open('/sys/fs/cgroup/memory/memory.usage_in_bytes', 'r') as f:
total_mem = int(f.read().strip())
cache = 0
if os.path.exists('/sys/fs/cgroup/memory/memory.stat'):
with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f:
for line in f:
if line.startswith('total_inactive_file '):
cache = int(line.split()[1])
break
mem_bytes = total_mem - cache
docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024)
# 获取内存限制
if os.path.exists('/sys/fs/cgroup/memory/memory.limit_in_bytes'):
with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f:
limit_bytes = int(f.read().strip())
# 检查是否是实际限制(不是默认的超大值)
if limit_bytes < 9223372036854771712:
docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024)
docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100)
except Exception as e:
docker_status['memory_usage'] = 'Error: {}'.format(str(e))
# 获取容器运行时间(基于PID 1的启动时间)
try:
# Get PID 1 start time
with open('/proc/1/stat', 'r') as f:
stat_data = f.read().split()
starttime_ticks = int(stat_data[21])
# Get system uptime
with open('/proc/uptime', 'r') as f:
system_uptime = float(f.read().split()[0])
# Get clock ticks per second
import os as os_module
ticks_per_sec = os_module.sysconf(os_module.sysconf_names['SC_CLK_TCK'])
# Calculate container uptime
process_start = starttime_ticks / ticks_per_sec
uptime_seconds = int(system_uptime - process_start)
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
minutes = (uptime_seconds % 3600) // 60
if days > 0:
docker_status['uptime'] = "{}{}小时 {}分钟".format(days, hours, minutes)
elif hours > 0:
docker_status['uptime'] = "{}小时 {}分钟".format(hours, minutes)
else:
docker_status['uptime'] = "{}分钟".format(minutes)
except Exception as e:
logger.debug(f"读取容器运行时间失败: {e}")
docker_status['status'] = 'Running'
else:
docker_status['status'] = 'Not in Docker'
except Exception as e:
docker_status['status'] = 'Error: {}'.format(str(e))
return jsonify(docker_status)
@app.route('/yuyx/api/admin/password', methods=['PUT'])
@admin_required
def update_admin_password():
"""修改管理员密码"""
data = request.json
new_password = data.get('new_password', '').strip()
if not new_password:
return jsonify({"error": "密码不能为空"}), 400
username = session.get('admin_username')
if database.update_admin_password(username, new_password):
return jsonify({"success": True})
return jsonify({"error": "修改失败"}), 400
@app.route('/yuyx/api/admin/username', methods=['PUT'])
@admin_required
def update_admin_username():
"""修改管理员用户名"""
data = request.json
new_username = data.get('new_username', '').strip()
if not new_username:
return jsonify({"error": "用户名不能为空"}), 400
old_username = session.get('admin_username')
if database.update_admin_username(old_username, new_username):
session['admin_username'] = new_username
return jsonify({"success": True})
return jsonify({"error": "修改失败,用户名可能已存在"}), 400
def update_admin_username():
"""修改管理员用户名"""
data = request.json
new_username = data.get('new_username', '').strip()
if not new_username:
return jsonify({"error": "用户名不能为空"}), 400
old_username = session.get('admin_username')
if database.update_admin_username(old_username, new_username):
session['admin_username'] = new_username
return jsonify({"success": True})
return jsonify({"error": "用户名已存在"}), 400
# ==================== 密码重置API ====================
# 管理员直接重置用户密码
@app.route('/yuyx/api/users/<int:user_id>/reset_password', methods=['POST'])
@admin_required
def admin_reset_password_route(user_id):
"""管理员直接重置用户密码(无需审核)"""
data = request.json
new_password = data.get('new_password', '').strip()
if not new_password:
return jsonify({"error": "新密码不能为空"}), 400
if len(new_password) < 6:
return jsonify({"error": "密码长度不能少于6位"}), 400
if database.admin_reset_user_password(user_id, new_password):
return jsonify({"message": "密码重置成功"})
return jsonify({"error": "重置失败,用户不存在"}), 400
# 获取密码重置申请列表
@app.route('/yuyx/api/password_resets', methods=['GET'])
@admin_required
def get_password_resets_route():
"""获取所有待审核的密码重置申请"""
resets = database.get_pending_password_resets()
return jsonify(resets)
# 批准密码重置申请
@app.route('/yuyx/api/password_resets/<int:request_id>/approve', methods=['POST'])
@admin_required
def approve_password_reset_route(request_id):
"""批准密码重置申请"""
if database.approve_password_reset(request_id):
return jsonify({"message": "密码重置申请已批准"})
return jsonify({"error": "批准失败"}), 400
# 拒绝密码重置申请
@app.route('/yuyx/api/password_resets/<int:request_id>/reject', methods=['POST'])
@admin_required
def reject_password_reset_route(request_id):
"""拒绝密码重置申请"""
if database.reject_password_reset(request_id):
return jsonify({"message": "密码重置申请已拒绝"})
return jsonify({"error": "拒绝失败"}), 400
# 用户申请重置密码(需要审核)
@app.route('/api/reset_password_request', methods=['POST'])
def request_password_reset():
"""用户申请重置密码"""
data = request.json
username = data.get('username', '').strip()
email = data.get('email', '').strip()
new_password = data.get('new_password', '').strip()
if not username or not new_password:
return jsonify({"error": "用户名和新密码不能为空"}), 400
if len(new_password) < 6:
return jsonify({"error": "密码长度不能少于6位"}), 400
# 验证用户存在
user = database.get_user_by_username(username)
if not user:
return jsonify({"error": "用户不存在"}), 404
# 如果提供了邮箱,验证邮箱是否匹配
if email and user.get('email') != email:
return jsonify({"error": "邮箱不匹配"}), 400
# 创建重置申请
request_id = database.create_password_reset_request(user['id'], new_password)
if request_id:
return jsonify({"message": "密码重置申请已提交,请等待管理员审核"})
else:
return jsonify({"error": "申请提交失败"}), 500
# ==================== 账号管理API (用户隔离) ====================
def load_user_accounts(user_id):
"""从数据库加载用户的账号到内存"""
if user_id not in user_accounts:
user_accounts[user_id] = {}
accounts_data = database.get_user_accounts(user_id)
for acc_data in accounts_data:
account = Account(
account_id=acc_data['id'],
user_id=user_id,
username=acc_data['username'],
password=acc_data['password'],
remember=bool(acc_data['remember']),
remark=acc_data['remark'] or ''
)
user_accounts[user_id][account.id] = account
# ==================== Bug反馈API用户端 ====================
@app.route('/api/feedback', methods=['POST'])
@login_required
def submit_feedback():
"""用户提交Bug反馈"""
data = request.get_json()
title = data.get('title', '').strip()
description = data.get('description', '').strip()
contact = data.get('contact', '').strip()
if not title or not description:
return jsonify({"error": "标题和描述不能为空"}), 400
if len(title) > 100:
return jsonify({"error": "标题不能超过100个字符"}), 400
if len(description) > 2000:
return jsonify({"error": "描述不能超过2000个字符"}), 400
# 从数据库获取用户名
user_info = database.get_user_by_id(current_user.id)
username = user_info['username'] if user_info else f'用户{current_user.id}'
feedback_id = database.create_bug_feedback(
user_id=current_user.id,
username=username,
title=title,
description=description,
contact=contact
)
return jsonify({"message": "反馈提交成功", "id": feedback_id})
@app.route('/api/feedback', methods=['GET'])
@login_required
def get_my_feedbacks():
"""获取当前用户的反馈列表"""
feedbacks = database.get_user_feedbacks(current_user.id)
return jsonify(feedbacks)
# ==================== Bug反馈API管理端 ====================
@app.route('/yuyx/api/feedbacks', methods=['GET'])
@admin_required
def get_all_feedbacks():
"""管理员获取所有反馈"""
status = request.args.get('status')
limit = int(request.args.get('limit', 100))
offset = int(request.args.get('offset', 0))
feedbacks = database.get_bug_feedbacks(limit=limit, offset=offset, status_filter=status)
stats = database.get_feedback_stats()
return jsonify({
"feedbacks": feedbacks,
"stats": stats
})
@app.route('/yuyx/api/feedbacks/<int:feedback_id>/reply', methods=['POST'])
@admin_required
def reply_to_feedback(feedback_id):
"""管理员回复反馈"""
data = request.get_json()
reply = data.get('reply', '').strip()
if not reply:
return jsonify({"error": "回复内容不能为空"}), 400
if database.reply_feedback(feedback_id, reply):
return jsonify({"message": "回复成功"})
else:
return jsonify({"error": "反馈不存在"}), 404
@app.route('/yuyx/api/feedbacks/<int:feedback_id>/close', methods=['POST'])
@admin_required
def close_feedback_api(feedback_id):
"""管理员关闭反馈"""
if database.close_feedback(feedback_id):
return jsonify({"message": "已关闭"})
else:
return jsonify({"error": "反馈不存在"}), 404
@app.route('/yuyx/api/feedbacks/<int:feedback_id>', methods=['DELETE'])
@admin_required
def delete_feedback_api(feedback_id):
"""管理员删除反馈"""
if database.delete_feedback(feedback_id):
return jsonify({"message": "已删除"})
else:
return jsonify({"error": "反馈不存在"}), 404
# ==================== 账号管理API ====================
@app.route('/api/accounts', methods=['GET'])
@login_required
def get_accounts():
"""获取当前用户的所有账号"""
user_id = current_user.id
# 检查是否需要强制刷新(容器重启后内存数据丢失)
refresh = request.args.get('refresh', 'false').lower() == 'true'
# 如果user_accounts中没有数据或者请求刷新则从数据库加载
if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh:
logger.debug(f"[API] 用户 {user_id} 请求账号列表从数据库加载refresh={refresh}")
load_user_accounts(user_id)
accounts = user_accounts.get(user_id, {})
logger.debug(f"[API] 返回用户 {user_id}{len(accounts)} 个账号")
return jsonify([acc.to_dict() for acc in accounts.values()])
@app.route('/api/accounts', methods=['POST'])
@login_required
def add_account():
"""添加账号"""
user_id = current_user.id
# 账号数量限制检查VIP不限制普通用户最多3个
current_count = len(database.get_user_accounts(user_id))
is_vip = database.is_user_vip(user_id)
if not is_vip and current_count >= 3:
return jsonify({"error": "普通用户最多添加3个账号升级VIP可无限添加"}), 403
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
remark = data.get("remark", "").strip()[:200] # 限制200字符
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
# 检查当前用户是否已存在该账号
if user_id in user_accounts:
for acc in user_accounts[user_id].values():
if acc.username == username:
return jsonify({"error": f"账号 '{username}' 已存在"}), 400
# 生成账号ID
import uuid
account_id = str(uuid.uuid4())[:8]
# 设置remember默认值为True
remember = data.get('remember', True)
# 保存到数据库
database.create_account(user_id, account_id, username, password, remember, remark)
# 加载到内存
account = Account(account_id, user_id, username, password, remember, remark)
if user_id not in user_accounts:
user_accounts[user_id] = {}
user_accounts[user_id][account_id] = account
log_to_client(f"添加账号: {username}", user_id)
return jsonify(account.to_dict())
@app.route('/api/accounts/<account_id>', methods=['PUT'])
@login_required
def update_account(account_id):
"""更新账号信息(密码等)"""
user_id = current_user.id
# 验证账号所有权
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
# 如果账号正在运行,不允许修改
if account.is_running:
return jsonify({"error": "账号正在运行中,请先停止"}), 400
data = request.json
new_password = data.get('password', '').strip()
new_remember = data.get('remember', account.remember)
if not new_password:
return jsonify({"error": "密码不能为空"}), 400
# 更新数据库
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE accounts
SET password = ?, remember = ?
WHERE id = ?
''', (new_password, new_remember, account_id))
conn.commit()
# 重置账号登录状态密码修改后恢复active状态
database.reset_account_login_status(account_id)
logger.info(f"[账号更新] 用户 {user_id} 修改了账号 {account.username} 的密码,已重置登录状态")
# 更新内存中的账号信息
account.password = new_password
account.remember = new_remember
log_to_client(f"账号 {account.username} 信息已更新,登录状态已重置", user_id)
return jsonify({"message": "账号更新成功", "account": account.to_dict()})
@app.route('/api/accounts/<account_id>', methods=['DELETE'])
@login_required
def delete_account(account_id):
"""删除账号"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
# 停止正在运行的任务
if account.is_running:
account.should_stop = True
if account.automation:
account.automation.close()
username = account.username
# 从数据库删除
database.delete_account(account_id)
# 从内存删除
del user_accounts[user_id][account_id]
log_to_client(f"删除账号: {username}", user_id)
return jsonify({"success": True})
@app.route('/api/accounts/<account_id>/remark', methods=['PUT'])
@login_required
def update_remark(account_id):
"""更新备注"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
data = request.json
remark = data.get('remark', '').strip()[:200]
# 更新数据库
database.update_account_remark(account_id, remark)
# 更新内存
user_accounts[user_id][account_id].remark = remark
log_to_client(f"更新备注: {user_accounts[user_id][account_id].username} -> {remark}", user_id)
return jsonify({"success": True})
@app.route('/api/accounts/<account_id>/start', methods=['POST'])
@login_required
def start_account(account_id):
"""启动账号任务"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
if account.is_running:
return jsonify({"error": "任务已在运行中"}), 400
data = request.json
browse_type = data.get('browse_type', '应读')
enable_screenshot = data.get('enable_screenshot', True) # 默认启用截图
# 确保浏览器管理器已初始化
if not init_browser_manager():
return jsonify({"error": "浏览器初始化失败"}), 500
# 启动任务线程
account.is_running = True
account.should_stop = False
account.status = "运行中"
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot, 'manual'),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
log_to_client(f"启动任务: {account.username} - {browse_type}", user_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return jsonify({"success": True})
@app.route('/api/accounts/<account_id>/stop', methods=['POST'])
@login_required
def stop_account(account_id):
"""停止账号任务"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
if not account.is_running:
return jsonify({"error": "任务未在运行"}), 400
account.should_stop = True
account.status = "正在停止"
log_to_client(f"停止任务: {account.username}", user_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return jsonify({"success": True})
def get_user_semaphore(user_id):
"""获取或创建用户的信号量"""
if user_id not in user_semaphores:
user_semaphores[user_id] = threading.Semaphore(max_concurrent_per_account)
return user_semaphores[user_id]
def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="manual"):
"""运行自动化任务"""
print(f"[DEBUG run_task] account={account_id}, enable_screenshot={enable_screenshot} (类型:{type(enable_screenshot).__name__}), source={source}")
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return
account = user_accounts[user_id][account_id]
# 导入time模块
import time as time_module
# 注意:不在此处记录开始时间,因为要排除排队等待时间
# 两级并发控制:用户级 + 全局级VIP优先
user_sem = get_user_semaphore(user_id)
# 检查是否VIP用户
is_vip_user = database.is_user_vip(user_id)
vip_label = " [VIP优先]" if is_vip_user else ""
# 获取用户级信号量(同一用户的账号排队)
log_to_client(f"等待资源分配...{vip_label}", user_id, account_id)
account.status = "排队中" + (" (VIP)" if is_vip_user else "")
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 记录任务状态为排队中
import time as time_mod
task_status[account_id] = {
"user_id": user_id,
"username": account.username,
"status": "排队中",
"detail_status": "等待资源" + vip_label,
"browse_type": browse_type,
"start_time": time_mod.time(),
"source": source,
"progress": {"items": 0, "attachments": 0},
"is_vip": is_vip_user
}
# 加入优先级队列
with task_queue_lock:
if is_vip_user:
vip_task_queue.append(account_id)
else:
normal_task_queue.append(account_id)
# VIP优先排队机制
acquired = False
while not acquired:
with task_queue_lock:
# VIP用户直接尝试获取; 普通用户需等VIP队列为空
can_try = is_vip_user or len(vip_task_queue) == 0
if can_try and user_sem.acquire(blocking=False):
acquired = True
with task_queue_lock:
if account_id in vip_task_queue:
vip_task_queue.remove(account_id)
if account_id in normal_task_queue:
normal_task_queue.remove(account_id)
break
# 检查是否被停止
if account.should_stop:
with task_queue_lock:
if account_id in vip_task_queue:
vip_task_queue.remove(account_id)
if account_id in normal_task_queue:
normal_task_queue.remove(account_id)
log_to_client(f"任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
if account_id in task_status:
del task_status[account_id]
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return
time_module.sleep(0.3)
try:
# 如果在排队期间被停止,直接返回
if account.should_stop:
log_to_client(f"任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return
# 获取全局信号量(防止所有用户同时运行导致资源耗尽)
global_semaphore.acquire()
try:
# 再次检查是否被停止
if account.should_stop:
log_to_client(f"任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return
# ====== 创建任务断点 ======
task_id = checkpoint_mgr.create_checkpoint(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type
)
logger.info(f"[断点] 任务 {task_id} 已创建")
# ====== 在此处记录任务真正的开始时间(排除排队等待时间) ======
task_start_time = time_module.time()
account.status = "运行中"
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
account.last_browse_type = browse_type
# 更新任务状态为运行中
if account_id in task_status:
task_status[account_id]["status"] = "运行中"
task_status[account_id]["detail_status"] = "初始化"
task_status[account_id]["start_time"] = task_start_time
# 重试机制最多尝试3次超时则换IP重试
max_attempts = 3
last_error = None
for attempt in range(1, max_attempts + 1):
try:
if attempt > 1:
log_to_client(f"🔄 第 {attempt} 次尝试(共{max_attempts}次)...", user_id, account_id)
# 检查是否需要使用代理
proxy_config = None
config = database.get_system_config()
if config.get('proxy_enabled') == 1:
proxy_api_url = config.get('proxy_api_url', '').strip()
if proxy_api_url:
log_to_client(f"正在获取代理IP...", user_id, account_id)
proxy_server = get_proxy_from_api(proxy_api_url, max_retries=3)
if proxy_server:
proxy_config = {'server': proxy_server}
log_to_client(f"✓ 将使用代理: {proxy_server}", user_id, account_id)
account.proxy_config = proxy_config # 保存代理配置供截图使用
else:
log_to_client(f"✗ 代理获取失败,将不使用代理继续", user_id, account_id)
else:
log_to_client(f"⚠ 代理已启用但未配置API地址", user_id, account_id)
# 使用 API 方式浏览(不启动浏览器,节省内存)
checkpoint_mgr.update_stage(task_id, TaskStage.STARTING, progress_percent=10)
def custom_log(message: str):
log_to_client(message, user_id, account_id)
log_to_client(f"开始登录...", user_id, account_id)
if account_id in task_status:
task_status[account_id]["detail_status"] = "正在登录"
checkpoint_mgr.update_stage(task_id, TaskStage.LOGGING_IN, progress_percent=25)
# 使用 API 方式登录和浏览(不启动浏览器)
api_browser = APIBrowser(log_callback=custom_log, proxy_config=proxy_config)
if api_browser.login(account.username, account.password):
log_to_client(f"✓ 登录成功!", user_id, account_id)
# 登录成功,清除失败计数
# 保存cookies供截图使用
api_browser.save_cookies_for_playwright(account.username)
database.reset_account_login_status(account_id)
if account_id in task_status:
task_status[account_id]["detail_status"] = "正在浏览"
log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id)
def should_stop():
return account.should_stop
checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50)
result = api_browser.browse_content(
browse_type=browse_type,
should_stop_callback=should_stop
)
# 转换结果类型以兼容后续代码
result = BrowseResult(
success=result.success,
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=result.error_message
)
api_browser.close()
else:
# API 登录失败
error_message = "登录失败"
log_to_client(f"{error_message}", user_id, account_id)
# 增加失败计数(假设密码错误)
is_suspended = database.increment_account_login_fail(account_id, error_message)
if is_suspended:
log_to_client(f"⚠ 该账号连续3次密码错误已自动暂停", user_id, account_id)
log_to_client(f"请在前台修改密码后才能继续使用", user_id, account_id)
retry_action = checkpoint_mgr.record_error(task_id, error_message)
if retry_action == "paused":
logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)")
account.status = "登录失败"
account.is_running = False
# 记录登录失败日志
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=0,
total_attachments=0,
error_message=error_message,
duration=int(time_module.time() - task_start_time),
source=source
)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
api_browser.close()
return
account.total_items = result.total_items
account.total_attachments = result.total_attachments
if result.success:
log_to_client(f"浏览完成! 共 {result.total_items} 条内容,{result.total_attachments} 个附件", user_id, account_id)
if account_id in task_status:
task_status[account_id]["detail_status"] = "浏览完成"
task_status[account_id]["progress"] = {"items": result.total_items, "attachments": result.total_attachments}
account.status = "已完成"
checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95)
checkpoint_mgr.complete_task(task_id, success=True)
logger.info(f"[断点] 任务 {task_id} 已完成")
# 记录成功日志(如果不截图则在此记录,截图时在截图完成后记录)
if not enable_screenshot:
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='success',
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message='',
duration=int(time_module.time() - task_start_time),
source=source
)
# 成功则跳出重试循环
break
else:
# 浏览出错,检查是否是超时错误
error_msg = result.error_message
if 'Timeout' in error_msg or 'timeout' in error_msg:
last_error = error_msg
log_to_client(f"⚠ 检测到超时错误: {error_msg}", user_id, account_id)
# 关闭当前浏览器
if account.automation:
try:
account.automation.close()
log_to_client(f"已关闭超时的浏览器实例", user_id, account_id)
except Exception as e:
logger.debug(f"关闭超时浏览器实例失败: {e}")
account.automation = None
if attempt < max_attempts:
log_to_client(f"⚠ 代理可能速度过慢将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
time_module.sleep(2) # 等待2秒再重试
continue
else:
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
duration=int(time_module.time() - task_start_time)
)
break
else:
# 非超时错误,直接失败不重试
log_to_client(f"浏览出错: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time),
source=source
)
break
except Exception as retry_error:
# 捕获重试过程中的异常
error_msg = str(retry_error)
last_error = error_msg
# 关闭可能存在的浏览器实例
if account.automation:
try:
account.automation.close()
except Exception as e:
logger.debug(f"关闭浏览器实例失败: {e}")
account.automation = None
if 'Timeout' in error_msg or 'timeout' in error_msg:
log_to_client(f"⚠ 执行超时: {error_msg}", user_id, account_id)
if attempt < max_attempts:
log_to_client(f"⚠ 将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
time_module.sleep(2)
continue
else:
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
duration=int(time_module.time() - task_start_time),
source=source
)
break
else:
# 非超时异常,直接失败
log_to_client(f"任务执行异常: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time),
source=source
)
break
except Exception as e:
error_msg = str(e)
log_to_client(f"任务执行出错: {error_msg}", user_id, account_id)
account.status = "出错"
# 记录异常失败日志
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time),
source=source
)
finally:
# 先关闭浏览器,再释放信号量(避免并发创建/关闭浏览器导致资源竞争)
account.is_running = False
# 如果状态不是已完成(需要截图),则重置为未开始
if account.status not in ["已完成"]:
account.status = "未开始"
if account.automation:
try:
account.automation.close()
# log_to_client(f"主任务浏览器已关闭", user_id, account_id) # 精简
except Exception as e:
log_to_client(f"关闭主任务浏览器时出错: {str(e)}", user_id, account_id)
finally:
account.automation = None
# 浏览器关闭后再释放全局信号量,确保新任务创建浏览器时旧浏览器已完全关闭
global_semaphore.release()
if account_id in active_tasks:
del active_tasks[account_id]
if account_id in task_status:
del task_status[account_id]
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 任务完成后自动截图(增加2秒延迟,确保资源完全释放)
# 根据enable_screenshot参数决定是否截图
if account.status == "已完成" and not account.should_stop:
if enable_screenshot:
log_to_client(f"等待2秒后开始截图...", user_id, account_id)
# 更新账号状态为等待截图
account.status = "等待截图"
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 重新添加截图状态
import time as time_mod
task_status[account_id] = {
"user_id": user_id,
"username": account.username,
"status": "排队中",
"detail_status": "等待截图资源",
"browse_type": browse_type,
"start_time": time_mod.time(),
"source": source,
"progress": {"items": result.total_items if result else 0, "attachments": result.total_attachments if result else 0}
}
time.sleep(2) # 延迟启动截图,确保主任务资源已完全释放
browse_result_dict = {'total_items': result.total_items, 'total_attachments': result.total_attachments}
threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id, browse_type, source, task_start_time, browse_result_dict), daemon=True).start()
else:
# 不截图时,重置状态为未开始
account.status = "未开始"
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
log_to_client(f"截图功能已禁用,跳过截图", user_id, account_id)
else:
# 任务非正常完成,重置状态为未开始
if account.status not in ["登录失败", "出错"]:
account.status = "未开始"
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
finally:
# 释放用户级信号量
user_sem.release()
def take_screenshot_for_account(user_id, account_id, browse_type="应读", source="manual", task_start_time=None, browse_result=None):
"""为账号任务完成后截图(使用工作线程池,真正的浏览器复用)"""
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return
account = user_accounts[user_id][account_id]
# 标记账号正在截图(防止重复提交截图任务)
account.is_running = True
def screenshot_task(browser_instance, user_id, account_id, account, browse_type, source, task_start_time, browse_result):
"""在worker线程中执行的截图任务"""
# ✅ 获得worker后立即更新状态为"截图中"
if user_id in user_accounts and account_id in user_accounts[user_id]:
acc = user_accounts[user_id][account_id]
acc.status = "截图中"
if account_id in task_status:
task_status[account_id]["status"] = "运行中"
task_status[account_id]["detail_status"] = "正在截图"
socketio.emit('account_update', acc.to_dict(), room=f'user_{user_id}')
max_retries = 3
for attempt in range(1, max_retries + 1):
automation = None
try:
# 更新状态
if account_id in task_status:
task_status[account_id]["detail_status"] = f"正在截图{f' (第{attempt}次)' if attempt > 1 else ''}"
if attempt > 1:
log_to_client(f"🔄 第 {attempt} 次截图尝试...", user_id, account_id)
log_to_client(f"使用Worker-{browser_instance['worker_id']}的浏览器(已使用{browser_instance['use_count']}次)", user_id, account_id)
# 使用worker的浏览器创建PlaywrightAutomation
proxy_config = account.proxy_config if hasattr(account, 'proxy_config') else None
automation = PlaywrightAutomation(browser_manager, account_id, proxy_config=proxy_config)
automation.playwright = browser_instance['playwright']
automation.browser = browser_instance['browser']
def custom_log(message: str):
log_to_client(message, user_id, account_id)
automation.log = custom_log
# 登录
log_to_client(f"登录中...", user_id, account_id)
login_result = automation.quick_login(account.username, account.password, account.remember)
if not login_result["success"]:
error_message = login_result.get("message", "截图登录失败")
log_to_client(f"截图登录失败: {error_message}", user_id, account_id)
if attempt < max_retries:
log_to_client(f"将重试...", user_id, account_id)
time.sleep(2)
continue
else:
log_to_client(f"❌ 截图失败: 登录失败", user_id, account_id)
return {'success': False, 'error': '登录失败'}
browse_type = account.last_browse_type
log_to_client(f"导航到 '{browse_type}' 页面...", user_id, account_id)
# 导航到指定页面
result = automation.browse_content(
navigate_only=True,
browse_type=browse_type,
auto_next_page=False,
auto_view_attachments=False,
interval=0,
should_stop_callback=None
)
if not result.success and result.error_message:
log_to_client(f"导航警告: {result.error_message}", user_id, account_id)
# 等待页面稳定
time.sleep(2)
# 生成截图文件名
beijing_tz = pytz.timezone('Asia/Shanghai')
now_beijing = datetime.now(beijing_tz)
timestamp = now_beijing.strftime('%Y%m%d_%H%M%S')
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
login_account = account.remark if account.remark else account.username
screenshot_filename = f"{username_prefix}_{login_account}_{browse_type}_{timestamp}.jpg"
screenshot_path = os.path.join(SCREENSHOTS_DIR, screenshot_filename)
# 尝试截图
if automation.take_screenshot(screenshot_path):
# 验证截图文件
if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000:
log_to_client(f"✓ 截图成功: {screenshot_filename}", user_id, account_id)
return {'success': True, 'filename': screenshot_filename}
else:
log_to_client(f"截图文件异常,将重试", user_id, account_id)
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
else:
log_to_client(f"截图保存失败", user_id, account_id)
if attempt < max_retries:
log_to_client(f"将重试...", user_id, account_id)
time.sleep(2)
except Exception as e:
log_to_client(f"截图出错: {str(e)}", user_id, account_id)
if attempt < max_retries:
log_to_client(f"将重试...", user_id, account_id)
time.sleep(2)
finally:
# 只关闭context不关闭浏览器由worker管理
if automation:
try:
if automation.context:
automation.context.close()
automation.context = None
automation.page = None
except Exception as e:
logger.debug(f"关闭context时出错: {e}")
return {'success': False, 'error': '截图失败已重试3次'}
def screenshot_callback(result, error):
"""截图完成回调"""
try:
# 重置账号状态
account.is_running = False
account.status = "未开始"
# 先清除任务状态这样to_dict()不会包含detail_status
if account_id in task_status:
del task_status[account_id]
# 然后发送更新
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
if error:
log_to_client(f"❌ 截图失败: {error}", user_id, account_id)
elif not result or not result.get('success'):
error_msg = result.get('error', '未知错误') if result else '未知错误'
log_to_client(f"❌ 截图失败: {error_msg}", user_id, account_id)
# 记录任务日志
if task_start_time and browse_result:
import time as time_module
total_elapsed = int(time_module.time() - task_start_time)
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='success',
total_items=browse_result.get('total_items', 0),
total_attachments=browse_result.get('total_attachments', 0),
duration=total_elapsed,
source=source
)
except Exception as e:
logger.error(f"截图回调出错: {e}")
# 提交任务到工作线程池
pool = get_browser_worker_pool()
pool.submit_task(
screenshot_task,
screenshot_callback,
user_id, account_id, account, browse_type, source, task_start_time, browse_result
)
def manual_screenshot(account_id):
"""手动为指定账号截图"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
if account.is_running:
return jsonify({"error": "任务运行中,无法截图"}), 400
data = request.json or {}
browse_type = data.get('browse_type', account.last_browse_type)
account.last_browse_type = browse_type
threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id), daemon=True).start()
log_to_client(f"手动截图: {account.username} - {browse_type}", user_id)
return jsonify({"success": True})
# ==================== 截图管理API ====================
@app.route('/api/screenshots', methods=['GET'])
@login_required
def get_screenshots():
"""获取当前用户的截图列表"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
try:
screenshots = []
if os.path.exists(SCREENSHOTS_DIR):
for filename in os.listdir(SCREENSHOTS_DIR):
# 只显示属于当前用户的截图支持png和jpg格式
if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'):
filepath = os.path.join(SCREENSHOTS_DIR, filename)
stat = os.stat(filepath)
# 转换为北京时间
beijing_tz = pytz.timezone('Asia/Shanghai')
created_time = datetime.fromtimestamp(stat.st_mtime, tz=beijing_tz)
# 解析文件名获取显示名称
# 文件名格式用户名_登录账号_浏览类型_时间.jpg
parts = filename.rsplit('.', 1)[0].split('_', 1) # 移除扩展名并分割
if len(parts) > 1:
# 显示名称登录账号_浏览类型_时间.jpg
display_name = parts[1] + '.' + filename.rsplit('.', 1)[1]
else:
display_name = filename
screenshots.append({
'filename': filename,
'display_name': display_name,
'size': stat.st_size,
'created': created_time.strftime('%Y-%m-%d %H:%M:%S')
})
screenshots.sort(key=lambda x: x['created'], reverse=True)
return jsonify(screenshots)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/screenshots/<filename>')
@login_required
def serve_screenshot(filename):
"""提供截图文件访问"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
# 验证文件属于当前用户
if not filename.startswith(username_prefix + '_'):
return jsonify({"error": "无权访问"}), 403
return send_from_directory(SCREENSHOTS_DIR, filename)
@app.route('/api/screenshots/<filename>', methods=['DELETE'])
@login_required
def delete_screenshot(filename):
"""删除指定截图"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
# 验证文件属于当前用户
if not filename.startswith(username_prefix + '_'):
return jsonify({"error": "无权删除"}), 403
try:
filepath = os.path.join(SCREENSHOTS_DIR, filename)
if os.path.exists(filepath):
os.remove(filepath)
log_to_client(f"删除截图: {filename}", user_id)
return jsonify({"success": True})
else:
return jsonify({"error": "文件不存在"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/screenshots/clear', methods=['POST'])
@login_required
def clear_all_screenshots():
"""清空当前用户的所有截图"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
try:
deleted_count = 0
if os.path.exists(SCREENSHOTS_DIR):
for filename in os.listdir(SCREENSHOTS_DIR):
if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'):
filepath = os.path.join(SCREENSHOTS_DIR, filename)
os.remove(filepath)
deleted_count += 1
log_to_client(f"清理了 {deleted_count} 个截图文件", user_id)
return jsonify({"success": True, "deleted": deleted_count})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ==================== WebSocket事件 ====================
@socketio.on('connect')
def handle_connect():
"""客户端连接"""
if current_user.is_authenticated:
user_id = current_user.id
join_room(f'user_{user_id}')
log_to_client("客户端已连接", user_id)
# 如果user_accounts中没有该用户的账号从数据库加载
if user_id not in user_accounts or len(user_accounts[user_id]) == 0:
db_accounts = database.get_user_accounts(user_id)
if db_accounts:
user_accounts[user_id] = {}
for acc_data in db_accounts:
account = Account(
account_id=acc_data['id'],
user_id=acc_data['user_id'],
username=acc_data['username'],
password=acc_data['password'],
remember=bool(acc_data.get('remember', 1)),
remark=acc_data.get('remark', '')
)
user_accounts[user_id][acc_data['id']] = account
log_to_client(f"已从数据库恢复 {len(db_accounts)} 个账号", user_id)
# 发送账号列表
accounts = user_accounts.get(user_id, {})
emit('accounts_list', [acc.to_dict() for acc in accounts.values()])
# 发送历史日志
if user_id in log_cache:
for log_entry in log_cache[user_id]:
emit('log', log_entry)
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开"""
if current_user.is_authenticated:
user_id = current_user.id
leave_room(f'user_{user_id}')
# ==================== 静态文件 ====================
@app.route('/static/<path:filename>')
def serve_static(filename):
"""提供静态文件访问"""
response = send_from_directory('static', filename)
# 禁用缓存,强制浏览器每次都重新加载
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# ==================== 启动 ====================
# ==================== 管理员VIP管理API ====================
@app.route('/yuyx/api/vip/config', methods=['GET'])
def get_vip_config_api():
"""获取VIP配置"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
config = database.get_vip_config()
return jsonify(config)
@app.route('/yuyx/api/vip/config', methods=['POST'])
def set_vip_config_api():
"""设置默认VIP天数"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
data = request.json
days = data.get('default_vip_days', 0)
if not isinstance(days, int) or days < 0:
return jsonify({"error": "VIP天数必须是非负整数"}), 400
database.set_default_vip_days(days)
return jsonify({"message": "VIP配置已更新", "default_vip_days": days})
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['POST'])
def set_user_vip_api(user_id):
"""设置用户VIP"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
data = request.json
days = data.get('days', 30)
# 验证days参数
valid_days = [7, 30, 365, 999999]
if days not in valid_days:
return jsonify({"error": "VIP天数必须是 7/30/365/999999 之一"}), 400
if database.set_user_vip(user_id, days):
vip_type = {7: "一周", 30: "一个月", 365: "一年", 999999: "永久"}[days]
return jsonify({"message": f"VIP设置成功: {vip_type}"})
return jsonify({"error": "设置失败,用户不存在"}), 400
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['DELETE'])
def remove_user_vip_api(user_id):
"""移除用户VIP"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
if database.remove_user_vip(user_id):
return jsonify({"message": "VIP已移除"})
return jsonify({"error": "移除失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['GET'])
def get_user_vip_info_api(user_id):
"""获取用户VIP信息(管理员)"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
vip_info = database.get_user_vip_info(user_id)
return jsonify(vip_info)
# ==================== 用户端VIP查询API ====================
@app.route('/api/user/vip', methods=['GET'])
@login_required
def get_current_user_vip():
"""获取当前用户VIP信息"""
vip_info = database.get_user_vip_info(current_user.id)
# 添加用户名
user_info = database.get_user_by_id(current_user.id)
vip_info['username'] = user_info['username'] if user_info else 'Unknown'
return jsonify(vip_info)
@app.route('/api/run_stats', methods=['GET'])
@login_required
def get_run_stats():
"""获取当前用户的运行统计"""
user_id = current_user.id
# 获取今日任务统计
stats = database.get_user_run_stats(user_id)
# 计算当前正在运行的账号数
current_running = 0
if user_id in user_accounts:
current_running = sum(1 for acc in user_accounts[user_id].values() if acc.is_running)
return jsonify({
'today_completed': stats.get('completed', 0),
'current_running': current_running,
'today_failed': stats.get('failed', 0),
'today_items': stats.get('total_items', 0),
'today_attachments': stats.get('total_attachments', 0)
})
# ==================== 系统配置API ====================
@app.route('/yuyx/api/system/config', methods=['GET'])
@admin_required
def get_system_config_api():
"""获取系统配置"""
config = database.get_system_config()
return jsonify(config)
@app.route('/yuyx/api/system/config', methods=['POST'])
@admin_required
def update_system_config_api():
"""更新系统配置"""
global max_concurrent_global, global_semaphore, max_concurrent_per_account
data = request.json
max_concurrent = data.get('max_concurrent_global')
schedule_enabled = data.get('schedule_enabled')
schedule_time = data.get('schedule_time')
schedule_browse_type = data.get('schedule_browse_type')
schedule_weekdays = data.get('schedule_weekdays')
new_max_concurrent_per_account = data.get('max_concurrent_per_account')
new_max_screenshot_concurrent = data.get('max_screenshot_concurrent')
# 验证参数
if max_concurrent is not None:
if not isinstance(max_concurrent, int) or max_concurrent < 1:
return jsonify({"error": "全局并发数必须大于0建议小型服务器2-5中型5-10大型10-20"}), 400
if new_max_concurrent_per_account is not None:
if not isinstance(new_max_concurrent_per_account, int) or new_max_concurrent_per_account < 1:
return jsonify({"error": "单账号并发数必须大于0建议设为1避免同一用户任务相互影响"}), 400
if new_max_screenshot_concurrent is not None:
if not isinstance(new_max_screenshot_concurrent, int) or new_max_screenshot_concurrent < 1:
return jsonify({"error": "截图并发数必须大于0建议根据服务器配置设置每个浏览器约占用200MB内存"}), 400
if schedule_time is not None:
# 验证时间格式 HH:MM
import re
if not re.match(r'^([01]\d|2[0-3]):([0-5]\d)$', schedule_time):
return jsonify({"error": "时间格式错误,应为 HH:MM"}), 400
if schedule_browse_type is not None:
if schedule_browse_type not in ['注册前未读', '应读', '未读']:
return jsonify({"error": "浏览类型无效"}), 400
if schedule_weekdays is not None:
# 验证星期格式,应该是逗号分隔的数字字符串 "1,2,3,4,5,6,7"
try:
days = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()]
if not all(1 <= d <= 7 for d in days):
return jsonify({"error": "星期数字必须在1-7之间"}), 400
except (ValueError, AttributeError):
return jsonify({"error": "星期格式错误"}), 400
# 更新数据库
if database.update_system_config(
max_concurrent=max_concurrent,
schedule_enabled=schedule_enabled,
schedule_time=schedule_time,
schedule_browse_type=schedule_browse_type,
schedule_weekdays=schedule_weekdays,
max_concurrent_per_account=new_max_concurrent_per_account,
max_screenshot_concurrent=new_max_screenshot_concurrent
):
# 如果修改了并发数,更新全局变量和信号量
if max_concurrent is not None and max_concurrent != max_concurrent_global:
max_concurrent_global = max_concurrent
global_semaphore = threading.Semaphore(max_concurrent)
print(f"全局并发数已更新为: {max_concurrent}")
# 如果修改了单用户并发数,更新全局变量(已有的信号量会在下次创建时使用新值)
if new_max_concurrent_per_account is not None and new_max_concurrent_per_account != max_concurrent_per_account:
max_concurrent_per_account = new_max_concurrent_per_account
print(f"单用户并发数已更新为: {max_concurrent_per_account}")
# 如果修改了截图并发数,更新信号量
if new_max_screenshot_concurrent is not None:
global screenshot_semaphore
screenshot_semaphore = threading.Semaphore(new_max_screenshot_concurrent)
print(f"截图并发数已更新为: {new_max_screenshot_concurrent}")
return jsonify({"message": "系统配置已更新"})
return jsonify({"error": "更新失败"}), 400
@app.route('/yuyx/api/schedule/execute', methods=['POST'])
@admin_required
def execute_schedule_now():
"""立即执行定时任务(无视定时时间和星期限制)"""
try:
# 在新线程中执行任务,避免阻塞请求
# 传入 skip_weekday_check=True 跳过星期检查
thread = threading.Thread(target=run_scheduled_task, args=(True,), daemon=True)
thread.start()
logger.info("[立即执行定时任务] 管理员手动触发定时任务执行(跳过星期检查)")
return jsonify({"message": "定时任务已开始执行,请查看任务列表获取进度"})
except Exception as e:
logger.error(f"[立即执行定时任务] 启动失败: {str(e)}")
return jsonify({"error": f"启动失败: {str(e)}"}), 500
# ==================== 代理配置API ====================
@app.route('/yuyx/api/proxy/config', methods=['GET'])
@admin_required
def get_proxy_config_api():
"""获取代理配置"""
config = database.get_system_config()
return jsonify({
'proxy_enabled': config.get('proxy_enabled', 0),
'proxy_api_url': config.get('proxy_api_url', ''),
'proxy_expire_minutes': config.get('proxy_expire_minutes', 3)
})
@app.route('/yuyx/api/proxy/config', methods=['POST'])
@admin_required
def update_proxy_config_api():
"""更新代理配置"""
data = request.json
proxy_enabled = data.get('proxy_enabled')
proxy_api_url = data.get('proxy_api_url', '').strip()
proxy_expire_minutes = data.get('proxy_expire_minutes')
if proxy_enabled is not None and proxy_enabled not in [0, 1]:
return jsonify({"error": "proxy_enabled必须是0或1"}), 400
if proxy_expire_minutes is not None:
if not isinstance(proxy_expire_minutes, int) or proxy_expire_minutes < 1:
return jsonify({"error": "代理有效期必须是大于0的整数"}), 400
if database.update_system_config(
proxy_enabled=proxy_enabled,
proxy_api_url=proxy_api_url,
proxy_expire_minutes=proxy_expire_minutes
):
return jsonify({"message": "代理配置已更新"})
return jsonify({"error": "更新失败"}), 400
@app.route('/yuyx/api/proxy/test', methods=['POST'])
@admin_required
def test_proxy_api():
"""测试代理连接"""
data = request.json
api_url = data.get('api_url', '').strip()
if not api_url:
return jsonify({"error": "请提供API地址"}), 400
try:
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
ip_port = response.text.strip()
if ip_port and ':' in ip_port:
return jsonify({
"success": True,
"proxy": ip_port,
"message": f"代理获取成功: {ip_port}"
})
else:
return jsonify({
"success": False,
"message": f"代理格式错误: {ip_port}"
}), 400
else:
return jsonify({
"success": False,
"message": f"HTTP错误: {response.status_code}"
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": f"连接失败: {str(e)}"
}), 500
# ==================== 服务器信息API ====================
@app.route('/yuyx/api/server/info', methods=['GET'])
@admin_required
def get_server_info_api():
"""获取服务器信息"""
import psutil
import datetime
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存信息
memory = psutil.virtual_memory()
memory_total = f"{memory.total / (1024**3):.1f}GB"
memory_used = f"{memory.used / (1024**3):.1f}GB"
memory_percent = memory.percent
# 磁盘信息
disk = psutil.disk_usage('/')
disk_total = f"{disk.total / (1024**3):.1f}GB"
disk_used = f"{disk.used / (1024**3):.1f}GB"
disk_percent = disk.percent
# 运行时长
boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
uptime_delta = datetime.datetime.now() - boot_time
days = uptime_delta.days
hours = uptime_delta.seconds // 3600
uptime = f"{days}{hours}小时"
return jsonify({
'cpu_percent': cpu_percent,
'memory_total': memory_total,
'memory_used': memory_used,
'memory_percent': memory_percent,
'disk_total': disk_total,
'disk_used': disk_used,
'disk_percent': disk_percent,
'uptime': uptime
})
# ==================== 任务统计和日志API ====================
@app.route('/yuyx/api/task/stats', methods=['GET'])
@admin_required
def get_task_stats_api():
"""获取任务统计数据"""
date_filter = request.args.get('date') # YYYY-MM-DD格式
stats = database.get_task_stats(date_filter)
return jsonify(stats)
@app.route('/yuyx/api/task/running', methods=['GET'])
@admin_required
def get_running_tasks_api():
"""获取当前运行中和排队中的任务"""
import time as time_mod
current_time = time_mod.time()
running = []
queuing = []
for account_id, info in task_status.items():
elapsed = int(current_time - info.get("start_time", current_time))
# 获取用户名
user = database.get_user_by_id(info.get("user_id"))
user_username = user['username'] if user else 'N/A'
# 获取进度信息
progress = info.get("progress", {"items": 0, "attachments": 0})
task_info = {
"account_id": account_id,
"user_id": info.get("user_id"),
"user_username": user_username,
"username": info.get("username"),
"browse_type": info.get("browse_type"),
"source": info.get("source", "manual"),
"detail_status": info.get("detail_status", "未知"),
"progress_items": progress.get("items", 0),
"progress_attachments": progress.get("attachments", 0),
"elapsed_seconds": elapsed,
"elapsed_display": f"{elapsed // 60}{elapsed % 60}" if elapsed >= 60 else f"{elapsed}"
}
if info.get("status") == "运行中":
running.append(task_info)
else:
queuing.append(task_info)
# 按开始时间排序
running.sort(key=lambda x: x["elapsed_seconds"], reverse=True)
queuing.sort(key=lambda x: x["elapsed_seconds"], reverse=True)
return jsonify({
"running": running,
"queuing": queuing,
"running_count": len(running),
"queuing_count": len(queuing),
"max_concurrent": max_concurrent_global
})
@app.route('/yuyx/api/task/logs', methods=['GET'])
@admin_required
def get_task_logs_api():
"""获取任务日志列表(支持分页和多种筛选)"""
limit = int(request.args.get('limit', 20))
offset = int(request.args.get('offset', 0))
date_filter = request.args.get('date') # YYYY-MM-DD格式
status_filter = request.args.get('status') # success/failed
source_filter = request.args.get('source') # manual/scheduled/immediate/resumed
user_id_filter = request.args.get('user_id') # 用户ID
account_filter = request.args.get('account') # 账号关键字
# 转换user_id为整数
if user_id_filter:
try:
user_id_filter = int(user_id_filter)
except ValueError:
user_id_filter = None
result = database.get_task_logs(
limit=limit,
offset=offset,
date_filter=date_filter,
status_filter=status_filter,
source_filter=source_filter,
user_id_filter=user_id_filter,
account_filter=account_filter
)
return jsonify(result)
@app.route('/yuyx/api/task/logs/clear', methods=['POST'])
@admin_required
def clear_old_task_logs_api():
"""清理旧的任务日志"""
data = request.json or {}
days = data.get('days', 30)
if not isinstance(days, int) or days < 1:
return jsonify({"error": "天数必须是大于0的整数"}), 400
deleted_count = database.delete_old_task_logs(days)
return jsonify({"message": f"已删除{days}天前的{deleted_count}条日志"})
@app.route('/yuyx/api/docker/restart', methods=['POST'])
@admin_required
def restart_docker_container():
"""重启Docker容器"""
import subprocess
import os
try:
# 检查是否在Docker容器中运行
if not os.path.exists('/.dockerenv'):
return jsonify({"error": "当前不在Docker容器中运行"}), 400
# 记录日志
app_logger.info("[系统] 管理员触发Docker容器重启")
# 使用nohup在后台执行重启命令避免阻塞
# 容器重启会导致当前进程终止,所以需要延迟执行
restart_script = """
import os
import time
# 延迟3秒让响应返回给客户端
time.sleep(3)
# 退出Python进程让Docker自动重启容器restart: unless-stopped
os._exit(0)
"""
# 写入临时脚本
with open('/tmp/restart_container.py', 'w') as f:
f.write(restart_script)
# 在后台执行重启脚本
subprocess.Popen(['python', '/tmp/restart_container.py'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True)
return jsonify({
"success": True,
"message": "容器将在3秒后重启请稍后刷新页面"
})
except Exception as e:
app_logger.error(f"[系统] Docker容器重启失败: {str(e)}")
return jsonify({"error": f"重启失败: {str(e)}"}), 500
# ==================== 定时任务调度器 ====================
def run_scheduled_task(skip_weekday_check=False):
"""执行所有账号的浏览任务(可被手动调用,过滤重复账号)
Args:
skip_weekday_check: 是否跳过星期检查立即执行时为True
"""
try:
from datetime import datetime
import pytz
config = database.get_system_config()
browse_type = config.get('schedule_browse_type', '应读')
# 检查今天是否在允许执行的星期列表中(立即执行时跳过此检查)
if not skip_weekday_check:
# 获取北京时间的星期几 (1=周一, 7=周日)
beijing_tz = pytz.timezone('Asia/Shanghai')
now_beijing = datetime.now(beijing_tz)
current_weekday = now_beijing.isoweekday() # 1-7
# 获取配置的星期列表
schedule_weekdays = config.get('schedule_weekdays', '1,2,3,4,5,6,7')
allowed_weekdays = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()]
if current_weekday not in allowed_weekdays:
weekday_names = ['', '周一', '周二', '周三', '周四', '周五', '周六', '周日']
print(f"[定时任务] 今天是{weekday_names[current_weekday]},不在执行日期内,跳过执行")
return
else:
print(f"[立即执行] 跳过星期检查,强制执行任务")
print(f"[定时任务] 开始执行 - 浏览类型: {browse_type}")
# 获取所有已审核用户的所有账号
all_users = database.get_all_users()
approved_users = [u for u in all_users if u['status'] == 'approved']
# 用于记录已执行的账号用户名,避免重复
executed_usernames = set()
total_accounts = 0
skipped_duplicates = 0
executed_accounts = 0
for user in approved_users:
user_id = user['id']
if user_id not in user_accounts:
load_user_accounts(user_id)
accounts = user_accounts.get(user_id, {})
for account_id, account in accounts.items():
total_accounts += 1
# 跳过正在运行的账号
if account.is_running:
continue
# 检查账号状态,跳过已暂停的账号
account_status_info = database.get_account_status(account_id)
if account_status_info:
status = account_status_info['status'] if 'status' in account_status_info.keys() else 'active'
if status == 'suspended':
fail_count = account_status_info['login_fail_count'] if 'login_fail_count' in account_status_info.keys() else 0
print(f"[定时任务] 跳过暂停账号: {account.username} (用户:{user['username']}) - 连续{fail_count}次密码错误,需修改密码")
continue
# 检查账号用户名是否已经执行过(重复账号过滤)
if account.username in executed_usernames:
skipped_duplicates += 1
print(f"[定时任务] 跳过重复账号: {account.username} (用户:{user['username']}) - 该账号已被其他用户执行")
continue
# 记录该账号用户名,避免后续重复执行
executed_usernames.add(account.username)
print(f"[定时任务] 启动账号: {account.username} (用户:{user['username']})")
# 启动任务
account.is_running = True
account.should_stop = False
account.status = "运行中"
# 获取系统配置的截图开关
cfg = database.get_system_config()
enable_screenshot_scheduled = cfg.get("enable_screenshot", 0) == 1
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot_scheduled, 'scheduled'),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
executed_accounts += 1
# 发送更新到用户
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 间隔启动,避免瞬间并发过高
time.sleep(2)
print(f"[定时任务] 执行完成 - 总账号数:{total_accounts}, 已执行:{executed_accounts}, 跳过重复:{skipped_duplicates}")
except Exception as e:
print(f"[定时任务] 执行出错: {str(e)}")
logger.error(f"[定时任务] 执行异常: {str(e)}")
def status_push_worker():
"""后台线程:每秒推送运行中任务的状态更新"""
while True:
try:
# 遍历所有运行中的任务状态
for account_id, status_info in list(task_status.items()):
user_id = status_info.get('user_id')
if user_id:
# 获取账号对象
if user_id in user_accounts and account_id in user_accounts[user_id]:
account = user_accounts[user_id][account_id]
account_data = account.to_dict()
# 推送账号状态更新
socketio.emit('account_update', account_data, room=f'user_{user_id}')
# 同时推送详细进度事件(方便前端分别处理)
progress = status_info.get('progress', {})
progress_data = {
'account_id': account_id,
'stage': status_info.get('detail_status', ''),
'total_items': account.total_items,
'browsed_items': progress.get('items', 0),
'total_attachments': account.total_attachments,
'viewed_attachments': progress.get('attachments', 0),
'start_time': status_info.get('start_time', 0),
'elapsed_seconds': account_data.get('elapsed_seconds', 0),
'elapsed_display': account_data.get('elapsed_display', '')
}
socketio.emit('task_progress', progress_data, room=f'user_{user_id}')
time.sleep(1) # 每秒推送一次
except Exception as e:
logger.debug(f"状态推送出错: {e}")
time.sleep(1)
def scheduled_task_worker():
"""定时任务工作线程"""
import schedule
def cleanup_expired_captcha():
"""清理过期验证码,防止内存泄漏"""
try:
current_time = time.time()
expired_keys = [k for k, v in captcha_storage.items()
if v["expire_time"] < current_time]
deleted_count = len(expired_keys)
for k in expired_keys:
del captcha_storage[k]
if deleted_count > 0:
print(f"[定时清理] 已清理 {deleted_count} 个过期验证码")
except Exception as e:
print(f"[定时清理] 清理验证码出错: {str(e)}")
def cleanup_old_data():
"""清理7天前的截图和日志"""
try:
print(f"[定时清理] 开始清理7天前的数据...")
# 清理7天前的任务日志
deleted_logs = database.delete_old_task_logs(7)
print(f"[定时清理] 已删除 {deleted_logs} 条任务日志")
# 清理30天前的操作日志
deleted_operation_logs = database.clean_old_operation_logs(30)
print(f"[定时清理] 已删除 {deleted_operation_logs} 条操作日志")
# 清理7天前的截图
deleted_screenshots = 0
if os.path.exists(SCREENSHOTS_DIR):
cutoff_time = time.time() - (7 * 24 * 60 * 60) # 7天前的时间戳
for filename in os.listdir(SCREENSHOTS_DIR):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(SCREENSHOTS_DIR, filename)
try:
# 检查文件修改时间
if os.path.getmtime(filepath) < cutoff_time:
os.remove(filepath)
deleted_screenshots += 1
except Exception as e:
print(f"[定时清理] 删除截图失败 {filename}: {str(e)}")
print(f"[定时清理] 已删除 {deleted_screenshots} 个截图文件")
print(f"[定时清理] 清理完成!")
except Exception as e:
print(f"[定时清理] 清理任务出错: {str(e)}")
def check_user_schedules():
"""检查并执行用户定时任务"""
import json
try:
from datetime import datetime
beijing_tz = pytz.timezone('Asia/Shanghai')
now = datetime.now(beijing_tz)
current_time = now.strftime('%H:%M')
current_weekday = now.isoweekday()
# 获取所有启用的用户定时任务
enabled_schedules = database.get_enabled_user_schedules()
for schedule_config in enabled_schedules:
# 检查时间是否匹配
if schedule_config['schedule_time'] != current_time:
continue
# 检查星期是否匹配
allowed_weekdays = [int(d) for d in schedule_config.get('weekdays', '1,2,3,4,5').split(',') if d.strip()]
if current_weekday not in allowed_weekdays:
continue
# 检查今天是否已经执行过
last_run = schedule_config.get('last_run_at')
if last_run:
try:
last_run_date = datetime.strptime(last_run, '%Y-%m-%d %H:%M:%S').date()
if last_run_date == now.date():
continue # 今天已执行过
except:
pass
# 执行用户定时任务
user_id = schedule_config['user_id']
schedule_id = schedule_config['id']
browse_type = schedule_config.get('browse_type', '应读')
enable_screenshot = schedule_config.get('enable_screenshot', 1)
# 调试日志
print(f"[DEBUG] 定时任务 {schedule_config.get('name')}: enable_screenshot={enable_screenshot} (类型:{type(enable_screenshot).__name__})")
try:
account_ids = json.loads(schedule_config.get('account_ids', '[]') or '[]')
except:
account_ids = []
if not account_ids:
continue
print(f"[用户定时任务] 用户 {schedule_config.get('user_username', user_id)} 的任务 '{schedule_config.get('name', '')}' 开始执行")
# 创建执行日志
import time as time_mod
execution_start_time = time_mod.time()
log_id = database.create_schedule_execution_log(
schedule_id=schedule_id,
user_id=user_id,
schedule_name=schedule_config.get('name', '未命名任务')
)
started_count = 0
for account_id in account_ids:
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
continue
account = user_accounts[user_id][account_id]
if account.is_running:
continue
account.is_running = True
account.should_stop = False
account.status = "排队中"
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
started_count += 1
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 更新最后执行时间
database.update_schedule_last_run(schedule_id)
# 更新执行日志
execution_duration = int(time_mod.time() - execution_start_time)
database.update_schedule_execution_log(
log_id,
total_accounts=len(account_ids),
success_accounts=started_count,
failed_accounts=len(account_ids) - started_count,
duration_seconds=execution_duration,
status='completed'
)
print(f"[用户定时任务] 已启动 {started_count} 个账号")
except Exception as e:
print(f"[用户定时任务] 检查出错: {str(e)}")
import traceback
traceback.print_exc()
# 每分钟检查一次配置
def check_and_schedule():
config = database.get_system_config()
# 清除旧的任务
schedule.clear()
# 时区转换函数将CST时间转换为UTC时间容器使用UTC
def cst_to_utc_time(cst_time_str):
"""将CST时间字符串HH:MM转换为UTC时间字符串
Args:
cst_time_str: CST时间字符串格式为 HH:MM
Returns:
UTC时间字符串格式为 HH:MM
"""
from datetime import datetime, timedelta
# 解析CST时间
hour, minute = map(int, cst_time_str.split(':'))
# CST是UTC+8所以UTC时间 = CST时间 - 8小时
utc_hour = (hour - 8) % 24
return f"{utc_hour:02d}:{minute:02d}"
# 始终添加每天凌晨3点CST的数据清理任务
cleanup_utc_time = cst_to_utc_time("03:00")
schedule.every().day.at(cleanup_utc_time).do(cleanup_old_data)
print(f"[定时任务] 已设置数据清理任务: 每天 CST 03:00 (UTC {cleanup_utc_time})")
# 每小时清理过期验证码
schedule.every().hour.do(cleanup_expired_captcha)
print(f"[定时任务] 已设置验证码清理任务: 每小时执行一次")
# 如果启用了定时浏览任务,则添加
if config.get('schedule_enabled'):
schedule_time_cst = config.get('schedule_time', '02:00')
schedule_time_utc = cst_to_utc_time(schedule_time_cst)
schedule.every().day.at(schedule_time_utc).do(run_scheduled_task)
print(f"[定时任务] 已设置浏览任务: 每天 CST {schedule_time_cst} (UTC {schedule_time_utc})")
# 初始检查
check_and_schedule()
last_check = time.time()
while True:
try:
# 执行待执行的任务
schedule.run_pending()
# 每5秒重新检查一次配置提高检查频率以确保定时任务准时执行
if time.time() - last_check > 5:
check_and_schedule()
check_user_schedules() # 检查用户定时任务
last_check = time.time()
time.sleep(1)
except Exception as e:
print(f"[定时任务] 调度器出错: {str(e)}")
time.sleep(5)
# ========== 断点续传API ==========
@app.route('/yuyx/api/checkpoint/paused')
@admin_required
def checkpoint_get_paused():
try:
user_id = request.args.get('user_id', type=int)
tasks = checkpoint_mgr.get_paused_tasks(user_id=user_id)
return jsonify({'success': True, 'tasks': tasks})
except Exception as e:
logger.error(f"获取暂停任务失败: {e}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/yuyx/api/checkpoint/<task_id>/resume', methods=['POST'])
@admin_required
def checkpoint_resume(task_id):
try:
checkpoint = checkpoint_mgr.get_checkpoint(task_id)
if not checkpoint:
return jsonify({'success': False, 'message': '任务不存在'}), 404
if checkpoint['status'] != 'paused':
return jsonify({'success': False, 'message': '任务未暂停'}), 400
if checkpoint_mgr.resume_task(task_id):
import threading
threading.Thread(
target=run_task,
args=(checkpoint['user_id'], checkpoint['account_id'], checkpoint['browse_type'], True, 'resumed'),
daemon=True
).start()
return jsonify({'success': True})
return jsonify({'success': False}), 500
except Exception as e:
logger.error(f"恢复任务失败: {e}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/yuyx/api/checkpoint/<task_id>/abandon', methods=['POST'])
@admin_required
def checkpoint_abandon(task_id):
try:
if checkpoint_mgr.abandon_task(task_id):
return jsonify({'success': True})
return jsonify({'success': False}), 404
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
# 初始化浏览器池(在后台线程中预热,不阻塞启动)
# ==================== 用户定时任务API ====================
@app.route('/api/schedules', methods=['GET'])
@login_required
def get_user_schedules_api():
"""获取当前用户的所有定时任务"""
schedules = database.get_user_schedules(current_user.id)
import json
for s in schedules:
try:
s['account_ids'] = json.loads(s.get('account_ids', '[]') or '[]')
except:
s['account_ids'] = []
return jsonify(schedules)
@app.route('/api/schedules', methods=['POST'])
@login_required
def create_user_schedule_api():
"""创建用户定时任务"""
data = request.json
name = data.get('name', '我的定时任务')
schedule_time = data.get('schedule_time', '08:00')
weekdays = data.get('weekdays', '1,2,3,4,5')
browse_type = data.get('browse_type', '应读')
enable_screenshot = data.get('enable_screenshot', 1)
account_ids = data.get('account_ids', [])
import re
if not re.match(r'^\d{2}:\d{2}$', schedule_time):
return jsonify({"error": "时间格式不正确,应为 HH:MM"}), 400
schedule_id = database.create_user_schedule(
user_id=current_user.id,
name=name,
schedule_time=schedule_time,
weekdays=weekdays,
browse_type=browse_type,
enable_screenshot=enable_screenshot,
account_ids=account_ids
)
if schedule_id:
return jsonify({"success": True, "id": schedule_id})
return jsonify({"error": "创建失败"}), 500
@app.route('/api/schedules/<int:schedule_id>', methods=['GET'])
@login_required
def get_schedule_detail_api(schedule_id):
"""获取定时任务详情"""
schedule = database.get_schedule_by_id(schedule_id)
if not schedule:
return jsonify({"error": "定时任务不存在"}), 404
if schedule['user_id'] != current_user.id:
return jsonify({"error": "无权访问"}), 403
import json
try:
schedule['account_ids'] = json.loads(schedule.get('account_ids', '[]') or '[]')
except:
schedule['account_ids'] = []
return jsonify(schedule)
@app.route('/api/schedules/<int:schedule_id>', methods=['PUT'])
@login_required
def update_schedule_api(schedule_id):
"""更新定时任务"""
schedule = database.get_schedule_by_id(schedule_id)
if not schedule:
return jsonify({"error": "定时任务不存在"}), 404
if schedule['user_id'] != current_user.id:
return jsonify({"error": "无权访问"}), 403
data = request.json
allowed_fields = ['name', 'schedule_time', 'weekdays', 'browse_type',
'enable_screenshot', 'account_ids', 'enabled']
update_data = {k: v for k, v in data.items() if k in allowed_fields}
if 'schedule_time' in update_data:
import re
if not re.match(r'^\d{2}:\d{2}$', update_data['schedule_time']):
return jsonify({"error": "时间格式不正确"}), 400
success = database.update_user_schedule(schedule_id, **update_data)
if success:
return jsonify({"success": True})
return jsonify({"error": "更新失败"}), 500
@app.route('/api/schedules/<int:schedule_id>', methods=['DELETE'])
@login_required
def delete_schedule_api(schedule_id):
"""删除定时任务"""
schedule = database.get_schedule_by_id(schedule_id)
if not schedule:
return jsonify({"error": "定时任务不存在"}), 404
if schedule['user_id'] != current_user.id:
return jsonify({"error": "无权访问"}), 403
success = database.delete_user_schedule(schedule_id)
if success:
return jsonify({"success": True})
return jsonify({"error": "删除失败"}), 500
@app.route('/api/schedules/<int:schedule_id>/toggle', methods=['POST'])
@login_required
def toggle_schedule_api(schedule_id):
"""启用/禁用定时任务"""
schedule = database.get_schedule_by_id(schedule_id)
if not schedule:
return jsonify({"error": "定时任务不存在"}), 404
if schedule['user_id'] != current_user.id:
return jsonify({"error": "无权访问"}), 403
data = request.json
enabled = data.get('enabled', not schedule['enabled'])
success = database.toggle_user_schedule(schedule_id, enabled)
if success:
return jsonify({"success": True, "enabled": enabled})
return jsonify({"error": "操作失败"}), 500
@app.route('/api/schedules/<int:schedule_id>/run', methods=['POST'])
@login_required
def run_schedule_now_api(schedule_id):
"""立即执行定时任务"""
import json
schedule = database.get_schedule_by_id(schedule_id)
if not schedule:
return jsonify({"error": "定时任务不存在"}), 404
if schedule['user_id'] != current_user.id:
return jsonify({"error": "无权访问"}), 403
try:
account_ids = json.loads(schedule.get('account_ids', '[]') or '[]')
except:
account_ids = []
if not account_ids:
return jsonify({"error": "没有配置账号"}), 400
user_id = current_user.id
browse_type = schedule['browse_type']
enable_screenshot = schedule['enable_screenshot']
started = []
for account_id in account_ids:
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
continue
account = user_accounts[user_id][account_id]
if account.is_running:
continue
account.is_running = True
account.should_stop = False
account.status = "排队中"
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
started.append(account_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
database.update_schedule_last_run(schedule_id)
return jsonify({
"success": True,
"started_count": len(started),
"message": f"已启动 {len(started)} 个账号"
})
# ==================== 定时任务执行日志API ====================
@app.route('/api/schedules/<int:schedule_id>/logs', methods=['GET'])
@login_required
def get_schedule_logs_api(schedule_id):
"""获取定时任务执行日志"""
schedule = database.get_schedule_by_id(schedule_id)
if not schedule:
return jsonify({"error": "定时任务不存在"}), 404
if schedule['user_id'] != current_user.id:
return jsonify({"error": "无权访问"}), 403
limit = request.args.get('limit', 10, type=int)
logs = database.get_schedule_execution_logs(schedule_id, limit)
return jsonify(logs)
# ==================== 批量操作API ====================
@app.route('/api/accounts/batch/start', methods=['POST'])
@login_required
def batch_start_accounts():
"""批量启动账号"""
user_id = current_user.id
data = request.json
account_ids = data.get('account_ids', [])
browse_type = data.get('browse_type', '应读')
enable_screenshot = data.get('enable_screenshot', True)
if not account_ids:
return jsonify({"error": "请选择要启动的账号"}), 400
started = []
failed = []
for account_id in account_ids:
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
failed.append({'id': account_id, 'reason': '账号不存在'})
continue
account = user_accounts[user_id][account_id]
if account.is_running:
failed.append({'id': account_id, 'reason': '已在运行中'})
continue
account.is_running = True
account.should_stop = False
account.status = "排队中"
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot, 'batch'),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
started.append(account_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return jsonify({
"success": True,
"started_count": len(started),
"failed_count": len(failed),
"started": started,
"failed": failed
})
@app.route('/api/accounts/batch/stop', methods=['POST'])
@login_required
def batch_stop_accounts():
"""批量停止账号"""
user_id = current_user.id
data = request.json
account_ids = data.get('account_ids', [])
if not account_ids:
return jsonify({"error": "请选择要停止的账号"}), 400
stopped = []
for account_id in account_ids:
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
continue
account = user_accounts[user_id][account_id]
if not account.is_running:
continue
account.should_stop = True
account.status = "正在停止"
stopped.append(account_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return jsonify({
"success": True,
"stopped_count": len(stopped),
"stopped": stopped
})
if __name__ == '__main__':
print("=" * 60)
print("知识管理平台自动化工具 - 多用户版")
print("=" * 60)
# 初始化数据库
database.init_database()
checkpoint_mgr = get_checkpoint_manager()
print("✓ 任务断点管理器已初始化")
# 加载系统配置(并发设置)
try:
system_config = database.get_system_config()
if system_config:
# 使用globals()修改全局变量
globals()['max_concurrent_global'] = system_config.get('max_concurrent_global', 2)
globals()['max_concurrent_per_account'] = system_config.get('max_concurrent_per_account', 1)
# 重新创建信号量
globals()['global_semaphore'] = threading.Semaphore(globals()['max_concurrent_global'])
print(f"✓ 已加载并发配置: 全局={globals()['max_concurrent_global']}, 单账号={globals()['max_concurrent_per_account']}")
except Exception as e:
print(f"警告: 加载并发配置失败,使用默认值: {e}")
# 主线程初始化浏览器Playwright不支持跨线程
print("\n正在初始化浏览器管理器...")
init_browser_manager()
# 启动定时任务调度器
print("\n启动定时任务调度器...")
scheduler_thread = threading.Thread(target=scheduled_task_worker, daemon=True)
scheduler_thread.start()
print("✓ 定时任务调度器已启动")
# 启动状态推送线程(每秒推送运行中任务状态)
status_thread = threading.Thread(target=status_push_worker, daemon=True)
status_thread.start()
print("✓ 状态推送线程已启动1秒/次)")
# 启动Web服务器
print("\n服务器启动中...")
print(f"用户访问地址: http://{config.SERVER_HOST}:{config.SERVER_PORT}")
print(f"后台管理地址: http://{config.SERVER_HOST}:{config.SERVER_PORT}/yuyx")
print("默认管理员: admin/admin")
print("=" * 60 + "\n")
# 同步初始化浏览器池必须在socketio.run之前否则eventlet会导致asyncio冲突
try:
system_cfg = database.get_system_config()
pool_size = system_cfg.get('max_screenshot_concurrent', 3) if system_cfg else 3
print(f"正在预热 {pool_size} 个浏览器实例(截图加速)...")
init_browser_worker_pool(pool_size=pool_size)
print("✓ 浏览器池初始化完成")
except Exception as e:
print(f"警告: 浏览器池初始化失败: {e}")
socketio.run(app, host=config.SERVER_HOST, port=config.SERVER_PORT, debug=config.DEBUG, allow_unsafe_werkzeug=True)