Files
zsglpt/api_browser.py
yuyx de51e1b7c7 修复多项安全漏洞和Bug
1. 安全修复:
   - 修复密码重置接口用户枚举漏洞,统一返回消息防止信息泄露
   - 统一密码强度验证为8位以上且包含字母和数字
   - 添加第三方账号密码加密存储(Fernet对称加密)
   - 修复默认管理员弱密码问题,改用随机生成强密码
   - 修复管理员回复XSS漏洞,添加HTML转义
   - 将MD5哈希替换为SHA256

2. 并发Bug修复:
   - 修复日志缓存竞态条件,添加锁保护
   - 修复截图信号量配置变更后不生效问题

3. 其他改进:
   - 添加API参数类型验证和边界检查
   - 新增crypto_utils.py加密工具模块

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 19:14:14 +08:00

417 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 浏览器 - 用纯 HTTP 请求实现浏览功能
比 Playwright 快 30-60 倍
"""
import requests
from bs4 import BeautifulSoup
import re
import time
import atexit
from typing import Optional, Callable
from dataclasses import dataclass
BASE_URL = "https://postoa.aidunsoft.com"
@dataclass
class APIBrowseResult:
"""API 浏览结果"""
success: bool
total_items: int = 0
total_attachments: int = 0
error_message: str = ""
class APIBrowser:
"""API 浏览器 - 使用纯 HTTP 请求实现浏览"""
def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
self.logged_in = False
self.log_callback = log_callback
self.stop_flag = False
self._closed = False # 防止重复关闭
# 设置代理
if proxy_config and proxy_config.get("server"):
proxy_server = proxy_config["server"]
self.session.proxies = {
"http": proxy_server,
"https": proxy_server
}
self.proxy_server = proxy_server
else:
self.proxy_server = None
# 注册退出清理函数
atexit.register(self._cleanup_on_exit)
def log(self, message: str):
"""记录日志"""
if self.log_callback:
self.log_callback(message)
def save_cookies_for_playwright(self, username: str):
"""保存cookies供Playwright使用"""
import os
import json
import hashlib
cookies_dir = '/app/data/cookies'
os.makedirs(cookies_dir, exist_ok=True)
# 安全修复使用SHA256代替MD5作为文件名哈希
filename = hashlib.sha256(username.encode()).hexdigest()[:32] + '.json'
cookies_path = os.path.join(cookies_dir, filename)
try:
# 获取requests session的cookies
cookies_list = []
for cookie in self.session.cookies:
cookies_list.append({
'name': cookie.name,
'value': cookie.value,
'domain': cookie.domain or 'postoa.aidunsoft.com',
'path': cookie.path or '/',
})
# Playwright storage_state 格式
storage_state = {
'cookies': cookies_list,
'origins': []
}
with open(cookies_path, 'w', encoding='utf-8') as f:
json.dump(storage_state, f)
self.log(f"[API] Cookies已保存供截图使用")
return True
except Exception as e:
self.log(f"[API] 保存cookies失败: {e}")
return False
def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs):
"""带重试机制的请求方法"""
kwargs.setdefault('timeout', 10)
last_error = None
for attempt in range(1, max_retries + 1):
try:
if method.lower() == 'get':
resp = self.session.get(url, **kwargs)
else:
resp = self.session.post(url, **kwargs)
return resp
except Exception as e:
last_error = e
if attempt < max_retries:
self.log(f"[API] 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...")
import time
time.sleep(retry_delay)
else:
self.log(f"[API] 请求失败,已重试{max_retries}次: {str(e)}")
raise last_error
def _get_aspnet_fields(self, soup):
"""获取 ASP.NET 隐藏字段"""
fields = {}
for name in ['__VIEWSTATE', '__VIEWSTATEGENERATOR', '__EVENTVALIDATION']:
field = soup.find('input', {'name': name})
if field:
fields[name] = field.get('value', '')
return fields
def login(self, username: str, password: str) -> bool:
"""登录"""
self.log(f"[API] 登录: {username}")
try:
login_url = f"{BASE_URL}/admin/login.aspx"
resp = self._request_with_retry('get', login_url)
soup = BeautifulSoup(resp.text, 'html.parser')
fields = self._get_aspnet_fields(soup)
data = fields.copy()
data['txtUserName'] = username
data['txtPassword'] = password
data['btnSubmit'] = '登 录'
resp = self._request_with_retry(
'post',
login_url,
data=data,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': BASE_URL,
'Referer': login_url,
},
allow_redirects=True
)
if 'index.aspx' in resp.url:
self.logged_in = True
self.log(f"[API] 登录成功")
return True
else:
soup = BeautifulSoup(resp.text, 'html.parser')
error = soup.find(id='lblMsg')
error_msg = error.get_text().strip() if error else '未知错误'
self.log(f"[API] 登录失败: {error_msg}")
return False
except Exception as e:
self.log(f"[API] 登录异常: {str(e)}")
return False
def get_article_list_page(self, bz: int = 2, page: int = 1, base_url: str = None):
"""获取单页文章列表"""
if not self.logged_in:
return [], 0, None
try:
if base_url and page > 1:
url = re.sub(r'page=\d+', f'page={page}', base_url)
else:
url = f"{BASE_URL}/admin/center.aspx?bz={bz}"
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
articles = []
ltable = soup.find('table', {'class': 'ltable'})
if ltable:
rows = ltable.find_all('tr')[1:]
for row in rows:
# 检查是否是"暂无记录"
if '暂无记录' in row.get_text():
continue
link = row.find('a', href=True)
if link:
href = link.get('href', '')
title = link.get_text().strip()
match = re.search(r'id=(\d+)', href)
article_id = match.group(1) if match else None
articles.append({
'title': title,
'href': href,
'article_id': article_id,
})
# 获取总页数
total_pages = 1
next_page_url = None
page_content = soup.find(id='PageContent')
if page_content:
text = page_content.get_text()
total_match = re.search(r'共(\d+)记录', text)
if total_match:
total_records = int(total_match.group(1))
total_pages = (total_records + 9) // 10
next_link = page_content.find('a', string=re.compile('下一页'))
if next_link:
next_href = next_link.get('href', '')
if next_href:
next_page_url = f"{BASE_URL}/admin/{next_href}"
return articles, total_pages, next_page_url
except Exception as e:
self.log(f"[API] 获取列表失败: {str(e)}")
return [], 0, None
def get_article_attachments(self, article_href: str):
"""获取文章的附件列表"""
try:
if not article_href.startswith('http'):
url = f"{BASE_URL}/admin/{article_href}"
else:
url = article_href
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
attachments = []
attach_list = soup.find('div', {'class': 'attach-list2'})
if attach_list:
items = attach_list.find_all('li')
for item in items:
download_links = item.find_all('a', onclick=re.compile(r'download\.ashx'))
for link in download_links:
onclick = link.get('onclick', '')
id_match = re.search(r'id=(\d+)', onclick)
channel_match = re.search(r'channel_id=(\d+)', onclick)
if id_match:
attach_id = id_match.group(1)
channel_id = channel_match.group(1) if channel_match else '1'
h3 = item.find('h3')
filename = h3.get_text().strip() if h3 else f'附件{attach_id}'
attachments.append({
'id': attach_id,
'channel_id': channel_id,
'filename': filename
})
break
return attachments
except Exception as e:
return []
def mark_read(self, attach_id: str, channel_id: str = '1') -> bool:
"""通过访问下载链接标记已读"""
download_url = f"{BASE_URL}/tools/download.ashx?site=main&id={attach_id}&channel_id={channel_id}"
try:
resp = self._request_with_retry("get", download_url, stream=True)
resp.close()
return resp.status_code == 200
except:
return False
def browse_content(self, browse_type: str,
should_stop_callback: Optional[Callable] = None) -> APIBrowseResult:
"""
浏览内容并标记已读
Args:
browse_type: 浏览类型 (应读/注册前未读)
should_stop_callback: 检查是否应该停止的回调函数
Returns:
浏览结果
"""
result = APIBrowseResult(success=False)
if not self.logged_in:
result.error_message = "未登录"
return result
# 根据浏览类型确定 bz 参数
# 网页实际选项: 0=注册前未读, 1=已读, 2=应读
# 前端选项: 注册前未读, 应读, 未读, 已读
if '注册前' in browse_type:
bz = 0 # 注册前未读
elif browse_type == '已读':
bz = 1 # 已读
else:
bz = 2 # 应读、未读 都映射到 bz=2
self.log(f"[API] 开始浏览 '{browse_type}' (bz={bz})...")
try:
total_items = 0
total_attachments = 0
page = 1
base_url = None
# 获取第一页
articles, total_pages, next_url = self.get_article_list_page(bz, page)
if not articles:
self.log(f"[API] '{browse_type}' 没有待处理内容")
result.success = True
return result
self.log(f"[API] 共 {total_pages} 页,开始处理...")
if next_url:
base_url = next_url
# 处理所有页面
while True:
if should_stop_callback and should_stop_callback():
self.log("[API] 收到停止信号")
break
for article in articles:
if should_stop_callback and should_stop_callback():
break
title = article['title'][:30]
total_items += 1
# 获取附件
attachments = self.get_article_attachments(article['href'])
if attachments:
for attach in attachments:
if self.mark_read(attach['id'], attach['channel_id']):
total_attachments += 1
self.log(f"[API] [{total_items}] {title} - {len(attachments)}个附件")
time.sleep(0.1)
# 下一页
page += 1
if page > total_pages:
break
articles, _, next_url = self.get_article_list_page(bz, page, base_url)
if not articles:
break
if next_url:
base_url = next_url
time.sleep(0.2)
self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件")
result.success = True
result.total_items = total_items
result.total_attachments = total_attachments
return result
except Exception as e:
result.error_message = str(e)
self.log(f"[API] 浏览出错: {str(e)}")
return result
def close(self):
"""关闭会话"""
if self._closed:
return
self._closed = True
try:
self.session.close()
except:
pass
def _cleanup_on_exit(self):
"""进程退出时的清理函数由atexit调用"""
if not self._closed:
try:
self.session.close()
self._closed = True
except:
pass
def __enter__(self):
"""Context manager支持 - 进入"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager支持 - 退出"""
self.close()
return False # 不抑制异常