Files
zsglpt/app.py.broken
Yu Yon b5344cd55e 修复所有bug并添加新功能
- 修复添加账号按钮无反应问题
- 添加账号备注字段(可选)
- 添加账号设置按钮(修改密码/备注)
- 修复用户反馈���能
- 添加定时任务执行日志
- 修复容器重启后账号加载问题
- 修复所有JavaScript语法错误
- 优化账号加载机制(4层保障)

🤖 Generated with Claude Code
2025-12-10 11:19:16 +08:00

2255 lines
84 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
知识管理平台自动化工具 - 多用户版本
支持用户注册登录、后台管理、数据隔离
"""
# 设置时区为中国标准时间CST, UTC+8
import os
os.environ['TZ'] = 'Asia/Shanghai'
try:
import time
time.tzset()
except AttributeError:
pass # Windows系统不支持tzset()
import pytz
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_from_directory, redirect, url_for, session
from flask_socketio import SocketIO, emit, join_room, leave_room
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import threading
import time
import json
import os
from datetime import datetime, timedelta, timezone
from functools import wraps
# 导入数据库模块和核心模块
import database
import requests
from playwright_automation import PlaywrightBrowserManager, PlaywrightAutomation, BrowseResult
from browser_installer import check_and_install_browser
# ========== 优化模块导入 ==========
from app_config import get_config
from app_logger import init_logging, get_logger, audit_logger
from app_security import (
ip_rate_limiter, require_ip_not_locked,
validate_username, validate_password, validate_email,
is_safe_path, sanitize_filename, get_client_ip
)
# ========== 初始化配置 ==========
config = get_config()
app = Flask(__name__)
# SECRET_KEY持久化,避免重启后所有用户登出
SECRET_KEY_FILE = 'data/secret_key.txt'
if os.path.exists(SECRET_KEY_FILE):
with open(SECRET_KEY_FILE, 'r') as f:
SECRET_KEY = f.read().strip()
else:
SECRET_KEY = os.urandom(24).hex()
os.makedirs('data', exist_ok=True)
with open(SECRET_KEY_FILE, 'w') as f:
f.write(SECRET_KEY)
print(f"✓ 已生成新的SECRET_KEY并保存")
app.config.from_object(config)
socketio = SocketIO(app, cors_allowed_origins="*")
# ========== 初始化日志系统 ==========
init_logging(log_level=config.LOG_LEVEL, log_file=config.LOG_FILE)
logger = get_logger('app')
logger.info("="*60)
logger.info("知识管理平台自动化工具 - 多用户版")
logger.info("="*60)
# Flask-Login 配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login_page'
# 截图目录
SCREENSHOTS_DIR = config.SCREENSHOTS_DIR
os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
# 全局变量
browser_manager = None
user_accounts = {} # {user_id: {account_id: Account对象}}
active_tasks = {} # {account_id: Thread对象}
log_cache = {} # {user_id: [logs]} 每个用户独立的日志缓存
log_cache_total_count = 0 # 全局日志总数,防止无限增长
# 日志缓存限制
MAX_LOGS_PER_USER = config.MAX_LOGS_PER_USER # 每个用户最多100条
MAX_TOTAL_LOGS = config.MAX_TOTAL_LOGS # 全局最多1000条,防止内存泄漏
# 并发控制每个用户同时最多运行1个账号避免内存不足
# 验证码存储:{session_id: {"code": "1234", "expire_time": timestamp, "failed_attempts": 0}}
captcha_storage = {}
# IP限流存储:{ip: {"attempts": count, "lock_until": timestamp, "first_attempt": timestamp}}
ip_rate_limit = {}
# 限流配置
MAX_CAPTCHA_ATTEMPTS = 5 # 每个验证码最多尝试次数
MAX_IP_ATTEMPTS_PER_HOUR = 10 # 每小时每个IP最多验证码错误次数
IP_LOCK_DURATION = 3600 # IP锁定时长(秒) - 1小时
# 全局限制:整个系统同时最多运行2个账号线程本地架构,每个线程独立浏览器,内存占用约200MB/浏览器)
max_concurrent_per_account = 1 # 每个用户最多1个
max_concurrent_global = 2 # 全局最多2个线程本地架构内存需求更高
user_semaphores = {} # {user_id: Semaphore}
global_semaphore = threading.Semaphore(max_concurrent_global)
# 截图专用信号量:限制同时进行的截图任务数量为1(避免资源竞争)
screenshot_semaphore = threading.Semaphore(1)
class User(UserMixin):
"""Flask-Login 用户类"""
def __init__(self, user_id):
self.id = user_id
class Admin(UserMixin):
"""管理员类"""
def __init__(self, admin_id):
self.id = admin_id
self.is_admin = True
class Account:
"""账号类"""
def __init__(self, account_id, user_id, username, password, remember=True, remark=''):
self.id = account_id
self.user_id = user_id
self.username = username
self.password = password
self.remember = remember
self.remark = remark
self.status = "未开始"
self.is_running = False
self.should_stop = False
self.total_items = 0
self.total_attachments = 0
self.automation = None
self.last_browse_type = "注册前未读"
self.proxy_config = None # 保存代理配置,浏览和截图共用
def to_dict(self):
return {
"id": self.id,
"username": self.username,
"status": self.status,
"remark": self.remark,
"total_items": self.total_items,
"total_attachments": self.total_attachments,
"is_running": self.is_running
}
@login_manager.user_loader
def load_user(user_id):
"""Flask-Login 用户加载"""
user = database.get_user_by_id(int(user_id))
if user:
return User(user['id'])
return None
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
return f(*args, **kwargs)
return decorated_function
def log_to_client(message, user_id=None, account_id=None):
"""发送日志到Web客户端(用户隔离)"""
beijing_tz = timezone(timedelta(hours=8))
timestamp = datetime.now(beijing_tz).strftime('%H:%M:%S')
log_data = {
'timestamp': timestamp,
'message': message,
'account_id': account_id
}
# 如果指定了user_id,则缓存到该用户的日志
if user_id:
global log_cache_total_count
if user_id not in log_cache:
log_cache[user_id] = []
log_cache[user_id].append(log_data)
log_cache_total_count += 1
# 持久化到数据库 (已禁用,使用task_logs表代替)
# try:
# database.save_operation_log(user_id, message, account_id, 'INFO')
# except Exception as e:
# print(f"保存日志到数据库失败: {e}")
# 单用户限制
if len(log_cache[user_id]) > MAX_LOGS_PER_USER:
log_cache[user_id].pop(0)
log_cache_total_count -= 1
# 全局限制 - 如果超过总数限制,清理日志最多的用户
while log_cache_total_count > MAX_TOTAL_LOGS:
if log_cache:
max_user = max(log_cache.keys(), key=lambda u: len(log_cache[u]))
if log_cache[max_user]:
log_cache[max_user].pop(0)
log_cache_total_count -= 1
else:
break
else:
break
# 发送到该用户的room
socketio.emit('log', log_data, room=f'user_{user_id}')
print(f"[{timestamp}] User:{user_id} {message}")
def get_proxy_from_api(api_url, max_retries=3):
"""从API获取代理IP支持重试
Args:
api_url: 代理API地址
max_retries: 最大重试次数
Returns:
代理服务器地址(格式: http://IP:PORT或 None
"""
for attempt in range(max_retries):
try:
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
ip_port = response.text.strip()
if ip_port and ':' in ip_port:
proxy_server = f"http://{ip_port}"
print(f"✓ 获取代理成功: {proxy_server} (尝试 {attempt + 1}/{max_retries})")
return proxy_server
else:
print(f"✗ 代理格式错误: {ip_port} (尝试 {attempt + 1}/{max_retries})")
else:
print(f"✗ 获取代理失败: HTTP {response.status_code} (尝试 {attempt + 1}/{max_retries})")
except Exception as e:
print(f"✗ 获取代理异常: {str(e)} (尝试 {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(1)
print(f"✗ 获取代理失败,已重试 {max_retries} 次,将不使用代理继续")
return None
def init_browser_manager():
"""初始化浏览器管理器"""
global browser_manager
if browser_manager is None:
print("正在初始化Playwright浏览器管理器...")
if not check_and_install_browser(log_callback=lambda msg, account_id=None: print(msg)):
print("浏览器环境检查失败!")
return False
browser_manager = PlaywrightBrowserManager(
headless=True,
log_callback=lambda msg, account_id=None: print(msg)
)
try:
# 不再需要initialize(),每个账号会创建独立浏览器
print("Playwright浏览器管理器创建成功")
return True
except Exception as e:
print(f"Playwright初始化失败: {str(e)}")
return False
return True
# ==================== 前端路由 ====================
@app.route('/')
def index():
"""主页 - 重定向到登录或应用"""
if current_user.is_authenticated:
return redirect(url_for('app_page'))
return redirect(url_for('login_page'))
@app.route('/login')
def login_page():
"""登录页面"""
return render_template('login.html')
@app.route('/register')
def register_page():
"""注册页面"""
return render_template('register.html')
@app.route('/app')
@login_required
def app_page():
"""主应用页面"""
return render_template('index.html')
@app.route('/yuyx')
def admin_login_page():
"""后台登录页面"""
if 'admin_id' in session:
return redirect(url_for('admin_page'))
return render_template('admin_login.html')
@app.route('/yuyx/admin')
@admin_required
def admin_page():
"""后台管理页面"""
return render_template('admin.html')
@app.route('/yuyx/vip')
@admin_required
def vip_admin_page():
"""VIP管理页面"""
return render_template('vip_admin.html')
# ==================== 用户认证API ====================
@app.route('/api/register', methods=['POST'])
@require_ip_not_locked # IP限流保护
def register():
"""用户注册"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
email = data.get('email', '').strip()
captcha_session = data.get('captcha_session', '')
captcha_code = data.get('captcha', '').strip()
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
# 验证验证码
if not captcha_session or captcha_session not in captcha_storage:
return jsonify({"error": "验证码已过期,请重新获取"}), 400
captcha_data = captcha_storage[captcha_session]
if captcha_data["expire_time"] < time.time():
del captcha_storage[captcha_session]
return jsonify({"error": "验证码已过期,请重新获取"}), 400
# 获取客户端IP
client_ip = request.headers.get('X-Forwarded-For', request.headers.get('X-Real-IP', request.remote_addr))
if client_ip and ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
# 检查IP限流
allowed, error_msg = check_ip_rate_limit(client_ip)
if not allowed:
return jsonify({"error": error_msg}), 429
# 检查验证码尝试次数
if captcha_data.get("failed_attempts", 0) >= MAX_CAPTCHA_ATTEMPTS:
del captcha_storage[captcha_session]
return jsonify({"error": "验证码尝试次数过多,请重新获取"}), 400
if captcha_data["code"] != captcha_code:
# 记录失败次数
captcha_data["failed_attempts"] = captcha_data.get("failed_attempts", 0) + 1
# 记录IP失败尝试
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": "验证码错误(剩余{}次机会)".format(
MAX_CAPTCHA_ATTEMPTS - captcha_data["failed_attempts"])}), 400
# 验证成功,删除已使用的验证码
del captcha_storage[captcha_session]
user_id = database.create_user(username, password, email)
if user_id:
return jsonify({"success": True, "message": "注册成功,请等待管理员审核"})
else:
return jsonify({"error": "用户名已存在"}), 400
# ==================== 验证码API ====================
import random
def check_ip_rate_limit(ip_address):
"""检查IP是否被限流"""
current_time = time.time()
# 清理过期的IP记录
expired_ips = [ip for ip, data in ip_rate_limit.items()
if data.get("lock_until", 0) < current_time and
current_time - data.get("first_attempt", current_time) > 3600]
for ip in expired_ips:
del ip_rate_limit[ip]
# 检查IP是否被锁定
if ip_address in ip_rate_limit:
ip_data = ip_rate_limit[ip_address]
# 如果IP被锁定且未到解锁时间
if ip_data.get("lock_until", 0) > current_time:
remaining_time = int(ip_data["lock_until"] - current_time)
return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1)
# 如果超过1小时,重置计数
if current_time - ip_data.get("first_attempt", current_time) > 3600:
ip_rate_limit[ip_address] = {
"attempts": 0,
"first_attempt": current_time
}
return True, None
def record_failed_captcha(ip_address):
"""记录验证码失败尝试"""
current_time = time.time()
if ip_address not in ip_rate_limit:
ip_rate_limit[ip_address] = {
"attempts": 1,
"first_attempt": current_time
}
else:
ip_rate_limit[ip_address]["attempts"] += 1
# 检查是否超过限制
if ip_rate_limit[ip_address]["attempts"] >= MAX_IP_ATTEMPTS_PER_HOUR:
ip_rate_limit[ip_address]["lock_until"] = current_time + IP_LOCK_DURATION
return True # 表示IP已被锁定
return False # 表示还未锁定
@app.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码"""
import uuid
session_id = str(uuid.uuid4())
# 生成4位随机数字
code = "".join([str(random.randint(0, 9)) for _ in range(4)])
# 存储验证码5分钟过期
captcha_storage[session_id] = {
"code": code,
"expire_time": time.time() + 300,
"failed_attempts": 0
}
# 清理过期验证码
expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < time.time()]
for k in expired_keys:
del captcha_storage[k]
return jsonify({"session_id": session_id, "captcha": code})
@app.route('/api/login', methods=['POST'])
@require_ip_not_locked # IP限流保护
def login():
"""用户登录"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
captcha_session = data.get('captcha_session', '')
captcha_code = data.get('captcha', '').strip()
need_captcha = data.get('need_captcha', False)
# 如果需要验证码,验证验证码
if need_captcha:
if not captcha_session or captcha_session not in captcha_storage:
return jsonify({"error": "验证码已过期,请重新获取"}), 400
captcha_data = captcha_storage[captcha_session]
if captcha_data["expire_time"] < time.time():
del captcha_storage[captcha_session]
return jsonify({"error": "验证码已过期,请重新获取"}), 400
if captcha_data["code"] != captcha_code:
return jsonify({"error": "验证码错误"}), 400
# 验证成功,删除已使用的验证码
del captcha_storage[captcha_session]
# 先检查用户是否存在
user_exists = database.get_user_by_username(username)
if not user_exists:
return jsonify({"error": "账号未注册", "need_captcha": True}), 401
# 检查密码是否正确
user = database.verify_user(username, password)
if not user:
# 密码错误
return jsonify({"error": "密码错误", "need_captcha": True}), 401
# 检查审核状态
if user['status'] != 'approved':
return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401
# 登录成功
user_obj = User(user['id'])
login_user(user_obj)
load_user_accounts(user['id'])
return jsonify({"success": True})
@app.route('/api/logout', methods=['POST'])
@login_required
def logout():
"""用户登出"""
logout_user()
return jsonify({"success": True})
# ==================== 管理员认证API ====================
@app.route('/yuyx/api/login', methods=['POST'])
@require_ip_not_locked # IP限流保护
def admin_login():
"""管理员登录"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
captcha_session = data.get('captcha_session', '')
captcha_code = data.get('captcha', '').strip()
need_captcha = data.get('need_captcha', False)
# 如果需要验证码,验证验证码
if need_captcha:
if not captcha_session or captcha_session not in captcha_storage:
return jsonify({"error": "验证码已过期,请重新获取"}), 400
captcha_data = captcha_storage[captcha_session]
if captcha_data["expire_time"] < time.time():
del captcha_storage[captcha_session]
return jsonify({"error": "验证码已过期,请重新获取"}), 400
if captcha_data["code"] != captcha_code:
return jsonify({"error": "验证码错误"}), 400
# 验证成功,删除已使用的验证码
del captcha_storage[captcha_session]
admin = database.verify_admin(username, password)
if admin:
session['admin_id'] = admin['id']
session['admin_username'] = admin['username']
return jsonify({"success": True})
else:
return jsonify({"error": "管理员用户名或密码错误", "need_captcha": True}), 401
@app.route('/yuyx/api/logout', methods=['POST'])
@admin_required
def admin_logout():
"""管理员登出"""
session.pop('admin_id', None)
session.pop('admin_username', None)
return jsonify({"success": True})
@app.route('/yuyx/api/users', methods=['GET'])
@admin_required
def get_all_users():
"""获取所有用户"""
users = database.get_all_users()
return jsonify(users)
@app.route('/yuyx/api/users/pending', methods=['GET'])
@admin_required
def get_pending_users():
"""获取待审核用户"""
users = database.get_pending_users()
return jsonify(users)
@app.route('/yuyx/api/users/<int:user_id>/approve', methods=['POST'])
@admin_required
def approve_user_route(user_id):
"""审核通过用户"""
if database.approve_user(user_id):
return jsonify({"success": True})
return jsonify({"error": "审核失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>/reject', methods=['POST'])
@admin_required
def reject_user_route(user_id):
"""拒绝用户"""
if database.reject_user(user_id):
return jsonify({"success": True})
return jsonify({"error": "拒绝失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user_route(user_id):
"""删除用户"""
if database.delete_user(user_id):
# 清理内存中的账号数据
if user_id in user_accounts:
del user_accounts[user_id]
# 清理用户信号量,防止内存泄漏
if user_id in user_semaphores:
del user_semaphores[user_id]
# 清理用户日志缓存,防止内存泄漏
global log_cache_total_count
if user_id in log_cache:
log_cache_total_count -= len(log_cache[user_id])
del log_cache[user_id]
return jsonify({"success": True})
return jsonify({"error": "删除失败"}), 400
@app.route('/yuyx/api/stats', methods=['GET'])
@admin_required
def get_system_stats():
"""获取系统统计"""
stats = database.get_system_stats()
# 从session获取管理员用户名
stats["admin_username"] = session.get('admin_username', 'admin')
return jsonify(stats)
@app.route('/yuyx/api/docker_stats', methods=['GET'])
@admin_required
def get_docker_stats():
"""获取Docker容器运行状态"""
import subprocess
docker_status = {
'running': False,
'container_name': 'N/A',
'uptime': 'N/A',
'memory_usage': 'N/A',
'memory_limit': 'N/A',
'memory_percent': 'N/A',
'cpu_percent': 'N/A',
'status': 'Unknown'
}
try:
# 检查是否在Docker容器内
if os.path.exists('/.dockerenv'):
docker_status['running'] = True
# 获取容器名称
try:
with open('/etc/hostname', 'r') as f:
docker_status['container_name'] = f.read().strip()
except:
pass
# 获取内存使用情况 (cgroup v2)
try:
# 尝试cgroup v2路径
if os.path.exists('/sys/fs/cgroup/memory.current'):
# Read total memory
with open('/sys/fs/cgroup/memory.current', 'r') as f:
mem_total = int(f.read().strip())
# Read cache from memory.stat
cache = 0
if os.path.exists('/sys/fs/cgroup/memory.stat'):
with open('/sys/fs/cgroup/memory.stat', 'r') as f:
for line in f:
if line.startswith('inactive_file '):
cache = int(line.split()[1])
break
# Actual memory = total - cache
mem_bytes = mem_total - cache
docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024)
# 获取内存限制
if os.path.exists('/sys/fs/cgroup/memory.max'):
with open('/sys/fs/cgroup/memory.max', 'r') as f:
limit_str = f.read().strip()
if limit_str != 'max':
limit_bytes = int(limit_str)
docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024)
docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100)
# 尝试cgroup v1路径
elif os.path.exists('/sys/fs/cgroup/memory/memory.usage_in_bytes'):
# 从 memory.stat 读取内存信息
mem_bytes = 0
if os.path.exists('/sys/fs/cgroup/memory/memory.stat'):
with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f:
rss = 0
cache = 0
for line in f:
if line.startswith('total_rss '):
rss = int(line.split()[1])
elif line.startswith('total_cache '):
cache = int(line.split()[1])
# 使用 RSS + (一部分活跃的cache),更接近docker stats的计算
# 但为了准确性,我们只用RSS
mem_bytes = rss
# 如果找不到,则使用总内存减去缓存作为后备
if mem_bytes == 0:
with open('/sys/fs/cgroup/memory/memory.usage_in_bytes', 'r') as f:
total_mem = int(f.read().strip())
cache = 0
if os.path.exists('/sys/fs/cgroup/memory/memory.stat'):
with open('/sys/fs/cgroup/memory/memory.stat', 'r') as f:
for line in f:
if line.startswith('total_inactive_file '):
cache = int(line.split()[1])
break
mem_bytes = total_mem - cache
docker_status['memory_usage'] = "{:.2f} MB".format(mem_bytes / 1024 / 1024)
# 获取内存限制
if os.path.exists('/sys/fs/cgroup/memory/memory.limit_in_bytes'):
with open('/sys/fs/cgroup/memory/memory.limit_in_bytes', 'r') as f:
limit_bytes = int(f.read().strip())
# 检查是否是实际限制(不是默认的超大值)
if limit_bytes < 9223372036854771712:
docker_status['memory_limit'] = "{:.2f} GB".format(limit_bytes / 1024 / 1024 / 1024)
docker_status['memory_percent'] = "{:.2f}%".format(mem_bytes / limit_bytes * 100)
except Exception as e:
docker_status['memory_usage'] = 'Error: {}'.format(str(e))
# 获取容器运行时间(基于PID 1的启动时间)
try:
# Get PID 1 start time
with open('/proc/1/stat', 'r') as f:
stat_data = f.read().split()
starttime_ticks = int(stat_data[21])
# Get system uptime
with open('/proc/uptime', 'r') as f:
system_uptime = float(f.read().split()[0])
# Get clock ticks per second
import os as os_module
ticks_per_sec = os_module.sysconf(os_module.sysconf_names['SC_CLK_TCK'])
# Calculate container uptime
process_start = starttime_ticks / ticks_per_sec
uptime_seconds = int(system_uptime - process_start)
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
minutes = (uptime_seconds % 3600) // 60
if days > 0:
docker_status['uptime'] = "{}{}小时 {}分钟".format(days, hours, minutes)
elif hours > 0:
docker_status['uptime'] = "{}小时 {}分钟".format(hours, minutes)
else:
docker_status['uptime'] = "{}分钟".format(minutes)
except:
pass
docker_status['status'] = 'Running'
else:
docker_status['status'] = 'Not in Docker'
except Exception as e:
docker_status['status'] = 'Error: {}'.format(str(e))
return jsonify(docker_status)
@app.route('/yuyx/api/admin/password', methods=['PUT'])
@admin_required
def update_admin_password():
"""修改管理员密码"""
data = request.json
new_password = data.get('new_password', '').strip()
if not new_password:
return jsonify({"error": "密码不能为空"}), 400
username = session.get('admin_username')
if database.update_admin_password(username, new_password):
return jsonify({"success": True})
return jsonify({"error": "修改失败"}), 400
@app.route('/yuyx/api/admin/username', methods=['PUT'])
@admin_required
def update_admin_username():
"""修改管理员用户名"""
data = request.json
new_username = data.get('new_username', '').strip()
if not new_username:
return jsonify({"error": "用户名不能为空"}), 400
old_username = session.get('admin_username')
if database.update_admin_username(old_username, new_username):
session['admin_username'] = new_username
return jsonify({"success": True})
return jsonify({"error": "用户名已存在"}), 400
# ==================== 密码重置API ====================
# 管理员直接重置用户密码
@app.route('/yuyx/api/users/<int:user_id>/reset_password', methods=['POST'])
@admin_required
def admin_reset_password_route(user_id):
"""管理员直接重置用户密码(无需审核)"""
data = request.json
new_password = data.get('new_password', '').strip()
if not new_password:
return jsonify({"error": "新密码不能为空"}), 400
if len(new_password) < 6:
return jsonify({"error": "密码长度不能少于6位"}), 400
if database.admin_reset_user_password(user_id, new_password):
return jsonify({"message": "密码重置成功"})
return jsonify({"error": "重置失败,用户不存在"}), 400
# 获取密码重置申请列表
@app.route('/yuyx/api/password_resets', methods=['GET'])
@admin_required
def get_password_resets_route():
"""获取所有待审核的密码重置申请"""
resets = database.get_pending_password_resets()
return jsonify(resets)
# 批准密码重置申请
@app.route('/yuyx/api/password_resets/<int:request_id>/approve', methods=['POST'])
@admin_required
def approve_password_reset_route(request_id):
"""批准密码重置申请"""
if database.approve_password_reset(request_id):
return jsonify({"message": "密码重置申请已批准"})
return jsonify({"error": "批准失败"}), 400
# 拒绝密码重置申请
@app.route('/yuyx/api/password_resets/<int:request_id>/reject', methods=['POST'])
@admin_required
def reject_password_reset_route(request_id):
"""拒绝密码重置申请"""
if database.reject_password_reset(request_id):
return jsonify({"message": "密码重置申请已拒绝"})
return jsonify({"error": "拒绝失败"}), 400
# 用户申请重置密码(需要审核)
@app.route('/api/reset_password_request', methods=['POST'])
def request_password_reset():
"""用户申请重置密码"""
data = request.json
username = data.get('username', '').strip()
email = data.get('email', '').strip()
new_password = data.get('new_password', '').strip()
if not username or not new_password:
return jsonify({"error": "用户名和新密码不能为空"}), 400
if len(new_password) < 6:
return jsonify({"error": "密码长度不能少于6位"}), 400
# 验证用户存在
user = database.get_user_by_username(username)
if not user:
return jsonify({"error": "用户不存在"}), 404
# 如果提供了邮箱,验证邮箱是否匹配
if email and user.get('email') != email:
return jsonify({"error": "邮箱不匹配"}), 400
# 创建重置申请
request_id = database.create_password_reset_request(user['id'], new_password)
if request_id:
return jsonify({"message": "密码重置申请已提交,请等待管理员审核"})
else:
return jsonify({"error": "申请提交失败"}), 500
# ==================== 账号管理API (用户隔离) ====================
def load_user_accounts(user_id):
"""从数据库加载用户的账号到内存"""
if user_id not in user_accounts:
user_accounts[user_id] = {}
accounts_data = database.get_user_accounts(user_id)
for acc_data in accounts_data:
account = Account(
account_id=acc_data['id'],
user_id=user_id,
username=acc_data['username'],
password=acc_data['password'],
remember=bool(acc_data['remember']),
remark=acc_data['remark'] or ''
)
user_accounts[user_id][account.id] = account
@app.route('/api/accounts', methods=['GET'])
@login_required
def get_accounts():
"""获取当前用户的所有账号"""
user_id = current_user.id
if user_id not in user_accounts:
load_user_accounts(user_id)
accounts = user_accounts.get(user_id, {})
return jsonify([acc.to_dict() for acc in accounts.values()])
@app.route('/api/accounts', methods=['POST'])
@login_required
def add_account():
"""添加账号"""
user_id = current_user.id
# VIP账号数量限制检查
if not database.is_user_vip(user_id):
current_count = len(database.get_user_accounts(user_id))
if current_count >= 1:
return jsonify({"error": "非VIP用户只能添加1个账号,请联系管理员开通VIP"}), 403
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '').strip()
remember = data.get('remember', True)
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
# 检查当前用户是否已存在该账号
if user_id in user_accounts:
for acc in user_accounts[user_id].values():
if acc.username == username:
return jsonify({"error": f"账号 '{username}' 已存在"}), 400
# 生成账号ID
import uuid
account_id = str(uuid.uuid4())[:8]
# 保存到数据库
database.create_account(user_id, account_id, username, password, remember, '')
# 加载到内存
account = Account(account_id, user_id, username, password, remember, '')
if user_id not in user_accounts:
user_accounts[user_id] = {}
user_accounts[user_id][account_id] = account
log_to_client(f"添加账号: {username}", user_id)
return jsonify(account.to_dict())
@app.route('/api/accounts/<account_id>', methods=['DELETE'])
@login_required
def delete_account(account_id):
"""删除账号"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
# 停止正在运行的任务
if account.is_running:
account.should_stop = True
if account.automation:
account.automation.close()
username = account.username
# 从数据库删除
database.delete_account(account_id)
# 从内存删除
del user_accounts[user_id][account_id]
log_to_client(f"删除账号: {username}", user_id)
return jsonify({"success": True})
@app.route('/api/accounts/<account_id>/remark', methods=['PUT'])
@login_required
def update_remark(account_id):
"""更新备注"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
data = request.json
remark = data.get('remark', '').strip()[:200]
# 更新数据库
database.update_account_remark(account_id, remark)
# 更新内存
user_accounts[user_id][account_id].remark = remark
log_to_client(f"更新备注: {user_accounts[user_id][account_id].username} -> {remark}", user_id)
return jsonify({"success": True})
@app.route('/api/accounts/<account_id>/start', methods=['POST'])
@login_required
def start_account(account_id):
"""启动账号任务"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
if account.is_running:
return jsonify({"error": "任务已在运行中"}), 400
data = request.json
browse_type = data.get('browse_type', '应读')
enable_screenshot = data.get('enable_screenshot', True) # 默认启用截图
# 确保浏览器管理器已初始化
if not init_browser_manager():
return jsonify({"error": "浏览器初始化失败"}), 500
# 启动任务线程
account.is_running = True
account.should_stop = False
account.status = "运行中"
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
log_to_client(f"启动任务: {account.username} - {browse_type}", user_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return jsonify({"success": True})
@app.route('/api/accounts/<account_id>/stop', methods=['POST'])
@login_required
def stop_account(account_id):
"""停止账号任务"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
if not account.is_running:
return jsonify({"error": "任务未在运行"}), 400
account.should_stop = True
account.status = "正在停止"
log_to_client(f"停止任务: {account.username}", user_id)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return jsonify({"success": True})
def get_user_semaphore(user_id):
"""获取或创建用户的信号量"""
if user_id not in user_semaphores:
user_semaphores[user_id] = threading.Semaphore(max_concurrent_per_account)
return user_semaphores[user_id]
def run_task(user_id, account_id, browse_type, enable_screenshot=True):
"""运行自动化任务"""
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return
account = user_accounts[user_id][account_id]
# 两级并发控制:用户级 + 全局级
user_sem = get_user_semaphore(user_id)
# 获取用户级信号量(同一用户的账号排队)
log_to_client(f"等待资源分配...", user_id, account_id)
account.status = "排队中"
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
user_sem.acquire()
try:
# 如果在排队期间被停止,直接返回
if account.should_stop:
log_to_client(f"任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return
# 获取全局信号量(防止所有用户同时运行导致资源耗尽)
global_semaphore.acquire()
try:
# 再次检查是否被停止
if account.should_stop:
log_to_client(f"任务已取消", user_id, account_id)
account.status = "已停止"
account.is_running = False
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return
account.status = "运行中"
# 记录任务真正开始执行的时间(不包括排队时间)
import time as time_module
task_start_time = time_module.time()
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
account.last_browse_type = browse_type
# 重试机制最多尝试3次超时则换IP重试
max_attempts = 3
last_error = None
for attempt in range(1, max_attempts + 1):
try:
if attempt > 1:
log_to_client(f"🔄 第 {attempt} 次尝试(共{max_attempts}次)...", user_id, account_id)
# 检查是否需要使用代理
proxy_config = None
config = database.get_system_config()
if config.get('proxy_enabled') == 1:
proxy_api_url = config.get('proxy_api_url', '').strip()
if proxy_api_url:
log_to_client(f"正在获取代理IP...", user_id, account_id)
proxy_server = get_proxy_from_api(proxy_api_url, max_retries=3)
if proxy_server:
proxy_config = {'server': proxy_server}
log_to_client(f"✓ 将使用代理: {proxy_server}", user_id, account_id)
account.proxy_config = proxy_config # 保存代理配置供截图使用
else:
log_to_client(f"✗ 代理获取失败,将不使用代理继续", user_id, account_id)
else:
log_to_client(f"⚠ 代理已启用但未配置API地址", user_id, account_id)
log_to_client(f"创建自动化实例...", user_id, account_id)
account.automation = PlaywrightAutomation(browser_manager, account_id, proxy_config=proxy_config)
# 为automation注入包含user_id的自定义log方法,使其能够实时发送日志到WebSocket
def custom_log(message: str):
log_to_client(message, user_id, account_id)
account.automation.log = custom_log
log_to_client(f"开始登录...", user_id, account_id)
if not account.automation.login(account.username, account.password, account.remember):
log_to_client(f"❌ 登录失败,请检查用户名和密码", user_id, account_id)
account.status = "登录失败"
account.is_running = False
# 记录登录失败日志
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=0,
total_attachments=0,
error_message='登录失败,请检查用户名和密码',
duration=int(time_module.time() - task_start_time)
)
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
return
log_to_client(f"✓ 登录成功!", user_id, account_id)
log_to_client(f"开始浏览 '{browse_type}' 内容...", user_id, account_id)
def should_stop():
return account.should_stop
result = account.automation.browse_content(
browse_type=browse_type,
auto_next_page=True,
auto_view_attachments=True,
interval=2.0,
should_stop_callback=should_stop
)
account.total_items = result.total_items
account.total_attachments = result.total_attachments
if result.success:
log_to_client(f"浏览完成! 共 {result.total_items} 条内容,{result.total_attachments} 个附件", user_id, account_id)
account.status = "已完成"
# 记录成功日志
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='success',
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message='',
duration=int(time_module.time() - task_start_time)
)
# 成功则跳出重试循环
break
else:
# 浏览出错,检查是否是超时错误
error_msg = result.error_message
if 'Timeout' in error_msg or 'timeout' in error_msg:
last_error = error_msg
log_to_client(f"⚠ 检测到超时错误: {error_msg}", user_id, account_id)
# 关闭当前浏览器
if account.automation:
try:
account.automation.close()
log_to_client(f"已关闭超时的浏览器实例", user_id, account_id)
except:
pass
account.automation = None
if attempt < max_attempts:
log_to_client(f"⚠ 代理可能速度过慢将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
time_module.sleep(2) # 等待2秒再重试
continue
else:
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
duration=int(time_module.time() - task_start_time)
)
break
else:
# 非超时错误,直接失败不重试
log_to_client(f"浏览出错: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=result.total_items,
total_attachments=result.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time)
)
break
except Exception as retry_error:
# 捕获重试过程中的异常
error_msg = str(retry_error)
last_error = error_msg
# 关闭可能存在的浏览器实例
if account.automation:
try:
account.automation.close()
except:
pass
account.automation = None
if 'Timeout' in error_msg or 'timeout' in error_msg:
log_to_client(f"⚠ 执行超时: {error_msg}", user_id, account_id)
if attempt < max_attempts:
log_to_client(f"⚠ 将换新IP重试 ({attempt}/{max_attempts})", user_id, account_id)
time_module.sleep(2)
continue
else:
log_to_client(f"❌ 已达到最大重试次数({max_attempts}),任务失败", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=f"重试{max_attempts}次后仍失败: {error_msg}",
duration=int(time_module.time() - task_start_time)
)
break
else:
# 非超时异常,直接失败
log_to_client(f"任务执行异常: {error_msg}", user_id, account_id)
account.status = "出错"
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time)
)
break
except Exception as e:
error_msg = str(e)
log_to_client(f"任务执行出错: {error_msg}", user_id, account_id)
account.status = "出错"
# 记录异常失败日志
database.create_task_log(
user_id=user_id,
account_id=account_id,
username=account.username,
browse_type=browse_type,
status='failed',
total_items=account.total_items,
total_attachments=account.total_attachments,
error_message=error_msg,
duration=int(time_module.time() - task_start_time)
)
finally:
# 释放全局信号量
global_semaphore.release()
account.is_running = False
if account.automation:
try:
account.automation.close()
log_to_client(f"主任务浏览器已关闭", user_id, account_id)
except Exception as e:
log_to_client(f"关闭主任务浏览器时出错: {str(e)}", user_id, account_id)
finally:
account.automation = None
if account_id in active_tasks:
del active_tasks[account_id]
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 任务完成后自动截图(增加2秒延迟,确保资源完全释放)
# 根据enable_screenshot参数决定是否截图
if account.status == "已完成" and not account.should_stop:
if enable_screenshot:
log_to_client(f"等待2秒后开始截图...", user_id, account_id)
time.sleep(2) # 延迟启动截图,确保主任务资源已完全释放
threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id), daemon=True).start()
else:
log_to_client(f"截图功能已禁用,跳过截图", user_id, account_id)
finally:
# 释放用户级信号量
user_sem.release()
def take_screenshot_for_account(user_id, account_id):
"""为账号任务完成后截图(带并发控制,避免资源竞争)"""
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return
account = user_accounts[user_id][account_id]
# 使用截图信号量,确保同时只有1个截图任务在执行
log_to_client(f"等待截图资源分配...", user_id, account_id)
screenshot_acquired = screenshot_semaphore.acquire(blocking=True, timeout=300) # 最多等待5分钟
if not screenshot_acquired:
log_to_client(f"截图资源获取超时,跳过截图", user_id, account_id)
return
automation = None
try:
log_to_client(f"开始截图流程...", user_id, account_id)
# 使用与浏览任务相同的代理配置
proxy_config = account.proxy_config if hasattr(account, 'proxy_config') else None
if proxy_config:
log_to_client(f"截图将使用相同代理: {proxy_config.get('server', 'Unknown')}", user_id, account_id)
automation = PlaywrightAutomation(browser_manager, account_id, proxy_config=proxy_config)
# 为截图automation也注入自定义log方法
def custom_log(message: str):
log_to_client(message, user_id, account_id)
automation.log = custom_log
log_to_client(f"重新登录以进行截图...", user_id, account_id)
if not automation.login(account.username, account.password, account.remember):
log_to_client(f"截图登录失败", user_id, account_id)
return
browse_type = account.last_browse_type
log_to_client(f"导航到 '{browse_type}' 页面...", user_id, account_id)
# 不使用should_stop_callback,让页面加载完成显示"暂无记录"
result = automation.browse_content(
browse_type=browse_type,
auto_next_page=False,
auto_view_attachments=False,
interval=0,
should_stop_callback=None
)
if not result.success and result.error_message != "":
log_to_client(f"导航失败: {result.error_message}", user_id, account_id)
time.sleep(2)
# 生成截图文件名(使用北京时间并简化格式)
beijing_tz = pytz.timezone('Asia/Shanghai')
now_beijing = datetime.now(beijing_tz)
timestamp = now_beijing.strftime('%Y%m%d_%H%M%S')
# 简化文件名用户名_登录账号_浏览类型_时间.jpg
# 获取用户名前缀
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
# 使用登录账号account.username而不是备注
login_account = account.username
screenshot_filename = f"{username_prefix}_{login_account}_{browse_type}_{timestamp}.jpg"
screenshot_path = os.path.join(SCREENSHOTS_DIR, screenshot_filename)
if automation.take_screenshot(screenshot_path):
log_to_client(f"✓ 截图已保存: {screenshot_filename}", user_id, account_id)
else:
log_to_client(f"✗ 截图失败", user_id, account_id)
except Exception as e:
log_to_client(f"✗ 截图过程中出错: {str(e)}", user_id, account_id)
finally:
# 确保浏览器资源被正确关闭
if automation:
try:
automation.close()
log_to_client(f"截图浏览器已关闭", user_id, account_id)
except Exception as e:
log_to_client(f"关闭截图浏览器时出错: {str(e)}", user_id, account_id)
# 释放截图信号量
screenshot_semaphore.release()
log_to_client(f"截图资源已释放", user_id, account_id)
@app.route('/api/accounts/<account_id>/screenshot', methods=['POST'])
@login_required
def manual_screenshot(account_id):
"""手动为指定账号截图"""
user_id = current_user.id
if user_id not in user_accounts or account_id not in user_accounts[user_id]:
return jsonify({"error": "账号不存在"}), 404
account = user_accounts[user_id][account_id]
if account.is_running:
return jsonify({"error": "任务运行中,无法截图"}), 400
data = request.json or {}
browse_type = data.get('browse_type', account.last_browse_type)
account.last_browse_type = browse_type
threading.Thread(target=take_screenshot_for_account, args=(user_id, account_id), daemon=True).start()
log_to_client(f"手动截图: {account.username} - {browse_type}", user_id)
return jsonify({"success": True})
# ==================== 截图管理API ====================
@app.route('/api/screenshots', methods=['GET'])
@login_required
def get_screenshots():
"""获取当前用户的截图列表"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
try:
screenshots = []
if os.path.exists(SCREENSHOTS_DIR):
for filename in os.listdir(SCREENSHOTS_DIR):
# 只显示属于当前用户的截图支持png和jpg格式
if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'):
filepath = os.path.join(SCREENSHOTS_DIR, filename)
stat = os.stat(filepath)
# 转换为北京时间
beijing_tz = pytz.timezone('Asia/Shanghai')
created_time = datetime.fromtimestamp(stat.st_mtime, tz=beijing_tz)
# 解析文件名获取显示名称
# 文件名格式用户名_登录账号_浏览类型_时间.jpg
parts = filename.rsplit('.', 1)[0].split('_', 1) # 移除扩展名并分割
if len(parts) > 1:
# 显示名称登录账号_浏览类型_时间.jpg
display_name = parts[1] + '.' + filename.rsplit('.', 1)[1]
else:
display_name = filename
screenshots.append({
'filename': filename,
'display_name': display_name,
'size': stat.st_size,
'created': created_time.strftime('%Y-%m-%d %H:%M:%S')
})
screenshots.sort(key=lambda x: x['created'], reverse=True)
return jsonify(screenshots)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/screenshots/<filename>')
@login_required
def serve_screenshot(filename):
"""提供截图文件访问"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
# 验证文件属于当前用户
if not filename.startswith(username_prefix + '_'):
return jsonify({"error": "无权访问"}), 403
return send_from_directory(SCREENSHOTS_DIR, filename)
@app.route('/api/screenshots/<filename>', methods=['DELETE'])
@login_required
def delete_screenshot(filename):
"""删除指定截图"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
# 验证文件属于当前用户
if not filename.startswith(username_prefix + '_'):
return jsonify({"error": "无权删除"}), 403
try:
filepath = os.path.join(SCREENSHOTS_DIR, filename)
if os.path.exists(filepath):
os.remove(filepath)
log_to_client(f"删除截图: {filename}", user_id)
return jsonify({"success": True})
else:
return jsonify({"error": "文件不存在"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/screenshots/clear', methods=['POST'])
@login_required
def clear_all_screenshots():
"""清空当前用户的所有截图"""
user_id = current_user.id
user_info = database.get_user_by_id(user_id)
username_prefix = user_info['username'] if user_info else f"user{user_id}"
try:
deleted_count = 0
if os.path.exists(SCREENSHOTS_DIR):
for filename in os.listdir(SCREENSHOTS_DIR):
if (filename.lower().endswith(('.png', '.jpg', '.jpeg'))) and filename.startswith(username_prefix + '_'):
filepath = os.path.join(SCREENSHOTS_DIR, filename)
os.remove(filepath)
deleted_count += 1
log_to_client(f"清理了 {deleted_count} 个截图文件", user_id)
return jsonify({"success": True, "deleted": deleted_count})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ==================== WebSocket事件 ====================
@socketio.on('connect')
def handle_connect():
"""客户端连接"""
if current_user.is_authenticated:
user_id = current_user.id
join_room(f'user_{user_id}')
log_to_client("客户端已连接", user_id)
# 发送账号列表
accounts = user_accounts.get(user_id, {})
emit('accounts_list', [acc.to_dict() for acc in accounts.values()])
# 发送历史日志
if user_id in log_cache:
for log_entry in log_cache[user_id]:
emit('log', log_entry)
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开"""
if current_user.is_authenticated:
user_id = current_user.id
leave_room(f'user_{user_id}')
# ==================== 静态文件 ====================
@app.route('/static/<path:filename>')
def serve_static(filename):
"""提供静态文件访问"""
return send_from_directory('static', filename)
# ==================== 启动 ====================
# ==================== 管理员VIP管理API ====================
@app.route('/yuyx/api/vip/config', methods=['GET'])
def get_vip_config_api():
"""获取VIP配置"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
config = database.get_vip_config()
return jsonify(config)
@app.route('/yuyx/api/vip/config', methods=['POST'])
def set_vip_config_api():
"""设置默认VIP天数"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
data = request.json
days = data.get('default_vip_days', 0)
if not isinstance(days, int) or days < 0:
return jsonify({"error": "VIP天数必须是非负整数"}), 400
database.set_default_vip_days(days)
return jsonify({"message": "VIP配置已更新", "default_vip_days": days})
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['POST'])
def set_user_vip_api(user_id):
"""设置用户VIP"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
data = request.json
days = data.get('days', 30)
# 验证days参数
valid_days = [7, 30, 365, 999999]
if days not in valid_days:
return jsonify({"error": "VIP天数必须是 7/30/365/999999 之一"}), 400
if database.set_user_vip(user_id, days):
vip_type = {7: "一周", 30: "一个月", 365: "一年", 999999: "永久"}[days]
return jsonify({"message": f"VIP设置成功: {vip_type}"})
return jsonify({"error": "设置失败,用户不存在"}), 400
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['DELETE'])
def remove_user_vip_api(user_id):
"""移除用户VIP"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
if database.remove_user_vip(user_id):
return jsonify({"message": "VIP已移除"})
return jsonify({"error": "移除失败"}), 400
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['GET'])
def get_user_vip_info_api(user_id):
"""获取用户VIP信息(管理员)"""
if 'admin_id' not in session:
return jsonify({"error": "需要管理员权限"}), 403
vip_info = database.get_user_vip_info(user_id)
return jsonify(vip_info)
# ==================== 用户端VIP查询API ====================
@app.route('/api/user/vip', methods=['GET'])
@login_required
def get_current_user_vip():
"""获取当前用户VIP信息"""
vip_info = database.get_user_vip_info(current_user.id)
return jsonify(vip_info)
@app.route('/api/run_stats', methods=['GET'])
@login_required
def get_run_stats():
"""获取当前用户的运行统计"""
user_id = current_user.id
# 获取今日任务统计
stats = database.get_user_run_stats(user_id)
# 计算当前正在运行的账号数
current_running = 0
if user_id in user_accounts:
current_running = sum(1 for acc in user_accounts[user_id].values() if acc.is_running)
return jsonify({
'today_completed': stats.get('completed', 0),
'current_running': current_running,
'today_failed': stats.get('failed', 0),
'today_items': stats.get('total_items', 0),
'today_attachments': stats.get('total_attachments', 0)
})
# ==================== 系统配置API ====================
@app.route('/yuyx/api/system/config', methods=['GET'])
@admin_required
def get_system_config_api():
"""获取系统配置"""
config = database.get_system_config()
return jsonify(config)
@app.route('/yuyx/api/system/config', methods=['POST'])
@admin_required
def update_system_config_api():
"""更新系统配置"""
global max_concurrent_global, global_semaphore, max_concurrent_per_account
data = request.json
max_concurrent = data.get('max_concurrent_global')
schedule_enabled = data.get('schedule_enabled')
schedule_time = data.get('schedule_time')
schedule_browse_type = data.get('schedule_browse_type')
schedule_weekdays = data.get('schedule_weekdays')
new_max_concurrent_per_account = data.get('max_concurrent_per_account')
# 验证参数
if max_concurrent is not None:
if not isinstance(max_concurrent, int) or max_concurrent < 1 or max_concurrent > 20:
return jsonify({"error": "全局并发数必须在1-20之间"}), 400
if new_max_concurrent_per_account is not None:
if not isinstance(new_max_concurrent_per_account, int) or new_max_concurrent_per_account < 1 or new_max_concurrent_per_account > 5:
return jsonify({"error": "单账号并发数必须在1-5之间"}), 400
if schedule_time is not None:
# 验证时间格式 HH:MM
import re
if not re.match(r'^([01]\d|2[0-3]):([0-5]\d)$', schedule_time):
return jsonify({"error": "时间格式错误,应为 HH:MM"}), 400
if schedule_browse_type is not None:
if schedule_browse_type not in ['注册前未读', '应读', '未读']:
return jsonify({"error": "浏览类型无效"}), 400
if schedule_weekdays is not None:
# 验证星期格式,应该是逗号分隔的数字字符串 "1,2,3,4,5,6,7"
try:
days = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()]
if not all(1 <= d <= 7 for d in days):
return jsonify({"error": "星期数字必须在1-7之间"}), 400
except (ValueError, AttributeError):
return jsonify({"error": "星期格式错误"}), 400
# 更新数据库
if database.update_system_config(
max_concurrent=max_concurrent,
schedule_enabled=schedule_enabled,
schedule_time=schedule_time,
schedule_browse_type=schedule_browse_type,
schedule_weekdays=schedule_weekdays,
max_concurrent_per_account=new_max_concurrent_per_account
):
# 如果修改了并发数,更新全局变量和信号量
if max_concurrent is not None and max_concurrent != max_concurrent_global:
max_concurrent_global = max_concurrent
global_semaphore = threading.Semaphore(max_concurrent)
print(f"全局并发数已更新为: {max_concurrent}")
# 如果修改了单用户并发数,更新全局变量(已有的信号量会在下次创建时使用新值)
if new_max_concurrent_per_account is not None and new_max_concurrent_per_account != max_concurrent_per_account:
max_concurrent_per_account = new_max_concurrent_per_account
print(f"单用户并发数已更新为: {max_concurrent_per_account}")
return jsonify({"message": "系统配置已更新"})
return jsonify({"error": "更新失败"}), 400
# ==================== 代理配置API ====================
@app.route('/yuyx/api/proxy/config', methods=['GET'])
@admin_required
def get_proxy_config_api():
"""获取代理配置"""
config = database.get_system_config()
return jsonify({
'proxy_enabled': config.get('proxy_enabled', 0),
'proxy_api_url': config.get('proxy_api_url', ''),
'proxy_expire_minutes': config.get('proxy_expire_minutes', 3)
})
@app.route('/yuyx/api/proxy/config', methods=['POST'])
@admin_required
def update_proxy_config_api():
"""更新代理配置"""
data = request.json
proxy_enabled = data.get('proxy_enabled')
proxy_api_url = data.get('proxy_api_url', '').strip()
proxy_expire_minutes = data.get('proxy_expire_minutes')
if proxy_enabled is not None and proxy_enabled not in [0, 1]:
return jsonify({"error": "proxy_enabled必须是0或1"}), 400
if proxy_expire_minutes is not None:
if not isinstance(proxy_expire_minutes, int) or proxy_expire_minutes < 1:
return jsonify({"error": "代理有效期必须是大于0的整数"}), 400
if database.update_system_config(
proxy_enabled=proxy_enabled,
proxy_api_url=proxy_api_url,
proxy_expire_minutes=proxy_expire_minutes
):
return jsonify({"message": "代理配置已更新"})
return jsonify({"error": "更新失败"}), 400
@app.route('/yuyx/api/proxy/test', methods=['POST'])
@admin_required
def test_proxy_api():
"""测试代理连接"""
data = request.json
api_url = data.get('api_url', '').strip()
if not api_url:
return jsonify({"error": "请提供API地址"}), 400
try:
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
ip_port = response.text.strip()
if ip_port and ':' in ip_port:
return jsonify({
"success": True,
"proxy": ip_port,
"message": f"代理获取成功: {ip_port}"
})
else:
return jsonify({
"success": False,
"message": f"代理格式错误: {ip_port}"
}), 400
else:
return jsonify({
"success": False,
"message": f"HTTP错误: {response.status_code}"
}), 400
except Exception as e:
return jsonify({
"success": False,
"message": f"连接失败: {str(e)}"
}), 500
# ==================== 服务器信息API ====================
@app.route('/yuyx/api/server/info', methods=['GET'])
@admin_required
def get_server_info_api():
"""获取服务器信息"""
import psutil
import datetime
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存信息
memory = psutil.virtual_memory()
memory_total = f"{memory.total / (1024**3):.1f}GB"
memory_used = f"{memory.used / (1024**3):.1f}GB"
memory_percent = memory.percent
# 磁盘信息
disk = psutil.disk_usage('/')
disk_total = f"{disk.total / (1024**3):.1f}GB"
disk_used = f"{disk.used / (1024**3):.1f}GB"
disk_percent = disk.percent
# 运行时长
boot_time = datetime.datetime.fromtimestamp(psutil.boot_time())
uptime_delta = datetime.datetime.now() - boot_time
days = uptime_delta.days
hours = uptime_delta.seconds // 3600
uptime = f"{days}{hours}小时"
return jsonify({
'cpu_percent': cpu_percent,
'memory_total': memory_total,
'memory_used': memory_used,
'memory_percent': memory_percent,
'disk_total': disk_total,
'disk_used': disk_used,
'disk_percent': disk_percent,
'uptime': uptime
})
# ==================== 任务统计和日志API ====================
@app.route('/yuyx/api/task/stats', methods=['GET'])
@admin_required
def get_task_stats_api():
"""获取任务统计数据"""
date_filter = request.args.get('date') # YYYY-MM-DD格式
stats = database.get_task_stats(date_filter)
return jsonify(stats)
@app.route('/yuyx/api/task/logs', methods=['GET'])
@admin_required
def get_task_logs_api():
"""获取任务日志列表"""
limit = int(request.args.get('limit', 100))
offset = int(request.args.get('offset', 0))
date_filter = request.args.get('date') # YYYY-MM-DD格式
status_filter = request.args.get('status') # success/failed
logs = database.get_task_logs(limit, offset, date_filter, status_filter)
return jsonify(logs)
@app.route('/yuyx/api/task/logs/clear', methods=['POST'])
@admin_required
def clear_old_task_logs_api():
"""清理旧的任务日志"""
data = request.json or {}
days = data.get('days', 30)
if not isinstance(days, int) or days < 1:
return jsonify({"error": "天数必须是大于0的整数"}), 400
deleted_count = database.delete_old_task_logs(days)
return jsonify({"message": f"已删除{days}天前的{deleted_count}条日志"})
# ==================== 定时任务调度器 ====================
def scheduled_task_worker():
"""定时任务工作线程"""
import schedule
from datetime import datetime
def cleanup_memory_task():
"""定时内存清理任务"""
try:
import gc
import psutil
import os
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / 1024 / 1024
collected = gc.collect()
try:
db = database.get_db()
db.execute("PRAGMA wal_checkpoint(TRUNCATE)")
db.commit()
except Exception as e:
print(f"[内存清理] WAL清理失败: {e}")
mem_after = process.memory_info().rss / 1024 / 1024
freed = mem_before - mem_after
print(f"[内存清理] 完成 - 回收对象:{collected}, 内存: {mem_before:.1f}MB -> {mem_after:.1f}MB (释放{freed:.1f}MB)")
except Exception as e:
print(f"[内存清理] 执行失败: {str(e)}")
def run_all_accounts_task():
"""执行所有账号的浏览任务(过滤重复账号)"""
try:
config = database.get_system_config()
browse_type = config.get('schedule_browse_type', '应读')
# 检查今天是否在允许执行的星期列表中
from datetime import datetime
import pytz
# 获取北京时间的星期几 (1=周一, 7=周日)
beijing_tz = pytz.timezone('Asia/Shanghai')
now_beijing = datetime.now(beijing_tz)
current_weekday = now_beijing.isoweekday() # 1-7
# 获取配置的星期列表
schedule_weekdays = config.get('schedule_weekdays', '1,2,3,4,5,6,7')
allowed_weekdays = [int(d.strip()) for d in schedule_weekdays.split(',') if d.strip()]
if current_weekday not in allowed_weekdays:
weekday_names = ['', '周一', '周二', '周三', '周四', '周五', '周六', '周日']
print(f"[定时任务] 今天是{weekday_names[current_weekday]},不在执行日期内,跳过执行")
return
print(f"[定时任务] 开始执行 - 浏览类型: {browse_type}")
# 获取所有已审核用户的所有账号
all_users = database.get_all_users()
approved_users = [u for u in all_users if u['status'] == 'approved']
# 用于记录已执行的账号用户名,避免重复
executed_usernames = set()
total_accounts = 0
skipped_duplicates = 0
executed_accounts = 0
for user in approved_users:
user_id = user['id']
if user_id not in user_accounts:
load_user_accounts(user_id)
accounts = user_accounts.get(user_id, {})
for account_id, account in accounts.items():
total_accounts += 1
# 跳过正在运行的账号
if account.is_running:
continue
# 检查账号用户名是否已经执行过(重复账号过滤)
if account.username in executed_usernames:
skipped_duplicates += 1
print(f"[定时任务] 跳过重复账号: {account.username} (用户:{user['username']}) - 该账号已被其他用户执行")
continue
# 记录该账号用户名,避免后续重复执行
executed_usernames.add(account.username)
print(f"[定时任务] 启动账号: {account.username} (用户:{user['username']})")
# 启动任务
account.is_running = True
account.should_stop = False
account.status = "运行中"
# 获取系统配置的截图开关
config = database.get_system_config()
enable_screenshot_scheduled = config.get("enable_screenshot", 0) == 1
thread = threading.Thread(
target=run_task,
args=(user_id, account_id, browse_type, enable_screenshot_scheduled),
daemon=True
)
thread.start()
active_tasks[account_id] = thread
executed_accounts += 1
# 发送更新到用户
socketio.emit('account_update', account.to_dict(), room=f'user_{user_id}')
# 间隔启动,避免瞬间并发过高
time.sleep(2)
print(f"[定时任务] 执行完成 - 总账号数:{total_accounts}, 已执行:{executed_accounts}, 跳过重复:{skipped_duplicates}")
except Exception as e:
print(f"[定时任务] 执行出错: {str(e)}")
def cleanup_expired_captcha():
"""清理过期验证码,防止内存泄漏"""
try:
current_time = time.time()
expired_keys = [k for k, v in captcha_storage.items()
if v["expire_time"] < current_time]
deleted_count = len(expired_keys)
for k in expired_keys:
del captcha_storage[k]
if deleted_count > 0:
print(f"[定时清理] 已清理 {deleted_count} 个过期验证码")
except Exception as e:
print(f"[定时清理] 清理验证码出错: {str(e)}")
def cleanup_old_data():
"""清理7天前的截图和日志"""
try:
print(f"[定时清理] 开始清理7天前的数据...")
# 清理7天前的任务日志
deleted_logs = database.delete_old_task_logs(7)
print(f"[定时清理] 已删除 {deleted_logs} 条任务日志")
# 清理30天前的操作日志
deleted_operation_logs = database.clean_old_operation_logs(30)
print(f"[定时清理] 已删除 {deleted_operation_logs} 条操作日志")
# 清理7天前的截图
deleted_screenshots = 0
if os.path.exists(SCREENSHOTS_DIR):
cutoff_time = time.time() - (7 * 24 * 60 * 60) # 7天前的时间戳
for filename in os.listdir(SCREENSHOTS_DIR):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
filepath = os.path.join(SCREENSHOTS_DIR, filename)
try:
# 检查文件修改时间
if os.path.getmtime(filepath) < cutoff_time:
os.remove(filepath)
deleted_screenshots += 1
except Exception as e:
print(f"[定时清理] 删除截图失败 {filename}: {str(e)}")
print(f"[定时清理] 已删除 {deleted_screenshots} 个截图文件")
print(f"[定时清理] 清理完成!")
except Exception as e:
print(f"[定时清理] 清理任务出错: {str(e)}")
# 每分钟检查一次配置
def check_and_schedule():
config = database.get_system_config()
# 清除旧的任务
schedule.clear()
# 时区转换函数将CST时间转换为UTC时间容器使用UTC
def cst_to_utc_time(cst_time_str):
"""将CST时间字符串HH:MM转换为UTC时间字符串
Args:
cst_time_str: CST时间字符串格式为 HH:MM
Returns:
UTC时间字符串格式为 HH:MM
"""
from datetime import datetime, timedelta
# 解析CST时间
hour, minute = map(int, cst_time_str.split(':'))
# CST是UTC+8所以UTC时间 = CST时间 - 8小时
utc_hour = (hour - 8) % 24
return f"{utc_hour:02d}:{minute:02d}"
# 始终添加每天凌晨3点CST的数据清理任务
cleanup_utc_time = cst_to_utc_time("03:00")
schedule.every().day.at(cleanup_utc_time).do(cleanup_old_data)
print(f"[定时任务] 已设置数据清理任务: 每天 CST 03:00 (UTC {cleanup_utc_time})")
# 每小时清理过期验证码
schedule.every().hour.do(cleanup_expired_captcha)
# 定时内存清理 - 每小时执行一次
schedule.every().hour.do(cleanup_memory_task)
print("[定时任务] 已设置内存清理任务: 每小时执行一次")
print(f"[定时任务] 已设置验证码清理任务: 每小时执行一次")
# 如果启用了定时浏览任务,则添加
if config.get('schedule_enabled'):
schedule_time_cst = config.get('schedule_time', '02:00')
schedule_time_utc = cst_to_utc_time(schedule_time_cst)
schedule.every().day.at(schedule_time_utc).do(run_all_accounts_task)
print(f"[定时任务] 已设置浏览任务: 每天 CST {schedule_time_cst} (UTC {schedule_time_utc})")
# 初始检查
check_and_schedule()
last_check = time.time()
while True:
try:
# 执行待执行的任务
schedule.run_pending()
# 每60秒重新检查一次配置
if time.time() - last_check > 60:
check_and_schedule()
last_check = time.time()
time.sleep(1)
except Exception as e:
print(f"[定时任务] 调度器出错: {str(e)}")
time.sleep(5)
if __name__ == '__main__':
print("=" * 60)
print("知识管理平台自动化工具 - 多用户版")
print("=" * 60)
# 初始化数据库
database.init_database()
# 加载系统配置(并发设置)
try:
config = database.get_system_config()
if config:
# 使用globals()修改全局变量
globals()['max_concurrent_global'] = config.get('max_concurrent_global', 2)
globals()['max_concurrent_per_account'] = config.get('max_concurrent_per_account', 1)
# 重新创建信号量
globals()['global_semaphore'] = threading.Semaphore(globals()['max_concurrent_global'])
print(f"✓ 已加载并发配置: 全局={globals()['max_concurrent_global']}, 单账号={globals()['max_concurrent_per_account']}")
except Exception as e:
print(f"警告: 加载并发配置失败,使用默认值: {e}")
# 主线程初始化浏览器Playwright不支持跨线程
print("\n正在初始化浏览器管理器...")
init_browser_manager()
# 启动定时任务调度器
print("\n启动定时任务调度器...")
scheduler_thread = threading.Thread(target=scheduled_task_worker, daemon=True)
scheduler_thread.start()
print("✓ 定时任务调度器已启动")
# 启动Web服务器
print("\n服务器启动中...")
print("用户访问地址: http://0.0.0.0:5000")
print("后台管理地址: http://0.0.0.0:5000/yuyx")
print("默认管理员: admin/admin")
print("=" * 60 + "\n")
socketio.run(app, host='0.0.0.0', port=5000, debug=False)