#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 知识管理平台自动化工具 - 多用户版本 支持用户注册登录、后台管理、数据隔离 """ # 设置时区为中国标准时间(CST, UTC+8) import os os.environ['TZ'] = 'Asia/Shanghai' try: import time time.tzset() except AttributeError: pass # Windows系统不支持tzset() import pytz from datetime import datetime from flask import Flask, render_template, request, jsonify, send_from_directory, redirect, url_for, session from flask_socketio import SocketIO, emit, join_room, leave_room from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user import threading import time import json import os from datetime import datetime, timedelta, timezone from functools import wraps # 导入数据库模块和核心模块 import database import requests from browser_pool import get_browser_pool, init_browser_pool from browser_pool_worker import get_browser_worker_pool, init_browser_worker_pool, shutdown_browser_worker_pool from playwright_automation import PlaywrightBrowserManager, PlaywrightAutomation, BrowseResult from api_browser import APIBrowser, APIBrowseResult from browser_installer import check_and_install_browser # ========== 优化模块导入 ========== from app_config import get_config from app_logger import init_logging, get_logger, audit_logger from app_security import ( ip_rate_limiter, require_ip_not_locked, validate_username, validate_password, validate_email, is_safe_path, sanitize_filename, get_client_ip ) from app_utils import verify_and_consume_captcha # ========== 初始化配置 ========== config = get_config() app = Flask(__name__) app.config.from_object(config) # 确保SECRET_KEY已设置 if not app.config.get('SECRET_KEY'): raise RuntimeError("SECRET_KEY未配置,请检查app_config.py") socketio = SocketIO( app, cors_allowed_origins="*", async_mode='threading', # 明确指定async模式 ping_timeout=60, # ping超时60秒 ping_interval=25, # 每25秒ping一次 logger=False, # 禁用socketio debug日志 engineio_logger=False ) # ========== 初始化日志系统 ========== init_logging(log_level=config.LOG_LEVEL, log_file=config.LOG_FILE) logger = get_logger('app') logger.info("="*60) logger.info("知识管理平台自动化工具 - 多用户版") logger.info("="*60) logger.info(f"Session配置: COOKIE_NAME={app.config.get('SESSION_COOKIE_NAME', 'session')}, " f"SAMESITE={app.config.get('SESSION_COOKIE_SAMESITE', 'None')}, " f"HTTPONLY={app.config.get('SESSION_COOKIE_HTTPONLY', 'None')}, " f"SECURE={app.config.get('SESSION_COOKIE_SECURE', 'None')}, " f"PATH={app.config.get('SESSION_COOKIE_PATH', 'None')}") # Flask-Login 配置 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login_page' @login_manager.unauthorized_handler def unauthorized(): """处理未授权访问 - API请求返回JSON,页面请求重定向""" if request.path.startswith('/api/') or request.path.startswith('/yuyx/api/'): return jsonify({"error": "请先登录", "code": "unauthorized"}), 401 return redirect(url_for('login_page', next=request.url)) # 截图目录 SCREENSHOTS_DIR = config.SCREENSHOTS_DIR os.makedirs(SCREENSHOTS_DIR, exist_ok=True) # 全局变量 browser_manager = None user_accounts = {} # {user_id: {account_id: Account对象}} active_tasks = {} # {account_id: Thread对象} task_status = {} # {account_id: {"user_id": x, "username": y, "status": "排队中/运行中", "detail_status": "具体状态", "browse_type": z, "start_time": t, "source": s, "progress": {...}, "is_vip": bool}} # VIP优先级队列 vip_task_queue = [] # VIP用户任务队列 normal_task_queue = [] # 普通用户任务队列 task_queue_lock = threading.Lock() log_cache = {} # {user_id: [logs]} 每个用户独立的日志缓存 log_cache_total_count = 0 # 全局日志总数,防止无限增长 # 日志缓存限制 MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条 MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏 # 并发控制:每个用户同时最多运行1个账号(避免内存不足) # 验证码存储:{session_id: {"code": "1234", "expire_time": timestamp, "failed_attempts": 0}} captcha_storage = {} # IP限流存储:{ip: {"attempts": count, "lock_until": timestamp, "first_attempt": timestamp}} ip_rate_limit = {} # 限流配置 - 从 config 读取,避免硬编码 MAX_CAPTCHA_ATTEMPTS = config.MAX_CAPTCHA_ATTEMPTS MAX_IP_ATTEMPTS_PER_HOUR = config.MAX_IP_ATTEMPTS_PER_HOUR IP_LOCK_DURATION = config.IP_LOCK_DURATION # 全局限制:整个系统同时最多运行N个账号(线程本地架构,每个线程独立浏览器,内存占用约200MB/浏览器) max_concurrent_per_account = config.MAX_CONCURRENT_PER_ACCOUNT max_concurrent_global = config.MAX_CONCURRENT_GLOBAL user_semaphores = {} # {user_id: Semaphore} global_semaphore = threading.Semaphore(max_concurrent_global) # 截图专用信号量:限制同时进行的截图任务数量为1(避免资源竞争) # ���图信号量将在首次使用时初始化 screenshot_semaphore = None screenshot_semaphore_lock = threading.Lock() def get_screenshot_semaphore(): """获取截图信号量(懒加载,根据配置动态创建)""" global screenshot_semaphore with screenshot_semaphore_lock: config = database.get_system_config() max_concurrent = config.get('max_screenshot_concurrent', 3) if screenshot_semaphore is None: screenshot_semaphore = threading.Semaphore(max_concurrent) return screenshot_semaphore, max_concurrent class User(UserMixin): """Flask-Login 用户类""" def __init__(self, user_id): self.id = user_id class Admin(UserMixin): """管理员类""" def __init__(self, admin_id): self.id = admin_id self.is_admin = True class Account: """账号类""" def __init__(self, account_id, user_id, username, password, remember=True, remark=''): self.id = account_id self.user_id = user_id self.username = username self.password = password self.remember = remember self.remark = remark self.status = "未开始" self.is_running = False self.should_stop = False self.total_items = 0 self.total_attachments = 0 self.automation = None self.last_browse_type = "注册前未读" self.proxy_config = None # 保存代理配置,浏览和截图共用 def to_dict(self): result = { "id": self.id, "username": self.username, "status": self.status, "remark": self.remark, "total_items": self.total_items, "total_attachments": self.total_attachments, "is_running": self.is_running } # 添加详细进度信息(如果有) if self.id in task_status: ts = task_status[self.id] progress = ts.get('progress', {}) result['detail_status'] = ts.get('detail_status', '') result['progress_items'] = progress.get('items', 0) result['progress_attachments'] = progress.get('attachments', 0) result['start_time'] = ts.get('start_time', 0) # 计算运行时长 if ts.get('start_time'): import time elapsed = int(time.time() - ts['start_time']) result['elapsed_seconds'] = elapsed mins, secs = divmod(elapsed, 60) result['elapsed_display'] = f"{mins}分{secs}秒" else: # 非运行状态下,根据status设置detail_status status_map = { '已完成': '任务完成', '截图中': '正在截图', '浏览完成': '浏览完成', '登录失败': '登录失败', '已暂停': '任务已暂停' } for key, val in status_map.items(): if key in self.status: result['detail_status'] = val break return result @login_manager.user_loader def load_user(user_id): """Flask-Login 用户加载""" user = database.get_user_by_id(int(user_id)) if user: return User(user['id']) return None def admin_required(f): """管理员权限装饰器""" @wraps(f) def decorated_function(*args, **kwargs): logger.debug(f"[admin_required] Session内容: {dict(session)}") logger.debug(f"[admin_required] Cookies: {request.cookies}") if 'admin_id' not in session: logger.warning(f"[admin_required] 拒绝访问 {request.path} - session中无admin_id") return jsonify({"error": "需要管理员权限"}), 403 logger.info(f"[admin_required] 管理员 {session.get('admin_username')} 访问 {request.path}") return f(*args, **kwargs) return decorated_function def log_to_client(message, user_id=None, account_id=None): """发送日志到Web客户端(用户隔离)""" beijing_tz = timezone(timedelta(hours=8)) timestamp = datetime.now(beijing_tz).strftime('%H:%M:%S') log_data = { 'timestamp': timestamp, 'message': message, 'account_id': account_id } # 如果指定了user_id,则缓存到该用户的日志 if user_id: global log_cache_total_count if user_id not in log_cache: log_cache[user_id] = [] log_cache[user_id].append(log_data) log_cache_total_count += 1 # 持久化到数据库 (已禁用,使用task_logs表代替) # try: # database.save_operation_log(user_id, message, account_id, 'INFO') # except Exception as e: # print(f"保存日志到数据库失败: {e}") # 单用户限制 if len(log_cache[user_id]) > MAX_LOGS_PER_USER: log_cache[user_id].pop(0) log_cache_total_count -= 1 # 全局限制 - 如果超过总数限制,清理日志最多的用户 while log_cache_total_count > MAX_TOTAL_LOGS: if log_cache: max_user = max(log_cache.keys(), key=lambda u: len(log_cache[u])) if log_cache[max_user]: log_cache[max_user].pop(0) log_cache_total_count -= 1 else: break else: break # 发送到该用户的room socketio.emit('log', log_data, room=f'user_{user_id}') # 控制台日志:添加账号短标识便于区分 if account_id: # 显示账号ID前4位作为标识 short_id = account_id[:4] if len(account_id) >= 4 else account_id print(f"[{timestamp}] U{user_id}:{short_id} | {message}") else: print(f"[{timestamp}] U{user_id} | {message}") def get_proxy_from_api(api_url, max_retries=3): """从API获取代理IP(支持重试) Args: api_url: 代理API地址 max_retries: 最大重试次数 Returns: 代理服务器地址(格式: http://IP:PORT)或 None """ import re # IP:PORT 格式正则 ip_port_pattern = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$') for attempt in range(max_retries): try: response = requests.get(api_url, timeout=10) if response.status_code == 200: text = response.text.strip() # 尝试解析JSON响应 try: import json data = json.loads(text) # 检查是否是错误响应 if isinstance(data, dict): if data.get('status') != 200 and data.get('status') != 0: error_msg = data.get('msg', data.get('message', '未知错误')) print(f"✗ 代理API返回错误: {error_msg} (尝试 {attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(1) continue # 尝试从JSON中获取IP ip_port = data.get('ip') or data.get('proxy') or data.get('data') if ip_port: text = str(ip_port).strip() except (json.JSONDecodeError, ValueError): # 不是JSON,继续使用原始文本 pass # 验证IP:PORT格式 if ip_port_pattern.match(text): proxy_server = f"http://{text}" print(f"✓ 获取代理成功: {proxy_server} (尝试 {attempt + 1}/{max_retries})") return proxy_server else: print(f"✗ 代理格式无效: {text[:50]} (尝试 {attempt + 1}/{max_retries})") else: print(f"✗ 获取代理失败: HTTP {response.status_code} (尝试 {attempt + 1}/{max_retries})") except Exception as e: print(f"✗ 获取代理异常: {str(e)} (尝试 {attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(1) print(f"✗ 获取代理失败,已重试 {max_retries} 次,将不使用代理继续") return None def init_browser_manager(): """初始化浏览器管理器""" global browser_manager if browser_manager is None: print("正在初始化Playwright浏览器管理器...") if not check_and_install_browser(log_callback=lambda msg, account_id=None: print(msg)): print("浏览器环境检查失败!") return False browser_manager = PlaywrightBrowserManager( headless=True, log_callback=lambda msg, account_id=None: print(msg) ) try: # 不再需要initialize(),每个账号会创建独立浏览器 print("Playwright浏览器管理器创建成功!") return True except Exception as e: print(f"Playwright初始化失败: {str(e)}") return False return True # ==================== 前端路由 ==================== @app.route('/') def index(): """主页 - 重定向到登录或应用""" if current_user.is_authenticated: return redirect(url_for('app_page')) return redirect(url_for('login_page')) @app.route('/login') def login_page(): """登录页面""" return render_template('login.html') @app.route('/register') def register_page(): """注册页面""" return render_template('register.html') @app.route('/app') @login_required def app_page(): """主应用页面""" return render_template('index.html') @app.route('/yuyx') def admin_login_page(): """后台登录页面""" if 'admin_id' in session: return redirect(url_for('admin_page')) return render_template('admin_login.html') @app.route('/yuyx/admin') @admin_required def admin_page(): """后台管理页面""" return render_template('admin.html') @app.route('/yuyx/vip') @admin_required def vip_admin_page(): """VIP管理页面""" return render_template('vip_admin.html') # ==================== 用户认证API ==================== @app.route('/api/register', methods=['POST']) @require_ip_not_locked # IP限流保护 def register(): """用户注册""" data = request.json username = data.get('username', '').strip() password = data.get('password', '').strip() email = data.get('email', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() if not username or not password: return jsonify({"error": "用户名和密码不能为空"}), 400 # 获取客户端IP(用于IP限流检查) client_ip = get_client_ip() # 检查IP限流 allowed, error_msg = check_ip_rate_limit(client_ip) if not allowed: return jsonify({"error": error_msg}), 429 # 验证验证码 success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage, MAX_CAPTCHA_ATTEMPTS) if not success: # 验证失败,记录IP失败尝试(注册特有的IP限流逻辑) is_locked = record_failed_captcha(client_ip) if is_locked: return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429 return jsonify({"error": message}), 400 user_id = database.create_user(username, password, email) if user_id: return jsonify({"success": True, "message": "注册成功,请等待管理员审核"}) else: return jsonify({"error": "用户名已存在"}), 400 # ==================== 验证码API ==================== import random from task_checkpoint import get_checkpoint_manager, TaskStage checkpoint_mgr = None # 任务断点管理器 def check_ip_rate_limit(ip_address): """检查IP是否被限流""" current_time = time.time() # 清理过期的IP记录 expired_ips = [ip for ip, data in ip_rate_limit.items() if data.get("lock_until", 0) < current_time and current_time - data.get("first_attempt", current_time) > 3600] for ip in expired_ips: del ip_rate_limit[ip] # 检查IP是否被锁定 if ip_address in ip_rate_limit: ip_data = ip_rate_limit[ip_address] # 如果IP被锁定且未到解锁时间 if ip_data.get("lock_until", 0) > current_time: remaining_time = int(ip_data["lock_until"] - current_time) return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1) # 如果超过1小时,重置计数 if current_time - ip_data.get("first_attempt", current_time) > 3600: ip_rate_limit[ip_address] = { "attempts": 0, "first_attempt": current_time } return True, None def record_failed_captcha(ip_address): """记录验证码失败尝试""" current_time = time.time() if ip_address not in ip_rate_limit: ip_rate_limit[ip_address] = { "attempts": 1, "first_attempt": current_time } else: ip_rate_limit[ip_address]["attempts"] += 1 # 检查是否超过限制 if ip_rate_limit[ip_address]["attempts"] >= MAX_IP_ATTEMPTS_PER_HOUR: ip_rate_limit[ip_address]["lock_until"] = current_time + IP_LOCK_DURATION return True # 表示IP已被锁定 return False # 表示还未锁定 @app.route("/api/generate_captcha", methods=["POST"]) def generate_captcha(): """生成4位数字验证码""" import uuid session_id = str(uuid.uuid4()) # 生成4位随机数字 code = "".join([str(random.randint(0, 9)) for _ in range(4)]) # 存储验证码,5分钟过期 captcha_storage[session_id] = { "code": code, "expire_time": time.time() + 300, "failed_attempts": 0 } # 清理过期验证码 expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < time.time()] for k in expired_keys: del captcha_storage[k] return jsonify({"session_id": session_id, "captcha": code}) @app.route('/api/login', methods=['POST']) @require_ip_not_locked # IP限流保护 def login(): """用户登录""" data = request.json username = data.get('username', '').strip() password = data.get('password', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() need_captcha = data.get('need_captcha', False) # 如果需要验证码,验证验证码 if need_captcha: success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage) if not success: return jsonify({"error": message}), 400 # 先检查用户是否存在 user_exists = database.get_user_by_username(username) if not user_exists: return jsonify({"error": "账号未注册", "need_captcha": True}), 401 # 检查密码是否正确 user = database.verify_user(username, password) if not user: # 密码错误 return jsonify({"error": "密码错误", "need_captcha": True}), 401 # 检查审核状态 if user['status'] != 'approved': return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401 # 登录成功 user_obj = User(user['id']) login_user(user_obj) load_user_accounts(user['id']) return jsonify({"success": True}) @app.route('/api/logout', methods=['POST']) @login_required def logout(): """用户登出""" logout_user() return jsonify({"success": True}) # ==================== 管理员认证API ==================== @app.route('/yuyx/api/debug-config', methods=['GET']) def debug_config(): """调试配置信息""" return jsonify({ "secret_key_set": bool(app.secret_key), "secret_key_length": len(app.secret_key) if app.secret_key else 0, "session_config": { "SESSION_COOKIE_NAME": app.config.get('SESSION_COOKIE_NAME'), "SESSION_COOKIE_SECURE": app.config.get('SESSION_COOKIE_SECURE'), "SESSION_COOKIE_HTTPONLY": app.config.get('SESSION_COOKIE_HTTPONLY'), "SESSION_COOKIE_SAMESITE": app.config.get('SESSION_COOKIE_SAMESITE'), "PERMANENT_SESSION_LIFETIME": str(app.config.get('PERMANENT_SESSION_LIFETIME')), }, "current_session": dict(session), "cookies_received": list(request.cookies.keys()) }) @app.route('/yuyx/api/login', methods=['POST']) @require_ip_not_locked # IP限流保护 def admin_login(): """管理员登录(支持JSON和form-data两种格式)""" # 兼容JSON和form-data两种提交方式 if request.is_json: data = request.json else: data = request.form username = data.get('username', '').strip() password = data.get('password', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() need_captcha = data.get('need_captcha', False) # 如果需要验证码,验证验证码 if need_captcha: success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage) if not success: if request.is_json: return jsonify({"error": message}), 400 else: return redirect(url_for('admin_login_page')) admin = database.verify_admin(username, password) if admin: # 清除旧session,确保干净的状态 session.clear() # 设置管理员session session['admin_id'] = admin['id'] session['admin_username'] = admin['username'] session.permanent = True # 设置为永久会话(使用PERMANENT_SESSION_LIFETIME配置) session.modified = True # 强制标记session为已修改,确保保存 logger.info(f"[admin_login] 管理员 {username} 登录成功, session已设置: admin_id={admin['id']}") logger.debug(f"[admin_login] Session内容: {dict(session)}") logger.debug(f"[admin_login] Cookie将被设置: name={app.config.get('SESSION_COOKIE_NAME', 'session')}") # 根据请求类型返回不同响应 if request.is_json: # JSON请求:返回JSON响应(给JavaScript使用) response = jsonify({"success": True, "redirect": "/yuyx/admin"}) return response else: # form-data请求:直接重定向到后台页面 return redirect(url_for('admin_page')) else: logger.warning(f"[admin_login] 管理员 {username} 登录失败 - 用户名或密码错误") if request.is_json: return jsonify({"error": "管理员用户名或密码错误", "need_captcha": True}), 401 else: # form提交失败,重定向回登录页(TODO: 可以添加flash消息) return redirect(url_for('admin_login_page')) @app.route('/yuyx/api/logout', methods=['POST']) @admin_required def admin_logout(): """管理员登出""" session.pop('admin_id', None) session.pop('admin_username', None) return jsonify({"success": True}) @app.route('/yuyx/api/users', methods=['GET']) @admin_required def get_all_users(): """获取所有用户""" users = database.get_all_users() return jsonify(users) @app.route('/yuyx/api/users/pending', methods=['GET']) @admin_required def get_pending_users(): """获取待审核用户""" users = database.get_pending_users() return jsonify(users) @app.route('/yuyx/api/users//approve', methods=['POST']) @admin_required def approve_user_route(user_id): """审核通过用户""" if database.approve_user(user_id): return jsonify({"success": True}) return jsonify({"error": "审核失败"}), 400 @app.route('/yuyx/api/users//reject', methods=['POST']) @admin_required def reject_user_route(user_id): """拒绝用户""" if database.reject_user(user_id): return jsonify({"success": True}) return jsonify({"error": "拒绝失败"}), 400 @app.route('/yuyx/api/users/', methods=['DELETE']) @admin_required def delete_user_route(user_id): """删除用户""" if database.delete_user(user_id): # 清理内存中的账号数据 if user_id in user_accounts: del user_accounts[user_id] # 清理用户信号量,防止内存泄漏 if user_id in user_semaphores: del user_semaphores[user_id] # 清理用户日志缓存,防止内存泄漏 global log_cache_total_count if user_id in log_cache: log_cache_total_count -= len(log_cache[user_id]) del log_cache[user_id] return jsonify({"success": True}) return jsonify({"error": "删除失败"}), 400 @app.route('/yuyx/api/stats', methods=['GET']) @admin_required def get_system_stats(): """获取系统统计""" stats = database.get_system_stats() # 从session获取管理员用户名 stats["admin_username"] = session.get('admin_username', 'admin') return jsonify(stats) @app.route('/yuyx/api/docker_stats', methods=['GET']) @admin_required def get_docker_stats(): """获取Docker容器运行状态""" import subprocess docker_status = { 'running': False, 'container_name': 'N/A', 'uptime': 'N/A', 'memory_usage': 'N/A', 'memory_limit': 'N/A', 'memory_percent': 'N/A', 'cpu_percent': 'N/A', 'status': 'Unknown' } try: # 检查是否在Docker容器内 if os.path.exists('/.dockerenv'): docker_status['running'] = True # 获取容器名称 try: with open('/etc/hostname', 'r') as f: docker_status['container_name'] = f.read().strip() except Exception as e: logger.debug(f"读取容器名称失败: {e}") # 获取内存使用情况 (cgroup v2) try: # 尝试cgroup v2路径 if os.path.exists('/sys/fs/cgroup/memory.current'): # Read total memory with open('/sys/fs/cgroup/memory.current', 'r') as f: mem_total = int(f.read().strip()) # Read cache from memory.stat cache = 0 if os.path.exists('/sys/fs/cgroup/memory.stat'): with open('/sys/fs/cgroup/memory.stat', 'r') as f: for line in f: if line.startswith('inactive_file '): cache = int(line.split()[1]) break # Actual memory = total - cache mem_bytes = mem_total - cache docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024) # 获取内存限制 if os.path.exists('/sys/fs/cgroup/memory.max'): with open('/sys/fs/cgroup/memory.max', 'r') as f: limit_str = f.read().strip() if limit_str != 'max': limit_bytes = int(limit_str) docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024) docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100) # 尝试cgroup v1路径 elif os.path.exists('/sys/fs/cgroup/memory/memory.usage_in_bytes'): # 从 memory.stat 读取内存信息 mem_bytes = 0 if os.path.exists('/sys/fs/cgroup/memory/memory.stat'): with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f: rss = 0 cache = 0 for line in f: if line.startswith('total_rss '): rss = int(line.split()[1]) elif line.startswith('total_cache '): cache = int(line.split()[1]) # 使用 RSS + (一部分活跃的cache),更接近docker stats的计算 # 但为了准确性,我们只用RSS mem_bytes = rss # 如果找不到,则使用总内存减去缓存作为后备 if mem_bytes == 0: with open('/sys/fs/cgroup/memory/memory.usage_in_bytes', 'r') as f: total_mem = int(f.read().strip()) cache = 0 if os.path.exists('/sys/fs/cgroup/memory/memory.stat'): with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f: for line in f: if line.startswith('total_inactive_file '): cache = int(line.split()[1]) break mem_bytes = total_mem - cache docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024) # 获取内存限制 if os.path.exists('/sys/fs/cgroup/memory/memory.limit_in_bytes'): with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f: limit_bytes = int(f.read().strip()) # 检查是否是实际限制(不是默认的超大值) if limit_bytes < 9223372036854771712: docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024) docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100) except Exception as e: docker_status['memory_usage'] = 'Error: {}'.format(str(e)) # 获取容器运行时间(基于PID 1的启动时间) try: # Get PID 1 start time with open('/proc/1/stat', 'r') as f: stat_data = f.read().split() starttime_ticks = int(stat_data[21]) # Get system uptime with open('/proc/uptime', 'r') as f: system_uptime = float(f.read().split()[0]) # Get clock ticks per second import os as os_module ticks_per_sec = os_module.sysconf(os_module.sysconf_names['SC_CLK_TCK']) # Calculate container uptime process_start = starttime_ticks / ticks_per_sec uptime_seconds = int(system_uptime - process_start) days = uptime_seconds // 86400 hours = (uptime_seconds % 86400) // 3600 minutes = (uptime_seconds % 3600) // 60 if days > 0: docker_status['uptime'] = "{}天 {}小时 {}分钟".format(days, hours, minutes) elif hours > 0: docker_status['uptime'] = "{}小时 {}分钟".format(hours, minutes) else: docker_status['uptime'] = "{}分钟".format(minutes) except Exception as e: logger.debug(f"读取容器运行时间失败: {e}") docker_status['status'] = 'Running' else: docker_status['status'] = 'Not in Docker' except Exception as e: docker_status['status'] = 'Error: {}'.format(str(e)) return jsonify(docker_status) @app.route('/yuyx/api/admin/password', methods=['PUT']) @admin_required def update_admin_password(): """修改管理员密码""" data = request.json new_password = data.get('new_password', '').strip() if not new_password: return jsonify({"error": "密码不能为空"}), 400 username = session.get('admin_username') if database.update_admin_password(username, new_password): return jsonify({"success": True}) return jsonify({"error": "修改失败"}), 400 @app.route('/yuyx/api/admin/username', methods=['PUT']) @admin_required def update_admin_username(): """修改管理员用户名""" data = request.json new_username = data.get('new_username', '').strip() if not new_username: return jsonify({"error": "用户名不能为空"}), 400 old_username = session.get('admin_username') if database.update_admin_username(old_username, new_username): session['admin_username'] = new_username return jsonify({"success": True}) return jsonify({"error": "修改失败,用户名可能已存在"}), 400 def update_admin_username(): """修改管理员用户名""" data = request.json new_username = data.get('new_username', '').strip() if not new_username: return jsonify({"error": "用户名不能为空"}), 400 old_username = session.get('admin_username') if database.update_admin_username(old_username, new_username): session['admin_username'] = new_username return jsonify({"success": True}) return jsonify({"error": "用户名已存在"}), 400 # ==================== 密码重置API ==================== # 管理员直接重置用户密码 @app.route('/yuyx/api/users//reset_password', methods=['POST']) @admin_required def admin_reset_password_route(user_id): """管理员直接重置用户密码(无需审核)""" data = request.json new_password = data.get('new_password', '').strip() if not new_password: return jsonify({"error": "新密码不能为空"}), 400 if len(new_password) < 6: return jsonify({"error": "密码长度不能少于6位"}), 400 if database.admin_reset_user_password(user_id, new_password): return jsonify({"message": "密码重置成功"}) return jsonify({"error": "重置失败,用户不存在"}), 400 # 获取密码重置申请列表 @app.route('/yuyx/api/password_resets', methods=['GET']) @admin_required def get_password_resets_route(): """获取所有待审核的密码重置申请""" resets = database.get_pending_password_resets() return jsonify(resets) # 批准密码重置申请 @app.route('/yuyx/api/password_resets//approve', methods=['POST']) @admin_required def approve_password_reset_route(request_id): """批准密码重置申请""" if database.approve_password_reset(request_id): return jsonify({"message": "密码重置申请已批准"}) return jsonify({"error": "批准失败"}), 400 # 拒绝密码重置申请 @app.route('/yuyx/api/password_resets//reject', methods=['POST']) @admin_required def reject_password_reset_route(request_id): """拒绝密码重置申请""" if database.reject_password_reset(request_id): return jsonify({"message": "密码重置申请已拒绝"}) return jsonify({"error": "拒绝失败"}), 400 # 用户申请重置密码(需要审核) @app.route('/api/reset_password_request', methods=['POST']) def request_password_reset(): """用户申请重置密码""" data = request.json username = data.get('username', '').strip() email = data.get('email', '').strip() new_password = data.get('new_password', '').strip() if not username or not new_password: return jsonify({"error": "用户名和新密码不能为空"}), 400 if len(new_password) < 6: return jsonify({"error": "密码长度不能少于6位"}), 400 # 验证用户存在 user = database.get_user_by_username(username) if not user: return jsonify({"error": "用户不存在"}), 404 # 如果提供了邮箱,验证邮箱是否匹配 if email and user.get('email') != email: return jsonify({"error": "邮箱不匹配"}), 400 # 创建重置申请 request_id = database.create_password_reset_request(user['id'], new_password) if request_id: return jsonify({"message": "密码重置申请已提交,请等待管理员审核"}) else: return jsonify({"error": "申请提交失败"}), 500 # ==================== 账号管理API (用户隔离) ==================== def load_user_accounts(user_id): """从数据库加载用户的账号到内存""" if user_id not in user_accounts: user_accounts[user_id] = {} accounts_data = database.get_user_accounts(user_id) for acc_data in accounts_data: account = Account( account_id=acc_data['id'], user_id=user_id, username=acc_data['username'], password=acc_data['password'], remember=bool(acc_data['remember']), remark=acc_data['remark'] or '' ) user_accounts[user_id][account.id] = account # ==================== Bug反馈API(用户端) ==================== @app.route('/api/feedback', methods=['POST']) @login_required def submit_feedback(): """用户提交Bug反馈""" data = request.get_json() title = data.get('title', '').strip() description = data.get('description', '').strip() contact = data.get('contact', '').strip() if not title or not description: return jsonify({"error": "标题和描述不能为空"}), 400 if len(title) > 100: return jsonify({"error": "标题不能超过100个字符"}), 400 if len(description) > 2000: return jsonify({"error": "描述不能超过2000个字符"}), 400 # 从数据库获取用户名 user_info = database.get_user_by_id(current_user.id) username = user_info['username'] if user_info else f'用户{current_user.id}' feedback_id = database.create_bug_feedback( user_id=current_user.id, username=username, title=title, description=description, contact=contact ) return jsonify({"message": "反馈提交成功", "id": feedback_id}) @app.route('/api/feedback', methods=['GET']) @login_required def get_my_feedbacks(): """获取当前用户的反馈列表""" feedbacks = database.get_user_feedbacks(current_user.id) return jsonify(feedbacks) # ==================== Bug反馈API(管理端) ==================== @app.route('/yuyx/api/feedbacks', methods=['GET']) @admin_required def get_all_feedbacks(): """管理员获取所有反馈""" status = request.args.get('status') limit = int(request.args.get('limit', 100)) offset = int(request.args.get('offset', 0)) feedbacks = database.get_bug_feedbacks(limit=limit, offset=offset, status_filter=status) stats = database.get_feedback_stats() return jsonify({ "feedbacks": feedbacks, "stats": stats }) @app.route('/yuyx/api/feedbacks//reply', methods=['POST']) @admin_required def reply_to_feedback(feedback_id): """管理员回复反馈""" data = request.get_json() reply = data.get('reply', '').strip() if not reply: return jsonify({"error": "回复内容不能为空"}), 400 if database.reply_feedback(feedback_id, reply): return jsonify({"message": "回复成功"}) else: return jsonify({"error": "反馈不存在"}), 404 @app.route('/yuyx/api/feedbacks//close', methods=['POST']) @admin_required def close_feedback_api(feedback_id): """管理员关闭反馈""" if database.close_feedback(feedback_id): return jsonify({"message": "已关闭"}) else: return jsonify({"error": "反馈不存在"}), 404 @app.route('/yuyx/api/feedbacks/', methods=['DELETE']) @admin_required def delete_feedback_api(feedback_id): """管理员删除反馈""" if database.delete_feedback(feedback_id): return jsonify({"message": "已删除"}) else: return jsonify({"error": "反馈不存在"}), 404 # ==================== 账号管理API ==================== @app.route('/api/accounts', methods=['GET']) @login_required def get_accounts(): """获取当前用户的所有账号""" user_id = current_user.id # 检查是否需要强制刷新(容器重启后内存数据丢失) refresh = request.args.get('refresh', 'false').lower() == 'true' # 如果user_accounts中没有数据或者请求刷新,则从数据库加载 if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh: logger.debug(f"[API] 用户 {user_id} 请求账号列表,从数据库加载(refresh={refresh})") load_user_accounts(user_id) accounts = user_accounts.get(user_id, {}) logger.debug(f"[API] 返回用户 {user_id} 的 {len(accounts)} 个账号") return jsonify([acc.to_dict() for acc in accounts.values()]) @app.route('/api/accounts', methods=['POST']) @login_required def add_account(): """添加账号""" user_id = current_user.id # 账号数量限制检查:VIP不限制,普通用户最多3个 current_count = len(database.get_user_accounts(user_id)) is_vip = database.is_user_vip(user_id) if not is_vip and current_count >= 3: return jsonify({"error": "普通用户最多添加3个账号,升级VIP可无限添加"}), 403 data = request.json username = data.get('username', '').strip() password = data.get('password', '').strip() remark = data.get("remark", "").strip()[:200] # 限制200字符 if not username or not password: return jsonify({"error": "用户名和密码不能为空"}), 400 # 检查当前用户是否已存在该账号 if user_id in user_accounts: for acc in user_accounts[user_id].values(): if acc.username == username: return jsonify({"error": f"账号 '{username}' 已存在"}), 400 # 生成账号ID import uuid account_id = str(uuid.uuid4())[:8] # 设置remember默认值为True remember = data.get('remember', True) # 保存到数据库 database.create_account(user_id, account_id, username, password, remember, remark) # 加载到内存 account = Account(account_id, user_id, username, password, remember, remark) if user_id not in user_accounts: user_accounts[user_id] = {} user_accounts[user_id][account_id] = account log_to_client(f"添加账号: {username}", user_id) return jsonify(account.to_dict()) @app.route('/api/accounts/', methods=['PUT']) @login_required def update_account(account_id): """更新账号信息(密码等)""" user_id = current_user.id # 验证账号所有权 if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] # 如果账号正在运行,不允许修改 if account.is_running: return jsonify({"error": "账号正在运行中,请先停止"}), 400 data = request.json new_password = data.get('password', '').strip() new_remember = data.get('remember', account.remember) if not new_password: return jsonify({"error": "密码不能为空"}), 400 # 更新数据库 with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(''' UPDATE accounts SET password = ?, remember = ? WHERE id = ? ''', (new_password, new_remember, account_id)) conn.commit() # 重置账号登录状态(密码修改后恢复active状态) database.reset_account_login_status(account_id) logger.info(f"[账号更新] 用户 {user_id} 修改了账号 {account.username} 的密码,已重置登录状态") # 更新内存中的账号信息 account.password = new_password account.remember = new_remember log_to_client(f"账号 {account.username} 信息已更新,登录状态已重置", user_id) return jsonify({"message": "账号更新成功", "account": account.to_dict()}) @app.route('/api/accounts/', methods=['DELETE']) @login_required def delete_account(account_id): """删除账号""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] # 停止正在运行的任务 if account.is_running: account.should_stop = True if account.automation: account.automation.close() username = account.username # 从数据库删除 database.delete_account(account_id) # 从内存删除 del user_accounts[user_id][account_id] log_to_client(f"删除账号: {username}", user_id) return jsonify({"success": True}) @app.route('/api/accounts//remark', methods=['PUT']) @login_required def update_remark(account_id): """更新备注""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 data = request.json remark = data.get('remark', '').strip()[:200] # 更新数据库 database.update_account_remark(account_id, remark) # 更新内存 user_accounts[user_id][account_id].remark = remark log_to_client(f"更新备注: {user_accounts[user_id][account_id].username} -> {remark}", user_id) return jsonify({"success": True}) @app.route('/api/accounts//start', methods=['POST']) @login_required def start_account(account_id): """启动账号任务""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] if account.is_running: return jsonify({"error": "任务已在运行中"}), 400 data = request.json browse_type = data.get('browse_type', '应读') enable_screenshot = data.get('enable_screenshot', True) # 默认启用截图 # 确保浏览器管理器已初始化 if not init_browser_manager(): return jsonify({"error": "浏览器初始化失败"}), 500 # 启动任务线程 account.is_running = True account.should_stop = False account.status = "运行中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'manual'), daemon=True ) thread.start() active_tasks[account_id] = thread log_to_client(f"启动任务: {account.username} - {browse_type}", user_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({"success": True}) @app.route('/api/accounts//stop', methods=['POST']) @login_required def stop_account(account_id): """停止账号任务""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] if not account.is_running: return jsonify({"error": "任务未在运行"}), 400 account.should_stop = True account.status = "正在停止" log_to_client(f"停止任务: {account.username}", user_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({"success": True}) def get_user_semaphore(user_id): """获取或创建用户的信号量""" if user_id not in user_semaphores: user_semaphores[user_id] = threading.Semaphore(max_concurrent_per_account) return user_semaphores[user_id] def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="manual"): """运行自动化任务""" print(f"[DEBUG run_task] account={account_id}, enable_screenshot={enable_screenshot} (类型:{type(enable_screenshot).__name__}), source={source}") if user_id not in user_accounts or account_id not in user_accounts[user_id]: return account = user_accounts[user_id][account_id] # 导入time模块 import time as time_module # 注意:不在此处记录开始时间,因为要排除排队等待时间 # 两级并发控制:用户级 + 全局级(VIP优先) user_sem = get_user_semaphore(user_id) # 检查是否VIP用户 is_vip_user = database.is_user_vip(user_id) vip_label = " [VIP优先]" if is_vip_user else "" # 获取用户级信号量(同一用户的账号排队) log_to_client(f"等待资源分配...{vip_label}", user_id, account_id) account.status = "排队中" + (" (VIP)" if is_vip_user else "") socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 记录任务状态为排队中 import time as time_mod task_status[account_id] = { "user_id": user_id, "username": account.username, "status": "排队中", "detail_status": "等待资源" + vip_label, "browse_type": browse_type, "start_time": time_mod.time(), "source": source, "progress": {"items": 0, "attachments": 0}, "is_vip": is_vip_user } # 加入优先级队列 with task_queue_lock: if is_vip_user: vip_task_queue.append(account_id) else: normal_task_queue.append(account_id) # VIP优先排队机制 acquired = False while not acquired: with task_queue_lock: # VIP用户直接尝试获取; 普通用户需等VIP队列为空 can_try = is_vip_user or len(vip_task_queue) == 0 if can_try and user_sem.acquire(blocking=False): acquired = True with task_queue_lock: if account_id in vip_task_queue: vip_task_queue.remove(account_id) if account_id in normal_task_queue: normal_task_queue.remove(account_id) break # 检查是否被停止 if account.should_stop: with task_queue_lock: if account_id in vip_task_queue: vip_task_queue.remove(account_id) if account_id in normal_task_queue: normal_task_queue.remove(account_id) log_to_client(f"任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False if account_id in task_status: del task_status[account_id] socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return time_module.sleep(0.3) try: # 如果在排队期间被停止,直接返回 if account.should_stop: log_to_client(f"任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return # 获取全局信号量(防止所有用户同时运行导致资源耗尽) global_semaphore.acquire() try: # 再次检查是否被停止 if account.should_stop: log_to_client(f"任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return # ====== 创建任务断点 ====== task_id = checkpoint_mgr.create_checkpoint( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type ) logger.info(f"[断点] 任务 {task_id} 已创建") # ====== 在此处记录任务真正的开始时间(排除排队等待时间) ====== task_start_time = time_module.time() account.status = "运行中" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') account.last_browse_type = browse_type # 更新任务状态为运行中 if account_id in task_status: task_status[account_id]["status"] = "运行中" task_status[account_id]["detail_status"] = "初始化" task_status[account_id]["start_time"] = task_start_time # 重试机制:最多尝试3次,超时则换IP重试 max_attempts = 3 last_error = None for attempt in range(1, max_attempts + 1): try: if attempt > 1: log_to_client(f"🔄 第 {attempt} 次尝试(共{max_attempts}次)...", user_id, account_id) # 检查是否需要使用代理 proxy_config = None config = database.get_system_config() if config.get('proxy_enabled') == 1: proxy_api_url = config.get('proxy_api_url', '').strip() if proxy_api_url: log_to_client(f"正在获取代理IP...", user_id, account_id) proxy_server = get_proxy_from_api(proxy_api_url, max_retries=3) if proxy_server: proxy_config = {'server': proxy_server} log_to_client(f"✓ 将使用代理: {proxy_server}", user_id, account_id) account.proxy_config = proxy_config # 保存代理配置供截图使用 else: log_to_client(f"✗ 代理获取失败,将不使用代理继续", user_id, account_id) else: log_to_client(f"⚠ 代理已启用但未配置API地址", user_id, account_id) # 使用 API 方式浏览(不启动浏览器,节省内存) checkpoint_mgr.update_stage(task_id, TaskStage.STARTING, progress_percent=10) def custom_log(message: str): log_to_client(message, user_id, account_id) log_to_client(f"开始登录...", user_id, account_id) if account_id in task_status: task_status[account_id]["detail_status"] = "正在登录" checkpoint_mgr.update_stage(task_id, TaskStage.LOGGING_IN, progress_percent=25) # 使用 API 方式登录和浏览(不启动浏览器) api_browser = APIBrowser(log_callback=custom_log, proxy_config=proxy_config) if api_browser.login(account.username, account.password): log_to_client(f"✓ 登录成功!", user_id, account_id) # 登录成功,清除失败计数 # 保存cookies供截图使用 api_browser.save_cookies_for_playwright(account.username) database.reset_account_login_status(account_id) if account_id in task_status: task_status[account_id]["detail_status"] = "正在浏览" log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id) def should_stop(): return account.should_stop checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50) result = api_browser.browse_content( browse_type=browse_type, should_stop_callback=should_stop ) # 转换结果类型以兼容后续代码 result = BrowseResult( success=result.success, total_items=result.total_items, total_attachments=result.total_attachments, error_message=result.error_message ) api_browser.close() else: # API 登录失败 error_message = "登录失败" log_to_client(f"❌ {error_message}", user_id, account_id) # 增加失败计数(假设密码错误) is_suspended = database.increment_account_login_fail(account_id, error_message) if is_suspended: log_to_client(f"⚠ 该账号连续3次密码错误,已自动暂停", user_id, account_id) log_to_client(f"请在前台修改密码后才能继续使用", user_id, account_id) retry_action = checkpoint_mgr.record_error(task_id, error_message) if retry_action == "paused": logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)") account.status = "登录失败" account.is_running = False # 记录登录失败日志 database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=0, total_attachments=0, error_message=error_message, duration=int(time_module.time() - task_start_time), source=source ) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') api_browser.close() return account.total_items = result.total_items account.total_attachments = result.total_attachments if result.success: log_to_client(f"浏览完成! 共 {result.total_items} 条内容,{result.total_attachments} 个附件", user_id, account_id) if account_id in task_status: task_status[account_id]["detail_status"] = "浏览完成" task_status[account_id]["progress"] = {"items": result.total_items, "attachments": result.total_attachments} account.status = "已完成" checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95) checkpoint_mgr.complete_task(task_id, success=True) logger.info(f"[断点] 任务 {task_id} 已完成") # 记录成功日志(如果不截图则在此记录,截图时在截图完成后记录) if not enable_screenshot: database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='success', total_items=result.total_items, total_attachments=result.total_attachments, error_message='', duration=int(time_module.time() - task_start_time), source=source ) # 成功则跳出重试循环 break else: # 浏览出错,检查是否是超时错误 error_msg = result.error_message if 'Timeout' in error_msg or 'timeout' in error_msg: last_error = error_msg log_to_client(f"⚠ 检测到超时错误: {error_msg}", user_id, account_id) # 关闭当前浏览器 if account.automation: try: account.automation.close() log_to_client(f"已关闭超时的浏览器实例", user_id, account_id) except Exception as e: logger.debug(f"关闭超时浏览器实例失败: {e}") account.automation = None if attempt < max_attempts: log_to_client(f"⚠ 代理可能速度过慢,将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id) time_module.sleep(2) # 等待2秒再重试 continue else: log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=result.total_items, total_attachments=result.total_attachments, error_message=f"重试{max_attempts}次后仍失败: {error_msg}", duration=int(time_module.time() - task_start_time) ) break else: # 非超时错误,直接失败不重试 log_to_client(f"浏览出错: {error_msg}", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=result.total_items, total_attachments=result.total_attachments, error_message=error_msg, duration=int(time_module.time() - task_start_time), source=source ) break except Exception as retry_error: # 捕获重试过程中的异常 error_msg = str(retry_error) last_error = error_msg # 关闭可能存在的浏览器实例 if account.automation: try: account.automation.close() except Exception as e: logger.debug(f"关闭浏览器实例失败: {e}") account.automation = None if 'Timeout' in error_msg or 'timeout' in error_msg: log_to_client(f"⚠ 执行超时: {error_msg}", user_id, account_id) if attempt < max_attempts: log_to_client(f"⚠ 将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id) time_module.sleep(2) continue else: log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=account.total_items, total_attachments=account.total_attachments, error_message=f"重试{max_attempts}次后仍失败: {error_msg}", duration=int(time_module.time() - task_start_time), source=source ) break else: # 非超时异常,直接失败 log_to_client(f"任务执行异常: {error_msg}", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=account.total_items, total_attachments=account.total_attachments, error_message=error_msg, duration=int(time_module.time() - task_start_time), source=source ) break except Exception as e: error_msg = str(e) log_to_client(f"任务执行出错: {error_msg}", user_id, account_id) account.status = "出错" # 记录异常失败日志 database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=account.total_items, total_attachments=account.total_attachments, error_message=error_msg, duration=int(time_module.time() - task_start_time), source=source ) finally: # 先关闭浏览器,再释放信号量(避免并发创建/关闭浏览器导致资源竞争) account.is_running = False # 如果状态不是已完成(需要截图),则重置为未开始 if account.status not in ["已完成"]: account.status = "未开始" if account.automation: try: account.automation.close() # log_to_client(f"主任务浏览器已关闭", user_id, account_id) # 精简 except Exception as e: log_to_client(f"关闭主任务浏览器时出错: {str(e)}", user_id, account_id) finally: account.automation = None # 浏览器关闭后再释放全局信号量,确保新任务创建浏览器时旧浏览器已完全关闭 global_semaphore.release() if account_id in active_tasks: del active_tasks[account_id] if account_id in task_status: del task_status[account_id] socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 任务完成后自动截图(增加2秒延迟,确保资源完全释放) # 根据enable_screenshot参数决定是否截图 if account.status == "已完成" and not account.should_stop: if enable_screenshot: log_to_client(f"等待2秒后开始截图...", user_id, account_id) # 更新账号状态为等待截图 account.status = "等待截图" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 重新添加截图状态 import time as time_mod task_status[account_id] = { "user_id": user_id, "username": account.username, "status": "排队中", "detail_status": "等待截图资源", "browse_type": browse_type, "start_time": time_mod.time(), "source": source, "progress": {"items": result.total_items if result else 0, "attachments": result.total_attachments if result else 0} } time.sleep(2) # 延迟启动截图,确保主任务资源已完全释放 browse_result_dict = {'total_items': result.total_items, 'total_attachments': result.total_attachments} threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id, browse_type, source, task_start_time, browse_result_dict), daemon=True).start() else: # 不截图时,重置状态为未开始 account.status = "未开始" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') log_to_client(f"截图功能已禁用,跳过截图", user_id, account_id) else: # 任务非正常完成,重置状态为未开始 if account.status not in ["登录失败", "出错"]: account.status = "未开始" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') finally: # 释放用户级信号量 user_sem.release() def take_screenshot_for_account(user_id, account_id, browse_type="应读", source="manual", task_start_time=None, browse_result=None): """为账号任务完成后截图(使用工作线程池,真正的浏览器复用)""" if user_id not in user_accounts or account_id not in user_accounts[user_id]: return account = user_accounts[user_id][account_id] # 标记账号正在截图(防止重复提交截图任务) account.is_running = True def screenshot_task(browser_instance, user_id, account_id, account, browse_type, source, task_start_time, browse_result): """在worker线程中执行的截图任务""" # ✅ 获得worker后,立即更新状态为"截图中" if user_id in user_accounts and account_id in user_accounts[user_id]: acc = user_accounts[user_id][account_id] acc.status = "截图中" if account_id in task_status: task_status[account_id]["status"] = "运行中" task_status[account_id]["detail_status"] = "正在截图" socketio.emit('account_update', acc.to_dict(), room=f'user_{user_id}') max_retries = 3 for attempt in range(1, max_retries + 1): automation = None try: # 更新状态 if account_id in task_status: task_status[account_id]["detail_status"] = f"正在截图{f' (第{attempt}次)' if attempt > 1 else ''}" if attempt > 1: log_to_client(f"🔄 第 {attempt} 次截图尝试...", user_id, account_id) log_to_client(f"使用Worker-{browser_instance['worker_id']}的浏览器(已使用{browser_instance['use_count']}次)", user_id, account_id) # 使用worker的浏览器创建PlaywrightAutomation proxy_config = account.proxy_config if hasattr(account, 'proxy_config') else None automation = PlaywrightAutomation(browser_manager, account_id, proxy_config=proxy_config) automation.playwright = browser_instance['playwright'] automation.browser = browser_instance['browser'] def custom_log(message: str): log_to_client(message, user_id, account_id) automation.log = custom_log # 登录 log_to_client(f"登录中...", user_id, account_id) login_result = automation.quick_login(account.username, account.password, account.remember) if not login_result["success"]: error_message = login_result.get("message", "截图登录失败") log_to_client(f"截图登录失败: {error_message}", user_id, account_id) if attempt < max_retries: log_to_client(f"将重试...", user_id, account_id) time.sleep(2) continue else: log_to_client(f"❌ 截图失败: 登录失败", user_id, account_id) return {'success': False, 'error': '登录失败'} browse_type = account.last_browse_type log_to_client(f"导航到 '{browse_type}' 页面...", user_id, account_id) # 导航到指定页面 result = automation.browse_content( navigate_only=True, browse_type=browse_type, auto_next_page=False, auto_view_attachments=False, interval=0, should_stop_callback=None ) if not result.success and result.error_message: log_to_client(f"导航警告: {result.error_message}", user_id, account_id) # 等待页面稳定 time.sleep(2) # 生成截图文件名 beijing_tz = pytz.timezone('Asia/Shanghai') now_beijing = datetime.now(beijing_tz) timestamp = now_beijing.strftime('%Y%m%d_%H%M%S') user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" login_account = account.remark if account.remark else account.username screenshot_filename = f"{username_prefix}_{login_account}_{browse_type}_{timestamp}.jpg" screenshot_path = os.path.join(SCREENSHOTS_DIR, screenshot_filename) # 尝试截图 if automation.take_screenshot(screenshot_path): # 验证截图文件 if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000: log_to_client(f"✓ 截图成功: {screenshot_filename}", user_id, account_id) return {'success': True, 'filename': screenshot_filename} else: log_to_client(f"截图文件异常,将重试", user_id, account_id) if os.path.exists(screenshot_path): os.remove(screenshot_path) else: log_to_client(f"截图保存失败", user_id, account_id) if attempt < max_retries: log_to_client(f"将重试...", user_id, account_id) time.sleep(2) except Exception as e: log_to_client(f"截图出错: {str(e)}", user_id, account_id) if attempt < max_retries: log_to_client(f"将重试...", user_id, account_id) time.sleep(2) finally: # 只关闭context,不关闭浏览器(由worker管理) if automation: try: if automation.context: automation.context.close() automation.context = None automation.page = None except Exception as e: logger.debug(f"关闭context时出错: {e}") return {'success': False, 'error': '截图失败,已重试3次'} def screenshot_callback(result, error): """截图完成回调""" try: # 重置账号状态 account.is_running = False account.status = "未开始" # 先清除任务状态(这样to_dict()不会包含detail_status) if account_id in task_status: del task_status[account_id] # 然后发送更新 socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') if error: log_to_client(f"❌ 截图失败: {error}", user_id, account_id) elif not result or not result.get('success'): error_msg = result.get('error', '未知错误') if result else '未知错误' log_to_client(f"❌ 截图失败: {error_msg}", user_id, account_id) # 记录任务日志 if task_start_time and browse_result: import time as time_module total_elapsed = int(time_module.time() - task_start_time) database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='success', total_items=browse_result.get('total_items', 0), total_attachments=browse_result.get('total_attachments', 0), duration=total_elapsed, source=source ) except Exception as e: logger.error(f"截图回调出错: {e}") # 提交任务到工作线程池 pool = get_browser_worker_pool() pool.submit_task( screenshot_task, screenshot_callback, user_id, account_id, account, browse_type, source, task_start_time, browse_result ) def manual_screenshot(account_id): """手动为指定账号截图""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] if account.is_running: return jsonify({"error": "任务运行中,无法截图"}), 400 data = request.json or {} browse_type = data.get('browse_type', account.last_browse_type) account.last_browse_type = browse_type threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id), daemon=True).start() log_to_client(f"手动截图: {account.username} - {browse_type}", user_id) return jsonify({"success": True}) # ==================== 截图管理API ==================== @app.route('/api/screenshots', methods=['GET']) @login_required def get_screenshots(): """获取当前用户的截图列表""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" try: screenshots = [] if os.path.exists(SCREENSHOTS_DIR): for filename in os.listdir(SCREENSHOTS_DIR): # 只显示属于当前用户的截图(支持png和jpg格式) if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'): filepath = os.path.join(SCREENSHOTS_DIR, filename) stat = os.stat(filepath) # 转换为北京时间 beijing_tz = pytz.timezone('Asia/Shanghai') created_time = datetime.fromtimestamp(stat.st_mtime, tz=beijing_tz) # 解析文件名获取显示名称 # 文件名格式:用户名_登录账号_浏览类型_时间.jpg parts = filename.rsplit('.', 1)[0].split('_', 1) # 移除扩展名并分割 if len(parts) > 1: # 显示名称:登录账号_浏览类型_时间.jpg display_name = parts[1] + '.' + filename.rsplit('.', 1)[1] else: display_name = filename screenshots.append({ 'filename': filename, 'display_name': display_name, 'size': stat.st_size, 'created': created_time.strftime('%Y-%m-%d %H:%M:%S') }) screenshots.sort(key=lambda x: x['created'], reverse=True) return jsonify(screenshots) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/screenshots/') @login_required def serve_screenshot(filename): """提供截图文件访问""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" # 验证文件属于当前用户 if not filename.startswith(username_prefix + '_'): return jsonify({"error": "无权访问"}), 403 return send_from_directory(SCREENSHOTS_DIR, filename) @app.route('/api/screenshots/', methods=['DELETE']) @login_required def delete_screenshot(filename): """删除指定截图""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" # 验证文件属于当前用户 if not filename.startswith(username_prefix + '_'): return jsonify({"error": "无权删除"}), 403 try: filepath = os.path.join(SCREENSHOTS_DIR, filename) if os.path.exists(filepath): os.remove(filepath) log_to_client(f"删除截图: {filename}", user_id) return jsonify({"success": True}) else: return jsonify({"error": "文件不存在"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/screenshots/clear', methods=['POST']) @login_required def clear_all_screenshots(): """清空当前用户的所有截图""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" try: deleted_count = 0 if os.path.exists(SCREENSHOTS_DIR): for filename in os.listdir(SCREENSHOTS_DIR): if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'): filepath = os.path.join(SCREENSHOTS_DIR, filename) os.remove(filepath) deleted_count += 1 log_to_client(f"清理了 {deleted_count} 个截图文件", user_id) return jsonify({"success": True, "deleted": deleted_count}) except Exception as e: return jsonify({"error": str(e)}), 500 # ==================== WebSocket事件 ==================== @socketio.on('connect') def handle_connect(): """客户端连接""" if current_user.is_authenticated: user_id = current_user.id join_room(f'user_{user_id}') log_to_client("客户端已连接", user_id) # 如果user_accounts中没有该用户的账号,从数据库加载 if user_id not in user_accounts or len(user_accounts[user_id]) == 0: db_accounts = database.get_user_accounts(user_id) if db_accounts: user_accounts[user_id] = {} for acc_data in db_accounts: account = Account( account_id=acc_data['id'], user_id=acc_data['user_id'], username=acc_data['username'], password=acc_data['password'], remember=bool(acc_data.get('remember', 1)), remark=acc_data.get('remark', '') ) user_accounts[user_id][acc_data['id']] = account log_to_client(f"已从数据库恢复 {len(db_accounts)} 个账号", user_id) # 发送账号列表 accounts = user_accounts.get(user_id, {}) emit('accounts_list', [acc.to_dict() for acc in accounts.values()]) # 发送历史日志 if user_id in log_cache: for log_entry in log_cache[user_id]: emit('log', log_entry) @socketio.on('disconnect') def handle_disconnect(): """客户端断开""" if current_user.is_authenticated: user_id = current_user.id leave_room(f'user_{user_id}') # ==================== 静态文件 ==================== @app.route('/static/') def serve_static(filename): """提供静态文件访问""" response = send_from_directory('static', filename) # 禁用缓存,强制浏览器每次都重新加载 response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response # ==================== 启动 ==================== # ==================== 管理员VIP管理API ==================== @app.route('/yuyx/api/vip/config', methods=['GET']) def get_vip_config_api(): """获取VIP配置""" if 'admin_id' not in session: return jsonify({"error": "需要管理员权限"}), 403 config = database.get_vip_config() return jsonify(config) @app.route('/yuyx/api/vip/config', methods=['POST']) def set_vip_config_api(): """设置默认VIP天数""" if 'admin_id' not in session: return jsonify({"error": "需要管理员权限"}), 403 data = request.json days = data.get('default_vip_days', 0) if not isinstance(days, int) or days < 0: return jsonify({"error": "VIP天数必须是非负整数"}), 400 database.set_default_vip_days(days) return jsonify({"message": "VIP配置已更新", "default_vip_days": days}) @app.route('/yuyx/api/users//vip', methods=['POST']) def set_user_vip_api(user_id): """设置用户VIP""" if 'admin_id' not in session: return jsonify({"error": "需要管理员权限"}), 403 data = request.json days = data.get('days', 30) # 验证days参数 valid_days = [7, 30, 365, 999999] if days not in valid_days: return jsonify({"error": "VIP天数必须是 7/30/365/999999 之一"}), 400 if database.set_user_vip(user_id, days): vip_type = {7: "一周", 30: "一个月", 365: "一年", 999999: "永久"}[days] return jsonify({"message": f"VIP设置成功: {vip_type}"}) return jsonify({"error": "设置失败,用户不存在"}), 400 @app.route('/yuyx/api/users//vip', methods=['DELETE']) def remove_user_vip_api(user_id): """移除用户VIP""" if 'admin_id' not in session: return jsonify({"error": "需要管理员权限"}), 403 if database.remove_user_vip(user_id): return jsonify({"message": "VIP已移除"}) return jsonify({"error": "移除失败"}), 400 @app.route('/yuyx/api/users//vip', methods=['GET']) def get_user_vip_info_api(user_id): """获取用户VIP信息(管理员)""" if 'admin_id' not in session: return jsonify({"error": "需要管理员权限"}), 403 vip_info = database.get_user_vip_info(user_id) return jsonify(vip_info) # ==================== 用户端VIP查询API ==================== @app.route('/api/user/vip', methods=['GET']) @login_required def get_current_user_vip(): """获取当前用户VIP信息""" vip_info = database.get_user_vip_info(current_user.id) # 添加用户名 user_info = database.get_user_by_id(current_user.id) vip_info['username'] = user_info['username'] if user_info else 'Unknown' return jsonify(vip_info) @app.route('/api/run_stats', methods=['GET']) @login_required def get_run_stats(): """获取当前用户的运行统计""" user_id = current_user.id # 获取今日任务统计 stats = database.get_user_run_stats(user_id) # 计算当前正在运行的账号数 current_running = 0 if user_id in user_accounts: current_running = sum(1 for acc in user_accounts[user_id].values() if acc.is_running) return jsonify({ 'today_completed': stats.get('completed', 0), 'current_running': current_running, 'today_failed': stats.get('failed', 0), 'today_items': stats.get('total_items', 0), 'today_attachments': stats.get('total_attachments', 0) }) # ==================== 系统配置API ==================== @app.route('/yuyx/api/system/config', methods=['GET']) @admin_required def get_system_config_api(): """获取系统配置""" config = database.get_system_config() return jsonify(config) @app.route('/yuyx/api/system/config', methods=['POST']) @admin_required def update_system_config_api(): """更新系统配置""" global max_concurrent_global, global_semaphore, max_concurrent_per_account data = request.json max_concurrent = data.get('max_concurrent_global') schedule_enabled = data.get('schedule_enabled') schedule_time = data.get('schedule_time') schedule_browse_type = data.get('schedule_browse_type') schedule_weekdays = data.get('schedule_weekdays') new_max_concurrent_per_account = data.get('max_concurrent_per_account') new_max_screenshot_concurrent = data.get('max_screenshot_concurrent') # 验证参数 if max_concurrent is not None: if not isinstance(max_concurrent, int) or max_concurrent < 1: return jsonify({"error": "全局并发数必须大于0(建议:小型服务器2-5,中型5-10,大型10-20)"}), 400 if new_max_concurrent_per_account is not None: if not isinstance(new_max_concurrent_per_account, int) or new_max_concurrent_per_account < 1: return jsonify({"error": "单账号并发数必须大于0(建议设为1,避免同一用户任务相互影响)"}), 400 if new_max_screenshot_concurrent is not None: if not isinstance(new_max_screenshot_concurrent, int) or new_max_screenshot_concurrent < 1: return jsonify({"error": "截图并发数必须大于0(建议根据服务器配置设置,每个浏览器约占用200MB内存)"}), 400 if schedule_time is not None: # 验证时间格式 HH:MM import re if not re.match(r'^([01]\d|2[0-3]):([0-5]\d)$', schedule_time): return jsonify({"error": "时间格式错误,应为 HH:MM"}), 400 if schedule_browse_type is not None: if schedule_browse_type not in ['注册前未读', '应读', '未读']: return jsonify({"error": "浏览类型无效"}), 400 if schedule_weekdays is not None: # 验证星期格式,应该是逗号分隔的数字字符串 "1,2,3,4,5,6,7" try: days = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()] if not all(1 <= d <= 7 for d in days): return jsonify({"error": "星期数字必须在1-7之间"}), 400 except (ValueError, AttributeError): return jsonify({"error": "星期格式错误"}), 400 # 更新数据库 if database.update_system_config( max_concurrent=max_concurrent, schedule_enabled=schedule_enabled, schedule_time=schedule_time, schedule_browse_type=schedule_browse_type, schedule_weekdays=schedule_weekdays, max_concurrent_per_account=new_max_concurrent_per_account, max_screenshot_concurrent=new_max_screenshot_concurrent ): # 如果修改了并发数,更新全局变量和信号量 if max_concurrent is not None and max_concurrent != max_concurrent_global: max_concurrent_global = max_concurrent global_semaphore = threading.Semaphore(max_concurrent) print(f"全局并发数已更新为: {max_concurrent}") # 如果修改了单用户并发数,更新全局变量(已有的信号量会在下次创建时使用新值) if new_max_concurrent_per_account is not None and new_max_concurrent_per_account != max_concurrent_per_account: max_concurrent_per_account = new_max_concurrent_per_account print(f"单用户并发数已更新为: {max_concurrent_per_account}") # 如果修改了截图并发数,更新信号量 if new_max_screenshot_concurrent is not None: global screenshot_semaphore screenshot_semaphore = threading.Semaphore(new_max_screenshot_concurrent) print(f"截图并发数已更新为: {new_max_screenshot_concurrent}") return jsonify({"message": "系统配置已更新"}) return jsonify({"error": "更新失败"}), 400 @app.route('/yuyx/api/schedule/execute', methods=['POST']) @admin_required def execute_schedule_now(): """立即执行定时任务(无视定时时间和星期限制)""" try: # 在新线程中执行任务,避免阻塞请求 # 传入 skip_weekday_check=True 跳过星期检查 thread = threading.Thread(target=run_scheduled_task, args=(True,), daemon=True) thread.start() logger.info("[立即执行定时任务] 管理员手动触发定时任务执行(跳过星期检查)") return jsonify({"message": "定时任务已开始执行,请查看任务列表获取进度"}) except Exception as e: logger.error(f"[立即执行定时任务] 启动失败: {str(e)}") return jsonify({"error": f"启动失败: {str(e)}"}), 500 # ==================== 代理配置API ==================== @app.route('/yuyx/api/proxy/config', methods=['GET']) @admin_required def get_proxy_config_api(): """获取代理配置""" config = database.get_system_config() return jsonify({ 'proxy_enabled': config.get('proxy_enabled', 0), 'proxy_api_url': config.get('proxy_api_url', ''), 'proxy_expire_minutes': config.get('proxy_expire_minutes', 3) }) @app.route('/yuyx/api/proxy/config', methods=['POST']) @admin_required def update_proxy_config_api(): """更新代理配置""" data = request.json proxy_enabled = data.get('proxy_enabled') proxy_api_url = data.get('proxy_api_url', '').strip() proxy_expire_minutes = data.get('proxy_expire_minutes') if proxy_enabled is not None and proxy_enabled not in [0, 1]: return jsonify({"error": "proxy_enabled必须是0或1"}), 400 if proxy_expire_minutes is not None: if not isinstance(proxy_expire_minutes, int) or proxy_expire_minutes < 1: return jsonify({"error": "代理有效期必须是大于0的整数"}), 400 if database.update_system_config( proxy_enabled=proxy_enabled, proxy_api_url=proxy_api_url, proxy_expire_minutes=proxy_expire_minutes ): return jsonify({"message": "代理配置已更新"}) return jsonify({"error": "更新失败"}), 400 @app.route('/yuyx/api/proxy/test', methods=['POST']) @admin_required def test_proxy_api(): """测试代理连接""" data = request.json api_url = data.get('api_url', '').strip() if not api_url: return jsonify({"error": "请提供API地址"}), 400 try: response = requests.get(api_url, timeout=10) if response.status_code == 200: ip_port = response.text.strip() if ip_port and ':' in ip_port: return jsonify({ "success": True, "proxy": ip_port, "message": f"代理获取成功: {ip_port}" }) else: return jsonify({ "success": False, "message": f"代理格式错误: {ip_port}" }), 400 else: return jsonify({ "success": False, "message": f"HTTP错误: {response.status_code}" }), 400 except Exception as e: return jsonify({ "success": False, "message": f"连接失败: {str(e)}" }), 500 # ==================== 服务器信息API ==================== @app.route('/yuyx/api/server/info', methods=['GET']) @admin_required def get_server_info_api(): """获取服务器信息""" import psutil import datetime # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存信息 memory = psutil.virtual_memory() memory_total = f"{memory.total / (1024**3):.1f}GB" memory_used = f"{memory.used / (1024**3):.1f}GB" memory_percent = memory.percent # 磁盘信息 disk = psutil.disk_usage('/') disk_total = f"{disk.total / (1024**3):.1f}GB" disk_used = f"{disk.used / (1024**3):.1f}GB" disk_percent = disk.percent # 运行时长 boot_time = datetime.datetime.fromtimestamp(psutil.boot_time()) uptime_delta = datetime.datetime.now() - boot_time days = uptime_delta.days hours = uptime_delta.seconds // 3600 uptime = f"{days}天{hours}小时" return jsonify({ 'cpu_percent': cpu_percent, 'memory_total': memory_total, 'memory_used': memory_used, 'memory_percent': memory_percent, 'disk_total': disk_total, 'disk_used': disk_used, 'disk_percent': disk_percent, 'uptime': uptime }) # ==================== 任务统计和日志API ==================== @app.route('/yuyx/api/task/stats', methods=['GET']) @admin_required def get_task_stats_api(): """获取任务统计数据""" date_filter = request.args.get('date') # YYYY-MM-DD格式 stats = database.get_task_stats(date_filter) return jsonify(stats) @app.route('/yuyx/api/task/running', methods=['GET']) @admin_required def get_running_tasks_api(): """获取当前运行中和排队中的任务""" import time as time_mod current_time = time_mod.time() running = [] queuing = [] for account_id, info in task_status.items(): elapsed = int(current_time - info.get("start_time", current_time)) # 获取用户名 user = database.get_user_by_id(info.get("user_id")) user_username = user['username'] if user else 'N/A' # 获取进度信息 progress = info.get("progress", {"items": 0, "attachments": 0}) task_info = { "account_id": account_id, "user_id": info.get("user_id"), "user_username": user_username, "username": info.get("username"), "browse_type": info.get("browse_type"), "source": info.get("source", "manual"), "detail_status": info.get("detail_status", "未知"), "progress_items": progress.get("items", 0), "progress_attachments": progress.get("attachments", 0), "elapsed_seconds": elapsed, "elapsed_display": f"{elapsed // 60}分{elapsed % 60}秒" if elapsed >= 60 else f"{elapsed}秒" } if info.get("status") == "运行中": running.append(task_info) else: queuing.append(task_info) # 按开始时间排序 running.sort(key=lambda x: x["elapsed_seconds"], reverse=True) queuing.sort(key=lambda x: x["elapsed_seconds"], reverse=True) return jsonify({ "running": running, "queuing": queuing, "running_count": len(running), "queuing_count": len(queuing), "max_concurrent": max_concurrent_global }) @app.route('/yuyx/api/task/logs', methods=['GET']) @admin_required def get_task_logs_api(): """获取任务日志列表(支持分页和多种筛选)""" limit = int(request.args.get('limit', 20)) offset = int(request.args.get('offset', 0)) date_filter = request.args.get('date') # YYYY-MM-DD格式 status_filter = request.args.get('status') # success/failed source_filter = request.args.get('source') # manual/scheduled/immediate/resumed user_id_filter = request.args.get('user_id') # 用户ID account_filter = request.args.get('account') # 账号关键字 # 转换user_id为整数 if user_id_filter: try: user_id_filter = int(user_id_filter) except ValueError: user_id_filter = None result = database.get_task_logs( limit=limit, offset=offset, date_filter=date_filter, status_filter=status_filter, source_filter=source_filter, user_id_filter=user_id_filter, account_filter=account_filter ) return jsonify(result) @app.route('/yuyx/api/task/logs/clear', methods=['POST']) @admin_required def clear_old_task_logs_api(): """清理旧的任务日志""" data = request.json or {} days = data.get('days', 30) if not isinstance(days, int) or days < 1: return jsonify({"error": "天数必须是大于0的整数"}), 400 deleted_count = database.delete_old_task_logs(days) return jsonify({"message": f"已删除{days}天前的{deleted_count}条日志"}) @app.route('/yuyx/api/docker/restart', methods=['POST']) @admin_required def restart_docker_container(): """重启Docker容器""" import subprocess import os try: # 检查是否在Docker容器中运行 if not os.path.exists('/.dockerenv'): return jsonify({"error": "当前不在Docker容器中运行"}), 400 # 记录日志 app_logger.info("[系统] 管理员触发Docker容器重启") # 使用nohup在后台执行重启命令,避免阻塞 # 容器重启会导致当前进程终止,所以需要延迟执行 restart_script = """ import os import time # 延迟3秒让响应返回给客户端 time.sleep(3) # 退出Python进程,让Docker自动重启容器(restart: unless-stopped) os._exit(0) """ # 写入临时脚本 with open('/tmp/restart_container.py', 'w') as f: f.write(restart_script) # 在后台执行重启脚本 subprocess.Popen(['python', '/tmp/restart_container.py'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True) return jsonify({ "success": True, "message": "容器将在3秒后重启,请稍后刷新页面" }) except Exception as e: app_logger.error(f"[系统] Docker容器重启失败: {str(e)}") return jsonify({"error": f"重启失败: {str(e)}"}), 500 # ==================== 定时任务调度器 ==================== def run_scheduled_task(skip_weekday_check=False): """执行所有账号的浏览任务(可被手动调用,过滤重复账号) Args: skip_weekday_check: 是否跳过星期检查(立即执行时为True) """ try: from datetime import datetime import pytz config = database.get_system_config() browse_type = config.get('schedule_browse_type', '应读') # 检查今天是否在允许执行的星期列表中(立即执行时跳过此检查) if not skip_weekday_check: # 获取北京时间的星期几 (1=周一, 7=周日) beijing_tz = pytz.timezone('Asia/Shanghai') now_beijing = datetime.now(beijing_tz) current_weekday = now_beijing.isoweekday() # 1-7 # 获取配置的星期列表 schedule_weekdays = config.get('schedule_weekdays', '1,2,3,4,5,6,7') allowed_weekdays = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()] if current_weekday not in allowed_weekdays: weekday_names = ['', '周一', '周二', '周三', '周四', '周五', '周六', '周日'] print(f"[定时任务] 今天是{weekday_names[current_weekday]},不在执行日期内,跳过执行") return else: print(f"[立即执行] 跳过星期检查,强制执行任务") print(f"[定时任务] 开始执行 - 浏览类型: {browse_type}") # 获取所有已审核用户的所有账号 all_users = database.get_all_users() approved_users = [u for u in all_users if u['status'] == 'approved'] # 用于记录已执行的账号用户名,避免重复 executed_usernames = set() total_accounts = 0 skipped_duplicates = 0 executed_accounts = 0 for user in approved_users: user_id = user['id'] if user_id not in user_accounts: load_user_accounts(user_id) accounts = user_accounts.get(user_id, {}) for account_id, account in accounts.items(): total_accounts += 1 # 跳过正在运行的账号 if account.is_running: continue # 检查账号状态,跳过已暂停的账号 account_status_info = database.get_account_status(account_id) if account_status_info: status = account_status_info['status'] if 'status' in account_status_info.keys() else 'active' if status == 'suspended': fail_count = account_status_info['login_fail_count'] if 'login_fail_count' in account_status_info.keys() else 0 print(f"[定时任务] 跳过暂停账号: {account.username} (用户:{user['username']}) - 连续{fail_count}次密码错误,需修改密码") continue # 检查账号用户名是否已经执行过(重复账号过滤) if account.username in executed_usernames: skipped_duplicates += 1 print(f"[定时任务] 跳过重复账号: {account.username} (用户:{user['username']}) - 该账号已被其他用户执行") continue # 记录该账号用户名,避免后续重复执行 executed_usernames.add(account.username) print(f"[定时任务] 启动账号: {account.username} (用户:{user['username']})") # 启动任务 account.is_running = True account.should_stop = False account.status = "运行中" # 获取系统配置的截图开关 cfg = database.get_system_config() enable_screenshot_scheduled = cfg.get("enable_screenshot", 0) == 1 thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot_scheduled, 'scheduled'), daemon=True ) thread.start() active_tasks[account_id] = thread executed_accounts += 1 # 发送更新到用户 socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 间隔启动,避免瞬间并发过高 time.sleep(2) print(f"[定时任务] 执行完成 - 总账号数:{total_accounts}, 已执行:{executed_accounts}, 跳过重复:{skipped_duplicates}") except Exception as e: print(f"[定时任务] 执行出错: {str(e)}") logger.error(f"[定时任务] 执行异常: {str(e)}") def status_push_worker(): """后台线程:每秒推送运行中任务的状态更新""" while True: try: # 遍历所有运行中的任务状态 for account_id, status_info in list(task_status.items()): user_id = status_info.get('user_id') if user_id: # 获取账号对象 if user_id in user_accounts and account_id in user_accounts[user_id]: account = user_accounts[user_id][account_id] account_data = account.to_dict() # 推送账号状态更新 socketio.emit('account_update', account_data, room=f'user_{user_id}') # 同时推送详细进度事件(方便前端分别处理) progress = status_info.get('progress', {}) progress_data = { 'account_id': account_id, 'stage': status_info.get('detail_status', ''), 'total_items': account.total_items, 'browsed_items': progress.get('items', 0), 'total_attachments': account.total_attachments, 'viewed_attachments': progress.get('attachments', 0), 'start_time': status_info.get('start_time', 0), 'elapsed_seconds': account_data.get('elapsed_seconds', 0), 'elapsed_display': account_data.get('elapsed_display', '') } socketio.emit('task_progress', progress_data, room=f'user_{user_id}') time.sleep(1) # 每秒推送一次 except Exception as e: logger.debug(f"状态推送出错: {e}") time.sleep(1) def scheduled_task_worker(): """定时任务工作线程""" import schedule def cleanup_expired_captcha(): """清理过期验证码,防止内存泄漏""" try: current_time = time.time() expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < current_time] deleted_count = len(expired_keys) for k in expired_keys: del captcha_storage[k] if deleted_count > 0: print(f"[定时清理] 已清理 {deleted_count} 个过期验证码") except Exception as e: print(f"[定时清理] 清理验证码出错: {str(e)}") def cleanup_old_data(): """清理7天前的截图和日志""" try: print(f"[定时清理] 开始清理7天前的数据...") # 清理7天前的任务日志 deleted_logs = database.delete_old_task_logs(7) print(f"[定时清理] 已删除 {deleted_logs} 条任务日志") # 清理30天前的操作日志 deleted_operation_logs = database.clean_old_operation_logs(30) print(f"[定时清理] 已删除 {deleted_operation_logs} 条操作日志") # 清理7天前的截图 deleted_screenshots = 0 if os.path.exists(SCREENSHOTS_DIR): cutoff_time = time.time() - (7 * 24 * 60 * 60) # 7天前的时间戳 for filename in os.listdir(SCREENSHOTS_DIR): if filename.lower().endswith(('.png', '.jpg', '.jpeg')): filepath = os.path.join(SCREENSHOTS_DIR, filename) try: # 检查文件修改时间 if os.path.getmtime(filepath) < cutoff_time: os.remove(filepath) deleted_screenshots += 1 except Exception as e: print(f"[定时清理] 删除截图失败 {filename}: {str(e)}") print(f"[定时清理] 已删除 {deleted_screenshots} 个截图文件") print(f"[定时清理] 清理完成!") except Exception as e: print(f"[定时清理] 清理任务出错: {str(e)}") def check_user_schedules(): """检查并执行用户定时任务""" import json try: from datetime import datetime beijing_tz = pytz.timezone('Asia/Shanghai') now = datetime.now(beijing_tz) current_time = now.strftime('%H:%M') current_weekday = now.isoweekday() # 获取所有启用的用户定时任务 enabled_schedules = database.get_enabled_user_schedules() for schedule_config in enabled_schedules: # 检查时间是否匹配 if schedule_config['schedule_time'] != current_time: continue # 检查星期是否匹配 allowed_weekdays = [int(d) for d in schedule_config.get('weekdays', '1,2,3,4,5').split(',') if d.strip()] if current_weekday not in allowed_weekdays: continue # 检查今天是否已经执行过 last_run = schedule_config.get('last_run_at') if last_run: try: last_run_date = datetime.strptime(last_run, '%Y-%m-%d %H:%M:%S').date() if last_run_date == now.date(): continue # 今天已执行过 except: pass # 执行用户定时任务 user_id = schedule_config['user_id'] schedule_id = schedule_config['id'] browse_type = schedule_config.get('browse_type', '应读') enable_screenshot = schedule_config.get('enable_screenshot', 1) # 调试日志 print(f"[DEBUG] 定时任务 {schedule_config.get('name')}: enable_screenshot={enable_screenshot} (类型:{type(enable_screenshot).__name__})") try: account_ids = json.loads(schedule_config.get('account_ids', '[]') or '[]') except: account_ids = [] if not account_ids: continue print(f"[用户定时任务] 用户 {schedule_config.get('user_username', user_id)} 的任务 '{schedule_config.get('name', '')}' 开始执行") # 创建执行日志 import time as time_mod execution_start_time = time_mod.time() log_id = database.create_schedule_execution_log( schedule_id=schedule_id, user_id=user_id, schedule_name=schedule_config.get('name', '未命名任务') ) started_count = 0 for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: continue account = user_accounts[user_id][account_id] if account.is_running: continue account.is_running = True account.should_stop = False account.status = "排队中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'), daemon=True ) thread.start() active_tasks[account_id] = thread started_count += 1 socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 更新最后执行时间 database.update_schedule_last_run(schedule_id) # 更新执行日志 execution_duration = int(time_mod.time() - execution_start_time) database.update_schedule_execution_log( log_id, total_accounts=len(account_ids), success_accounts=started_count, failed_accounts=len(account_ids) - started_count, duration_seconds=execution_duration, status='completed' ) print(f"[用户定时任务] 已启动 {started_count} 个账号") except Exception as e: print(f"[用户定时任务] 检查出错: {str(e)}") import traceback traceback.print_exc() # 每分钟检查一次配置 def check_and_schedule(): config = database.get_system_config() # 清除旧的任务 schedule.clear() # 时区转换函数:将CST时间转换为UTC时间(容器使用UTC) def cst_to_utc_time(cst_time_str): """将CST时间字符串(HH:MM)转换为UTC时间字符串 Args: cst_time_str: CST时间字符串,格式为 HH:MM Returns: UTC时间字符串,格式为 HH:MM """ from datetime import datetime, timedelta # 解析CST时间 hour, minute = map(int, cst_time_str.split(':')) # CST是UTC+8,所以UTC时间 = CST时间 - 8小时 utc_hour = (hour - 8) % 24 return f"{utc_hour:02d}:{minute:02d}" # 始终添加每天凌晨3点(CST)的数据清理任务 cleanup_utc_time = cst_to_utc_time("03:00") schedule.every().day.at(cleanup_utc_time).do(cleanup_old_data) print(f"[定时任务] 已设置数据清理任务: 每天 CST 03:00 (UTC {cleanup_utc_time})") # 每小时清理过期验证码 schedule.every().hour.do(cleanup_expired_captcha) print(f"[定时任务] 已设置验证码清理任务: 每小时执行一次") # 如果启用了定时浏览任务,则添加 if config.get('schedule_enabled'): schedule_time_cst = config.get('schedule_time', '02:00') schedule_time_utc = cst_to_utc_time(schedule_time_cst) schedule.every().day.at(schedule_time_utc).do(run_scheduled_task) print(f"[定时任务] 已设置浏览任务: 每天 CST {schedule_time_cst} (UTC {schedule_time_utc})") # 初始检查 check_and_schedule() last_check = time.time() while True: try: # 执行待执行的任务 schedule.run_pending() # 每60秒重新检查一次配置 if time.time() - last_check > 60: check_and_schedule() check_user_schedules() # 检查用户定时任务 last_check = time.time() time.sleep(1) except Exception as e: print(f"[定时任务] 调度器出错: {str(e)}") time.sleep(5) # ========== 断点续传API ========== @app.route('/yuyx/api/checkpoint/paused') @admin_required def checkpoint_get_paused(): try: user_id = request.args.get('user_id', type=int) tasks = checkpoint_mgr.get_paused_tasks(user_id=user_id) return jsonify({'success': True, 'tasks': tasks}) except Exception as e: logger.error(f"获取暂停任务失败: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/yuyx/api/checkpoint//resume', methods=['POST']) @admin_required def checkpoint_resume(task_id): try: checkpoint = checkpoint_mgr.get_checkpoint(task_id) if not checkpoint: return jsonify({'success': False, 'message': '任务不存在'}), 404 if checkpoint['status'] != 'paused': return jsonify({'success': False, 'message': '任务未暂停'}), 400 if checkpoint_mgr.resume_task(task_id): import threading threading.Thread( target=run_task, args=(checkpoint['user_id'], checkpoint['account_id'], checkpoint['browse_type'], True, 'resumed'), daemon=True ).start() return jsonify({'success': True}) return jsonify({'success': False}), 500 except Exception as e: logger.error(f"恢复任务失败: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/yuyx/api/checkpoint//abandon', methods=['POST']) @admin_required def checkpoint_abandon(task_id): try: if checkpoint_mgr.abandon_task(task_id): return jsonify({'success': True}) return jsonify({'success': False}), 404 except Exception as e: return jsonify({'success': False, 'message': str(e)}), 500 # 初始化浏览器池(在后台线程中预热,不阻塞启动) # ==================== 用户定时任务API ==================== @app.route('/api/schedules', methods=['GET']) @login_required def get_user_schedules_api(): """获取当前用户的所有定时任务""" schedules = database.get_user_schedules(current_user.id) import json for s in schedules: try: s['account_ids'] = json.loads(s.get('account_ids', '[]') or '[]') except: s['account_ids'] = [] return jsonify(schedules) @app.route('/api/schedules', methods=['POST']) @login_required def create_user_schedule_api(): """创建用户定时任务""" data = request.json name = data.get('name', '我的定时任务') schedule_time = data.get('schedule_time', '08:00') weekdays = data.get('weekdays', '1,2,3,4,5') browse_type = data.get('browse_type', '应读') enable_screenshot = data.get('enable_screenshot', 1) account_ids = data.get('account_ids', []) import re if not re.match(r'^\d{2}:\d{2}$', schedule_time): return jsonify({"error": "时间格式不正确,应为 HH:MM"}), 400 schedule_id = database.create_user_schedule( user_id=current_user.id, name=name, schedule_time=schedule_time, weekdays=weekdays, browse_type=browse_type, enable_screenshot=enable_screenshot, account_ids=account_ids ) if schedule_id: return jsonify({"success": True, "id": schedule_id}) return jsonify({"error": "创建失败"}), 500 @app.route('/api/schedules/', methods=['GET']) @login_required def get_schedule_detail_api(schedule_id): """获取定时任务详情""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 import json try: schedule['account_ids'] = json.loads(schedule.get('account_ids', '[]') or '[]') except: schedule['account_ids'] = [] return jsonify(schedule) @app.route('/api/schedules/', methods=['PUT']) @login_required def update_schedule_api(schedule_id): """更新定时任务""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 data = request.json allowed_fields = ['name', 'schedule_time', 'weekdays', 'browse_type', 'enable_screenshot', 'account_ids', 'enabled'] update_data = {k: v for k, v in data.items() if k in allowed_fields} if 'schedule_time' in update_data: import re if not re.match(r'^\d{2}:\d{2}$', update_data['schedule_time']): return jsonify({"error": "时间格式不正确"}), 400 success = database.update_user_schedule(schedule_id, **update_data) if success: return jsonify({"success": True}) return jsonify({"error": "更新失败"}), 500 @app.route('/api/schedules/', methods=['DELETE']) @login_required def delete_schedule_api(schedule_id): """删除定时任务""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 success = database.delete_user_schedule(schedule_id) if success: return jsonify({"success": True}) return jsonify({"error": "删除失败"}), 500 @app.route('/api/schedules//toggle', methods=['POST']) @login_required def toggle_schedule_api(schedule_id): """启用/禁用定时任务""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 data = request.json enabled = data.get('enabled', not schedule['enabled']) success = database.toggle_user_schedule(schedule_id, enabled) if success: return jsonify({"success": True, "enabled": enabled}) return jsonify({"error": "操作失败"}), 500 @app.route('/api/schedules//run', methods=['POST']) @login_required def run_schedule_now_api(schedule_id): """立即执行定时任务""" import json schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 try: account_ids = json.loads(schedule.get('account_ids', '[]') or '[]') except: account_ids = [] if not account_ids: return jsonify({"error": "没有配置账号"}), 400 user_id = current_user.id browse_type = schedule['browse_type'] enable_screenshot = schedule['enable_screenshot'] started = [] for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: continue account = user_accounts[user_id][account_id] if account.is_running: continue account.is_running = True account.should_stop = False account.status = "排队中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'), daemon=True ) thread.start() active_tasks[account_id] = thread started.append(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') database.update_schedule_last_run(schedule_id) return jsonify({ "success": True, "started_count": len(started), "message": f"已启动 {len(started)} 个账号" }) # ==================== 定时任务执行日志API ==================== @app.route('/api/schedules//logs', methods=['GET']) @login_required def get_schedule_logs_api(schedule_id): """获取定时任务执行日志""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 limit = request.args.get('limit', 10, type=int) logs = database.get_schedule_execution_logs(schedule_id, limit) return jsonify(logs) # ==================== 批量操作API ==================== @app.route('/api/accounts/batch/start', methods=['POST']) @login_required def batch_start_accounts(): """批量启动账号""" user_id = current_user.id data = request.json account_ids = data.get('account_ids', []) browse_type = data.get('browse_type', '应读') enable_screenshot = data.get('enable_screenshot', True) if not account_ids: return jsonify({"error": "请选择要启动的账号"}), 400 started = [] failed = [] for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: failed.append({'id': account_id, 'reason': '账号不存在'}) continue account = user_accounts[user_id][account_id] if account.is_running: failed.append({'id': account_id, 'reason': '已在运行中'}) continue account.is_running = True account.should_stop = False account.status = "排队中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'batch'), daemon=True ) thread.start() active_tasks[account_id] = thread started.append(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({ "success": True, "started_count": len(started), "failed_count": len(failed), "started": started, "failed": failed }) @app.route('/api/accounts/batch/stop', methods=['POST']) @login_required def batch_stop_accounts(): """批量停止账号""" user_id = current_user.id data = request.json account_ids = data.get('account_ids', []) if not account_ids: return jsonify({"error": "请选择要停止的账号"}), 400 stopped = [] for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: continue account = user_accounts[user_id][account_id] if not account.is_running: continue account.should_stop = True account.status = "正在停止" stopped.append(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({ "success": True, "stopped_count": len(stopped), "stopped": stopped }) if __name__ == '__main__': print("=" * 60) print("知识管理平台自动化工具 - 多用户版") print("=" * 60) # 初始化数据库 database.init_database() checkpoint_mgr = get_checkpoint_manager() print("✓ 任务断点管理器已初始化") # 加载系统配置(并发设置) try: system_config = database.get_system_config() if system_config: # 使用globals()修改全局变量 globals()['max_concurrent_global'] = system_config.get('max_concurrent_global', 2) globals()['max_concurrent_per_account'] = system_config.get('max_concurrent_per_account', 1) # 重新创建信号量 globals()['global_semaphore'] = threading.Semaphore(globals()['max_concurrent_global']) print(f"✓ 已加载并发配置: 全局={globals()['max_concurrent_global']}, 单账号={globals()['max_concurrent_per_account']}") except Exception as e: print(f"警告: 加载并发配置失败,使用默认值: {e}") # 主线程初始化浏览器(Playwright不支持跨线程) print("\n正在初始化浏览器管理器...") init_browser_manager() # 启动定时任务调度器 print("\n启动定时任务调度器...") scheduler_thread = threading.Thread(target=scheduled_task_worker, daemon=True) scheduler_thread.start() print("✓ 定时任务调度器已启动") # 启动状态推送线程(每秒推送运行中任务状态) status_thread = threading.Thread(target=status_push_worker, daemon=True) status_thread.start() print("✓ 状态推送线程已启动(1秒/次)") # 启动Web服务器 print("\n服务器启动中...") print(f"用户访问地址: http://{config.SERVER_HOST}:{config.SERVER_PORT}") print(f"后台管理地址: http://{config.SERVER_HOST}:{config.SERVER_PORT}/yuyx") print("默认管理员: admin/admin") print("=" * 60 + "\n") # 同步初始化浏览器池(必须在socketio.run之前,否则eventlet会导致asyncio冲突) try: system_cfg = database.get_system_config() pool_size = system_cfg.get('max_screenshot_concurrent', 3) if system_cfg else 3 print(f"正在预热 {pool_size} 个浏览器实例(截图加速)...") init_browser_worker_pool(pool_size=pool_size) print("✓ 浏览器池初始化完成") except Exception as e: print(f"警告: 浏览器池初始化失败: {e}") socketio.run(app, host=config.SERVER_HOST, port=config.SERVER_PORT, debug=config.DEBUG, allow_unsafe_werkzeug=True)