- 新增依赖检测模块:启动时自动检测wkhtmltoimage和Playwright Chromium - 新增依赖安装对话框:缺失时提示用户一键下载安装 - 修复选项记忆功能:浏览类型、自动截图、自动上传选项现在会保存 - 优化KDocs登录检测:未登录时自动切换到金山文档页面并显示二维码 - 简化日志输出:移除debug信息,保留用户友好的状态提示 - 新增账号变化信号:账号管理页面的修改会自动同步到浏览任务页面 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
505 lines
18 KiB
Python
505 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
API浏览器 - 精简版
|
||
用纯HTTP请求实现浏览功能,比浏览器自动化快30-60倍
|
||
从原项目精简提取,移除了缓存、诊断日志等复杂功能
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import time
|
||
import hashlib
|
||
from typing import Optional, Callable, List, Dict, Any
|
||
from dataclasses import dataclass
|
||
from urllib.parse import urlsplit
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
|
||
@dataclass
|
||
class APIBrowseResult:
|
||
"""API浏览结果"""
|
||
success: bool
|
||
total_items: int = 0
|
||
total_attachments: int = 0
|
||
error_message: str = ""
|
||
|
||
|
||
def get_cookie_jar_path(username: str) -> str:
|
||
"""获取截图用的cookies文件路径(Netscape Cookie格式)"""
|
||
from config import COOKIES_DIR
|
||
|
||
COOKIES_DIR.mkdir(exist_ok=True)
|
||
filename = hashlib.sha256(username.encode()).hexdigest()[:32] + ".cookies.txt"
|
||
return str(COOKIES_DIR / filename)
|
||
|
||
|
||
def is_cookie_jar_fresh(cookie_path: str, max_age_seconds: int = 86400) -> bool:
|
||
"""判断cookies文件是否存在且未过期(默认24小时)"""
|
||
if not cookie_path or not os.path.exists(cookie_path):
|
||
return False
|
||
try:
|
||
file_age = time.time() - os.path.getmtime(cookie_path)
|
||
return file_age <= max(0, int(max_age_seconds or 0))
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
class APIBrowser:
|
||
"""
|
||
API浏览器 - 使用纯HTTP请求实现浏览
|
||
|
||
用法:
|
||
with APIBrowser(log_callback=print) as browser:
|
||
if browser.login(username, password):
|
||
result = browser.browse_content("应读")
|
||
"""
|
||
|
||
def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None):
|
||
self.session = requests.Session()
|
||
self.session.headers.update({
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
})
|
||
self.logged_in = False
|
||
self.log_callback = log_callback
|
||
self.stop_flag = False
|
||
self._closed = False
|
||
self.last_total_records = 0
|
||
self._username = ""
|
||
|
||
# 获取配置
|
||
from config import get_config
|
||
config = get_config()
|
||
self.base_url = config.zsgl.base_url
|
||
self.login_url = config.zsgl.login_url
|
||
self.index_url_pattern = config.zsgl.index_url_pattern
|
||
|
||
# 设置代理
|
||
if proxy_config and proxy_config.get("server"):
|
||
proxy_server = proxy_config["server"]
|
||
self.session.proxies = {"http": proxy_server, "https": proxy_server}
|
||
self.proxy_server = proxy_server
|
||
else:
|
||
self.proxy_server = None
|
||
|
||
def log(self, message: str):
|
||
"""记录日志"""
|
||
if self.log_callback:
|
||
self.log_callback(message)
|
||
|
||
def _request_with_retry(self, method: str, url: str, max_retries: int = 3,
|
||
retry_delay: float = 1, **kwargs) -> requests.Response:
|
||
"""带重试机制的请求方法"""
|
||
kwargs.setdefault("timeout", 10.0)
|
||
last_error = None
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
if method.lower() == "get":
|
||
resp = self.session.get(url, **kwargs)
|
||
else:
|
||
resp = self.session.post(url, **kwargs)
|
||
return resp
|
||
except Exception as e:
|
||
last_error = e
|
||
if attempt < max_retries:
|
||
self.log(f" 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...")
|
||
time.sleep(retry_delay)
|
||
else:
|
||
self.log(f" 请求失败,已重试{max_retries}次: {str(e)}")
|
||
|
||
raise last_error
|
||
|
||
def _get_aspnet_fields(self, soup: BeautifulSoup) -> Dict[str, str]:
|
||
"""获取ASP.NET隐藏字段"""
|
||
fields = {}
|
||
for name in ["__VIEWSTATE", "__VIEWSTATEGENERATOR", "__EVENTVALIDATION"]:
|
||
field = soup.find("input", {"name": name})
|
||
if field:
|
||
fields[name] = field.get("value", "")
|
||
return fields
|
||
|
||
def login(self, username: str, password: str) -> bool:
|
||
"""登录"""
|
||
self.log(f" 登录: {username}")
|
||
self._username = username
|
||
|
||
try:
|
||
resp = self._request_with_retry("get", self.login_url)
|
||
soup = BeautifulSoup(resp.text, "html.parser")
|
||
fields = self._get_aspnet_fields(soup)
|
||
|
||
data = fields.copy()
|
||
data["txtUserName"] = username
|
||
data["txtPassword"] = password
|
||
data["btnSubmit"] = "登 录"
|
||
|
||
resp = self._request_with_retry(
|
||
"post",
|
||
self.login_url,
|
||
data=data,
|
||
headers={
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Origin": self.base_url,
|
||
"Referer": self.login_url,
|
||
},
|
||
allow_redirects=True,
|
||
)
|
||
|
||
if self.index_url_pattern in resp.url:
|
||
self.logged_in = True
|
||
self.log(f" 登录成功")
|
||
return True
|
||
else:
|
||
soup = BeautifulSoup(resp.text, "html.parser")
|
||
error = soup.find(id="lblMsg")
|
||
error_msg = error.get_text().strip() if error else "未知错误"
|
||
self.log(f" 登录失败: {error_msg}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.log(f" 登录异常: {str(e)}")
|
||
return False
|
||
|
||
def get_real_name(self) -> Optional[str]:
|
||
"""获取用户真实姓名"""
|
||
if not self.logged_in:
|
||
return None
|
||
|
||
try:
|
||
url = f"{self.base_url}/admin/center.aspx"
|
||
resp = self._request_with_retry("get", url)
|
||
soup = BeautifulSoup(resp.text, "html.parser")
|
||
|
||
nlist = soup.find("div", {"class": "nlist-5"})
|
||
if nlist:
|
||
first_li = nlist.find("li")
|
||
if first_li:
|
||
text = first_li.get_text()
|
||
match = re.search(r"姓名[::]\s*([^\((]+)", text)
|
||
if match:
|
||
return match.group(1).strip()
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def save_cookies_for_screenshot(self, username: str) -> bool:
|
||
"""保存cookies供wkhtmltoimage使用(Netscape Cookie格式)"""
|
||
cookies_path = get_cookie_jar_path(username)
|
||
try:
|
||
parsed = urlsplit(self.base_url)
|
||
cookie_domain = parsed.hostname or "postoa.aidunsoft.com"
|
||
|
||
lines = [
|
||
"# Netscape HTTP Cookie File",
|
||
"# Generated by zsglpt-lite",
|
||
]
|
||
for cookie in self.session.cookies:
|
||
domain = cookie.domain or cookie_domain
|
||
include_subdomains = "TRUE" if domain.startswith(".") else "FALSE"
|
||
path = cookie.path or "/"
|
||
secure = "TRUE" if getattr(cookie, "secure", False) else "FALSE"
|
||
expires = int(getattr(cookie, "expires", 0) or 0)
|
||
lines.append("\t".join([
|
||
domain,
|
||
include_subdomains,
|
||
path,
|
||
secure,
|
||
str(expires),
|
||
cookie.name,
|
||
cookie.value,
|
||
]))
|
||
|
||
with open(cookies_path, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(lines) + "\n")
|
||
|
||
self.log(f" Cookies已保存供截图使用")
|
||
return True
|
||
except Exception as e:
|
||
self.log(f" 保存cookies失败: {e}")
|
||
return False
|
||
|
||
def get_article_list_page(self, bz: int = 0, page: int = 1) -> tuple:
|
||
"""获取单页文章列表"""
|
||
if not self.logged_in:
|
||
return [], 0, None
|
||
|
||
if page > 1:
|
||
url = f"{self.base_url}/admin/center.aspx?bz={bz}&page={page}"
|
||
else:
|
||
url = f"{self.base_url}/admin/center.aspx?bz={bz}"
|
||
|
||
resp = self._request_with_retry("get", url)
|
||
soup = BeautifulSoup(resp.text, "html.parser")
|
||
articles = []
|
||
|
||
ltable = soup.find("table", {"class": "ltable"})
|
||
if ltable:
|
||
rows = ltable.find_all("tr")[1:]
|
||
for row in rows:
|
||
if "暂无记录" in row.get_text():
|
||
continue
|
||
|
||
link = row.find("a", href=True)
|
||
if link:
|
||
href = link.get("href", "")
|
||
title = link.get_text().strip()
|
||
match = re.search(r"id=(\d+)", href)
|
||
article_id = match.group(1) if match else None
|
||
articles.append({
|
||
"title": title,
|
||
"href": href,
|
||
"article_id": article_id,
|
||
})
|
||
|
||
# 获取总页数
|
||
total_pages = 1
|
||
total_records = 0
|
||
|
||
page_content = soup.find(id="PageContent")
|
||
if page_content:
|
||
text = page_content.get_text()
|
||
total_match = re.search(r"共(\d+)记录", text)
|
||
if total_match:
|
||
total_records = int(total_match.group(1))
|
||
total_pages = (total_records + 9) // 10
|
||
|
||
self.last_total_records = total_records
|
||
return articles, total_pages, None
|
||
|
||
def get_article_attachments(self, article_href: str) -> tuple:
|
||
"""获取文章的附件列表和文章信息"""
|
||
if not article_href.startswith("http"):
|
||
url = f"{self.base_url}/admin/{article_href}"
|
||
else:
|
||
url = article_href
|
||
|
||
resp = self._request_with_retry("get", url)
|
||
soup = BeautifulSoup(resp.text, "html.parser")
|
||
|
||
attachments = []
|
||
article_info = {"channel_id": None, "article_id": None}
|
||
|
||
# 从saveread按钮获取channel_id和article_id
|
||
for elem in soup.find_all(["button", "input"]):
|
||
onclick = elem.get("onclick", "")
|
||
match = re.search(r"saveread\((\d+),(\d+)\)", onclick)
|
||
if match:
|
||
article_info["channel_id"] = match.group(1)
|
||
article_info["article_id"] = match.group(2)
|
||
break
|
||
|
||
attach_list = soup.find("div", {"class": "attach-list2"})
|
||
if attach_list:
|
||
items = attach_list.find_all("li")
|
||
for item in items:
|
||
download_links = item.find_all("a", onclick=re.compile(r"download2?\.ashx"))
|
||
for link in download_links:
|
||
onclick = link.get("onclick", "")
|
||
id_match = re.search(r"id=(\d+)", onclick)
|
||
channel_match = re.search(r"channel_id=(\d+)", onclick)
|
||
if id_match:
|
||
attach_id = id_match.group(1)
|
||
channel_id = channel_match.group(1) if channel_match else "1"
|
||
h3 = item.find("h3")
|
||
filename = h3.get_text().strip() if h3 else f"附件{attach_id}"
|
||
attachments.append({
|
||
"id": attach_id,
|
||
"channel_id": channel_id,
|
||
"filename": filename
|
||
})
|
||
break
|
||
|
||
return attachments, article_info
|
||
|
||
def mark_article_read(self, channel_id: str, article_id: str) -> bool:
|
||
"""通过saveread API标记文章已读"""
|
||
if not channel_id or not article_id:
|
||
return False
|
||
|
||
import random
|
||
saveread_url = (
|
||
f"{self.base_url}/tools/submit_ajax.ashx?action=saveread"
|
||
f"&time={random.random()}&fl={channel_id}&id={article_id}"
|
||
)
|
||
|
||
try:
|
||
resp = self._request_with_retry("post", saveread_url)
|
||
if resp.status_code == 200:
|
||
try:
|
||
data = resp.json()
|
||
return data.get("status") == 1
|
||
except:
|
||
return True
|
||
return False
|
||
except:
|
||
return False
|
||
|
||
def mark_attachment_read(self, attach_id: str, channel_id: str = "1") -> bool:
|
||
"""通过访问预览通道标记附件已读"""
|
||
download_url = f"{self.base_url}/tools/download2.ashx?site=main&id={attach_id}&channel_id={channel_id}"
|
||
|
||
try:
|
||
resp = self._request_with_retry("get", download_url, stream=True)
|
||
resp.close()
|
||
return resp.status_code == 200
|
||
except:
|
||
return False
|
||
|
||
def browse_content(
|
||
self,
|
||
browse_type: str,
|
||
should_stop_callback: Optional[Callable] = None,
|
||
progress_callback: Optional[Callable] = None,
|
||
) -> APIBrowseResult:
|
||
"""
|
||
浏览内容并标记已读
|
||
|
||
Args:
|
||
browse_type: 浏览类型 (应读/注册前未读)
|
||
should_stop_callback: 检查是否应该停止的回调函数
|
||
progress_callback: 进度回调,用于实时上报已浏览内容数量
|
||
回调参数: {"total_items": int, "browsed_items": int}
|
||
|
||
Returns:
|
||
浏览结果
|
||
"""
|
||
result = APIBrowseResult(success=False)
|
||
|
||
if not self.logged_in:
|
||
result.error_message = "未登录"
|
||
return result
|
||
|
||
# 根据浏览类型确定bz参数(网站更新后 bz=0 为应读)
|
||
bz = 0
|
||
|
||
self.log(f" 开始浏览 '{browse_type}' (bz={bz})...")
|
||
|
||
try:
|
||
total_items = 0
|
||
total_attachments = 0
|
||
|
||
# 获取第一页
|
||
articles, total_pages, _ = self.get_article_list_page(bz, 1)
|
||
|
||
if not articles:
|
||
self.log(f" '{browse_type}' 没有待处理内容")
|
||
result.success = True
|
||
return result
|
||
|
||
total_records = self.last_total_records
|
||
self.log(f" 共 {total_records} 条记录,开始处理...")
|
||
|
||
# 上报初始进度
|
||
if progress_callback:
|
||
progress_callback({"total_items": total_records, "browsed_items": 0})
|
||
|
||
processed_hrefs = set()
|
||
current_page = 1
|
||
max_iterations = total_records + 20
|
||
|
||
for iteration in range(max_iterations):
|
||
if should_stop_callback and should_stop_callback():
|
||
self.log(" 收到停止信号")
|
||
break
|
||
|
||
if not articles:
|
||
break
|
||
|
||
new_articles_in_page = 0
|
||
|
||
for article in articles:
|
||
if should_stop_callback and should_stop_callback():
|
||
break
|
||
|
||
article_href = article["href"]
|
||
if article_href in processed_hrefs:
|
||
continue
|
||
|
||
processed_hrefs.add(article_href)
|
||
new_articles_in_page += 1
|
||
title = article["title"][:30]
|
||
|
||
# 获取附件和文章信息
|
||
try:
|
||
attachments, article_info = self.get_article_attachments(article_href)
|
||
except Exception as e:
|
||
self.log(f" 获取文章失败: {title} | {str(e)}")
|
||
continue
|
||
|
||
total_items += 1
|
||
|
||
# 标记文章已读
|
||
article_marked = False
|
||
if article_info.get("channel_id") and article_info.get("article_id"):
|
||
article_marked = self.mark_article_read(
|
||
article_info["channel_id"],
|
||
article_info["article_id"]
|
||
)
|
||
|
||
# 处理附件
|
||
if attachments:
|
||
for attach in attachments:
|
||
if self.mark_attachment_read(attach["id"], attach["channel_id"]):
|
||
total_attachments += 1
|
||
self.log(f" [{total_items}] {title} - {len(attachments)}个附件")
|
||
else:
|
||
status = "已标记" if article_marked else "标记失败"
|
||
self.log(f" [{total_items}] {title} - 无附件({status})")
|
||
|
||
# 上报进度
|
||
if progress_callback:
|
||
progress_callback({"total_items": total_records, "browsed_items": total_items})
|
||
|
||
# 简单延迟,避免请求太快
|
||
time.sleep(0.05)
|
||
|
||
# 决定下一步
|
||
if new_articles_in_page > 0:
|
||
current_page = 1
|
||
else:
|
||
current_page += 1
|
||
if current_page > total_pages:
|
||
break
|
||
|
||
# 获取下一页
|
||
try:
|
||
articles, new_total_pages, _ = self.get_article_list_page(bz, current_page)
|
||
if new_total_pages > 0:
|
||
total_pages = new_total_pages
|
||
except Exception as e:
|
||
self.log(f" 获取第{current_page}页列表失败: {str(e)}")
|
||
break
|
||
|
||
self.log(f" 浏览完成: {total_items} 条内容,{total_attachments} 个附件")
|
||
result.success = True
|
||
result.total_items = total_items
|
||
result.total_attachments = total_attachments
|
||
return result
|
||
|
||
except Exception as e:
|
||
result.error_message = str(e)
|
||
self.log(f" 浏览出错: {str(e)}")
|
||
return result
|
||
|
||
def close(self):
|
||
"""关闭会话"""
|
||
if self._closed:
|
||
return
|
||
self._closed = True
|
||
try:
|
||
self.session.close()
|
||
except:
|
||
pass
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.close()
|
||
return False
|