fix: 内存溢出与任务调度优化

This commit is contained in:
2025-12-13 17:36:02 +08:00
parent 9e761140c1
commit d7d878dc08
5 changed files with 1128 additions and 480 deletions

View File

@@ -10,12 +10,27 @@ from bs4 import BeautifulSoup
import re
import time
import atexit
import weakref
from typing import Optional, Callable
from dataclasses import dataclass
BASE_URL = "https://postoa.aidunsoft.com"
_api_browser_instances: "weakref.WeakSet[APIBrowser]" = weakref.WeakSet()
def _cleanup_api_browser_instances():
"""进程退出时清理残留的API浏览器实例弱引用不阻止GC"""
for inst in list(_api_browser_instances):
try:
inst.close()
except Exception:
pass
atexit.register(_cleanup_api_browser_instances)
@dataclass
class APIBrowseResult:
@@ -52,8 +67,7 @@ class APIBrowser:
else:
self.proxy_server = None
# 注册退出清理函数
atexit.register(self._cleanup_on_exit)
_api_browser_instances.add(self)
def log(self, message: str):
"""记录日志"""
@@ -427,14 +441,10 @@ class APIBrowser:
self.session.close()
except:
pass
def _cleanup_on_exit(self):
"""进程退出时的清理函数由atexit调用"""
if not self._closed:
finally:
try:
self.session.close()
self._closed = True
except:
_api_browser_instances.discard(self)
except Exception:
pass
def __enter__(self):

1306
app.py

File diff suppressed because it is too large Load Diff

View File

