Files
zsglpt/api_browser.py
yuyx 114a4107bb 修复所有资源泄漏问题(P0级bug)
修复的Bug:
- Bug #21: Playwright浏览器实例泄漏
- Bug #22: 数据库连接泄漏(已由连接池解决)
- Bug #23: 截图文件句柄泄漏
- Bug #24: 线程资源未清理
- Bug #25: requests.Session对象泄漏

主要改进:
1. PlaywrightAutomation类:
   - 添加atexit注册,确保进程退出时关闭浏览器
   - 添加__enter__/__exit__支持context manager
   - 添加_closed标志防止重复关闭
   - 添加_cleanup_on_exit静默清理方法

2. APIBrowser类:
   - 添加atexit注册,确保Session正确关闭
   - 添加__enter__/__exit__支持context manager
   - 添加_closed标志防止重复关闭

3. 截图功能增强:
   - 使用临时文件机制
   - 添加文件大小验证
   - 失败时自动清理临时文件
   - 确保不产生垃圾文件

4. 应用关闭清理:
   - 添加cleanup_on_exit()函数
   - 注册SIGINT/SIGTERM信号处理器
   - 停止所有运行中的任务
   - 等待线程优雅退出
   - 关闭浏览器线程池
   - 关闭数据库连接池

影响:
- 防止长期运行导致的内存泄漏
- 确保进程异常退出时正确清理资源
- 提升系统稳定性和可靠性

受影响文件:
- playwright_automation.py
- api_browser.py
- app.py

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 13:48:06 +08:00

