Files
zsglpt/api_browser.py
yuyx 606cad43dc fix: 修复无附件文章无法标记已读的问题
- 发现标记已读的正确 API: /tools/submit_ajax.ashx?action=saveread
- 添加 mark_article_read 方法调用 saveread API 标记文章已读
- 修改 get_article_attachments 返回文章的 channel_id 和 article_id
- 对每篇文章都调用 mark_article_read,无论是否有附件
- 解决米米88、黄紫夏99等账号文章无法标记已读的问题

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 13:24:29 +08:00

665 lines
25 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
API 浏览器 - 用纯 HTTP 请求实现浏览功能
比传统浏览器自动化快 30-60 倍
"""
import requests
from bs4 import BeautifulSoup
import os
import re
import time
import atexit
import weakref
from typing import Optional, Callable
from dataclasses import dataclass
from urllib.parse import urlsplit
from app_config import get_config
import time as _time_module
_MODULE_START_TIME = _time_module.time()
_WARMUP_PERIOD_SECONDS = 60 # 启动后 60 秒内使用更长超时
_WARMUP_TIMEOUT_SECONDS = 15.0 # 预热期间的超时时间
config = get_config()
BASE_URL = getattr(config, "ZSGL_BASE_URL", "https://postoa.aidunsoft.com")
LOGIN_URL = getattr(config, "ZSGL_LOGIN_URL", f"{BASE_URL}/admin/login.aspx")
INDEX_URL_PATTERN = getattr(config, "ZSGL_INDEX_URL_PATTERN", "index.aspx")
COOKIES_DIR = getattr(config, "COOKIES_DIR", "data/cookies")
try:
_API_REQUEST_TIMEOUT_SECONDS = float(os.environ.get("API_REQUEST_TIMEOUT_SECONDS") or os.environ.get("API_REQUEST_TIMEOUT") or "5")
except Exception:
_API_REQUEST_TIMEOUT_SECONDS = 5.0
_API_REQUEST_TIMEOUT_SECONDS = max(3.0, _API_REQUEST_TIMEOUT_SECONDS)
_API_DIAGNOSTIC_LOG = str(os.environ.get("API_DIAGNOSTIC_LOG", "")).strip().lower() in ("1", "true", "yes", "on")
try:
_API_DIAGNOSTIC_SLOW_MS = int(os.environ.get("API_DIAGNOSTIC_SLOW_MS", "0") or "0")
except Exception:
_API_DIAGNOSTIC_SLOW_MS = 0
_API_DIAGNOSTIC_SLOW_MS = max(0, _API_DIAGNOSTIC_SLOW_MS)
_cookie_domain_fallback = urlsplit(BASE_URL).hostname or "postoa.aidunsoft.com"
_COOKIE_JAR_MAX_AGE_SECONDS = 24 * 60 * 60
def get_cookie_jar_path(username: str) -> str:
"""获取截图用的 cookies 文件路径Netscape Cookie 格式)"""
import hashlib
os.makedirs(COOKIES_DIR, exist_ok=True)
filename = hashlib.sha256(username.encode()).hexdigest()[:32] + ".cookies.txt"
return os.path.join(COOKIES_DIR, filename)
def is_cookie_jar_fresh(cookie_path: str, max_age_seconds: int = _COOKIE_JAR_MAX_AGE_SECONDS) -> bool:
"""判断 cookies 文件是否存在且未过期"""
if not cookie_path or not os.path.exists(cookie_path):
return False
try:
file_age = time.time() - os.path.getmtime(cookie_path)
return file_age <= max(0, int(max_age_seconds or 0))
except Exception:
return False
_api_browser_instances: "weakref.WeakSet[APIBrowser]" = weakref.WeakSet()
def _cleanup_api_browser_instances():
"""进程退出时清理残留的API浏览器实例弱引用不阻止GC"""
for inst in list(_api_browser_instances):
try:
inst.close()
except Exception:
pass
atexit.register(_cleanup_api_browser_instances)
@dataclass
class APIBrowseResult:
"""API 浏览结果"""
success: bool
total_items: int = 0
total_attachments: int = 0
error_message: str = ""
class APIBrowser:
"""API 浏览器 - 使用纯 HTTP 请求实现浏览"""
def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
})
self.logged_in = False
self.log_callback = log_callback
self.stop_flag = False
self._closed = False # 防止重复关闭
self.last_total_records = 0
# 设置代理
if proxy_config and proxy_config.get("server"):
proxy_server = proxy_config["server"]
self.session.proxies = {
"http": proxy_server,
"https": proxy_server
}
self.proxy_server = proxy_server
else:
self.proxy_server = None
_api_browser_instances.add(self)
def log(self, message: str):
"""记录日志"""
if self.log_callback:
self.log_callback(message)
def save_cookies_for_screenshot(self, username: str):
"""保存 cookies 供 wkhtmltoimage 使用Netscape Cookie 格式)"""
cookies_path = get_cookie_jar_path(username)
try:
lines = [
"# Netscape HTTP Cookie File",
"# This file was generated by zsglpt",
]
for cookie in self.session.cookies:
domain = cookie.domain or _cookie_domain_fallback
include_subdomains = "TRUE" if domain.startswith(".") else "FALSE"
path = cookie.path or "/"
secure = "TRUE" if getattr(cookie, "secure", False) else "FALSE"
expires = int(getattr(cookie, "expires", 0) or 0)
lines.append(
"\t".join(
[
domain,
include_subdomains,
path,
secure,
str(expires),
cookie.name,
cookie.value,
]
)
)
with open(cookies_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
self.log(f"[API] Cookies已保存供截图使用")
return True
except Exception as e:
self.log(f"[API] 保存cookies失败: {e}")
return False
def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs):
"""带重试机制的请求方法"""
# 启动后 60 秒内使用更长超时15秒之后使用配置的超时
if (_time_module.time() - _MODULE_START_TIME) < _WARMUP_PERIOD_SECONDS:
kwargs.setdefault('timeout', _WARMUP_TIMEOUT_SECONDS)
else:
kwargs.setdefault('timeout', _API_REQUEST_TIMEOUT_SECONDS)
last_error = None
timeout_value = kwargs.get("timeout")
diag_enabled = _API_DIAGNOSTIC_LOG
slow_ms = _API_DIAGNOSTIC_SLOW_MS
for attempt in range(1, max_retries + 1):
start_ts = _time_module.time()
try:
if method.lower() == 'get':
resp = self.session.get(url, **kwargs)
else:
resp = self.session.post(url, **kwargs)
if diag_enabled:
elapsed_ms = int((_time_module.time() - start_ts) * 1000)
if slow_ms <= 0 or elapsed_ms >= slow_ms:
self.log(
f"[API][trace] {method.upper()} {url} ok status={resp.status_code} elapsed_ms={elapsed_ms} timeout={timeout_value} attempt={attempt}/{max_retries}"
)
return resp
except Exception as e:
last_error = e
if diag_enabled:
elapsed_ms = int((_time_module.time() - start_ts) * 1000)
self.log(
f"[API][trace] {method.upper()} {url} err={type(e).__name__} elapsed_ms={elapsed_ms} timeout={timeout_value} attempt={attempt}/{max_retries}"
)
if attempt < max_retries:
self.log(f"[API] 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...")
import time
time.sleep(retry_delay)
else:
self.log(f"[API] 请求失败,已重试{max_retries}次: {str(e)}")
raise last_error
def _get_aspnet_fields(self, soup):
"""获取 ASP.NET 隐藏字段"""
fields = {}
for name in ['__VIEWSTATE', '__VIEWSTATEGENERATOR', '__EVENTVALIDATION']:
field = soup.find('input', {'name': name})
if field:
fields[name] = field.get('value', '')
return fields
def get_real_name(self) -> Optional[str]:
"""
获取用户真实姓名
从 center.aspx 页面解析姓名信息
返回: 姓名字符串,失败返回 None
"""
if not self.logged_in:
return None
try:
url = f"{BASE_URL}/admin/center.aspx"
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
# 查找包含"姓名:"的元素
# 页面格式: <li><p>姓名:喻勇祥(19174616018) 人力资源编码: ...</p></li>
nlist = soup.find('div', {'class': 'nlist-5'})
if nlist:
first_li = nlist.find('li')
if first_li:
text = first_li.get_text()
# 解析姓名:格式为 "姓名XXX(手机号)"
match = re.search(r'姓名[:]\s*([^\(]+)', text)
if match:
real_name = match.group(1).strip()
if real_name:
return real_name
return None
except Exception as e:
return None
def login(self, username: str, password: str) -> bool:
"""登录"""
self.log(f"[API] 登录: {username}")
try:
resp = self._request_with_retry('get', LOGIN_URL)
soup = BeautifulSoup(resp.text, 'html.parser')
fields = self._get_aspnet_fields(soup)
data = fields.copy()
data['txtUserName'] = username
data['txtPassword'] = password
data['btnSubmit'] = '登 录'
resp = self._request_with_retry(
'post',
LOGIN_URL,
data=data,
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': BASE_URL,
'Referer': LOGIN_URL,
},
allow_redirects=True
)
if INDEX_URL_PATTERN in resp.url:
self.logged_in = True
self.log(f"[API] 登录成功")
return True
else:
soup = BeautifulSoup(resp.text, 'html.parser')
error = soup.find(id='lblMsg')
error_msg = error.get_text().strip() if error else '未知错误'
self.log(f"[API] 登录失败: {error_msg}")
return False
except Exception as e:
self.log(f"[API] 登录异常: {str(e)}")
return False
def get_article_list_page(self, bz: int = 2, page: int = 1, base_url: str = None):
"""获取单页文章列表"""
if not self.logged_in:
return [], 0, None
if base_url and page > 1:
url = re.sub(r'page=\d+', f'page={page}', base_url)
elif page > 1:
# 兼容兜底:若没有 next_url极少数情况下页面不提供“下一页”链接尝试直接拼 page 参数
url = f"{BASE_URL}/admin/center.aspx?bz={bz}&page={page}"
else:
url = f"{BASE_URL}/admin/center.aspx?bz={bz}"
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
articles = []
ltable = soup.find('table', {'class': 'ltable'})
if ltable:
rows = ltable.find_all('tr')[1:]
for row in rows:
# 检查是否是"暂无记录"
if '暂无记录' in row.get_text():
continue
link = row.find('a', href=True)
if link:
href = link.get('href', '')
title = link.get_text().strip()
match = re.search(r'id=(\d+)', href)
article_id = match.group(1) if match else None
articles.append({
'title': title,
'href': href,
'article_id': article_id,
})
# 获取总页数
total_pages = 1
next_page_url = None
total_records = 0
page_content = soup.find(id='PageContent')
if page_content:
text = page_content.get_text()
total_match = re.search(r'共(\d+)记录', text)
if total_match:
total_records = int(total_match.group(1))
total_pages = (total_records + 9) // 10
next_link = page_content.find('a', string=re.compile('下一页'))
if next_link:
next_href = next_link.get('href', '')
if next_href:
next_page_url = f"{BASE_URL}/admin/{next_href}"
try:
self.last_total_records = int(total_records or 0)
except Exception:
self.last_total_records = 0
return articles, total_pages, next_page_url
def get_article_attachments(self, article_href: str):
"""
获取文章的附件列表和文章信息
Returns:
tuple: (attachments_list, article_info)
- attachments_list: 附件列表
- article_info: 包含 channel_id 和 article_id 的字典,用于标记文章已读
"""
if not article_href.startswith('http'):
url = f"{BASE_URL}/admin/{article_href}"
else:
url = article_href
resp = self._request_with_retry('get', url)
soup = BeautifulSoup(resp.text, 'html.parser')
attachments = []
article_info = {'channel_id': None, 'article_id': None}
# 从 saveread 按钮获取 channel_id 和 article_id
for elem in soup.find_all(['button', 'input']):
onclick = elem.get('onclick', '')
match = re.search(r'saveread\((\d+),(\d+)\)', onclick)
if match:
article_info['channel_id'] = match.group(1)
article_info['article_id'] = match.group(2)
break
attach_list = soup.find('div', {'class': 'attach-list2'})
if attach_list:
items = attach_list.find_all('li')
for item in items:
download_links = item.find_all('a', onclick=re.compile(r'download2?\.ashx'))
for link in download_links:
onclick = link.get('onclick', '')
id_match = re.search(r'id=(\d+)', onclick)
channel_match = re.search(r'channel_id=(\d+)', onclick)
if id_match:
attach_id = id_match.group(1)
channel_id = channel_match.group(1) if channel_match else '1'
h3 = item.find('h3')
filename = h3.get_text().strip() if h3 else f'附件{attach_id}'
attachments.append({
'id': attach_id,
'channel_id': channel_id,
'filename': filename
})
break
return attachments, article_info
def mark_article_read(self, channel_id: str, article_id: str) -> bool:
"""通过 saveread API 标记文章已读"""
if not channel_id or not article_id:
return False
import random
saveread_url = f"{BASE_URL}/tools/submit_ajax.ashx?action=saveread&time={random.random()}&fl={channel_id}&id={article_id}"
try:
resp = self._request_with_retry("post", saveread_url)
# 检查响应是否成功
if resp.status_code == 200:
try:
data = resp.json()
return data.get('status') == 1
except:
return True # 如果不是 JSON 但状态码 200也认为成功
return False
except:
return False
def mark_read(self, attach_id: str, channel_id: str = '1') -> bool:
"""通过访问预览通道标记附件已读"""
download_url = f"{BASE_URL}/tools/download2.ashx?site=main&id={attach_id}&channel_id={channel_id}"
try:
resp = self._request_with_retry("get", download_url, stream=True)
resp.close()
return resp.status_code == 200
except:
return False
def browse_content(
self,
browse_type: str,
should_stop_callback: Optional[Callable] = None,
progress_callback: Optional[Callable] = None,
) -> APIBrowseResult:
"""
浏览内容并标记已读
Args:
browse_type: 浏览类型 (应读/注册前未读)
should_stop_callback: 检查是否应该停止的回调函数
progress_callback: 进度回调(可选),用于实时上报已浏览内容数量
Returns:
浏览结果
"""
result = APIBrowseResult(success=False)
if not self.logged_in:
result.error_message = "未登录"
return result
# 根据浏览类型确定 bz 参数
# 网站更新后参数: 0=应读, 1=已读(注册前未读需通过页面交互切换)
# 当前前端选项: 注册前未读、应读(默认应读)
browse_type_text = str(browse_type or "")
if '注册前' in browse_type_text:
bz = 0 # 注册前未读(暂与应读相同,网站通过页面状态区分)
else:
bz = 0 # 应读
self.log(f"[API] 开始浏览 '{browse_type}' (bz={bz})...")
try:
total_items = 0
total_attachments = 0
skipped_items = 0
consecutive_failures = 0
max_consecutive_failures = 3
# 获取第一页,了解总记录数
try:
articles, total_pages, _ = self.get_article_list_page(bz, 1)
consecutive_failures = 0
except Exception as e:
result.error_message = str(e)
self.log(f"[API] 获取第1页列表失败: {str(e)}")
return result
if not articles:
self.log(f"[API] '{browse_type}' 没有待处理内容")
result.success = True
return result
total_records = int(getattr(self, "last_total_records", 0) or 0)
self.log(f"[API] 共 {total_records} 条记录,开始处理...")
last_report_ts = 0.0
def report_progress(force: bool = False):
nonlocal last_report_ts
if not progress_callback:
return
now_ts = time.time()
if not force and now_ts - last_report_ts < 1.0:
return
last_report_ts = now_ts
try:
progress_callback({"total_items": total_records, "browsed_items": total_items})
except Exception:
pass
report_progress(force=True)
# 循环处理:遍历所有页面,跟踪已处理文章防止重复
max_iterations = total_records + 20 # 防止无限循环
iteration = 0
processed_hrefs = set() # 跟踪已处理的文章,防止重复处理
current_page = 1
while articles and iteration < max_iterations:
iteration += 1
if should_stop_callback and should_stop_callback():
self.log("[API] 收到停止信号")
break
new_articles_in_page = 0 # 本次迭代中新处理的文章数
for article in articles:
if should_stop_callback and should_stop_callback():
break
article_href = article['href']
# 跳过已处理的文章
if article_href in processed_hrefs:
continue
processed_hrefs.add(article_href)
new_articles_in_page += 1
title = article['title'][:30]
# 获取附件和文章信息(文章详情页)
try:
attachments, article_info = self.get_article_attachments(article_href)
consecutive_failures = 0
except Exception as e:
skipped_items += 1
consecutive_failures += 1
self.log(
f"[API] 获取文章失败,跳过(连续失败{consecutive_failures}/{max_consecutive_failures}: {title} | {str(e)}"
)
if consecutive_failures >= max_consecutive_failures:
raise
continue
total_items += 1
report_progress()
# 标记文章已读(调用 saveread API
article_marked = False
if article_info.get('channel_id') and article_info.get('article_id'):
article_marked = self.mark_article_read(
article_info['channel_id'],
article_info['article_id']
)
# 处理附件(如果有)
if attachments:
for attach in attachments:
if self.mark_read(attach['id'], attach['channel_id']):
total_attachments += 1
self.log(f"[API] [{total_items}] {title} - {len(attachments)}个附件")
else:
# 没有附件的文章,只记录标记状态
status = "已标记" if article_marked else "标记失败"
self.log(f"[API] [{total_items}] {title} - 无附件({status})")
time.sleep(0.1)
time.sleep(0.2)
# 决定下一步获取哪一页
if new_articles_in_page > 0:
# 有新文章被处理重新获取第1页因为已读文章会从列表消失页面会上移
current_page = 1
else:
# 当前页没有新文章,尝试下一页
current_page += 1
if current_page > total_pages:
self.log(f"[API] 已遍历所有 {total_pages} 页,结束循环")
break
try:
articles, new_total_pages, _ = self.get_article_list_page(bz, current_page)
if new_total_pages > 0:
total_pages = new_total_pages
except Exception as e:
self.log(f"[API] 获取第{current_page}页列表失败: {str(e)}")
break
report_progress(force=True)
if skipped_items:
self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件(跳过 {skipped_items} 条内容)")
else:
self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件")
result.success = True
result.total_items = total_items
result.total_attachments = total_attachments
return result
except Exception as e:
result.error_message = str(e)
self.log(f"[API] 浏览出错: {str(e)}")
return result
def close(self):
"""关闭会话"""
if self._closed:
return
self._closed = True
try:
self.session.close()
except:
pass
finally:
try:
_api_browser_instances.discard(self)
except Exception:
pass
def __enter__(self):
"""Context manager支持 - 进入"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager支持 - 退出"""
self.close()
return False # 不抑制异常
def warmup_api_connection(proxy_config: Optional[dict] = None, log_callback: Optional[Callable] = None):
"""预热 API 连接 - 建立 TCP/TLS 连接池"""
def log(msg: str):
if log_callback:
log_callback(msg)
else:
print(f"[API预热] {msg}")
log("正在预热 API 连接...")
try:
session = requests.Session()
if proxy_config and proxy_config.get("server"):
session.proxies = {"http": proxy_config["server"], "https": proxy_config["server"]}
# 发送一个轻量级请求建立连接
resp = session.get(f"{BASE_URL}/admin/login.aspx", timeout=10, allow_redirects=False)
log(f"✓ API 连接预热完成 (status={resp.status_code})")
session.close()
return True
except Exception as e:
log(f"API 连接预热失败: {e}")
return False