@@ -10,9 +10,11 @@ from typing import Callable, Optional, Dict, Any
import nest_asyncio
nest_asyncio.apply()
# 安全修复: 将魔法数字提取为可配置常量
BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒)默认5分钟
TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒)
# 安全修复: 将魔法数字提取为可配置常量
BROWSER_IDLE_TIMEOUT = int(os.environ.get('BROWSER_IDLE_TIMEOUT', '300')) # 空闲超时(秒)默认5分钟
TASK_QUEUE_TIMEOUT = int(os.environ.get('TASK_QUEUE_TIMEOUT', '10')) # 队列获取超时(秒)
TASK_QUEUE_MAXSIZE = int(os.environ.get('BROWSER_TASK_QUEUE_MAXSIZE', '200')) # 队列最大长度(0表示无限制)
BROWSER_MAX_USE_COUNT = int(os.environ.get('BROWSER_MAX_USE_COUNT', '0')) # 每个浏览器最大复用次数(0表示不限制)
class BrowserWorker(threading.Thread):
@@ -146,27 +148,33 @@ class BrowserWorker(threading.Thread):
self.log(f"开始执行任务(第{self.browser_instance['use_count']}次使用浏览器)")
try:
# 将浏览器实例传递给任务函数
result = task_func(self.browser_instance, *task_args, **task_kwargs)
callback(result, None)
self.log(f"任务执行成功")
last_task_time = time.time()
except Exception as e:
self.log(f"任务执行失败: {e}")
callback(None, str(e))
self.failed_tasks += 1
last_task_time = time.time()
# 任务失败后,检查浏览器健康
if not self._check_browser_health():
self.log("任务失败导致浏览器异常,将在下次任务前重建")
self._close_browser()
except Exception as e:
self.log(f"Worker出错: {e}")
time.sleep(1)
try:
# 将浏览器实例传递给任务函数
result = task_func(self.browser_instance, *task_args, **task_kwargs)
callback(result, None)
self.log(f"任务执行成功")
last_task_time = time.time()
except Exception as e:
self.log(f"任务执行失败: {e}")
callback(None, str(e))
self.failed_tasks += 1
last_task_time = time.time()
# 任务失败后,检查浏览器健康
if not self._check_browser_health():
self.log("任务失败导致浏览器异常,将在下次任务前重建")
self._close_browser()
# 定期重启浏览器释放Chromium可能累积的内存
if self.browser_instance and BROWSER_MAX_USE_COUNT > 0:
if self.browser_instance.get('use_count', 0) >= BROWSER_MAX_USE_COUNT:
self.log(f"浏览器已复用{self.browser_instance['use_count']}次,重启释放资源")
self._close_browser()
except Exception as e:
self.log(f"Worker出错: {e}")
time.sleep(1)
# 清理资源
self._close_browser()
@@ -177,16 +185,17 @@ class BrowserWorker(threading.Thread):
self.running = False
class BrowserWorkerPool:
"""浏览器工作线程池"""
def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None):
self.pool_size = pool_size
self.log_callback = log_callback
self.task_queue = queue.Queue()
self.workers = []
self.initialized = False
self.lock = threading.Lock()
class BrowserWorkerPool:
"""浏览器工作线程池"""
def __init__(self, pool_size: int = 3, log_callback: Optional[Callable] = None):
self.pool_size = pool_size
self.log_callback = log_callback
maxsize = TASK_QUEUE_MAXSIZE if TASK_QUEUE_MAXSIZE > 0 else 0
self.task_queue = queue.Queue(maxsize=maxsize)
self.workers = []
self.initialized = False
self.lock = threading.Lock()
def log(self, message: str):
"""日志输出"""
@@ -215,7 +224,7 @@ class BrowserWorkerPool:
self.initialized = True
self.log(f"✓ 工作线程池初始化完成({self.pool_size}个worker就绪浏览器将在有任务时按需启动")
def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool:
def submit_task(self, task_func: Callable, callback: Callable, *args, **kwargs) -> bool:
"""
提交任务到队列
@@ -238,8 +247,12 @@ class BrowserWorkerPool:
'callback': callback
}
self.task_queue.put(task)
return True
try:
self.task_queue.put(task, timeout=1)
return True
except queue.Full:
self.log(f"警告任务队列已满maxsize={self.task_queue.maxsize}),拒绝提交任务")
return False
def get_stats(self) -> Dict[str, Any]:
"""获取线程池统计信息"""

View File

@@ -1623,6 +1623,15 @@ class EmailQueue:
def _process_task(self, task: Dict):
"""处理邮件任务"""
try:
func = task.get('callable')
if callable(func):
args = task.get('args') or ()
kwargs = task.get('kwargs') or {}
result = func(*args, **kwargs)
if task.get('callback'):
task['callback'](result)
return
result = send_email(
to_email=task['to_email'],
subject=task['subject'],
@@ -1670,6 +1679,20 @@ class EmailQueue:
print("[邮件服务] 邮件队列已满")
return False
def enqueue_callable(self, func: Callable, args=None, kwargs=None, callback: Callable = None) -> bool:
"""将可调用任务加入队列(用于复杂邮件/打包等逻辑异步化)"""
try:
self.queue.put({
'callable': func,
'args': args or (),
'kwargs': kwargs or {},
'callback': callback
}, timeout=5)
return True
except queue.Full:
print("[邮件服务] 邮件队列已满")
return False
@property
def pending_count(self) -> int:
"""队列中待处理的任务数"""
@@ -1958,14 +1981,14 @@ def send_task_complete_email_async(
log_callback: Callable = None
):
"""异步发送任务完成通知邮件"""
import threading
thread = threading.Thread(
target=send_task_complete_email,
queue = get_email_queue()
ok = queue.enqueue_callable(
send_task_complete_email,
args=(user_id, email, username, account_name, browse_type,
total_items, total_attachments, screenshot_path, log_callback),
daemon=True
)
thread.start()
if (not ok) and log_callback:
log_callback("[邮件] 邮件队列已满,任务通知未发送")
def send_batch_task_complete_email(
@@ -2058,32 +2081,55 @@ def send_batch_task_complete_email(
</html>
"""
# 收集所有截图文件
screenshot_files = []
# 收集可用截图文件路径(避免把所有图片一次性读入内存)
screenshot_paths = []
for s in screenshots:
if s.get('path') and os.path.exists(s['path']):
try:
with open(s['path'], 'rb') as f:
screenshot_files.append({
'filename': f"{s.get('account_name', 'screenshot')}_{os.path.basename(s['path'])}",
'data': f.read()
})
except Exception as e:
print(f"[邮件] 读取截图文件失败: {e}")
path = s.get('path')
if path and os.path.exists(path):
arcname = f"{s.get('account_name', 'screenshot')}_{os.path.basename(path)}"
screenshot_paths.append((path, arcname))
# 如果有截图,打包ZIP
# 如果有截图,优先落盘打包ZIP,再按大小决定是否附加(降低内存峰值)
zip_data = None
zip_filename = None
if screenshot_files:
attachment_note = ""
if screenshot_paths:
import tempfile
zip_path = None
try:
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for sf in screenshot_files:
zf.writestr(sf['filename'], sf['data'])
zip_data = zip_buffer.getvalue()
zip_filename = f"screenshots_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
with tempfile.NamedTemporaryFile(prefix="screenshots_", suffix=".zip", delete=False) as tmp:
zip_path = tmp.name
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for file_path, arcname in screenshot_paths:
try:
zf.write(file_path, arcname=arcname)
except Exception as e:
print(f"[邮件] 写入ZIP失败: {e}")
zip_size = os.path.getsize(zip_path) if zip_path and os.path.exists(zip_path) else 0
if zip_size <= 0:
attachment_note = "本次无可用截图文件(可能截图失败或文件不存在)。"
elif zip_size > MAX_ATTACHMENT_SIZE:
attachment_note = f"截图打包文件过大({zip_size} bytes本次不附加附件。"
else:
with open(zip_path, 'rb') as f:
zip_data = f.read()
zip_filename = f"screenshots_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
attachment_note = "截图已打包为ZIP附件请查收。"
except Exception as e:
print(f"[邮件] 打包截图失败: {e}")
attachment_note = "截图打包失败,本次不附加附件。"
finally:
if zip_path and os.path.exists(zip_path):
try:
os.remove(zip_path)
except Exception:
pass
else:
attachment_note = "本次无可用截图文件(可能截图失败或未启用截图)。"
# 将附件说明写入邮件内容
html_content = html_content.replace("截图已打包为ZIP附件请查收。", attachment_note)
# 发送邮件
attachments = []
@@ -2132,13 +2178,13 @@ def send_batch_task_complete_email_async(
screenshots: List[Dict[str, Any]]
):
"""异步发送批次任务完成通知邮件"""
import threading
thread = threading.Thread(
target=send_batch_task_complete_email,
queue = get_email_queue()
ok = queue.enqueue_callable(
send_batch_task_complete_email,
args=(user_id, email, username, schedule_name, browse_type, screenshots),
daemon=True
)
thread.start()
if not ok:
print("[邮件] 邮件队列已满,批次任务邮件未发送")
# ============ 初始化 ============

View File

@@ -12,6 +12,7 @@ import time
import json
import threading
import atexit
import weakref
from typing import Optional, Callable
from dataclasses import dataclass
from app_config import get_config
@@ -28,6 +29,20 @@ else:
# 获取配置
config = get_config()
_playwright_automation_instances: "weakref.WeakSet[PlaywrightAutomation]" = weakref.WeakSet()
def _cleanup_playwright_automation_instances():
"""进程退出时清理残留的自动化实例弱引用不阻止GC"""
for inst in list(_playwright_automation_instances):
try:
inst._force_cleanup()
except Exception:
pass
atexit.register(_cleanup_playwright_automation_instances)
@dataclass
class BrowseResult:
@@ -151,8 +166,7 @@ class PlaywrightAutomation:
self._closed = False # 防止重复关闭
self._lock = threading.Lock() # Bug #13 fix: 保护浏览器资源访问
# 注册退出清理函数,确保进程异常退出时也能关闭浏览器
atexit.register(self._cleanup_on_exit)
_playwright_automation_instances.add(self)
def log(self, message: str):
"""记录日志"""
@@ -215,6 +229,52 @@ class PlaywrightAutomation:
except Exception as e:
self.log(f"加载cookies失败: {e}")
return False
def load_cookies_into_current_browser(self, username: str) -> bool:
"""在“已连接的现有 browser”上加载 cookies 创建 context用于浏览器池复用"""
import os
if not self.browser or not self.browser.is_connected():
return False
cookies_path = self.get_cookies_path(username)
if not os.path.exists(cookies_path):
return False
try:
# 检查cookies文件是否过期24小时
import time as time_module
file_age = time_module.time() - os.path.getmtime(cookies_path)
if file_age > 24 * 3600:
self.log("Cookies已过期需要重新登录")
os.remove(cookies_path)
return False
with open(cookies_path, 'r', encoding='utf-8') as f:
storage = json.load(f)
context_options = {
'viewport': {'width': 1920, 'height': 1080},
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'device_scale_factor': 2,
'storage_state': storage
}
self.context = self.browser.new_context(**context_options)
self.context.set_default_timeout(config.DEFAULT_TIMEOUT)
self.context.set_default_navigation_timeout(config.PAGE_LOAD_TIMEOUT)
self.page = self.context.new_page()
self.main_page = self.page
return True
except Exception as e:
self.log(f"加载cookies失败: {e}")
try:
if self.context:
self.context.close()
except Exception:
pass
self.context = None
self.page = None
self.main_page = None
return False
def check_login_state(self) -> bool:
"""检查当前是否处于登录状态"""
@@ -235,9 +295,24 @@ class PlaywrightAutomation:
def quick_login(self, username: str, password: str, remember: bool = True):
"""快速登录 - 使用池中浏览器时直接登录否则尝试cookies"""
# 如果已有浏览器实例(从池中获取),直接使用该浏览器登录
# 不尝试加载cookies因为load_cookies会创建新浏览器覆盖池中的
# 如果已有浏览器实例(从池中获取),优先尝试复用cookies避免重复登录/减少耗时)
if self.browser and self.browser.is_connected():
if self.load_cookies_into_current_browser(username):
self.log("使用池中浏览器,尝试使用已保存的登录态...")
if self.check_login_state():
self.log("✓ 登录态有效,跳过登录")
return {"success": True, "message": "使用已保存的登录态", "used_cookies": True}
else:
self.log("登录态已失效,重新登录")
try:
if self.context:
self.context.close()
except Exception:
pass
self.context = None
self.page = None
self.main_page = None
self.log("使用池中浏览器,直接登录")
result = self.login(username, password, remember)
if result.get('success'):