417 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 浏览器 - 用纯 HTTP 请求实现浏览功能
比 Playwright 快 30-60 倍
"""
import requests
from bs4 import BeautifulSoup
import re
import time
import atexit
from typing import Optional, Callable
from dataclasses import dataclass
BASE_URL = "https://postoa.aidunsoft.com"
@dataclass
class APIBrowseResult:
"""API 浏览结果"""
success: bool
total_items: int = 0
total_attachments: int = 0
error_message: str = ""
class APIBrowser:
"""API 浏览器 - 使用纯 HTTP 请求实现浏览"""
def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
self.logged_in = False
self.log_callback = log_callback
self.stop_flag = False
self._closed = False # 防止重复关闭
# 设置代理
if proxy_config and proxy_config.get("server"):
proxy_server = proxy_config["server"]
self.session.proxies = {
"http": proxy_server,
"https": proxy_server
}
self.proxy_server = proxy_server
else:
self.proxy_server = None
# 注册退出清理函数
atexit.register(self._cleanup_on_exit)
def log(self, message: str):
"""记录日志"""
if self.log_callback:
self.log_callback(message)
def save_cookies_for_playwright(self, username: str):
"""保存cookies供Playwright使用"""
import os
import json
import hashlib
cookies_dir = '/app/data/cookies'
os.makedirs(cookies_dir, exist_ok=True)
# 用用户名的hash作为文件名
filename = hashlib.md5(username.encode()).hexdigest() + '.json'
cookies_path = os.path.join(cookies_dir, filename)
try:
# 获取requests session的cookies
cookies_list = []
for cookie in self.session.cookies:
cookies_list.append({
'name': cookie.name,
'value': cookie.value,
'domain': cookie.domain or 'postoa.aidunsoft.com',
'path': cookie.path or '/',
})
# Playwright storage_state 格式
storage_state = {
'cookies': cookies_list,
'origins': []
}
with open(cookies_path, 'w', encoding='utf-8') as f:
json.dump(storage_state, f)
self.log(f"[API] Cookies已保存供截图使用")
return True
except Exception as e:
self.log(f"[API] 保存cookies失败: {e}")
return False
def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs):
"""带重试机制的请求方法"""
kwargs.setdefault('timeout', 10)
last_error = None
for attempt in range(1, max_retries + 1):
try:
if method.lower() == 'get':
resp = self.session.get(url, **kwargs)
else:
resp = self.session.post(url, **kwargs)
return resp
except Exception as e:
last_error = e
if attempt < max_retries:
self.log(f"[API] 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...")
import time
time.sleep(retry_delay)
else:
self.log(f"[API] 请求失败,已重试{max_retries}次: {str(e)}")
raise last_error
def _get_aspnet_fields(self, soup):
"""获取 ASP.NET 隐藏字段"""
fields = {}
for name in ['__VIEWSTATE', '__VIEWSTATEGENERATOR', '__EVENTVALIDATION']:
field = soup.find('input', {'name': name})
if field:
fields[name] = field.get('value', '')
return fields
def login(self, username: str, password: str) -> bool:
"""登录"""
self.log(f"[API] 登录: {username}")
try:
login_url = f"{BASE_URL}/admin/login.aspx"
resp = self._request_with_retry('get', login_url)
soup = BeautifulSoup(resp.text, 'html.parser')
fields = self._get_aspnet_fields(soup)
data = fields.copy()
data['txtUserName'] = username
data['txtPassword'] = password
data['btnSubmit'] = '登 录'
resp = self._request_with_retry(
'post',
login_url,
data=data,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': BASE_URL,
'Referer': login_url,
},
allow_redirects=True
)
if 'index.aspx' in resp.url:
self.logged_in = True
self.log(f"[API] 登录成功")
return True
else:
soup = BeautifulSoup(resp.text, 'html.parser')
error = soup.find(id='lblMsg')
error_msg = error.get_text().strip() if error else '未知错误'
self.log(f"[API] 登录失败: {error_msg}")
return False
except Exception as e:
self.log(f"[API] 登录异常: {str(e)}")
return False
def get_article_list_page(self, bz: int = 2, page: int = 1, base_url: str = None):
"""获取单页文章列表"""
if not self.logged_in:
return [], 0, None
try:
if base_url and page > 1:
url = re.sub(r'page=\d+', f'page={page}', base_url)
else:
url = f"{BASE_URL}/admin/center.aspx?bz={bz}"
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
articles = []
ltable = soup.find('table', {'class': 'ltable'})
if ltable:
rows = ltable.find_all('tr')[1:]
for row in rows:
# 检查是否是"暂无记录"
if '暂无记录' in row.get_text():
continue
link = row.find('a', href=True)
if link:
href = link.get('href', '')
title = link.get_text().strip()
match = re.search(r'id=(\d+)', href)
article_id = match.group(1) if match else None
articles.append({
'title': title,
'href': href,
'article_id': article_id,
})
# 获取总页数
total_pages = 1
next_page_url = None
page_content = soup.find(id='PageContent')
if page_content:
text = page_content.get_text()
total_match = re.search(r'共(\d+)记录', text)
if total_match:
total_records = int(total_match.group(1))
total_pages = (total_records + 9) // 10
next_link = page_content.find('a', string=re.compile('下一页'))
if next_link:
next_href = next_link.get('href', '')
if next_href:
next_page_url = f"{BASE_URL}/admin/{next_href}"
return articles, total_pages, next_page_url
except Exception as e:
self.log(f"[API] 获取列表失败: {str(e)}")
return [], 0, None
def get_article_attachments(self, article_href: str):
"""获取文章的附件列表"""
try:
if not article_href.startswith('http'):
url = f"{BASE_URL}/admin/{article_href}"
else:
url = article_href
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
attachments = []
attach_list = soup.find('div', {'class': 'attach-list2'})
if attach_list:
items = attach_list.find_all('li')
for item in items:
download_links = item.find_all('a', onclick=re.compile(r'download\.ashx'))
for link in download_links:
onclick = link.get('onclick', '')
id_match = re.search(r'id=(\d+)', onclick)
channel_match = re.search(r'channel_id=(\d+)', onclick)
if id_match:
attach_id = id_match.group(1)
channel_id = channel_match.group(1) if channel_match else '1'
h3 = item.find('h3')
filename = h3.get_text().strip() if h3 else f'附件{attach_id}'
attachments.append({
'id': attach_id,
'channel_id': channel_id,
'filename': filename
})
break
return attachments
except Exception as e:
return []
def mark_read(self, attach_id: str, channel_id: str = '1') -> bool:
"""通过访问下载链接标记已读"""
download_url = f"{BASE_URL}/tools/download.ashx?site=main&id={attach_id}&channel_id={channel_id}"
try:
resp = self._request_with_retry("get", download_url, stream=True)
resp.close()
return resp.status_code == 200
except:
return False
def browse_content(self, browse_type: str,
should_stop_callback: Optional[Callable] = None) -> APIBrowseResult:
"""
浏览内容并标记已读
Args:
browse_type: 浏览类型 (应读/注册前未读)
should_stop_callback: 检查是否应该停止的回调函数
Returns:
浏览结果
"""
result = APIBrowseResult(success=False)
if not self.logged_in:
result.error_message = "未登录"
return result
# 根据浏览类型确定 bz 参数
# 网页实际选项: 0=注册前未读, 1=已读, 2=应读
# 前端选项: 注册前未读, 应读, 未读, 已读
if '注册前' in browse_type:
bz = 0 # 注册前未读
elif browse_type == '已读':
bz = 1 # 已读
else:
bz = 2 # 应读、未读 都映射到 bz=2
self.log(f"[API] 开始浏览 '{browse_type}' (bz={bz})...")
try:
total_items = 0
total_attachments = 0
page = 1
base_url = None
# 获取第一页
articles, total_pages, next_url = self.get_article_list_page(bz, page)
if not articles:
self.log(f"[API] '{browse_type}' 没有待处理内容")
result.success = True
return result
self.log(f"[API] 共 {total_pages} 页,开始处理...")
if next_url:
base_url = next_url
# 处理所有页面
while True:
if should_stop_callback and should_stop_callback():
self.log("[API] 收到停止信号")
break
for article in articles:
if should_stop_callback and should_stop_callback():
break
title = article['title'][:30]
total_items += 1
# 获取附件
attachments = self.get_article_attachments(article['href'])
if attachments:
for attach in attachments:
if self.mark_read(attach['id'], attach['channel_id']):
total_attachments += 1
self.log(f"[API] [{total_items}] {title} - {len(attachments)}个附件")
time.sleep(0.1)
# 下一页
page += 1
if page > total_pages:
break
articles, _, next_url = self.get_article_list_page(bz, page, base_url)
if not articles:
break
if next_url:
base_url = next_url
time.sleep(0.2)
self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件")
result.success = True
result.total_items = total_items
result.total_attachments = total_attachments
return result
except Exception as e:
result.error_message = str(e)
self.log(f"[API] 浏览出错: {str(e)}")
return result
def close(self):
"""关闭会话"""
if self._closed:
return
self._closed = True
try:
self.session.close()
except:
pass
def _cleanup_on_exit(self):
"""进程退出时的清理函数由atexit调用"""
if not self._closed:
try:
self.session.close()
self._closed = True
except:
pass
def __enter__(self):
"""Context manager支持 - 进入"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager支持 - 退出"""
self.close()
return False # 不抑制异常