#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 知识管理平台自动化工具 - 多用户版本 支持用户注册登录、后台管理、数据隔离 """ # 设置时区为中国标准时间(CST, UTC+8) import os os.environ['TZ'] = 'Asia/Shanghai' try: import time time.tzset() except AttributeError: pass # Windows系统不支持tzset() import pytz from datetime import datetime from flask import Flask, render_template, request, jsonify, send_from_directory, redirect, url_for, session from flask_socketio import SocketIO, emit, join_room, leave_room from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user import threading import time import json import os import secrets # 安全修复: 使用加密安全的随机数生成 from datetime import datetime, timedelta, timezone from functools import wraps # 导入数据库模块和核心模块 import database import requests from browser_pool import get_browser_pool, init_browser_pool from browser_pool_worker import get_browser_worker_pool, init_browser_worker_pool, shutdown_browser_worker_pool from playwright_automation import PlaywrightBrowserManager, PlaywrightAutomation, BrowseResult from api_browser import APIBrowser, APIBrowseResult from browser_installer import check_and_install_browser # ========== 优化模块导入 ========== from app_config import get_config from app_logger import init_logging, get_logger, audit_logger from app_security import ( ip_rate_limiter, require_ip_not_locked, validate_username, validate_password, validate_email, is_safe_path, sanitize_filename, get_client_ip ) from app_utils import verify_and_consume_captcha from crypto_utils import encrypt_password as encrypt_account_password # ========== 时区辅助函数 (Bug #2 fix) ========== BEIJING_TZ = pytz.timezone('Asia/Shanghai') def get_beijing_now(): """获取北京时间的当前时间(统一时区处理)""" return datetime.now(BEIJING_TZ) # ========== 初始化配置 ========== config = get_config() app = Flask(__name__) app.config.from_object(config) # 确保SECRET_KEY已设置 if not app.config.get('SECRET_KEY'): raise RuntimeError("SECRET_KEY未配置,请检查app_config.py") # 安全修复:从环境变量获取允许的CORS源,不再使用"*" cors_origins = os.environ.get('CORS_ALLOWED_ORIGINS', '').strip() if cors_origins: cors_allowed = [origin.strip() for origin in cors_origins.split(',') if origin.strip()] else: # 默认只允许同源,生产环��应该明确配置 cors_allowed = [] socketio = SocketIO( app, cors_allowed_origins=cors_allowed if cors_allowed else None, # None表示只允许同源 async_mode='threading', # 明确指定async模式 ping_timeout=60, # ping超时60秒 ping_interval=25, # 每25秒ping一次 logger=False, # 禁用socketio debug日志 engineio_logger=False ) # ========== 初始化日志系统 ========== init_logging(log_level=config.LOG_LEVEL, log_file=config.LOG_FILE) logger = get_logger('app') logger.info("="*60) logger.info("知识管理平台自动化工具 - 多用户版") logger.info("="*60) logger.info(f"Session配置: COOKIE_NAME={app.config.get('SESSION_COOKIE_NAME', 'session')}, " f"SAMESITE={app.config.get('SESSION_COOKIE_SAMESITE', 'None')}, " f"HTTPONLY={app.config.get('SESSION_COOKIE_HTTPONLY', 'None')}, " f"SECURE={app.config.get('SESSION_COOKIE_SECURE', 'None')}, " f"PATH={app.config.get('SESSION_COOKIE_PATH', 'None')}") # Flask-Login 配置 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login_page' @login_manager.unauthorized_handler def unauthorized(): """处理未授权访问 - API请求返回JSON,页面请求重定向""" if request.path.startswith('/api/') or request.path.startswith('/yuyx/api/'): return jsonify({"error": "请先登录", "code": "unauthorized"}), 401 return redirect(url_for('login_page', next=request.url)) # 截图目录 SCREENSHOTS_DIR = config.SCREENSHOTS_DIR os.makedirs(SCREENSHOTS_DIR, exist_ok=True) # 全局变量 browser_manager = None user_accounts = {} # {user_id: {account_id: Account对象}} active_tasks = {} # {account_id: Thread对象} task_status = {} # {account_id: {"user_id": x, "username": y, "status": "排队中/运行中", "detail_status": "具体状态", "browse_type": z, "start_time": t, "source": s, "progress": {...}, "is_vip": bool}} # 线程安全锁 - 修复Bug #12 active_tasks_lock = threading.Lock() # 保护 active_tasks 字典 task_status_lock = threading.Lock() # 保护 task_status 字典 user_accounts_lock = threading.Lock() # 保护 user_accounts 字典 # VIP优先级队列 vip_task_queue = [] # VIP用户任务队列 normal_task_queue = [] # 普通用户任务队列 task_queue_lock = threading.Lock() log_cache = {} # {user_id: [logs]} 每个用户独立的日志缓存 log_cache_lock = threading.Lock() # 保护 log_cache 字典 log_cache_total_count = 0 # 全局日志总数,防止无限增长 # 批次任务截图收集 - 用于定时任务打包发送邮件 batch_task_screenshots = {} # {batch_id: {'user_id': x, 'browse_type': y, 'screenshots': [{'account_name': a, 'path': p, 'items': n, 'attachments': m}], 'total_accounts': n, 'completed': n}} batch_task_lock = threading.Lock() # 随机延迟任务存储 pending_random_schedules = {} pending_random_lock = threading.Lock() # 日志缓存限制 MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条 MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏 # 安全修复: 内存清理配置 USER_ACCOUNTS_EXPIRE_SECONDS = 3600 # 用户账号缓存1小时过期 user_accounts_last_access = {} # {user_id: last_access_timestamp} # 并发控制:每个用户同时最多运行1个账号(避免内存不足) # 验证码存储:{session_id: {"code": "1234", "expire_time": timestamp, "failed_attempts": 0}} captcha_storage = {} captcha_storage_lock = threading.Lock() # 安全修复: 保护captcha_storage的线程安全 # IP限流存储:{ip: {"attempts": count, "lock_until": timestamp, "first_attempt": timestamp}} ip_rate_limit = {} ip_rate_limit_lock = threading.Lock() # Bug fix: 保护 ip_rate_limit 字典的线程安全 # 限流配置 - 从 config 读取,避免硬编码 MAX_CAPTCHA_ATTEMPTS = config.MAX_CAPTCHA_ATTEMPTS MAX_IP_ATTEMPTS_PER_HOUR = config.MAX_IP_ATTEMPTS_PER_HOUR IP_LOCK_DURATION = config.IP_LOCK_DURATION # 全局限制:整个系统同时最多运行N个账号(线程本地架构,每个线程独立浏览器,内存占用约200MB/浏览器) max_concurrent_per_account = config.MAX_CONCURRENT_PER_ACCOUNT max_concurrent_global = config.MAX_CONCURRENT_GLOBAL user_semaphores = {} # {user_id: Semaphore} global_semaphore = threading.Semaphore(max_concurrent_global) # 截图专用信号量:限制同时进行的截图任务数量(避免资源竞争) # 信号量将在首次使用时初始化,并支持动态更新 screenshot_semaphore = None screenshot_semaphore_lock = threading.Lock() screenshot_semaphore_size = 0 # 记录当前信号量大小 def get_screenshot_semaphore(): """获取截图信号量(懒加载,根据配置动态创建,支持配置更新)""" global screenshot_semaphore, screenshot_semaphore_size with screenshot_semaphore_lock: config = database.get_system_config() max_concurrent = config.get('max_screenshot_concurrent', 3) # 安全修复:支持配置变更后重新创建信号量 if screenshot_semaphore is None or screenshot_semaphore_size != max_concurrent: screenshot_semaphore = threading.Semaphore(max_concurrent) screenshot_semaphore_size = max_concurrent print(f"[截图信号量] 已更新为 {max_concurrent} 并发") return screenshot_semaphore, max_concurrent # ==================== 内存清理函数 ==================== def cleanup_expired_data(): """定期清理过期数据,防止内存泄漏 清理内容: - 过期的验证码 - 过期的IP限流记录 - 长时间未访问的用户账号缓存 - 完成的任务状态 """ global log_cache_total_count current_time = time.time() # 1. 清理过期验证码 with captcha_storage_lock: expired_captchas = [k for k, v in captcha_storage.items() if v.get("expire_time", 0) < current_time] for k in expired_captchas: del captcha_storage[k] if expired_captchas: logger.debug(f"已清理 {len(expired_captchas)} 个过期验证码") # 2. 清理过期IP限流记录 with ip_rate_limit_lock: expired_ips = [] for ip, data in ip_rate_limit.items(): # 如果锁定已过期且首次尝试超过1小时,则清理 lock_until = data.get("lock_until", 0) first_attempt = data.get("first_attempt", 0) if lock_until < current_time and (current_time - first_attempt) > 3600: expired_ips.append(ip) for ip in expired_ips: del ip_rate_limit[ip] if expired_ips: logger.debug(f"已清理 {len(expired_ips)} 个过期IP限流记录") # 3. 清理长时间未访问的用户账号缓存 with user_accounts_lock: expired_users = [] for user_id, last_access in list(user_accounts_last_access.items()): if (current_time - last_access) > USER_ACCOUNTS_EXPIRE_SECONDS: # 检查该用户是否有活跃任务 has_active_task = False with task_status_lock: for task_data in task_status.values(): if task_data.get("user_id") == user_id: has_active_task = True break if not has_active_task and user_id in user_accounts: expired_users.append(user_id) for user_id in expired_users: del user_accounts[user_id] del user_accounts_last_access[user_id] if expired_users: logger.debug(f"已清理 {len(expired_users)} 个过期用户账号缓存") # 4. 清理已完成任务的状态(保留最近10分钟的) with task_status_lock: completed_tasks = [] for account_id, status_data in list(task_status.items()): if status_data.get("status") in ["已完成", "失败", "已停止"]: start_time = status_data.get("start_time", 0) if (current_time - start_time) > 600: # 10分钟 completed_tasks.append(account_id) for account_id in completed_tasks: del task_status[account_id] if completed_tasks: logger.debug(f"已清理 {len(completed_tasks)} 个已完成任务状态") def start_cleanup_scheduler(): """启动定期清理调度器""" def cleanup_loop(): while True: try: time.sleep(300) # 每5分钟执行一次清理 cleanup_expired_data() except Exception as e: logger.error(f"清理任务执行失败: {e}") cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="cleanup-scheduler") cleanup_thread.start() logger.info("内存清理调度器已启动") class User(UserMixin): """Flask-Login 用户类""" def __init__(self, user_id): self.id = user_id class Admin(UserMixin): """管理员类""" def __init__(self, admin_id): self.id = admin_id self.is_admin = True class Account: """账号类 安全注意事项: - 密码以私有属性存储,避免意外泄露 - __repr__不包含密码,防止日志意外记录 - 生产环境应考虑使用加密存储(如Fernet加密) """ __slots__ = ['id', 'user_id', 'username', '_password', 'remember', 'remark', 'status', 'is_running', 'should_stop', 'total_items', 'total_attachments', 'automation', 'last_browse_type', 'proxy_config'] def __init__(self, account_id, user_id, username, password, remember=True, remark=''): self.id = account_id self.user_id = user_id self.username = username self._password = password # 安全修复:使用私有属性存储密码 self.remember = remember self.remark = remark self.status = "未开始" self.is_running = False self.should_stop = False self.total_items = 0 self.total_attachments = 0 self.automation = None self.last_browse_type = "注册前未读" self.proxy_config = None # 保存代理配置,浏览和截图共用 @property def password(self): """获取密码(仅供自动化登录使用)""" return self._password def __repr__(self): """安全的字符串表示,不包含密码""" return f"Account(id={self.id}, username={self.username}, status={self.status})" def to_dict(self): result = { "id": self.id, "username": self.username, "status": self.status, "remark": self.remark, "total_items": self.total_items, "total_attachments": self.total_attachments, "is_running": self.is_running } # 添加详细进度信息(如果有) if self.id in task_status: ts = task_status[self.id] progress = ts.get('progress', {}) result['detail_status'] = ts.get('detail_status', '') result['progress_items'] = progress.get('items', 0) result['progress_attachments'] = progress.get('attachments', 0) result['start_time'] = ts.get('start_time', 0) # 计算运行时长 if ts.get('start_time'): import time elapsed = int(time.time() - ts['start_time']) result['elapsed_seconds'] = elapsed mins, secs = divmod(elapsed, 60) result['elapsed_display'] = f"{mins}分{secs}秒" else: # 非运行状态下,根据status设置detail_status status_map = { '已完成': '任务完成', '截图中': '正在截图', '浏览完成': '浏览完成', '登录失败': '登录失败', '已暂停': '任务已暂停' } for key, val in status_map.items(): if key in self.status: result['detail_status'] = val break return result # ========== 线程安全辅助函数 - 修复Bug #12 ========== def safe_set_task(account_id, thread): """线程安全地设置任务""" with active_tasks_lock: active_tasks[account_id] = thread def safe_get_task(account_id): """线程安全地获取任务""" with active_tasks_lock: return active_tasks.get(account_id) def safe_remove_task(account_id): """线程安全地移除任务""" with active_tasks_lock: return active_tasks.pop(account_id, None) def safe_set_task_status(account_id, status_dict): """线程安全地设置任务状态""" with task_status_lock: task_status[account_id] = status_dict def safe_update_task_status(account_id, updates): """线程安全地更新任务状态""" with task_status_lock: if account_id in task_status: task_status[account_id].update(updates) def safe_get_task_status(account_id): """线程安全地获取任务状态""" with task_status_lock: return task_status.get(account_id, {}).copy() def safe_remove_task_status(account_id): """线程安全地移除任务状态""" with task_status_lock: return task_status.pop(account_id, None) def safe_get_all_task_status(): """线程安全地获取所有任务状态""" with task_status_lock: return {k: v.copy() for k, v in task_status.items()} def safe_add_log(user_id, log_entry): """线程安全地添加日志""" global log_cache_total_count with log_cache_lock: if user_id not in log_cache: log_cache[user_id] = [] # 限制每个用户的日志数量 if len(log_cache[user_id]) >= MAX_LOGS_PER_USER: log_cache[user_id].pop(0) log_cache_total_count = max(0, log_cache_total_count - 1) log_cache[user_id].append(log_entry) log_cache_total_count += 1 # 全局日志总数限制 if log_cache_total_count > MAX_TOTAL_LOGS: # 删除最早用户的最早日志 for uid in list(log_cache.keys()): if log_cache[uid]: log_cache[uid].pop(0) log_cache_total_count -= 1 break @login_manager.user_loader def load_user(user_id): """Flask-Login 用户加载""" user = database.get_user_by_id(int(user_id)) if user: return User(user['id']) return None def admin_required(f): """管理员权限装饰器""" @wraps(f) def decorated_function(*args, **kwargs): # 安全修复:不再记录完整的session和cookies内容,防止敏感信息泄露 logger.debug(f"[admin_required] 检查会话,admin_id存在: {'admin_id' in session}") if 'admin_id' not in session: logger.warning(f"[admin_required] 拒绝访问 {request.path} - session中无admin_id") return jsonify({"error": "需要管理员权限"}), 403 logger.info(f"[admin_required] 管理员 {session.get('admin_username')} 访问 {request.path}") return f(*args, **kwargs) return decorated_function def log_to_client(message, user_id=None, account_id=None): """发送日志到Web客户端(用户隔离)""" from app_security import escape_html timestamp = get_beijing_now().strftime('%H:%M:%S') # 安全修复:转义HTML特殊字符,防止XSS攻击 log_data = { 'timestamp': timestamp, 'message': escape_html(str(message)) if message else '', 'account_id': account_id } # 如果指定了user_id,则缓存到该用户的日志 if user_id: # 安全修复:使用锁保护日志缓存操作,防止竞态条件 global log_cache_total_count with log_cache_lock: if user_id not in log_cache: log_cache[user_id] = [] log_cache[user_id].append(log_data) log_cache_total_count += 1 # 单用户限制 if len(log_cache[user_id]) > MAX_LOGS_PER_USER: log_cache[user_id].pop(0) log_cache_total_count -= 1 # 全局限制 - 如果超过总数限制,清理日志最多的用户 while log_cache_total_count > MAX_TOTAL_LOGS: if log_cache: max_user = max(log_cache.keys(), key=lambda u: len(log_cache[u])) if log_cache[max_user]: log_cache[max_user].pop(0) log_cache_total_count -= 1 else: break else: break # 发送到该用户的room(在锁外执行,避免死锁) socketio.emit('log', log_data, room=f'user_{user_id}') # 控制台日志:添加账号短标识便于区分 if account_id: # 显示账号ID前4位作为标识 short_id = account_id[:4] if len(account_id) >= 4 else account_id print(f"[{timestamp}] U{user_id}:{short_id} | {message}") else: print(f"[{timestamp}] U{user_id} | {message}") def validate_ip_port(ip_port_str): """验证IP:PORT格式是否有效 Bug fix: 验证IP范围(0-255)和端口范围(1-65535) Args: ip_port_str: 格式为 "IP:PORT" 的字符串 Returns: bool: 是否有效 """ import re pattern = re.compile(r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3}):(\d{1,5})$') match = pattern.match(ip_port_str) if not match: return False # 验证IP每个部分在0-255范围内 for i in range(1, 5): octet = int(match.group(i)) if octet < 0 or octet > 255: return False # 验证端口在1-65535范围内 port = int(match.group(5)) if port < 1 or port > 65535: return False return True def get_proxy_from_api(api_url, max_retries=3): """从API获取代理IP(支持重试) Args: api_url: 代理API地址 max_retries: 最大重试次数 Returns: 代理服务器地址(格式: http://IP:PORT)或 None """ import re # IP:PORT 格式正则(基础格式检查) ip_port_pattern = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$') for attempt in range(max_retries): try: response = requests.get(api_url, timeout=10) if response.status_code == 200: text = response.text.strip() # 尝试解析JSON响应 try: import json data = json.loads(text) # 检查是否是错误响应 if isinstance(data, dict): if data.get('status') != 200 and data.get('status') != 0: error_msg = data.get('msg', data.get('message', '未知错误')) print(f"✗ 代理API返回错误: {error_msg} (尝试 {attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(1) continue # 尝试从JSON中获取IP ip_port = data.get('ip') or data.get('proxy') or data.get('data') if ip_port: text = str(ip_port).strip() except (json.JSONDecodeError, ValueError): # 不是JSON,继续使用原始文本 pass # 验证IP:PORT格式(基础格式检查 + 范围验证) if ip_port_pattern.match(text) and validate_ip_port(text): proxy_server = f"http://{text}" print(f"✓ 获取代理成功: {proxy_server} (尝试 {attempt + 1}/{max_retries})") return proxy_server else: print(f"✗ 代理格式或范围无效: {text[:50]} (尝试 {attempt + 1}/{max_retries})") else: print(f"✗ 获取代理失败: HTTP {response.status_code} (尝试 {attempt + 1}/{max_retries})") except Exception as e: print(f"✗ 获取代理异常: {str(e)} (尝试 {attempt + 1}/{max_retries})") if attempt < max_retries - 1: time.sleep(1) print(f"✗ 获取代理失败,已重试 {max_retries} 次,将不使用代理继续") return None def init_browser_manager(): """初始化浏览器管理器""" global browser_manager if browser_manager is None: print("正在初始化Playwright浏览器管理器...") if not check_and_install_browser(log_callback=lambda msg, account_id=None: print(msg)): print("浏览器环境检查失败!") return False browser_manager = PlaywrightBrowserManager( headless=True, log_callback=lambda msg, account_id=None: print(msg) ) try: # 不再需要initialize(),每个账号会创建独立浏览器 print("Playwright浏览器管理器创建成功!") return True except Exception as e: print(f"Playwright初始化失败: {str(e)}") return False return True # ==================== 前端路由 ==================== @app.route('/') def index(): """主页 - 重定向到登录或应用""" if current_user.is_authenticated: return redirect(url_for('app_page')) return redirect(url_for('login_page')) @app.route('/login') def login_page(): """登录页面""" return render_template('login.html') @app.route('/register') def register_page(): """注册页面""" return render_template('register.html') @app.route('/app') @login_required def app_page(): """主应用页面""" return render_template('index.html') @app.route('/yuyx') def admin_login_page(): """后台登录页面""" if 'admin_id' in session: return redirect(url_for('admin_page')) return render_template('admin_login.html') @app.route('/yuyx/admin') @admin_required def admin_page(): """后台管理页面""" return render_template('admin.html') @app.route('/yuyx/vip') @admin_required def vip_admin_page(): """VIP管理页面""" return render_template('vip_admin.html') # ==================== 用户认证API ==================== @app.route('/api/register', methods=['POST']) @require_ip_not_locked # IP限流保护 def register(): """用户注册""" data = request.json username = data.get('username', '').strip() password = data.get('password', '').strip() email = data.get('email', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() if not username or not password: return jsonify({"error": "用户名和密码不能为空"}), 400 # 获取客户端IP(用于IP限流检查) client_ip = get_client_ip() # 检查IP限流 allowed, error_msg = check_ip_rate_limit(client_ip) if not allowed: return jsonify({"error": error_msg}), 429 # 验证验证码 success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage, MAX_CAPTCHA_ATTEMPTS) if not success: # 验证失败,记录IP失败尝试(注册特有的IP限流逻辑) is_locked = record_failed_captcha(client_ip) if is_locked: return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429 return jsonify({"error": message}), 400 # 检查邮箱验证是否启用 email_settings = email_service.get_email_settings() email_verify_enabled = email_settings.get('register_verify_enabled', False) and email_settings.get('enabled', False) # 如果启用了邮箱验证,邮箱必填 if email_verify_enabled and not email: return jsonify({"error": "启用邮箱验证后,邮箱为必填项"}), 400 # 简单邮箱格式验证 if email and '@' not in email: return jsonify({"error": "邮箱格式不正确"}), 400 # 获取自动审核配置 system_config = database.get_system_config() auto_approve_enabled = system_config.get('auto_approve_enabled', 0) == 1 auto_approve_hourly_limit = system_config.get('auto_approve_hourly_limit', 10) auto_approve_vip_days = system_config.get('auto_approve_vip_days', 7) # 检查每小时注册限制(同时适用于自动审核和邮箱验证) if auto_approve_enabled or email_verify_enabled: hourly_count = database.get_hourly_registration_count() if hourly_count >= auto_approve_hourly_limit: return jsonify({"error": f"注册人数过多,请稍后再试(每小时限制{auto_approve_hourly_limit}人)"}), 429 user_id = database.create_user(username, password, email) if user_id: # 优先级:邮箱验证 > 自动审核 > 手动审核 if email_verify_enabled and email: # 发送验证邮件 result = email_service.send_register_verification_email( email=email, username=username, user_id=user_id ) if result['success']: return jsonify({ "success": True, "message": "注册成功!验证邮件已发送,请查收邮箱并点击链接完成验证", "need_verify": True }) else: # 邮件发送失败,但用户已创建,返回提示 logger.error(f"注册验证邮件发送失败: {result['error']}") return jsonify({ "success": True, "message": f"注册成功,但验证邮件发送失败({result['error']})。请稍后在登录页面重新发送验证邮件", "need_verify": True }) elif auto_approve_enabled: # 自动审核通过 database.approve_user(user_id) # 赠送VIP天数 if auto_approve_vip_days > 0: database.set_user_vip(user_id, auto_approve_vip_days) return jsonify({"success": True, "message": f"注册成功!已自动审核通过,赠送{auto_approve_vip_days}天VIP"}) else: return jsonify({"success": True, "message": "注册成功!已自动审核通过"}) else: return jsonify({"success": True, "message": "注册成功,请等待管理员审核"}) else: return jsonify({"error": "用户名已存在"}), 400 @app.route('/api/verify-email/') def verify_email(token): """验证邮箱 - 用户点击邮件中的链接""" result = email_service.verify_email_token(token, email_service.EMAIL_TYPE_REGISTER) if result: user_id = result['user_id'] email = result['email'] # 验证成功,激活用户 database.approve_user(user_id) # 获取自动审核配置,检查是否赠送VIP system_config = database.get_system_config() auto_approve_vip_days = system_config.get('auto_approve_vip_days', 7) if auto_approve_vip_days > 0: database.set_user_vip(user_id, auto_approve_vip_days) logger.info(f"用户邮箱验���成功: user_id={user_id}, email={email}") return render_template('verify_success.html') else: logger.warning(f"邮箱验证失败: token={token[:20]}...") return render_template('verify_failed.html', error_message="验证链接无效或已过期,请重新注册或申请重发验证邮件") @app.route('/api/resend-verify-email', methods=['POST']) @require_ip_not_locked def resend_verify_email(): """重发验证邮件""" data = request.json email = data.get('email', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() if not email: return jsonify({"error": "请输入邮箱"}), 400 # 获取客户端IP(用于IP限流检查) client_ip = get_client_ip() # 检查IP限流 allowed, error_msg = check_ip_rate_limit(client_ip) if not allowed: return jsonify({"error": error_msg}), 429 # 验证验证码 success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage, MAX_CAPTCHA_ATTEMPTS) if not success: is_locked = record_failed_captcha(client_ip) if is_locked: return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429 return jsonify({"error": message}), 400 # 查找待验证的用户 user = database.get_user_by_email(email) if not user: return jsonify({"error": "该邮箱未注册"}), 404 if user['status'] == 'approved': return jsonify({"error": "该账号已验证通过,请直接登录"}), 400 # 发送验证邮件 result = email_service.resend_register_verification_email( user_id=user['id'], email=email, username=user['username'] ) if result['success']: return jsonify({"success": True, "message": "验证邮件已重新发送,请查收"}) else: return jsonify({"error": result['error']}), 500 @app.route('/api/email/verify-status') def get_email_verify_status(): """获取邮箱验证功能状态(公开API)""" try: settings = email_service.get_email_settings() return jsonify({ 'email_enabled': settings.get('enabled', False), 'register_verify_enabled': settings.get('register_verify_enabled', False) and settings.get('enabled', False) }) except Exception as e: return jsonify({ 'email_enabled': False, 'register_verify_enabled': False }) # ==================== 密码重置(邮件方式)API ==================== @app.route('/api/forgot-password', methods=['POST']) @require_ip_not_locked def forgot_password(): """发送密码重置邮件""" data = request.json email = data.get('email', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() if not email: return jsonify({"error": "请输入邮箱"}), 400 # 获取客户端IP client_ip = get_client_ip() # 检查IP限流 allowed, error_msg = check_ip_rate_limit(client_ip) if not allowed: return jsonify({"error": error_msg}), 429 # 验证验证码 success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage, MAX_CAPTCHA_ATTEMPTS) if not success: is_locked = record_failed_captcha(client_ip) if is_locked: return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429 return jsonify({"error": message}), 400 # 检查邮件功能是否启用 email_settings = email_service.get_email_settings() if not email_settings.get('enabled', False): return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400 # 查找用户(防止用户枚举,统一返回成功消息) user = database.get_user_by_email(email) if user and user.get('status') == 'approved': # 发送重置邮件 result = email_service.send_password_reset_email( email=email, username=user['username'], user_id=user['id'] ) if not result['success']: logger.error(f"密码重置邮件发送失败: {result['error']}") # 即使失败也返回统一消息,防止信息泄露 # 统一返回成功消息 return jsonify({ "success": True, "message": "如果该邮箱已注册,您将收到密码重置邮件" }) @app.route('/reset-password/') def reset_password_page(token): """密码重置页面""" result = email_service.verify_password_reset_token(token) if result: return render_template('reset_password.html', token=token, valid=True, error_message='') else: return render_template('reset_password.html', token=token, valid=False, error_message='重置链接无效或已过期,请重新申请密码重置') @app.route('/api/reset-password-confirm', methods=['POST']) def reset_password_confirm(): """确认密码重置""" data = request.json token = data.get('token', '').strip() new_password = data.get('new_password', '').strip() if not token or not new_password: return jsonify({"error": "参数不完整"}), 400 # 验证密码强度 is_valid, error_msg = validate_password(new_password) if not is_valid: return jsonify({"error": error_msg}), 400 # 验证并消费token result = email_service.confirm_password_reset(token) if not result: return jsonify({"error": "重置链接无效或已过期"}), 400 # 更新用户密码 user_id = result['user_id'] if database.admin_reset_user_password(user_id, new_password): logger.info(f"用户密码重置成功: user_id={user_id}") return jsonify({"success": True, "message": "密码重置成功"}) else: return jsonify({"error": "密码重置失败"}), 500 # ==================== 验证码API ==================== import random from task_checkpoint import get_checkpoint_manager, TaskStage checkpoint_mgr = None # 任务断点管理器 def check_ip_rate_limit(ip_address): """检查IP是否被限流 Bug fix: 使用线程锁保护 ip_rate_limit 字典操作,防止竞态条件 """ current_time = time.time() with ip_rate_limit_lock: # 安全修复:修正过期IP清理逻辑 # 原问题:first_attempt不存在时默认使用current_time,导致永远不会被清理 expired_ips = [] for ip, data in ip_rate_limit.items(): lock_expired = data.get("lock_until", 0) < current_time first_attempt = data.get("first_attempt") # 修复:如果first_attempt不存在或超过1小时,视为过期 attempt_expired = first_attempt is None or (current_time - first_attempt > 3600) if lock_expired and attempt_expired: expired_ips.append(ip) for ip in expired_ips: del ip_rate_limit[ip] # 检查IP是否被锁定 if ip_address in ip_rate_limit: ip_data = ip_rate_limit[ip_address] # 如果IP被锁定且未到解锁时间 if ip_data.get("lock_until", 0) > current_time: remaining_time = int(ip_data["lock_until"] - current_time) return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1) # 如果超过1小时,重置计数 first_attempt = ip_data.get("first_attempt") if first_attempt is None or current_time - first_attempt > 3600: ip_rate_limit[ip_address] = { "attempts": 0, "first_attempt": current_time } return True, None def record_failed_captcha(ip_address): """记录验证码失败尝试 Bug fix: 使用线程锁保护 ip_rate_limit 字典操作,防止竞态条件 """ current_time = time.time() with ip_rate_limit_lock: if ip_address not in ip_rate_limit: ip_rate_limit[ip_address] = { "attempts": 1, "first_attempt": current_time } else: ip_rate_limit[ip_address]["attempts"] += 1 # 检查是否超过限制 if ip_rate_limit[ip_address]["attempts"] >= MAX_IP_ATTEMPTS_PER_HOUR: ip_rate_limit[ip_address]["lock_until"] = current_time + IP_LOCK_DURATION return True # 表示IP已被锁定 return False # 表示还未锁定 @app.route("/api/generate_captcha", methods=["POST"]) def generate_captcha(): """生成4位数字验证码图片 安全修复:验证码不再以明文返回,而是生成base64图片 """ import uuid import base64 from io import BytesIO session_id = str(uuid.uuid4()) # 安全修复: 使用加密安全的随机数生成验证码 code = "".join([str(secrets.randbelow(10)) for _ in range(4)]) # 存储验证码,5分钟过期 captcha_storage[session_id] = { "code": code, "expire_time": time.time() + 300, "failed_attempts": 0 } # 清理过期验证码 expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < time.time()] for k in expired_keys: del captcha_storage[k] # 生成验证码图片 try: from PIL import Image, ImageDraw, ImageFont import io # 创建图片 - 增大尺寸以便显示更大的字体 width, height = 160, 60 image = Image.new('RGB', (width, height), color=(255, 255, 255)) draw = ImageDraw.Draw(image) # 添加干扰线 for _ in range(6): x1 = random.randint(0, width) y1 = random.randint(0, height) x2 = random.randint(0, width) y2 = random.randint(0, height) draw.line([(x1, y1), (x2, y2)], fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)), width=1) # 添加干扰点 for _ in range(80): x = random.randint(0, width) y = random.randint(0, height) draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200))) # 绘制验证码文字 - 增大字体 font = None font_paths = [ "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/freefont/FreeSansBold.ttf", ] for font_path in font_paths: try: font = ImageFont.truetype(font_path, 42) break except: continue if font is None: font = ImageFont.load_default() for i, char in enumerate(code): x = 12 + i * 35 + random.randint(-3, 3) y = random.randint(5, 12) color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150)) draw.text((x, y), char, font=font, fill=color) # 转换为base64 buffer = io.BytesIO() image.save(buffer, format='PNG') img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') return jsonify({ "session_id": session_id, "captcha_image": f"data:image/png;base64,{img_base64}" }) except ImportError as e: # 如果没有PIL,不再降级服务,直接返回错误 # 安全修复:不返回任何验证码相关信息 logger.error(f"PIL库未安装,验证码功能不可用: {e}") # 清理刚创建的验证码记录 with captcha_storage_lock: if session_id in captcha_storage: del captcha_storage[session_id] return jsonify({ "error": "验证码服务暂不可用,请联系管理员安装PIL库" }), 503 # Service Unavailable @app.route('/api/login', methods=['POST']) @require_ip_not_locked # IP限流保护 def login(): """用户登录""" data = request.json username = data.get('username', '').strip() password = data.get('password', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() need_captcha = data.get('need_captcha', False) # 如果需要验证码,验证验证码 if need_captcha: success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage) if not success: return jsonify({"error": message}), 400 # 安全修复:使用通用错误消息防止用户枚举 # 不再分开检查用户是否存在和密码是否正确 user = database.verify_user(username, password) if not user: # 返回通用错误消息,不透露账号是否存在 return jsonify({"error": "用户名或密码错误", "need_captcha": True}), 401 # 检查审核状态 if user['status'] != 'approved': return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401 # 登录成功 user_obj = User(user['id']) login_user(user_obj) load_user_accounts(user['id']) return jsonify({"success": True}) @app.route('/api/logout', methods=['POST']) @login_required def logout(): """用户登出""" logout_user() return jsonify({"success": True}) # ==================== 管理员认证API ==================== @app.route('/yuyx/api/debug-config', methods=['GET']) @admin_required # 安全修复:添加管理员认证 def debug_config(): """调试配置信息(仅管理员可访问,生产环境应禁用)""" # 安全修复:生产环境禁用调试端点 if not app.debug: return jsonify({"error": "调试端点已在生产环境禁用"}), 403 return jsonify({ "secret_key_set": bool(app.secret_key), "secret_key_length": len(app.secret_key) if app.secret_key else 0, "session_config": { "SESSION_COOKIE_NAME": app.config.get('SESSION_COOKIE_NAME'), "SESSION_COOKIE_SECURE": app.config.get('SESSION_COOKIE_SECURE'), "SESSION_COOKIE_HTTPONLY": app.config.get('SESSION_COOKIE_HTTPONLY'), "SESSION_COOKIE_SAMESITE": app.config.get('SESSION_COOKIE_SAMESITE'), "PERMANENT_SESSION_LIFETIME": str(app.config.get('PERMANENT_SESSION_LIFETIME')), }, # 安全修复:移除敏感的session内容,只显示有无 "has_session": bool(session), "cookies_received": list(request.cookies.keys()) }) @app.route('/yuyx/api/login', methods=['POST']) @require_ip_not_locked # IP限流保护 def admin_login(): """管理员登录(支持JSON和form-data两种格式)""" # 兼容JSON和form-data两种提交方式 if request.is_json: data = request.json else: data = request.form username = data.get('username', '').strip() password = data.get('password', '').strip() captcha_session = data.get('captcha_session', '') captcha_code = data.get('captcha', '').strip() need_captcha = data.get('need_captcha', False) # 如果需要验证码,验证验证码 if need_captcha: success, message = verify_and_consume_captcha(captcha_session, captcha_code, captcha_storage) if not success: if request.is_json: return jsonify({"error": message}), 400 else: return redirect(url_for('admin_login_page')) admin = database.verify_admin(username, password) if admin: # 安全修复: 只清除管理员相关的session,保留用户登录状态 # 这样用户和管理员可以在同一浏览器中共存 session.pop('admin_id', None) session.pop('admin_username', None) # 设置管理员session session['admin_id'] = admin['id'] session['admin_username'] = admin['username'] session.permanent = True # 设置为永久会话(使用PERMANENT_SESSION_LIFETIME配置) session.modified = True # 强制标记session为已修改,确保保存 logger.info(f"[admin_login] 管理员 {username} 登录成功") # 安全修复:不再记录完整session内容 # 根据请求类型返回不同响应 if request.is_json: # JSON请求:返回JSON响应(给JavaScript使用) response = jsonify({"success": True, "redirect": "/yuyx/admin"}) return response else: # form-data请求:直接重定向到后台页面 return redirect(url_for('admin_page')) else: logger.warning(f"[admin_login] 管理员 {username} 登录失败 - 用户名或密码错误") if request.is_json: return jsonify({"error": "管理员用户名或密码错误", "need_captcha": True}), 401 else: # form提交失败,重定向回登录页(TODO: 可以添加flash消息) return redirect(url_for('admin_login_page')) @app.route('/yuyx/api/logout', methods=['POST']) @admin_required def admin_logout(): """管理员登出""" session.pop('admin_id', None) session.pop('admin_username', None) return jsonify({"success": True}) @app.route('/yuyx/api/users', methods=['GET']) @admin_required def get_all_users(): """获取所有用户""" users = database.get_all_users() return jsonify(users) @app.route('/yuyx/api/users/pending', methods=['GET']) @admin_required def get_pending_users(): """获取待审核用户""" users = database.get_pending_users() return jsonify(users) @app.route('/yuyx/api/users//approve', methods=['POST']) @admin_required def approve_user_route(user_id): """审核通过用户""" if database.approve_user(user_id): return jsonify({"success": True}) return jsonify({"error": "审核失败"}), 400 @app.route('/yuyx/api/users//reject', methods=['POST']) @admin_required def reject_user_route(user_id): """拒绝用户""" if database.reject_user(user_id): return jsonify({"success": True}) return jsonify({"error": "拒绝失败"}), 400 @app.route('/yuyx/api/users/', methods=['DELETE']) @admin_required def delete_user_route(user_id): """删除用户""" if database.delete_user(user_id): # 清理内存中的账号数据 if user_id in user_accounts: del user_accounts[user_id] # 清理用户信号量,防止内存泄漏 if user_id in user_semaphores: del user_semaphores[user_id] # 清理用户日志缓存,防止内存泄漏 global log_cache_total_count if user_id in log_cache: log_cache_total_count -= len(log_cache[user_id]) del log_cache[user_id] return jsonify({"success": True}) return jsonify({"error": "删除失败"}), 400 @app.route('/yuyx/api/stats', methods=['GET']) @admin_required def get_system_stats(): """获取系统统计""" stats = database.get_system_stats() # 从session获取管理员用户名 stats["admin_username"] = session.get('admin_username', 'admin') return jsonify(stats) @app.route('/yuyx/api/docker_stats', methods=['GET']) @admin_required def get_docker_stats(): """获取Docker容器运行状态""" import subprocess docker_status = { 'running': False, 'container_name': 'N/A', 'uptime': 'N/A', 'memory_usage': 'N/A', 'memory_limit': 'N/A', 'memory_percent': 'N/A', 'cpu_percent': 'N/A', 'status': 'Unknown' } try: # 检查是否在Docker容器内 if os.path.exists('/.dockerenv'): docker_status['running'] = True # 获取容器名称 try: with open('/etc/hostname', 'r') as f: docker_status['container_name'] = f.read().strip() except Exception as e: logger.debug(f"读取容器名称失败: {e}") # 获取内存使用情况 (cgroup v2) try: # 尝试cgroup v2路径 if os.path.exists('/sys/fs/cgroup/memory.current'): # Read total memory with open('/sys/fs/cgroup/memory.current', 'r') as f: mem_total = int(f.read().strip()) # Read cache from memory.stat cache = 0 if os.path.exists('/sys/fs/cgroup/memory.stat'): with open('/sys/fs/cgroup/memory.stat', 'r') as f: for line in f: if line.startswith('inactive_file '): cache = int(line.split()[1]) break # Actual memory = total - cache mem_bytes = mem_total - cache docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024) # 获取内存限制 if os.path.exists('/sys/fs/cgroup/memory.max'): with open('/sys/fs/cgroup/memory.max', 'r') as f: limit_str = f.read().strip() if limit_str != 'max': limit_bytes = int(limit_str) docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024) docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100) # 尝试cgroup v1路径 elif os.path.exists('/sys/fs/cgroup/memory/memory.usage_in_bytes'): # 从 memory.stat 读取内存信息 mem_bytes = 0 if os.path.exists('/sys/fs/cgroup/memory/memory.stat'): with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f: rss = 0 cache = 0 for line in f: if line.startswith('total_rss '): rss = int(line.split()[1]) elif line.startswith('total_cache '): cache = int(line.split()[1]) # 使用 RSS + (一部分活跃的cache),更接近docker stats的计算 # 但为了准确性,我们只用RSS mem_bytes = rss # 如果找不到,则使用总内存减去缓存作为后备 if mem_bytes == 0: with open('/sys/fs/cgroup/memory/memory.usage_in_bytes', 'r') as f: total_mem = int(f.read().strip()) cache = 0 if os.path.exists('/sys/fs/cgroup/memory/memory.stat'): with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f: for line in f: if line.startswith('total_inactive_file '): cache = int(line.split()[1]) break mem_bytes = total_mem - cache docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024) # 获取内存限制 if os.path.exists('/sys/fs/cgroup/memory/memory.limit_in_bytes'): with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f: limit_bytes = int(f.read().strip()) # 检查是否是实际限制(不是默认的超大值) if limit_bytes < 9223372036854771712: docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024) docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100) except Exception as e: docker_status['memory_usage'] = 'Error: {}'.format(str(e)) # 获取容器运行时间(基于PID 1的启动时间) try: # Get PID 1 start time with open('/proc/1/stat', 'r') as f: stat_data = f.read().split() starttime_ticks = int(stat_data[21]) # Get system uptime with open('/proc/uptime', 'r') as f: system_uptime = float(f.read().split()[0]) # Get clock ticks per second import os as os_module ticks_per_sec = os_module.sysconf(os_module.sysconf_names['SC_CLK_TCK']) # Calculate container uptime process_start = starttime_ticks / ticks_per_sec uptime_seconds = int(system_uptime - process_start) days = uptime_seconds // 86400 hours = (uptime_seconds % 86400) // 3600 minutes = (uptime_seconds % 3600) // 60 if days > 0: docker_status['uptime'] = "{}天 {}小时 {}分钟".format(days, hours, minutes) elif hours > 0: docker_status['uptime'] = "{}小时 {}分钟".format(hours, minutes) else: docker_status['uptime'] = "{}分钟".format(minutes) except Exception as e: logger.debug(f"读取容器运行时间失败: {e}") docker_status['status'] = 'Running' else: docker_status['status'] = 'Not in Docker' except Exception as e: docker_status['status'] = 'Error: {}'.format(str(e)) return jsonify(docker_status) @app.route('/yuyx/api/admin/password', methods=['PUT']) @admin_required def update_admin_password(): """修改管理员密码""" data = request.json new_password = data.get('new_password', '').strip() if not new_password: return jsonify({"error": "密码不能为空"}), 400 username = session.get('admin_username') if database.update_admin_password(username, new_password): return jsonify({"success": True}) return jsonify({"error": "修改失败"}), 400 @app.route('/yuyx/api/admin/username', methods=['PUT']) @admin_required def update_admin_username(): """修改管理员用户名""" data = request.json new_username = data.get('new_username', '').strip() if not new_username: return jsonify({"error": "用户名不能为空"}), 400 old_username = session.get('admin_username') if database.update_admin_username(old_username, new_username): session['admin_username'] = new_username return jsonify({"success": True}) return jsonify({"error": "修改失败,用户名可能已存在"}), 400 # ==================== 密码重置API ==================== # 管理员直接重置用户密码 @app.route('/yuyx/api/users//reset_password', methods=['POST']) @admin_required def admin_reset_password_route(user_id): """管理员直接重置用户密码(无需审核)""" data = request.json new_password = data.get('new_password', '').strip() if not new_password: return jsonify({"error": "新密码不能为空"}), 400 # 安全修复:统一密码强度要求为8位以上且包含字母和数字 is_valid, error_msg = validate_password(new_password) if not is_valid: return jsonify({"error": error_msg}), 400 if database.admin_reset_user_password(user_id, new_password): return jsonify({"message": "密码重置成功"}) return jsonify({"error": "重置失败,用户不存在"}), 400 # 获取密码重置申请列表 @app.route('/yuyx/api/password_resets', methods=['GET']) @admin_required def get_password_resets_route(): """获取所有待审核的密码重置申请""" resets = database.get_pending_password_resets() return jsonify(resets) # 批准密码重置申请 @app.route('/yuyx/api/password_resets//approve', methods=['POST']) @admin_required def approve_password_reset_route(request_id): """批准密码重置申请""" if database.approve_password_reset(request_id): return jsonify({"message": "密码重置申请已批准"}) return jsonify({"error": "批准失败"}), 400 # 拒绝密码重置申请 @app.route('/yuyx/api/password_resets//reject', methods=['POST']) @admin_required def reject_password_reset_route(request_id): """拒绝密码重置申请""" if database.reject_password_reset(request_id): return jsonify({"message": "密码重置申请已拒绝"}) return jsonify({"error": "拒绝失败"}), 400 # 用户申请重置密码(需要审核) @app.route('/api/reset_password_request', methods=['POST']) def request_password_reset(): """用户申请重置密码""" data = request.json username = data.get('username', '').strip() email = data.get('email', '').strip() new_password = data.get('new_password', '').strip() if not username or not new_password: return jsonify({"error": "用户名和新密码不能为空"}), 400 # 安全修复:统一密码强度要求为8位以上且包含字母和数字 is_valid, error_msg = validate_password(new_password) if not is_valid: return jsonify({"error": error_msg}), 400 # 安全修复:防止用户枚举,统一返回成功消息 # 无论用户是否存在都返回相同消息 user = database.get_user_by_username(username) # 如果用户存在且邮箱匹配(或未提供邮箱),则创建重置申请 if user: # 如果提供了邮箱,验证邮箱是否匹配 if email and user.get('email') != email: # 邮箱不匹配,但不透露具体原因 pass else: # 创建重置申请 database.create_password_reset_request(user['id'], new_password) # 无论成功与否都返回相同消息,防止用户枚举 return jsonify({"message": "如果账号存在,密码重置申请已提交,请等待管理员审核"}) # ==================== 账号管理API (用户隔离) ==================== def load_user_accounts(user_id): """从数据库加载用户的账号到内存""" if user_id not in user_accounts: user_accounts[user_id] = {} accounts_data = database.get_user_accounts(user_id) for acc_data in accounts_data: account = Account( account_id=acc_data['id'], user_id=user_id, username=acc_data['username'], password=acc_data['password'], remember=bool(acc_data['remember']), remark=acc_data['remark'] or '' ) user_accounts[user_id][account.id] = account # ==================== Bug反馈API(用户端) ==================== @app.route('/api/feedback', methods=['POST']) @login_required def submit_feedback(): """用户提交Bug反馈""" data = request.get_json() title = data.get('title', '').strip() description = data.get('description', '').strip() contact = data.get('contact', '').strip() if not title or not description: return jsonify({"error": "标题和描述不能为空"}), 400 if len(title) > 100: return jsonify({"error": "标题不能超过100个字符"}), 400 if len(description) > 2000: return jsonify({"error": "描述不能超过2000个字符"}), 400 # 从数据库获取用户名 user_info = database.get_user_by_id(current_user.id) username = user_info['username'] if user_info else f'用户{current_user.id}' feedback_id = database.create_bug_feedback( user_id=current_user.id, username=username, title=title, description=description, contact=contact ) return jsonify({"message": "反馈提交成功", "id": feedback_id}) @app.route('/api/feedback', methods=['GET']) @login_required def get_my_feedbacks(): """获取当前用户的反馈列表""" feedbacks = database.get_user_feedbacks(current_user.id) return jsonify(feedbacks) # ==================== Bug反馈API(管理端) ==================== @app.route('/yuyx/api/feedbacks', methods=['GET']) @admin_required def get_all_feedbacks(): """管理员获取所有反馈""" status = request.args.get('status') # 安全修复:参数类型验证,防止类型转换异常 try: limit = int(request.args.get('limit', 100)) offset = int(request.args.get('offset', 0)) # 限制最大值防止DoS limit = min(max(1, limit), 1000) offset = max(0, offset) except (ValueError, TypeError): return jsonify({"error": "无效的分页参数"}), 400 feedbacks = database.get_bug_feedbacks(limit=limit, offset=offset, status_filter=status) stats = database.get_feedback_stats() return jsonify({ "feedbacks": feedbacks, "stats": stats }) @app.route('/yuyx/api/feedbacks//reply', methods=['POST']) @admin_required def reply_to_feedback(feedback_id): """管理员回复反馈""" data = request.get_json() reply = data.get('reply', '').strip() if not reply: return jsonify({"error": "回复内容不能为空"}), 400 if database.reply_feedback(feedback_id, reply): return jsonify({"message": "回复成功"}) else: return jsonify({"error": "反馈不存在"}), 404 @app.route('/yuyx/api/feedbacks//close', methods=['POST']) @admin_required def close_feedback_api(feedback_id): """管理员关闭反馈""" if database.close_feedback(feedback_id): return jsonify({"message": "已关闭"}) else: return jsonify({"error": "反馈不存在"}), 404 @app.route('/yuyx/api/feedbacks/', methods=['DELETE']) @admin_required def delete_feedback_api(feedback_id): """管理员删除反馈""" if database.delete_feedback(feedback_id): return jsonify({"message": "已删除"}) else: return jsonify({"error": "反馈不存在"}), 404 # ==================== 账号管理API ==================== @app.route('/api/accounts', methods=['GET']) @login_required def get_accounts(): """获取当前用户的所有账号""" user_id = current_user.id refresh = request.args.get('refresh', 'false').lower() == 'true' # 安全修复:使用锁保护检查和加载操作,防止竞态条件 with user_accounts_lock: # 如果user_accounts中没有数据或者请求刷新,则从数据库加载 if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh: load_user_accounts(user_id) accounts = user_accounts.get(user_id, {}) return jsonify([acc.to_dict() for acc in accounts.values()]) @app.route('/api/accounts', methods=['POST']) @login_required def add_account(): """添加账号""" user_id = current_user.id # 账号数量限制检查:VIP不限制,普通用户最多3个 current_count = len(database.get_user_accounts(user_id)) is_vip = database.is_user_vip(user_id) if not is_vip and current_count >= 3: return jsonify({"error": "普通用户最多添加3个账号,升级VIP可无限添加"}), 403 data = request.json username = data.get('username', '').strip() password = data.get('password', '').strip() remark = data.get("remark", "").strip()[:200] # 限制200字符 if not username or not password: return jsonify({"error": "用户名和密码不能为空"}), 400 # 检查当前用户是否已存在该账号 if user_id in user_accounts: for acc in user_accounts[user_id].values(): if acc.username == username: return jsonify({"error": f"账号 '{username}' 已存在"}), 400 # 生成账号ID import uuid account_id = str(uuid.uuid4())[:8] # 设置remember默认值为True remember = data.get('remember', True) # 保存到数据库 database.create_account(user_id, account_id, username, password, remember, remark) # 加载到内存 account = Account(account_id, user_id, username, password, remember, remark) if user_id not in user_accounts: user_accounts[user_id] = {} user_accounts[user_id][account_id] = account log_to_client(f"添加账号: {username}", user_id) return jsonify(account.to_dict()) @app.route('/api/accounts/', methods=['PUT']) @login_required def update_account(account_id): """更新账号信息(密码等)""" user_id = current_user.id # 验证账号所有权 if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] # 如果账号正在运行,不允许修改 if account.is_running: return jsonify({"error": "账号正在运行中,请先停止"}), 400 data = request.json new_password = data.get('password', '').strip() new_remember = data.get('remember', account.remember) if not new_password: return jsonify({"error": "密码不能为空"}), 400 # 安全修复:加密存储第三方账号密码 encrypted_password = encrypt_account_password(new_password) # 更新数据库 with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(''' UPDATE accounts SET password = ?, remember = ? WHERE id = ? ''', (encrypted_password, new_remember, account_id)) conn.commit() # 重置账号登录状态(密码修改后恢复active状态) database.reset_account_login_status(account_id) logger.info(f"[账号更新] 用户 {user_id} 修改了账号 {account.username} 的密码,已重置登录状态") # 更新内存中的账号信息(内存中保存明文,用于自动化登录) account._password = new_password account.remember = new_remember log_to_client(f"账号 {account.username} 信息已更新,登录状态已重置", user_id) return jsonify({"message": "账号更新成功", "account": account.to_dict()}) @app.route('/api/accounts/', methods=['DELETE']) @login_required def delete_account(account_id): """删除账号""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] # 停止正在运行的任务 if account.is_running: account.should_stop = True if account.automation: account.automation.close() username = account.username # 从数据库删除 database.delete_account(account_id) # 从内存删除 del user_accounts[user_id][account_id] log_to_client(f"删除账号: {username}", user_id) return jsonify({"success": True}) @app.route('/api/accounts//remark', methods=['PUT']) @login_required def update_remark(account_id): """更新备注""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 data = request.json remark = data.get('remark', '').strip()[:200] # 更新数据库 database.update_account_remark(account_id, remark) # 更新内存 user_accounts[user_id][account_id].remark = remark log_to_client(f"更新备注: {user_accounts[user_id][account_id].username} -> {remark}", user_id) return jsonify({"success": True}) @app.route('/api/accounts//start', methods=['POST']) @login_required def start_account(account_id): """启动账号任务""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] if account.is_running: return jsonify({"error": "任务已在运行中"}), 400 data = request.json browse_type = data.get('browse_type', '应读') enable_screenshot = data.get('enable_screenshot', True) # 默认启用截图 # 确保浏览器管理器已初始化 if not init_browser_manager(): return jsonify({"error": "浏览器初始化失败"}), 500 # 启动任务线程 account.is_running = True account.should_stop = False account.status = "运行中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'manual'), daemon=True ) thread.start() active_tasks[account_id] = thread log_to_client(f"启动任务: {account.username} - {browse_type}", user_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({"success": True}) @app.route('/api/accounts//stop', methods=['POST']) @login_required def stop_account(account_id): """停止账号任务""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] if not account.is_running: return jsonify({"error": "任务未在运行"}), 400 account.should_stop = True account.status = "正在停止" log_to_client(f"停止任务: {account.username}", user_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({"success": True}) def get_user_semaphore(user_id): """获取或创建用户的信号量""" if user_id not in user_semaphores: user_semaphores[user_id] = threading.Semaphore(max_concurrent_per_account) return user_semaphores[user_id] def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="manual"): """运行自动化任务""" print(f"[DEBUG run_task] account={account_id}, enable_screenshot={enable_screenshot} (类型:{type(enable_screenshot).__name__}), source={source}") if user_id not in user_accounts or account_id not in user_accounts[user_id]: return account = user_accounts[user_id][account_id] # 导入time模块 import time as time_module # 注意:不在此处记录开始时间,因为要排除排队等待时间 # 两级并发控制:用户级 + 全局级(VIP优先) user_sem = get_user_semaphore(user_id) # 检查是否VIP用户 is_vip_user = database.is_user_vip(user_id) vip_label = " [VIP优先]" if is_vip_user else "" # 获取用户级信号量(同一用户的账号排队) log_to_client(f"等待资源分配...{vip_label}", user_id, account_id) account.status = "排队中" + (" (VIP)" if is_vip_user else "") socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 记录任务状态为排队中 import time as time_mod task_status[account_id] = { "user_id": user_id, "username": account.username, "status": "排队中", "detail_status": "等待资源" + vip_label, "browse_type": browse_type, "start_time": time_mod.time(), "source": source, "progress": {"items": 0, "attachments": 0}, "is_vip": is_vip_user } # 加入优先级队列 with task_queue_lock: if is_vip_user: vip_task_queue.append(account_id) else: normal_task_queue.append(account_id) # VIP优先排队机制 acquired = False while not acquired: with task_queue_lock: # VIP用户直接尝试获取; 普通用户需等VIP队列为空 can_try = is_vip_user or len(vip_task_queue) == 0 if can_try and user_sem.acquire(blocking=False): acquired = True with task_queue_lock: if account_id in vip_task_queue: vip_task_queue.remove(account_id) if account_id in normal_task_queue: normal_task_queue.remove(account_id) break # 检查是否被停止 if account.should_stop: with task_queue_lock: if account_id in vip_task_queue: vip_task_queue.remove(account_id) if account_id in normal_task_queue: normal_task_queue.remove(account_id) log_to_client(f"任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False if account_id in task_status: del task_status[account_id] socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return time_module.sleep(0.3) try: # 如果在排队期间被停止,直接返回 if account.should_stop: log_to_client(f"任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return # 获取全局信号量(防止所有用户同时运行导致资源耗尽) global_semaphore.acquire() try: # 再次检查是否被停止 if account.should_stop: log_to_client(f"任务已取消", user_id, account_id) account.status = "已停止" account.is_running = False socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return # ====== 创建任务断点 ====== task_id = checkpoint_mgr.create_checkpoint( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type ) logger.info(f"[断点] 任务 {task_id} 已创建") # ====== 在此处记录任务真正的开始时间(排除排队等待时间) ====== task_start_time = time_module.time() account.status = "运行中" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') account.last_browse_type = browse_type # 更新任务状态为运行中 if account_id in task_status: task_status[account_id]["status"] = "运行中" task_status[account_id]["detail_status"] = "初始化" task_status[account_id]["start_time"] = task_start_time # 重试机制:最多尝试3次,超时则换IP重试 max_attempts = 3 last_error = None for attempt in range(1, max_attempts + 1): try: if attempt > 1: log_to_client(f"🔄 第 {attempt} 次尝试(共{max_attempts}次)...", user_id, account_id) # 检查是否需要使用代理 proxy_config = None config = database.get_system_config() if config.get('proxy_enabled') == 1: proxy_api_url = config.get('proxy_api_url', '').strip() if proxy_api_url: log_to_client(f"正在获取代理IP...", user_id, account_id) proxy_server = get_proxy_from_api(proxy_api_url, max_retries=3) if proxy_server: proxy_config = {'server': proxy_server} log_to_client(f"✓ 将使用代理: {proxy_server}", user_id, account_id) account.proxy_config = proxy_config # 保存代理配置供截图使用 else: log_to_client(f"✗ 代理获取失败,将不使用代理继续", user_id, account_id) else: log_to_client(f"⚠ 代理已启用但未配置API地址", user_id, account_id) # 使用 API 方式浏览(不启动浏览器,节省内存) checkpoint_mgr.update_stage(task_id, TaskStage.STARTING, progress_percent=10) def custom_log(message: str): log_to_client(message, user_id, account_id) log_to_client(f"开始登录...", user_id, account_id) if account_id in task_status: task_status[account_id]["detail_status"] = "正在登录" checkpoint_mgr.update_stage(task_id, TaskStage.LOGGING_IN, progress_percent=25) # 使用 API 方式登录和浏览(不启动浏览器) api_browser = APIBrowser(log_callback=custom_log, proxy_config=proxy_config) if api_browser.login(account.username, account.password): log_to_client(f"✓ 登录成功!", user_id, account_id) # 登录成功,清除失败计数 # 保存cookies供截图使用 api_browser.save_cookies_for_playwright(account.username) database.reset_account_login_status(account_id) if account_id in task_status: task_status[account_id]["detail_status"] = "正在浏览" log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id) def should_stop(): return account.should_stop checkpoint_mgr.update_stage(task_id, TaskStage.BROWSING, progress_percent=50) result = api_browser.browse_content( browse_type=browse_type, should_stop_callback=should_stop ) # APIBrowseResult和BrowseResult字段完全相同,无需转换 api_browser.close() else: # API 登录失败 error_message = "登录失败" log_to_client(f"❌ {error_message}", user_id, account_id) # 增加失败计数(假设密码错误) is_suspended = database.increment_account_login_fail(account_id, error_message) if is_suspended: log_to_client(f"⚠ 该账号连续3次密码错误,已自动暂停", user_id, account_id) log_to_client(f"请在前台修改密码后才能继续使用", user_id, account_id) retry_action = checkpoint_mgr.record_error(task_id, error_message) if retry_action == "paused": logger.warning(f"[断点] 任务 {task_id} 已暂停(登录失败)") account.status = "登录失败" account.is_running = False # 记录登录失败日志 database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=0, total_attachments=0, error_message=error_message, duration=int(time_module.time() - task_start_time), source=source ) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') api_browser.close() return account.total_items = result.total_items account.total_attachments = result.total_attachments if result.success: log_to_client(f"浏览完成! 共 {result.total_items} 条内容,{result.total_attachments} 个附件", user_id, account_id) if account_id in task_status: task_status[account_id]["detail_status"] = "浏览完成" task_status[account_id]["progress"] = {"items": result.total_items, "attachments": result.total_attachments} account.status = "已完成" checkpoint_mgr.update_stage(task_id, TaskStage.COMPLETING, progress_percent=95) checkpoint_mgr.complete_task(task_id, success=True) logger.info(f"[断点] 任务 {task_id} 已完成") # 记录成功日志(如果不截图则在此记录,截图时在截图完成后记录) if not enable_screenshot: database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='success', total_items=result.total_items, total_attachments=result.total_attachments, error_message='', duration=int(time_module.time() - task_start_time), source=source ) # 发送任务完成邮件通知(不截图时在此发送) # 只对定时任务发送邮件通知,手动执行不发送 if source and source.startswith('user_scheduled'): try: user_info = database.get_user_by_id(user_id) # 检查用户是否开启了邮件通知 if user_info and user_info.get('email') and database.get_user_email_notify(user_id): account_name = account.remark if account.remark else account.username email_service.send_task_complete_email_async( user_id=user_id, email=user_info['email'], username=user_info['username'], account_name=account_name, browse_type=browse_type, total_items=result.total_items, total_attachments=result.total_attachments, screenshot_path=None, log_callback=lambda msg: log_to_client(msg, user_id, account_id) ) except Exception as email_error: logger.warning(f"发送任务完成邮件失败: {email_error}") # 成功则跳出重试循环 break else: # 浏览出错,检查是否是超时错误 error_msg = result.error_message if 'Timeout' in error_msg or 'timeout' in error_msg: last_error = error_msg log_to_client(f"⚠ 检测到超时错误: {error_msg}", user_id, account_id) # 关闭当前浏览器 if account.automation: try: account.automation.close() log_to_client(f"已关闭超时的浏览器实例", user_id, account_id) except Exception as e: logger.debug(f"关闭超时浏览器实例失败: {e}") account.automation = None if attempt < max_attempts: log_to_client(f"⚠ 代理可能速度过慢,将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id) time_module.sleep(2) # 等待2秒再重试 continue else: log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=result.total_items, total_attachments=result.total_attachments, error_message=f"重试{max_attempts}次后仍失败: {error_msg}", duration=int(time_module.time() - task_start_time) ) break else: # 非超时错误,直接失败不重试 log_to_client(f"浏览出错: {error_msg}", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=result.total_items, total_attachments=result.total_attachments, error_message=error_msg, duration=int(time_module.time() - task_start_time), source=source ) break except Exception as retry_error: # 捕获重试过程中的异常 error_msg = str(retry_error) last_error = error_msg # 关闭可能存在的浏览器实例 if account.automation: try: account.automation.close() except Exception as e: logger.debug(f"关闭浏览器实例失败: {e}") account.automation = None if 'Timeout' in error_msg or 'timeout' in error_msg: log_to_client(f"⚠ 执行超时: {error_msg}", user_id, account_id) if attempt < max_attempts: log_to_client(f"⚠ 将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id) time_module.sleep(2) continue else: log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=account.total_items, total_attachments=account.total_attachments, error_message=f"重试{max_attempts}次后仍失败: {error_msg}", duration=int(time_module.time() - task_start_time), source=source ) break else: # 非超时异常,直接失败 log_to_client(f"任务执行异常: {error_msg}", user_id, account_id) account.status = "出错" database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=account.total_items, total_attachments=account.total_attachments, error_message=error_msg, duration=int(time_module.time() - task_start_time), source=source ) break except Exception as e: error_msg = str(e) log_to_client(f"任务执行出错: {error_msg}", user_id, account_id) account.status = "出错" # 记录异常失败日志 database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='failed', total_items=account.total_items, total_attachments=account.total_attachments, error_message=error_msg, duration=int(time_module.time() - task_start_time), source=source ) finally: # 先关闭浏览器,再释放信号量(避免并发创建/关闭浏览器导致资源竞争) account.is_running = False # 如果状态不是已完成(需要截图),则重置为未开始 if account.status not in ["已完成"]: account.status = "未开始" if account.automation: try: account.automation.close() # log_to_client(f"主任务浏览器已关闭", user_id, account_id) # 精简 except Exception as e: log_to_client(f"关闭主任务浏览器时出错: {str(e)}", user_id, account_id) finally: account.automation = None # 浏览器关闭后再释放全局信号量,确保新任务创建浏览器时旧浏览器已完全关闭 global_semaphore.release() if account_id in active_tasks: del active_tasks[account_id] if account_id in task_status: del task_status[account_id] socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 任务完成后自动截图(增加2秒延迟,确保资源完全释放) # 根据enable_screenshot参数决定是否截图 if account.status == "已完成" and not account.should_stop: if enable_screenshot: log_to_client(f"等待2秒后开始截图...", user_id, account_id) # 更新账号状态为等待截图 account.status = "等待截图" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 重新添加截图状态 import time as time_mod task_status[account_id] = { "user_id": user_id, "username": account.username, "status": "排队中", "detail_status": "等待截图资源", "browse_type": browse_type, "start_time": time_mod.time(), "source": source, "progress": {"items": result.total_items if result else 0, "attachments": result.total_attachments if result else 0} } time.sleep(2) # 延迟启动截图,确保主任务资源已完全释放 browse_result_dict = {'total_items': result.total_items, 'total_attachments': result.total_attachments} threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id, browse_type, source, task_start_time, browse_result_dict), daemon=True).start() else: # 不截图时,重置状态为未开始 account.status = "未开始" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') log_to_client(f"截图功能已禁用,跳过截图", user_id, account_id) else: # 任务非正常完成,重置状态为未开始 if account.status not in ["登录失败", "出错"]: account.status = "未开始" socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') finally: # 释放用户级信号量 user_sem.release() def take_screenshot_for_account(user_id, account_id, browse_type="应读", source="manual", task_start_time=None, browse_result=None): """为账号任务完成后截图(使用工作线程池,真正的浏览器复用)""" if user_id not in user_accounts or account_id not in user_accounts[user_id]: return account = user_accounts[user_id][account_id] # 标记账号正在截图(防止重复提交截图任务) account.is_running = True def screenshot_task(browser_instance, user_id, account_id, account, browse_type, source, task_start_time, browse_result): """在worker线程中执行的截图任务""" # ✅ 获得worker后,立即更新状态为"截图中" if user_id in user_accounts and account_id in user_accounts[user_id]: acc = user_accounts[user_id][account_id] acc.status = "截图中" if account_id in task_status: task_status[account_id]["status"] = "运行中" task_status[account_id]["detail_status"] = "正在截图" socketio.emit('account_update', acc.to_dict(), room=f'user_{user_id}') max_retries = 3 for attempt in range(1, max_retries + 1): automation = None try: # 更新状态 if account_id in task_status: task_status[account_id]["detail_status"] = f"正在截图{f' (第{attempt}次)' if attempt > 1 else ''}" if attempt > 1: log_to_client(f"🔄 第 {attempt} 次截图尝试...", user_id, account_id) log_to_client(f"使用Worker-{browser_instance['worker_id']}的浏览器(已使用{browser_instance['use_count']}次)", user_id, account_id) # 使用worker的浏览器创建PlaywrightAutomation proxy_config = account.proxy_config if hasattr(account, 'proxy_config') else None automation = PlaywrightAutomation(browser_manager, account_id, proxy_config=proxy_config) automation.playwright = browser_instance['playwright'] automation.browser = browser_instance['browser'] def custom_log(message: str): log_to_client(message, user_id, account_id) automation.log = custom_log # 登录 log_to_client(f"登录中...", user_id, account_id) login_result = automation.quick_login(account.username, account.password, account.remember) if not login_result["success"]: error_message = login_result.get("message", "截图登录失败") log_to_client(f"截图登录失败: {error_message}", user_id, account_id) if attempt < max_retries: log_to_client(f"将重试...", user_id, account_id) time.sleep(2) continue else: log_to_client(f"❌ 截图失败: 登录失败", user_id, account_id) return {'success': False, 'error': '登录失败'} browse_type = account.last_browse_type log_to_client(f"导航到 '{browse_type}' 页面...", user_id, account_id) # 导航到指定页面 result = automation.browse_content( navigate_only=True, browse_type=browse_type, auto_next_page=False, auto_view_attachments=False, interval=0, should_stop_callback=None ) if not result.success and result.error_message: log_to_client(f"导航警告: {result.error_message}", user_id, account_id) # 等待页面稳定 time.sleep(2) # 生成截图文件名 timestamp = get_beijing_now().strftime('%Y%m%d_%H%M%S') user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" login_account = account.remark if account.remark else account.username screenshot_filename = f"{username_prefix}_{login_account}_{browse_type}_{timestamp}.jpg" screenshot_path = os.path.join(SCREENSHOTS_DIR, screenshot_filename) # 尝试截图 if automation.take_screenshot(screenshot_path): # 验证截图文件 if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000: log_to_client(f"✓ 截图成功: {screenshot_filename}", user_id, account_id) return {'success': True, 'filename': screenshot_filename} else: log_to_client(f"截图文件异常,将重试", user_id, account_id) if os.path.exists(screenshot_path): os.remove(screenshot_path) else: log_to_client(f"截图保存失败", user_id, account_id) if attempt < max_retries: log_to_client(f"将重试...", user_id, account_id) time.sleep(2) except Exception as e: log_to_client(f"截图出错: {str(e)}", user_id, account_id) if attempt < max_retries: log_to_client(f"将重试...", user_id, account_id) time.sleep(2) finally: # 只关闭context,不关闭浏览器(由worker管理) if automation: try: if automation.context: automation.context.close() automation.context = None automation.page = None except Exception as e: logger.debug(f"关闭context时出错: {e}") return {'success': False, 'error': '截图失败,已重试3次'} def screenshot_callback(result, error): """截图完成回调""" try: # 重置账号状态 account.is_running = False account.status = "未开始" # 先清除任务状态(这样to_dict()不会包含detail_status) if account_id in task_status: del task_status[account_id] # 然后发送更新 socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') if error: log_to_client(f"❌ 截图失败: {error}", user_id, account_id) elif not result or not result.get('success'): error_msg = result.get('error', '未知错误') if result else '未知错误' log_to_client(f"❌ 截图失败: {error_msg}", user_id, account_id) # 记录任务日志 if task_start_time and browse_result: import time as time_module total_elapsed = int(time_module.time() - task_start_time) database.create_task_log( user_id=user_id, account_id=account_id, username=account.username, browse_type=browse_type, status='success', total_items=browse_result.get('total_items', 0), total_attachments=browse_result.get('total_attachments', 0), duration=total_elapsed, source=source ) # 处理邮件通知 - 批次任务收集截图,非批次任务直接发送 try: # 检查是否是批次任务 (source格式: user_scheduled:batch_xxx) batch_id = None if source and source.startswith('user_scheduled:batch_'): batch_id = source.split(':', 1)[1] screenshot_path = None if result and result.get('success') and result.get('filename'): screenshot_path = os.path.join(SCREENSHOTS_DIR, result['filename']) account_name = account.remark if account.remark else account.username if batch_id: # 批次任务:收集截图信息,当所有截图完成时发送打包邮件 should_send_email = False batch_info = None with batch_task_lock: if batch_id in batch_task_screenshots: batch_task_screenshots[batch_id]['screenshots'].append({ 'account_name': account_name, 'path': screenshot_path, 'items': browse_result.get('total_items', 0), 'attachments': browse_result.get('total_attachments', 0) }) batch_task_screenshots[batch_id]['completed'] += 1 # 检查是否所有截图都完成了 if batch_task_screenshots[batch_id]['completed'] >= batch_task_screenshots[batch_id]['total_accounts']: should_send_email = True batch_info = batch_task_screenshots.pop(batch_id) print(f"[批次邮件] 批次 {batch_id} 所有 {batch_info['total_accounts']} 个截图已完成,准备发送邮件") # 在锁外发送邮件 if should_send_email and batch_info and batch_info['screenshots']: try: batch_user_id = batch_info['user_id'] user_info = database.get_user_by_id(batch_user_id) if user_info and user_info.get('email') and database.get_user_email_notify(batch_user_id): email_service.send_batch_task_complete_email_async( user_id=batch_user_id, email=user_info['email'], username=user_info['username'], schedule_name=batch_info['schedule_name'], browse_type=batch_info['browse_type'], screenshots=batch_info['screenshots'] ) print(f"[批次邮件] 已发送打包邮件,包含 {len(batch_info['screenshots'])} 个截图") except Exception as batch_email_err: print(f"[批次邮件] 发送失败: {batch_email_err}") elif source and source.startswith('user_scheduled'): # 非批次的定时任务:直接发送邮件(手动执行不发送邮件通知) user_info = database.get_user_by_id(user_id) if user_info and user_info.get('email') and database.get_user_email_notify(user_id): email_service.send_task_complete_email_async( user_id=user_id, email=user_info['email'], username=user_info['username'], account_name=account_name, browse_type=browse_type, total_items=browse_result.get('total_items', 0), total_attachments=browse_result.get('total_attachments', 0), screenshot_path=screenshot_path, log_callback=lambda msg: log_to_client(msg, user_id, account_id) ) except Exception as email_error: logger.warning(f"发送任务完成邮件失败: {email_error}") except Exception as e: logger.error(f"截图回调出错: {e}") # 提交任务到工作线程池 pool = get_browser_worker_pool() pool.submit_task( screenshot_task, screenshot_callback, user_id, account_id, account, browse_type, source, task_start_time, browse_result ) def manual_screenshot(account_id): """手动为指定账号截图""" user_id = current_user.id if user_id not in user_accounts or account_id not in user_accounts[user_id]: return jsonify({"error": "账号不存在"}), 404 account = user_accounts[user_id][account_id] if account.is_running: return jsonify({"error": "任务运行中,无法截图"}), 400 data = request.json or {} browse_type = data.get('browse_type', account.last_browse_type) account.last_browse_type = browse_type threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id), daemon=True).start() log_to_client(f"手动截图: {account.username} - {browse_type}", user_id) return jsonify({"success": True}) # ==================== 截图管理API ==================== @app.route('/api/screenshots', methods=['GET']) @login_required def get_screenshots(): """获取当前用户的截图列表""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" try: screenshots = [] if os.path.exists(SCREENSHOTS_DIR): for filename in os.listdir(SCREENSHOTS_DIR): # 只显示属于当前用户的截图(支持png和jpg格式) if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'): filepath = os.path.join(SCREENSHOTS_DIR, filename) stat = os.stat(filepath) # 转换为北京时间 created_time = datetime.fromtimestamp(stat.st_mtime, tz=BEIJING_TZ) # 解析文件名获取显示名称 # 文件名格式:用户名_登录账号_浏览类型_时间.jpg parts = filename.rsplit('.', 1)[0].split('_', 1) # 移除扩展名并分割 if len(parts) > 1: # 显示名称:登录账号_浏览类型_时间.jpg display_name = parts[1] + '.' + filename.rsplit('.', 1)[1] else: display_name = filename screenshots.append({ 'filename': filename, 'display_name': display_name, 'size': stat.st_size, 'created': created_time.strftime('%Y-%m-%d %H:%M:%S') }) screenshots.sort(key=lambda x: x['created'], reverse=True) return jsonify(screenshots) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/screenshots/') @login_required def serve_screenshot(filename): """提供截图文件访问""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" # 验证文件属于当前用户 if not filename.startswith(username_prefix + '_'): return jsonify({"error": "无权访问"}), 403 # 防止路径遍历攻击 (Bug #19 fix) if not is_safe_path(SCREENSHOTS_DIR, filename): return jsonify({"error": "非法路径"}), 403 return send_from_directory(SCREENSHOTS_DIR, filename) @app.route('/api/screenshots/', methods=['DELETE']) @login_required def delete_screenshot(filename): """删除指定截图""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" # 验证文件属于当前用户 if not filename.startswith(username_prefix + '_'): return jsonify({"error": "无权删除"}), 403 try: filepath = os.path.join(SCREENSHOTS_DIR, filename) if os.path.exists(filepath): os.remove(filepath) log_to_client(f"删除截图: {filename}", user_id) return jsonify({"success": True}) else: return jsonify({"error": "文件不存在"}), 404 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/api/screenshots/clear', methods=['POST']) @login_required def clear_all_screenshots(): """清空当前用户的所有截图""" user_id = current_user.id user_info = database.get_user_by_id(user_id) username_prefix = user_info['username'] if user_info else f"user{user_id}" try: deleted_count = 0 if os.path.exists(SCREENSHOTS_DIR): for filename in os.listdir(SCREENSHOTS_DIR): if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'): filepath = os.path.join(SCREENSHOTS_DIR, filename) os.remove(filepath) deleted_count += 1 log_to_client(f"清理了 {deleted_count} 个截图文件", user_id) return jsonify({"success": True, "deleted": deleted_count}) except Exception as e: return jsonify({"error": str(e)}), 500 # ==================== WebSocket事件 ==================== @socketio.on('connect') def handle_connect(): """客户端连接""" if current_user.is_authenticated: user_id = current_user.id join_room(f'user_{user_id}') log_to_client("客户端已连接", user_id) # 如果user_accounts中没有该用户的账号,从数据库加载 if user_id not in user_accounts or len(user_accounts[user_id]) == 0: db_accounts = database.get_user_accounts(user_id) if db_accounts: user_accounts[user_id] = {} for acc_data in db_accounts: account = Account( account_id=acc_data['id'], user_id=acc_data['user_id'], username=acc_data['username'], password=acc_data['password'], remember=bool(acc_data.get('remember', 1)), remark=acc_data.get('remark', '') ) user_accounts[user_id][acc_data['id']] = account log_to_client(f"已从数据库恢复 {len(db_accounts)} 个账号", user_id) # 发送账号列表 accounts = user_accounts.get(user_id, {}) emit('accounts_list', [acc.to_dict() for acc in accounts.values()]) # 发送历史日志 if user_id in log_cache: for log_entry in log_cache[user_id]: emit('log', log_entry) @socketio.on('disconnect') def handle_disconnect(): """客户端断开""" if current_user.is_authenticated: user_id = current_user.id leave_room(f'user_{user_id}') # ==================== 静态文件 ==================== @app.route('/static/') def serve_static(filename): """提供静态文件访问""" # 防止路径遍历攻击 (Bug #19 fix) if not is_safe_path('static', filename): return jsonify({"error": "非法路径"}), 403 response = send_from_directory('static', filename) # 禁用缓存,强制浏览器每次都重新加载 response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response # ==================== 启动 ==================== # ==================== 管理员VIP管理API ==================== @app.route('/yuyx/api/vip/config', methods=['GET']) @admin_required # 安全修复:使用统一的装饰器 def get_vip_config_api(): """获取VIP配置""" config = database.get_vip_config() return jsonify(config) @app.route('/yuyx/api/vip/config', methods=['POST']) @admin_required # 安全修复:使用统一的装饰器 def set_vip_config_api(): """设置默认VIP天数""" data = request.json days = data.get('default_vip_days', 0) if not isinstance(days, int) or days < 0: return jsonify({"error": "VIP天数必须是非负整数"}), 400 database.set_default_vip_days(days) return jsonify({"message": "VIP配置已更新", "default_vip_days": days}) @app.route('/yuyx/api/users//vip', methods=['POST']) @admin_required # 安全修复:使用统一的装饰器 def set_user_vip_api(user_id): """设置用户VIP""" data = request.json days = data.get('days', 30) # 验证days参数 valid_days = [7, 30, 365, 999999] if days not in valid_days: return jsonify({"error": "VIP天数必须是 7/30/365/999999 之一"}), 400 if database.set_user_vip(user_id, days): vip_type = {7: "一周", 30: "一个月", 365: "一年", 999999: "永久"}[days] return jsonify({"message": f"VIP设置成功: {vip_type}"}) return jsonify({"error": "设置失败,用户不存在"}), 400 @app.route('/yuyx/api/users//vip', methods=['DELETE']) @admin_required # 安全修复:使用统一的装饰器 def remove_user_vip_api(user_id): """移除用户VIP""" if database.remove_user_vip(user_id): return jsonify({"message": "VIP已移除"}) return jsonify({"error": "移除失败"}), 400 @app.route('/yuyx/api/users//vip', methods=['GET']) @admin_required # 安全修复:使用统一的装饰器 def get_user_vip_info_api(user_id): """获取用户VIP信息(管理员)""" vip_info = database.get_user_vip_info(user_id) return jsonify(vip_info) # ==================== 用户端VIP查询API ==================== @app.route('/api/user/vip', methods=['GET']) @login_required def get_current_user_vip(): """获取当前用户VIP信息""" vip_info = database.get_user_vip_info(current_user.id) # 添加用户名 user_info = database.get_user_by_id(current_user.id) vip_info['username'] = user_info['username'] if user_info else 'Unknown' return jsonify(vip_info) @app.route('/api/user/password', methods=['POST']) @login_required def change_user_password(): """用户修改自己的密码""" data = request.get_json() current_password = data.get('current_password') new_password = data.get('new_password') if not current_password or not new_password: return jsonify({"error": "请填写完整信息"}), 400 if len(new_password) < 6: return jsonify({"error": "新密码至少6位"}), 400 # 验证当前密码 user = database.get_user_by_id(current_user.id) if not user or not check_password_hash(user['password_hash'], current_password): return jsonify({"error": "当前密码错误"}), 400 # 更新密码(使用管理员重置密码的函数,因为已经验证过当前密码了) if database.admin_reset_user_password(current_user.id, new_password): return jsonify({"success": True}) else: return jsonify({"error": "密码更新失败"}), 500 @app.route('/api/user/email', methods=['GET']) @login_required def get_user_email(): """获取当前用户的邮箱信息""" user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 return jsonify({ "email": user.get('email', ''), "email_verified": user.get('email_verified', False) }) @app.route('/api/user/bind-email', methods=['POST']) @login_required @require_ip_not_locked def bind_user_email(): """发送邮箱绑定验证邮件""" data = request.get_json() email = data.get('email', '').strip().lower() # 验证邮箱格式 if not email or not validate_email(email): return jsonify({"error": "请输入有效的邮箱地址"}), 400 # 检查邮件功能是否启用 settings = email_service.get_email_settings() if not settings.get('enabled', False): return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400 # 检查邮箱是否已被其他用户使用 existing_user = database.get_user_by_email(email) if existing_user and existing_user['id'] != current_user.id: return jsonify({"error": "该邮箱已被其他用户绑定"}), 400 # 获取当前用户信息 user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 # 如果已经绑定了相同邮箱且已验证,无需重复绑定 if user.get('email') == email and user.get('email_verified'): return jsonify({"error": "该邮箱已绑定并验证"}), 400 # 发送验证邮件 result = email_service.send_bind_email_verification( user_id=current_user.id, email=email, username=user['username'] ) if result['success']: return jsonify({ "success": True, "message": "验证邮件已发送,请查收" }) else: return jsonify({"error": result['error']}), 500 @app.route('/api/verify-bind-email/') def verify_bind_email(token): """验证邮箱绑定Token""" result = email_service.verify_bind_email_token(token) if result: user_id = result['user_id'] email = result['email'] # 更新用户邮箱 if database.update_user_email(user_id, email, verified=True): # 返回成功页面 return render_template('verify_success.html', title='邮箱绑定成功', message=f'邮箱 {email} 已成功绑定到您的账号!', redirect_url='/' ) else: return render_template('verify_failed.html', title='绑定失败', message='邮箱绑定失败,请重试' ) else: return render_template('verify_failed.html', title='链接无效', message='验证链接已过期或无效,请重新发送验证邮件' ) @app.route('/api/user/unbind-email', methods=['POST']) @login_required def unbind_user_email(): """解绑用户邮箱""" user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 if not user.get('email'): return jsonify({"error": "当前未绑定邮箱"}), 400 # 解绑邮箱 if database.update_user_email(current_user.id, None, verified=False): return jsonify({"success": True, "message": "邮箱已解绑"}) else: return jsonify({"error": "解绑失败"}), 500 @app.route('/api/user/email-notify', methods=['GET']) @login_required def get_user_email_notify(): """获取用户邮件通知偏好""" enabled = database.get_user_email_notify(current_user.id) return jsonify({"enabled": enabled}) @app.route('/api/user/email-notify', methods=['POST']) @login_required def update_user_email_notify(): """更新用户邮件通知偏好""" data = request.get_json() enabled = data.get('enabled', True) if database.update_user_email_notify(current_user.id, enabled): return jsonify({"success": True}) else: return jsonify({"error": "更新失败"}), 500 @app.route('/api/run_stats', methods=['GET']) @login_required def get_run_stats(): """获取当前用户的运行统计""" user_id = current_user.id # 获取今日任务统计 stats = database.get_user_run_stats(user_id) # 计算当前正在运行的账号数 current_running = 0 if user_id in user_accounts: current_running = sum(1 for acc in user_accounts[user_id].values() if acc.is_running) return jsonify({ 'today_completed': stats.get('completed', 0), 'current_running': current_running, 'today_failed': stats.get('failed', 0), 'today_items': stats.get('total_items', 0), 'today_attachments': stats.get('total_attachments', 0) }) # ==================== 系统配置API ==================== @app.route('/yuyx/api/system/config', methods=['GET']) @admin_required def get_system_config_api(): """获取系统配置""" config = database.get_system_config() return jsonify(config) @app.route('/yuyx/api/system/config', methods=['POST']) @admin_required def update_system_config_api(): """更新系统配置""" global max_concurrent_global, global_semaphore, max_concurrent_per_account data = request.json max_concurrent = data.get('max_concurrent_global') schedule_enabled = data.get('schedule_enabled') schedule_time = data.get('schedule_time') schedule_browse_type = data.get('schedule_browse_type') schedule_weekdays = data.get('schedule_weekdays') new_max_concurrent_per_account = data.get('max_concurrent_per_account') new_max_screenshot_concurrent = data.get('max_screenshot_concurrent') auto_approve_enabled = data.get('auto_approve_enabled') auto_approve_hourly_limit = data.get('auto_approve_hourly_limit') auto_approve_vip_days = data.get('auto_approve_vip_days') # 验证参数 if max_concurrent is not None: if not isinstance(max_concurrent, int) or max_concurrent < 1: return jsonify({"error": "全局并发数必须大于0(建议:小型服务器2-5,中型5-10,大型10-20)"}), 400 if new_max_concurrent_per_account is not None: if not isinstance(new_max_concurrent_per_account, int) or new_max_concurrent_per_account < 1: return jsonify({"error": "单账号并发数必须大于0(建议设为1,避免同一用户任务相互影响)"}), 400 if new_max_screenshot_concurrent is not None: if not isinstance(new_max_screenshot_concurrent, int) or new_max_screenshot_concurrent < 1: return jsonify({"error": "截图并发数必须大于0(建议根据服务器配置设置,每个浏览器约占用200MB内存)"}), 400 if schedule_time is not None: # 验证时间格式 HH:MM import re if not re.match(r'^([01]\d|2[0-3]):([0-5]\d)$', schedule_time): return jsonify({"error": "时间格式错误,应为 HH:MM"}), 400 if schedule_browse_type is not None: if schedule_browse_type not in ['注册前未读', '应读', '未读']: return jsonify({"error": "浏览类型无效"}), 400 if schedule_weekdays is not None: # 验证星期格式,应该是逗号分隔的数字字符串 "1,2,3,4,5,6,7" try: days = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()] if not all(1 <= d <= 7 for d in days): return jsonify({"error": "星期数字必须在1-7之间"}), 400 except (ValueError, AttributeError): return jsonify({"error": "星期格式错误"}), 400 if auto_approve_hourly_limit is not None: if not isinstance(auto_approve_hourly_limit, int) or auto_approve_hourly_limit < 1: return jsonify({"error": "每小时注册限制必须大于0"}), 400 if auto_approve_vip_days is not None: if not isinstance(auto_approve_vip_days, int) or auto_approve_vip_days < 0: return jsonify({"error": "注册赠送VIP天数不能为负数"}), 400 # 更新数据库 if database.update_system_config( max_concurrent=max_concurrent, schedule_enabled=schedule_enabled, schedule_time=schedule_time, schedule_browse_type=schedule_browse_type, schedule_weekdays=schedule_weekdays, max_concurrent_per_account=new_max_concurrent_per_account, max_screenshot_concurrent=new_max_screenshot_concurrent, auto_approve_enabled=auto_approve_enabled, auto_approve_hourly_limit=auto_approve_hourly_limit, auto_approve_vip_days=auto_approve_vip_days ): # 如果修改了并发数,更新全局变量和信号量 if max_concurrent is not None and max_concurrent != max_concurrent_global: max_concurrent_global = max_concurrent global_semaphore = threading.Semaphore(max_concurrent) print(f"全局并发数已更新为: {max_concurrent}") # 如果修改了单用户并发数,更新全局变量(已有的信号量会在下次创建时使用新值) if new_max_concurrent_per_account is not None and new_max_concurrent_per_account != max_concurrent_per_account: max_concurrent_per_account = new_max_concurrent_per_account print(f"单用户并发数已更新为: {max_concurrent_per_account}") # 如果修改了截图并发数,更新信号量 if new_max_screenshot_concurrent is not None: global screenshot_semaphore screenshot_semaphore = threading.Semaphore(new_max_screenshot_concurrent) print(f"截图并发数已更新为: {new_max_screenshot_concurrent}") return jsonify({"message": "系统配置已更新"}) return jsonify({"error": "更新失败"}), 400 @app.route('/yuyx/api/schedule/execute', methods=['POST']) @admin_required def execute_schedule_now(): """立即执行定时任务(无视定时时间和星期限制)""" try: # 在新线程中执行任务,避免阻塞请求 # 传入 skip_weekday_check=True 跳过星期检查 thread = threading.Thread(target=run_scheduled_task, args=(True,), daemon=True) thread.start() logger.info("[立即执行定时任务] 管理员手动触发定时任务执行(跳过星期检查)") return jsonify({"message": "定时任务已开始执行,请查看任务列表获取进度"}) except Exception as e: logger.error(f"[立即执行定时任务] 启动失败: {str(e)}") return jsonify({"error": f"启动失败: {str(e)}"}), 500 # ==================== 代理配置API ==================== @app.route('/yuyx/api/proxy/config', methods=['GET']) @admin_required def get_proxy_config_api(): """获取代理配置""" config = database.get_system_config() return jsonify({ 'proxy_enabled': config.get('proxy_enabled', 0), 'proxy_api_url': config.get('proxy_api_url', ''), 'proxy_expire_minutes': config.get('proxy_expire_minutes', 3) }) @app.route('/yuyx/api/proxy/config', methods=['POST']) @admin_required def update_proxy_config_api(): """更新代理配置""" data = request.json proxy_enabled = data.get('proxy_enabled') proxy_api_url = data.get('proxy_api_url', '').strip() proxy_expire_minutes = data.get('proxy_expire_minutes') if proxy_enabled is not None and proxy_enabled not in [0, 1]: return jsonify({"error": "proxy_enabled必须是0或1"}), 400 if proxy_expire_minutes is not None: if not isinstance(proxy_expire_minutes, int) or proxy_expire_minutes < 1: return jsonify({"error": "代理有效期必须是大于0的整数"}), 400 if database.update_system_config( proxy_enabled=proxy_enabled, proxy_api_url=proxy_api_url, proxy_expire_minutes=proxy_expire_minutes ): return jsonify({"message": "代理配置已更新"}) return jsonify({"error": "更新失败"}), 400 @app.route('/yuyx/api/proxy/test', methods=['POST']) @admin_required def test_proxy_api(): """测试代理连接""" data = request.json api_url = data.get('api_url', '').strip() if not api_url: return jsonify({"error": "请提供API地址"}), 400 try: response = requests.get(api_url, timeout=10) if response.status_code == 200: ip_port = response.text.strip() if ip_port and ':' in ip_port: return jsonify({ "success": True, "proxy": ip_port, "message": f"代理获取成功: {ip_port}" }) else: return jsonify({ "success": False, "message": f"代理格式错误: {ip_port}" }), 400 else: return jsonify({ "success": False, "message": f"HTTP错误: {response.status_code}" }), 400 except Exception as e: return jsonify({ "success": False, "message": f"连接失败: {str(e)}" }), 500 # ==================== 服务器信息API ==================== @app.route('/yuyx/api/server/info', methods=['GET']) @admin_required def get_server_info_api(): """获取服务器信息""" import psutil # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 内存信息 memory = psutil.virtual_memory() memory_total = f"{memory.total / (1024**3):.1f}GB" memory_used = f"{memory.used / (1024**3):.1f}GB" memory_percent = memory.percent # 磁盘信息 disk = psutil.disk_usage('/') disk_total = f"{disk.total / (1024**3):.1f}GB" disk_used = f"{disk.used / (1024**3):.1f}GB" disk_percent = disk.percent # 运行时长 boot_time = datetime.fromtimestamp(psutil.boot_time(), tz=BEIJING_TZ) uptime_delta = get_beijing_now() - boot_time days = uptime_delta.days hours = uptime_delta.seconds // 3600 uptime = f"{days}天{hours}小时" return jsonify({ 'cpu_percent': cpu_percent, 'memory_total': memory_total, 'memory_used': memory_used, 'memory_percent': memory_percent, 'disk_total': disk_total, 'disk_used': disk_used, 'disk_percent': disk_percent, 'uptime': uptime }) # ==================== 任务统计和日志API ==================== @app.route('/yuyx/api/task/stats', methods=['GET']) @admin_required def get_task_stats_api(): """获取任务统计数据""" date_filter = request.args.get('date') # YYYY-MM-DD格式 stats = database.get_task_stats(date_filter) return jsonify(stats) @app.route('/yuyx/api/task/running', methods=['GET']) @admin_required def get_running_tasks_api(): """获取当前运行中和排队中的任务""" import time as time_mod current_time = time_mod.time() running = [] queuing = [] for account_id, info in task_status.items(): elapsed = int(current_time - info.get("start_time", current_time)) # 获取用户名 user = database.get_user_by_id(info.get("user_id")) user_username = user['username'] if user else 'N/A' # 获取进度信息 progress = info.get("progress", {"items": 0, "attachments": 0}) task_info = { "account_id": account_id, "user_id": info.get("user_id"), "user_username": user_username, "username": info.get("username"), "browse_type": info.get("browse_type"), "source": info.get("source", "manual"), "detail_status": info.get("detail_status", "未知"), "progress_items": progress.get("items", 0), "progress_attachments": progress.get("attachments", 0), "elapsed_seconds": elapsed, "elapsed_display": f"{elapsed // 60}分{elapsed % 60}秒" if elapsed >= 60 else f"{elapsed}秒" } if info.get("status") == "运行中": running.append(task_info) else: queuing.append(task_info) # 按开始时间排序 running.sort(key=lambda x: x["elapsed_seconds"], reverse=True) queuing.sort(key=lambda x: x["elapsed_seconds"], reverse=True) return jsonify({ "running": running, "queuing": queuing, "running_count": len(running), "queuing_count": len(queuing), "max_concurrent": max_concurrent_global }) @app.route('/yuyx/api/task/logs', methods=['GET']) @admin_required def get_task_logs_api(): """获取任务日志列表(支持分页和多种筛选)""" limit = int(request.args.get('limit', 20)) offset = int(request.args.get('offset', 0)) date_filter = request.args.get('date') # YYYY-MM-DD格式 status_filter = request.args.get('status') # success/failed source_filter = request.args.get('source') # manual/scheduled/immediate/resumed user_id_filter = request.args.get('user_id') # 用户ID account_filter = request.args.get('account') # 账号关键字 # 转换user_id为整数 if user_id_filter: try: user_id_filter = int(user_id_filter) except ValueError: user_id_filter = None result = database.get_task_logs( limit=limit, offset=offset, date_filter=date_filter, status_filter=status_filter, source_filter=source_filter, user_id_filter=user_id_filter, account_filter=account_filter ) return jsonify(result) @app.route('/yuyx/api/task/logs/clear', methods=['POST']) @admin_required def clear_old_task_logs_api(): """清理旧的任务日志""" data = request.json or {} days = data.get('days', 30) if not isinstance(days, int) or days < 1: return jsonify({"error": "天数必须是大于0的整数"}), 400 deleted_count = database.delete_old_task_logs(days) return jsonify({"message": f"已删除{days}天前的{deleted_count}条日志"}) @app.route('/yuyx/api/docker/restart', methods=['POST']) @admin_required def restart_docker_container(): """重启Docker容器""" import subprocess import os try: # 检查是否在Docker容器中运行 if not os.path.exists('/.dockerenv'): return jsonify({"error": "当前不在Docker容器中运行"}), 400 # 记录日志 app_logger.info("[系统] 管理员触发Docker容器重启") # 使用nohup在后台执行重启命令,避免阻塞 # 容器重启会导致当前进程终止,所以需要延迟执行 restart_script = """ import os import time # 延迟3秒让响应返回给客户端 time.sleep(3) # 退出Python进程,让Docker自动重启容器(restart: unless-stopped) os._exit(0) """ # 写入临时脚本 with open('/tmp/restart_container.py', 'w') as f: f.write(restart_script) # 在后台执行重启脚本 subprocess.Popen(['python', '/tmp/restart_container.py'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True) return jsonify({ "success": True, "message": "容器将在3秒后重启,请稍后刷新页面" }) except Exception as e: app_logger.error(f"[系统] Docker容器重启失败: {str(e)}") return jsonify({"error": f"重启失败: {str(e)}"}), 500 # ==================== 定时任务调度器 ==================== def run_scheduled_task(skip_weekday_check=False): """执行所有账号的浏览任务(可被手动调用,过滤重复账号) Args: skip_weekday_check: 是否跳过星期检查(立即执行时为True) """ try: config = database.get_system_config() browse_type = config.get('schedule_browse_type', '应读') # 检查今天是否在允许执行的星期列表中(立即执行时跳过此检查) if not skip_weekday_check: # 获取北京时间的星期几 (1=周一, 7=周日) now_beijing = get_beijing_now() current_weekday = now_beijing.isoweekday() # 1-7 # 获取配置的星期列表 schedule_weekdays = config.get('schedule_weekdays', '1,2,3,4,5,6,7') allowed_weekdays = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()] if current_weekday not in allowed_weekdays: weekday_names = ['', '周一', '周二', '周三', '周四', '周五', '周六', '周日'] print(f"[定时任务] 今天是{weekday_names[current_weekday]},不在执行日期内,跳过执行") return else: print(f"[立即执行] 跳过星期检查,强制执行任务") print(f"[定时任务] 开始执行 - 浏览类型: {browse_type}") # 获取所有已审核用户的所有账号 all_users = database.get_all_users() approved_users = [u for u in all_users if u['status'] == 'approved'] # 用于记录已执行的账号用户名,避免重复 executed_usernames = set() total_accounts = 0 skipped_duplicates = 0 executed_accounts = 0 for user in approved_users: user_id = user['id'] if user_id not in user_accounts: load_user_accounts(user_id) accounts = user_accounts.get(user_id, {}) for account_id, account in accounts.items(): total_accounts += 1 # 跳过正在运行的账号 if account.is_running: continue # 检查账号状态,跳过已暂停的账号 account_status_info = database.get_account_status(account_id) if account_status_info: status = account_status_info['status'] if 'status' in account_status_info.keys() else 'active' if status == 'suspended': fail_count = account_status_info['login_fail_count'] if 'login_fail_count' in account_status_info.keys() else 0 print(f"[定时任务] 跳过暂停账号: {account.username} (用户:{user['username']}) - 连续{fail_count}次密码错误,需修改密码") continue # 检查账号用户名是否已经执行过(重复账号过滤) if account.username in executed_usernames: skipped_duplicates += 1 print(f"[定时任务] 跳过重复账号: {account.username} (用户:{user['username']}) - 该账号已被其他用户执行") continue # 记录该账号用户名,避免后续重复执行 executed_usernames.add(account.username) print(f"[定时任务] 启动账号: {account.username} (用户:{user['username']})") # 启动任务 account.is_running = True account.should_stop = False account.status = "运行中" # 获取系统配置的截图开关 cfg = database.get_system_config() enable_screenshot_scheduled = cfg.get("enable_screenshot", 0) == 1 thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot_scheduled, 'scheduled'), daemon=True ) thread.start() active_tasks[account_id] = thread executed_accounts += 1 # 发送更新到用户 socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 间隔启动,避免瞬间并发过高 time.sleep(2) print(f"[定时任务] 执行完成 - 总账号数:{total_accounts}, 已执行:{executed_accounts}, 跳过重复:{skipped_duplicates}") except Exception as e: print(f"[定时任务] 执行出错: {str(e)}") logger.error(f"[定时任务] 执行异常: {str(e)}") def status_push_worker(): """后台线程:每秒推送运行中任务的状态更新""" while True: try: # 遍历所有运行中的任务状态 for account_id, status_info in list(task_status.items()): user_id = status_info.get('user_id') if user_id: # 获取账号对象 if user_id in user_accounts and account_id in user_accounts[user_id]: account = user_accounts[user_id][account_id] account_data = account.to_dict() # 推送账号状态更新 socketio.emit('account_update', account_data, room=f'user_{user_id}') # 同时推送详细进度事件(方便前端分别处理) progress = status_info.get('progress', {}) progress_data = { 'account_id': account_id, 'stage': status_info.get('detail_status', ''), 'total_items': account.total_items, 'browsed_items': progress.get('items', 0), 'total_attachments': account.total_attachments, 'viewed_attachments': progress.get('attachments', 0), 'start_time': status_info.get('start_time', 0), 'elapsed_seconds': account_data.get('elapsed_seconds', 0), 'elapsed_display': account_data.get('elapsed_display', '') } socketio.emit('task_progress', progress_data, room=f'user_{user_id}') time.sleep(1) # 每秒推送一次 except Exception as e: logger.debug(f"状态推送出错: {e}") time.sleep(1) def scheduled_task_worker(): """定时任务工作线程""" import schedule def cleanup_expired_captcha(): """清理过期验证码,防止内存泄漏""" try: current_time = time.time() expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < current_time] deleted_count = len(expired_keys) for k in expired_keys: del captcha_storage[k] if deleted_count > 0: print(f"[定时清理] 已清理 {deleted_count} 个过期验证码") except Exception as e: print(f"[定时清理] 清理验证码出错: {str(e)}") def cleanup_old_data(): """清理旧数据:7天前截图和任务日志,30天前操作日志和定时任务执行日志""" try: print(f"[定时清理] 开始清理旧数据...") # 清理7天前的任务日志 deleted_logs = database.delete_old_task_logs(7) print(f"[定时清理] 已删除 {deleted_logs} 条任务日志") # 清理30天前的操作日志 deleted_operation_logs = database.clean_old_operation_logs(30) print(f"[定时清理] 已删除 {deleted_operation_logs} 条操作日志") # 清理30天前的定时任务执行日志 deleted_schedule_logs = database.clean_old_schedule_logs(30) print(f"[定时清理] 已删除 {deleted_schedule_logs} 条定时任务执行日志") # 清理7天前的截图 deleted_screenshots = 0 if os.path.exists(SCREENSHOTS_DIR): cutoff_time = time.time() - (7 * 24 * 60 * 60) # 7天前的时间戳 for filename in os.listdir(SCREENSHOTS_DIR): if filename.lower().endswith(('.png', '.jpg', '.jpeg')): filepath = os.path.join(SCREENSHOTS_DIR, filename) try: # 检查文件修改时间 if os.path.getmtime(filepath) < cutoff_time: os.remove(filepath) deleted_screenshots += 1 except Exception as e: print(f"[定时清理] 删除截图失败 {filename}: {str(e)}") print(f"[定时清理] 已删除 {deleted_screenshots} 个截图文件") print(f"[定时清理] 清理完成!") except Exception as e: print(f"[定时清理] 清理任务出错: {str(e)}") def check_user_schedules(): """检查并执行用户定时任务""" import json try: now = get_beijing_now() current_time = now.strftime('%H:%M') current_weekday = now.isoweekday() # 获取所有启用的用户定时任务 enabled_schedules = database.get_enabled_user_schedules() for schedule_config in enabled_schedules: schedule_time = schedule_config['schedule_time'].strip() schedule_name = schedule_config.get('name', '未命名任务') schedule_id = schedule_config['id'] # 标准化时间格式(处理 "8:00" -> "08:00") if ':' in schedule_time: h, m = schedule_time.split(':') schedule_time = f"{int(h):02d}:{int(m):02d}" # 获取随机延迟设置 random_delay = schedule_config.get('random_delay', 0) # 处理随机延迟逻辑 should_execute = False if random_delay == 1: import random as rand_module with pending_random_lock: if schedule_id in pending_random_schedules: # 检查是否到达随机执行时间 pending_info = pending_random_schedules[schedule_id] scheduled_for = pending_info['scheduled_for'] if now >= scheduled_for: del pending_random_schedules[schedule_id] should_execute = True print(f"[定时任务] 任务#{schedule_id} 随机延迟到达,开始执行") else: # 计算窗口时间 set_hour, set_minute = map(int, schedule_time.split(':')) set_time_today = now.replace(hour=set_hour, minute=set_minute, second=0, microsecond=0) window_start = set_time_today - timedelta(minutes=15) # 当前时间刚好到达窗口开始 if now.strftime('%H:%M') == window_start.strftime('%H:%M'): random_minutes = rand_module.randint(0, 30) random_time = window_start + timedelta(minutes=random_minutes) pending_random_schedules[schedule_id] = { 'scheduled_for': random_time, 'config': schedule_config } print(f"[定时任务] 任务#{schedule_id} 安排随机时间: {random_time.strftime('%H:%M')}") else: # 精确时间匹配 if schedule_time == current_time: should_execute = True if not should_execute: continue # 检查星期是否匹配 weekdays_str = schedule_config.get('weekdays', '1,2,3,4,5') try: allowed_weekdays = [int(d) for d in weekdays_str.split(',') if d.strip()] except Exception as e: print(f"[定时任务] 任务#{schedule_id} 解析weekdays失败: {e}") continue if current_weekday not in allowed_weekdays: continue # 检查今天是否已经执行过(同一天同一时间只执行一次) last_run = schedule_config.get('last_run_at') if last_run: try: last_run_datetime = datetime.strptime(last_run, '%Y-%m-%d %H:%M:%S') last_run_date = last_run_datetime.date() last_run_time = last_run_datetime.strftime('%H:%M') if last_run_date == now.date() and last_run_time == schedule_time: continue except Exception as e: pass # 解析失败则继续执行 print(f"[定时任务] 任务#{schedule_id} '{schedule_name}' 开始执行 (时间:{schedule_time})") # 执行用户定时任务 user_id = schedule_config['user_id'] schedule_id = schedule_config['id'] browse_type = schedule_config.get('browse_type', '应读') enable_screenshot = schedule_config.get('enable_screenshot', 1) try: account_ids_raw = schedule_config.get('account_ids', '[]') or '[]' account_ids = json.loads(account_ids_raw) except Exception as e: print(f"[定时任务] 任务#{schedule_id} 解析account_ids失败: {e}") account_ids = [] if not account_ids: continue # 创建执行日志 import time as time_mod import uuid execution_start_time = time_mod.time() log_id = database.create_schedule_execution_log( schedule_id=schedule_id, user_id=user_id, schedule_name=schedule_config.get('name', '未命名任务') ) # 创建批次ID用于收集截图 batch_id = f"batch_{uuid.uuid4().hex[:12]}" with batch_task_lock: batch_task_screenshots[batch_id] = { 'user_id': user_id, 'browse_type': browse_type, 'schedule_name': schedule_config.get('name', '未命名任务'), 'screenshots': [], 'total_accounts': 0, 'completed': 0 } started_count = 0 skipped_count = 0 task_threads = [] # 收集所有启动的任务线程 for account_id in account_ids: if user_id not in user_accounts: skipped_count += 1 continue if account_id not in user_accounts[user_id]: skipped_count += 1 continue account = user_accounts[user_id][account_id] if account.is_running: skipped_count += 1 continue account.is_running = True account.should_stop = False account.status = "排队中" # 传递批次ID,格式: user_scheduled:batch_xxx task_source = f"user_scheduled:{batch_id}" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, task_source), daemon=True ) thread.start() active_tasks[account_id] = thread task_threads.append(thread) started_count += 1 socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') # 更新批次的总账号数 with batch_task_lock: if batch_id in batch_task_screenshots: batch_task_screenshots[batch_id]['total_accounts'] = started_count # 更新最后执行时间 database.update_schedule_last_run(schedule_id) print(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号,批次ID: {batch_id}") # 启动监控线程,等待所有任务完成后更新日志 # 注意:邮件发送已移至截图回调中,当所有截图完成时自动发送 def wait_and_update_log(threads, start_time, lid, total, success, sid): for t in threads: t.join() # 等待每个任务完成(注意:这只是run_task线程,截图是异步的) execution_duration = int(time_mod.time() - start_time) database.update_schedule_execution_log( lid, total_accounts=total, success_accounts=success, failed_accounts=total - success, duration_seconds=execution_duration, status='completed' ) print(f"[用户定时任务] 任务#{sid}浏览阶段完成,耗时{execution_duration}秒,等待截图完成后发送邮件") if task_threads: monitor_thread = threading.Thread( target=wait_and_update_log, args=(task_threads, execution_start_time, log_id, len(account_ids), started_count, schedule_id), daemon=True ) monitor_thread.start() else: # 没有启动任何任务,直接更新日志 database.update_schedule_execution_log( log_id, total_accounts=len(account_ids), success_accounts=0, failed_accounts=len(account_ids), duration_seconds=0, status='completed' ) if started_count == 0 and len(account_ids) > 0: print(f"[用户定时任务] ⚠️ 警告:所有账号都被跳过了!请检查user_accounts状态") except Exception as e: print(f"[用户定时任务] 检查出错: {str(e)}") import traceback traceback.print_exc() # 每分钟检查一次配置 def check_and_schedule(): config = database.get_system_config() # 清除旧的任务 schedule.clear() # 时区转换函数:将CST时间转换为UTC时间(容器使用UTC) def cst_to_utc_time(cst_time_str): """将CST时间字符串(HH:MM)转换为UTC时间字符串 Args: cst_time_str: CST时间字符串,格式为 HH:MM Returns: UTC时间字符串,格式为 HH:MM """ # 解析CST时间 hour, minute = map(int, cst_time_str.split(':')) # CST是UTC+8,所以UTC时间 = CST时间 - 8小时 utc_hour = (hour - 8) % 24 return f"{utc_hour:02d}:{minute:02d}" # 始终添加每天凌晨3点(CST)的数据清理任务 cleanup_utc_time = cst_to_utc_time("03:00") schedule.every().day.at(cleanup_utc_time).do(cleanup_old_data) print(f"[定时任务] 已设置数据清理任务: 每天 CST 03:00 (UTC {cleanup_utc_time})") # 每小时清理过期验证码 schedule.every().hour.do(cleanup_expired_captcha) print(f"[定时任务] 已设置验证码清理任务: 每小时执行一次") # 如果启用了定时浏览任务,则添加 if config.get('schedule_enabled'): schedule_time_cst = config.get('schedule_time', '02:00') schedule_time_utc = cst_to_utc_time(schedule_time_cst) schedule.every().day.at(schedule_time_utc).do(run_scheduled_task) print(f"[定时任务] 已设置浏览任务: 每天 CST {schedule_time_cst} (UTC {schedule_time_utc})") # 初始检查 check_and_schedule() last_check = time.time() while True: try: # 执行待执行的任务 schedule.run_pending() # 每5秒重新检查一次配置(提高检查频率以确保定时任务准时执行) if time.time() - last_check > 5: check_and_schedule() check_user_schedules() # 检查用户定时任务 last_check = time.time() time.sleep(1) except Exception as e: print(f"[定时任务] 调度器出错: {str(e)}") time.sleep(5) # ========== 断点续传API ========== @app.route('/yuyx/api/checkpoint/paused') @admin_required def checkpoint_get_paused(): try: user_id = request.args.get('user_id', type=int) tasks = checkpoint_mgr.get_paused_tasks(user_id=user_id) return jsonify({'success': True, 'tasks': tasks}) except Exception as e: logger.error(f"获取暂停任务失败: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/yuyx/api/checkpoint//resume', methods=['POST']) @admin_required def checkpoint_resume(task_id): try: checkpoint = checkpoint_mgr.get_checkpoint(task_id) if not checkpoint: return jsonify({'success': False, 'message': '任务不存在'}), 404 if checkpoint['status'] != 'paused': return jsonify({'success': False, 'message': '任务未暂停'}), 400 if checkpoint_mgr.resume_task(task_id): import threading threading.Thread( target=run_task, args=(checkpoint['user_id'], checkpoint['account_id'], checkpoint['browse_type'], True, 'resumed'), daemon=True ).start() return jsonify({'success': True}) return jsonify({'success': False}), 500 except Exception as e: logger.error(f"恢复任务失败: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/yuyx/api/checkpoint//abandon', methods=['POST']) @admin_required def checkpoint_abandon(task_id): try: if checkpoint_mgr.abandon_task(task_id): return jsonify({'success': True}) return jsonify({'success': False}), 404 except Exception as e: return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 邮件服务API ==================== import email_service @app.route('/yuyx/api/email/settings', methods=['GET']) @admin_required def get_email_settings_api(): """获取全局邮件设置""" try: settings = email_service.get_email_settings() return jsonify(settings) except Exception as e: logger.error(f"获取邮件设置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/email/settings', methods=['POST']) @admin_required def update_email_settings_api(): """更新全局邮件设置""" try: data = request.json enabled = data.get('enabled', False) failover_enabled = data.get('failover_enabled', True) register_verify_enabled = data.get('register_verify_enabled') base_url = data.get('base_url') task_notify_enabled = data.get('task_notify_enabled') email_service.update_email_settings( enabled=enabled, failover_enabled=failover_enabled, register_verify_enabled=register_verify_enabled, base_url=base_url, task_notify_enabled=task_notify_enabled ) return jsonify({'success': True}) except Exception as e: logger.error(f"更新邮件设置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs', methods=['GET']) @admin_required def get_smtp_configs_api(): """获取所有SMTP配置列表""" try: configs = email_service.get_smtp_configs(include_password=False) return jsonify(configs) except Exception as e: logger.error(f"获取SMTP配置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs', methods=['POST']) @admin_required def create_smtp_config_api(): """创建SMTP配置""" try: data = request.json # 验证必填字段 if not data.get('host'): return jsonify({'error': 'SMTP服务器地址不能为空'}), 400 if not data.get('username'): return jsonify({'error': 'SMTP用户名不能为空'}), 400 config_id = email_service.create_smtp_config(data) return jsonify({'success': True, 'id': config_id}) except Exception as e: logger.error(f"创建SMTP配置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs/', methods=['GET']) @admin_required def get_smtp_config_api(config_id): """获取单个SMTP配置详情""" try: config = email_service.get_smtp_config(config_id, include_password=False) if not config: return jsonify({'error': '配置不存在'}), 404 return jsonify(config) except Exception as e: logger.error(f"获取SMTP配置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs/', methods=['PUT']) @admin_required def update_smtp_config_api(config_id): """更新SMTP配置""" try: data = request.json if email_service.update_smtp_config(config_id, data): return jsonify({'success': True}) return jsonify({'error': '更新失败'}), 400 except Exception as e: logger.error(f"更新SMTP配置��败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs/', methods=['DELETE']) @admin_required def delete_smtp_config_api(config_id): """删除SMTP配置""" try: if email_service.delete_smtp_config(config_id): return jsonify({'success': True}) return jsonify({'error': '删除失败'}), 400 except Exception as e: logger.error(f"删除SMTP配置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs//test', methods=['POST']) @admin_required def test_smtp_config_api(config_id): """测试SMTP配置""" try: data = request.json test_email = data.get('email', '') if not test_email: return jsonify({'error': '请提供测试邮箱'}), 400 # 简单验证邮箱格式 import re if not re.match(r'^[^\s@]+@[^\s@]+\.[^\s@]+$', test_email): return jsonify({'error': '邮箱格式不正确'}), 400 result = email_service.test_smtp_config(config_id, test_email) return jsonify(result) except Exception as e: logger.error(f"测试SMTP配置失败: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/yuyx/api/smtp/configs//primary', methods=['POST']) @admin_required def set_primary_smtp_config_api(config_id): """设置主SMTP配置""" try: if email_service.set_primary_smtp_config(config_id): return jsonify({'success': True}) return jsonify({'error': '设置失败'}), 400 except Exception as e: logger.error(f"设置主SMTP配置失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/email/stats', methods=['GET']) @admin_required def get_email_stats_api(): """获取邮件发送统计""" try: stats = email_service.get_email_stats() return jsonify(stats) except Exception as e: logger.error(f"获取邮件统计失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/email/logs', methods=['GET']) @admin_required def get_email_logs_api(): """获取邮件发送日志""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 20, type=int) email_type = request.args.get('type', None) status = request.args.get('status', None) # 限制page_size范围 page_size = min(max(page_size, 10), 100) result = email_service.get_email_logs(page, page_size, email_type, status) return jsonify(result) except Exception as e: logger.error(f"获取邮件日志失败: {e}") return jsonify({'error': str(e)}), 500 @app.route('/yuyx/api/email/logs/cleanup', methods=['POST']) @admin_required def cleanup_email_logs_api(): """清理过期邮件日志""" try: data = request.json or {} days = data.get('days', 30) # 限制days范围 days = min(max(days, 7), 365) deleted = email_service.cleanup_email_logs(days) return jsonify({'success': True, 'deleted': deleted}) except Exception as e: logger.error(f"清理邮件日志失败: {e}") return jsonify({'error': str(e)}), 500 # 初始化浏览器池(在后台线程中预热,不阻塞启动) # ==================== 用户定时任务API ==================== @app.route('/api/schedules', methods=['GET']) @login_required def get_user_schedules_api(): """获取当前用户的所有定时任务""" schedules = database.get_user_schedules(current_user.id) import json for s in schedules: try: s['account_ids'] = json.loads(s.get('account_ids', '[]') or '[]') except: s['account_ids'] = [] return jsonify(schedules) @app.route('/api/schedules', methods=['POST']) @login_required def create_user_schedule_api(): """创建用户定时任务""" data = request.json name = data.get('name', '我的定时任务') schedule_time = data.get('schedule_time', '08:00') weekdays = data.get('weekdays', '1,2,3,4,5') browse_type = data.get('browse_type', '应读') enable_screenshot = data.get('enable_screenshot', 1) account_ids = data.get('account_ids', []) import re if not re.match(r'^\d{2}:\d{2}$', schedule_time): return jsonify({"error": "时间格式不正确,应为 HH:MM"}), 400 schedule_id = database.create_user_schedule( user_id=current_user.id, name=name, schedule_time=schedule_time, weekdays=weekdays, browse_type=browse_type, enable_screenshot=enable_screenshot, account_ids=account_ids ) if schedule_id: return jsonify({"success": True, "id": schedule_id}) return jsonify({"error": "创建失败"}), 500 @app.route('/api/schedules/', methods=['GET']) @login_required def get_schedule_detail_api(schedule_id): """获取定时任务详情""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 import json try: schedule['account_ids'] = json.loads(schedule.get('account_ids', '[]') or '[]') except: schedule['account_ids'] = [] return jsonify(schedule) @app.route('/api/schedules/', methods=['PUT']) @login_required def update_schedule_api(schedule_id): """更新定时任务""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 data = request.json allowed_fields = ['name', 'schedule_time', 'weekdays', 'browse_type', 'enable_screenshot', 'account_ids', 'enabled'] update_data = {k: v for k, v in data.items() if k in allowed_fields} if 'schedule_time' in update_data: import re if not re.match(r'^\d{2}:\d{2}$', update_data['schedule_time']): return jsonify({"error": "时间格式不正确"}), 400 success = database.update_user_schedule(schedule_id, **update_data) if success: return jsonify({"success": True}) return jsonify({"error": "更新失败"}), 500 @app.route('/api/schedules/', methods=['DELETE']) @login_required def delete_schedule_api(schedule_id): """删除定时任务""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 success = database.delete_user_schedule(schedule_id) if success: return jsonify({"success": True}) return jsonify({"error": "删除失败"}), 500 @app.route('/api/schedules//toggle', methods=['POST']) @login_required def toggle_schedule_api(schedule_id): """启用/禁用定时任务""" schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 data = request.json enabled = data.get('enabled', not schedule['enabled']) success = database.toggle_user_schedule(schedule_id, enabled) if success: return jsonify({"success": True, "enabled": enabled}) return jsonify({"error": "操作失败"}), 500 @app.route('/api/schedules//run', methods=['POST']) @login_required def run_schedule_now_api(schedule_id): """立即执行定时任务""" import json schedule = database.get_schedule_by_id(schedule_id) if not schedule: return jsonify({"error": "定时任务不存在"}), 404 if schedule['user_id'] != current_user.id: return jsonify({"error": "无权访问"}), 403 try: account_ids = json.loads(schedule.get('account_ids', '[]') or '[]') except: account_ids = [] if not account_ids: return jsonify({"error": "没有配置账号"}), 400 user_id = current_user.id browse_type = schedule['browse_type'] enable_screenshot = schedule['enable_screenshot'] started = [] for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: continue account = user_accounts[user_id][account_id] if account.is_running: continue account.is_running = True account.should_stop = False account.status = "排队中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'user_scheduled'), daemon=True ) thread.start() active_tasks[account_id] = thread started.append(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') database.update_schedule_last_run(schedule_id) return jsonify({ "success": True, "started_count": len(started), "message": f"已启动 {len(started)} 个账号" }) # ==================== 定时任务执行日志API ==================== @app.route('/api/schedules//logs', methods=['GET']) @login_required def get_schedule_logs_api(schedule_id): """获取定时任务执行日志""" try: schedule = database.get_schedule_by_id(schedule_id) if not schedule or schedule['user_id'] != current_user.id: return jsonify([]) limit = request.args.get('limit', 20, type=int) logs = database.get_schedule_execution_logs(schedule_id, limit) return jsonify(logs if logs else []) except Exception as e: return jsonify([]) @app.route('/api/schedules//logs', methods=['DELETE']) @login_required def delete_schedule_logs_api(schedule_id): """清空定时任务执行日志""" try: schedule = database.get_schedule_by_id(schedule_id) if not schedule or schedule['user_id'] != current_user.id: return jsonify({"error": "无权限"}), 403 deleted = database.delete_schedule_logs(schedule_id, current_user.id) return jsonify({"success": True, "deleted": deleted}) except Exception as e: return jsonify({"error": str(e)}), 500 # ==================== 批量操作API ==================== @app.route('/api/accounts/batch/start', methods=['POST']) @login_required def batch_start_accounts(): """批量启动账号""" user_id = current_user.id data = request.json account_ids = data.get('account_ids', []) browse_type = data.get('browse_type', '应读') enable_screenshot = data.get('enable_screenshot', True) if not account_ids: return jsonify({"error": "请选择要启动的账号"}), 400 started = [] failed = [] for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: failed.append({'id': account_id, 'reason': '账号不存在'}) continue account = user_accounts[user_id][account_id] if account.is_running: failed.append({'id': account_id, 'reason': '已在运行中'}) continue account.is_running = True account.should_stop = False account.status = "排队中" thread = threading.Thread( target=run_task, args=(user_id, account_id, browse_type, enable_screenshot, 'batch'), daemon=True ) thread.start() active_tasks[account_id] = thread started.append(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({ "success": True, "started_count": len(started), "failed_count": len(failed), "started": started, "failed": failed }) @app.route('/api/accounts/batch/stop', methods=['POST']) @login_required def batch_stop_accounts(): """批量停止账号""" user_id = current_user.id data = request.json account_ids = data.get('account_ids', []) if not account_ids: return jsonify({"error": "请选择要停止的账号"}), 400 stopped = [] for account_id in account_ids: if user_id not in user_accounts or account_id not in user_accounts[user_id]: continue account = user_accounts[user_id][account_id] if not account.is_running: continue account.should_stop = True account.status = "正在停止" stopped.append(account_id) socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}') return jsonify({ "success": True, "stopped_count": len(stopped), "stopped": stopped }) def cleanup_on_exit(): """应用退出时的清理函数""" import signal import sys print("\n正在清理资源...") # 1. 停止所有运行中的任务 print("- 停止运行中的任务...") for account_id, thread in list(active_tasks.items()): try: # 设置停止标志 if account_id in user_accounts: for user_id in user_accounts: if account_id in user_accounts[user_id]: user_accounts[user_id][account_id].should_stop = True except: pass # 2. 等待所有线程完成(最多等待5秒) print("- 等待线程退出...") for account_id, thread in list(active_tasks.items()): try: if thread and thread.is_alive(): thread.join(timeout=2) except: pass # 3. 关闭浏览器工作线程池 print("- 关闭浏览器线程池...") try: shutdown_browser_worker_pool() except: pass # 3.5 关闭邮件队列 print("- 关闭邮件队列...") try: email_service.shutdown_email_queue() except: pass # 4. 关闭数据库连接池 print("- 关闭数据库连接池...") try: db_pool._pool.close_all() if db_pool._pool else None except: pass print("✓ 资源清理完成") if __name__ == '__main__': import signal import atexit # 注册退出清理函数 atexit.register(cleanup_on_exit) # 注册信号处理器 def signal_handler(sig, frame): print("\n\n收到退出信号,正在关闭...") cleanup_on_exit() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) print("=" * 60) print("知识管理平台自动化工具 - 多用户版") print("=" * 60) # 初始化数据库 database.init_database() checkpoint_mgr = get_checkpoint_manager() print("✓ 任务断点管理器已初始化") # 初始化邮件服务 try: email_service.init_email_service() print("✓ 邮件服务已初始化") except Exception as e: print(f"警告: 邮件服务初始化失败: {e}") # 启动内存清理调度器 start_cleanup_scheduler() # 加载系统配置(并发设置) try: system_config = database.get_system_config() if system_config: # 使用globals()修改全局变量 globals()['max_concurrent_global'] = system_config.get('max_concurrent_global', 2) globals()['max_concurrent_per_account'] = system_config.get('max_concurrent_per_account', 1) # 重新创建信号量 globals()['global_semaphore'] = threading.Semaphore(globals()['max_concurrent_global']) print(f"✓ 已加载并发配置: 全局={globals()['max_concurrent_global']}, 单账号={globals()['max_concurrent_per_account']}") except Exception as e: print(f"警告: 加载并发配置失败,使用默认值: {e}") # 主线程初始化浏览器(Playwright不支持跨线程) print("\n正在初始化浏览器管理器...") init_browser_manager() # 启动定时任务调度器 print("\n启动定时任务调度器...") scheduler_thread = threading.Thread(target=scheduled_task_worker, daemon=True) scheduler_thread.start() print("✓ 定时任务调度器已启动") # 启动状态推送线程(每秒推送运行中任务状态) status_thread = threading.Thread(target=status_push_worker, daemon=True) status_thread.start() print("✓ 状态推送线程已启动(1秒/次)") # 启动Web服务器 print("\n服务器启动中...") print(f"用户访问地址: http://{config.SERVER_HOST}:{config.SERVER_PORT}") print(f"后台管理地址: http://{config.SERVER_HOST}:{config.SERVER_PORT}/yuyx") print("默认管理员: admin/admin") print("=" * 60 + "\n") # 初始化浏览器工作线程池(按需模式,启动时不创建浏览器) try: system_cfg = database.get_system_config() pool_size = system_cfg.get('max_screenshot_concurrent', 3) if system_cfg else 3 print(f"初始化截图线程池({pool_size}个worker,按需启动浏览器,空闲5分钟后自动关闭)...") init_browser_worker_pool(pool_size=pool_size) print("✓ 截图线程池初始化完成") except Exception as e: print(f"警告: 截图线程池初始化失败: {e}") socketio.run(app, host=config.SERVER_HOST, port=config.SERVER_PORT, debug=config.DEBUG, allow_unsafe_werkzeug=True)