#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API 浏览器 - 用纯 HTTP 请求实现浏览功能 比传统浏览器自动化快 30-60 倍 """ import requests from bs4 import BeautifulSoup import os import re import time import atexit import weakref from typing import Optional, Callable from dataclasses import dataclass from urllib.parse import urlsplit import threading from app_config import get_config import time as _time_module _MODULE_START_TIME = _time_module.time() _WARMUP_PERIOD_SECONDS = 60 # 启动后 60 秒内使用更长超时 _WARMUP_TIMEOUT_SECONDS = 15.0 # 预热期间的超时时间 # HTML解析缓存类 class HTMLParseCache: """HTML解析结果缓存""" def __init__(self, ttl: int = 300, maxsize: int = 1000): self.cache = {} self.ttl = ttl self.maxsize = maxsize self._access_times = {} self._lock = threading.RLock() def _make_key(self, url: str, content_hash: str) -> str: return f"{url}:{content_hash}" def get(self, key: str) -> Optional[tuple]: """获取缓存,如果存在且未过期""" with self._lock: if key in self.cache: value, timestamp = self.cache[key] if time.time() - timestamp < self.ttl: self._access_times[key] = time.time() return value else: # 过期删除 del self.cache[key] del self._access_times[key] return None def set(self, key: str, value: tuple): """设置缓存""" with self._lock: # 如果缓存已满,删除最久未访问的项 if len(self.cache) >= self.maxsize: if self._access_times: # 使用简单的LRU策略,删除最久未访问的项 oldest_key = None oldest_time = float("inf") for key, access_time in self._access_times.items(): if access_time < oldest_time: oldest_time = access_time oldest_key = key if oldest_key: del self.cache[oldest_key] del self._access_times[oldest_key] self.cache[key] = (value, time.time()) self._access_times[key] = time.time() def clear(self): """清空缓存""" with self._lock: self.cache.clear() self._access_times.clear() def get_lru_key(self) -> Optional[str]: """获取最久未访问的键""" if not self._access_times: return None return min(self._access_times.keys(), key=lambda k: self._access_times[k]) config = get_config() BASE_URL = getattr(config, "ZSGL_BASE_URL", "https://postoa.aidunsoft.com") LOGIN_URL = getattr(config, "ZSGL_LOGIN_URL", f"{BASE_URL}/admin/login.aspx") INDEX_URL_PATTERN = getattr(config, "ZSGL_INDEX_URL_PATTERN", "index.aspx") COOKIES_DIR = getattr(config, "COOKIES_DIR", "data/cookies") try: _API_REQUEST_TIMEOUT_SECONDS = float( os.environ.get("API_REQUEST_TIMEOUT_SECONDS") or os.environ.get("API_REQUEST_TIMEOUT") or "5" ) except Exception: _API_REQUEST_TIMEOUT_SECONDS = 5.0 _API_REQUEST_TIMEOUT_SECONDS = max(3.0, _API_REQUEST_TIMEOUT_SECONDS) _API_DIAGNOSTIC_LOG = str(os.environ.get("API_DIAGNOSTIC_LOG", "")).strip().lower() in ("1", "true", "yes", "on") try: _API_DIAGNOSTIC_SLOW_MS = int(os.environ.get("API_DIAGNOSTIC_SLOW_MS", "0") or "0") except Exception: _API_DIAGNOSTIC_SLOW_MS = 0 _API_DIAGNOSTIC_SLOW_MS = max(0, _API_DIAGNOSTIC_SLOW_MS) _cookie_domain_fallback = urlsplit(BASE_URL).hostname or "postoa.aidunsoft.com" _COOKIE_JAR_MAX_AGE_SECONDS = 24 * 60 * 60 def get_cookie_jar_path(username: str) -> str: """获取截图用的 cookies 文件路径(Netscape Cookie 格式)""" import hashlib os.makedirs(COOKIES_DIR, exist_ok=True) filename = hashlib.sha256(username.encode()).hexdigest()[:32] + ".cookies.txt" return os.path.join(COOKIES_DIR, filename) def is_cookie_jar_fresh(cookie_path: str, max_age_seconds: int = _COOKIE_JAR_MAX_AGE_SECONDS) -> bool: """判断 cookies 文件是否存在且未过期""" if not cookie_path or not os.path.exists(cookie_path): return False try: file_age = time.time() - os.path.getmtime(cookie_path) return file_age <= max(0, int(max_age_seconds or 0)) except Exception: return False _api_browser_instances: "weakref.WeakSet[APIBrowser]" = weakref.WeakSet() def _cleanup_api_browser_instances(): """进程退出时清理残留的API浏览器实例(弱引用,不阻止GC)""" for inst in list(_api_browser_instances): try: inst.close() except Exception: pass atexit.register(_cleanup_api_browser_instances) @dataclass class APIBrowseResult: """API 浏览结果""" success: bool total_items: int = 0 total_attachments: int = 0 error_message: str = "" class APIBrowser: """API 浏览器 - 使用纯 HTTP 请求实现浏览""" def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None): self.session = requests.Session() self.session.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } ) self.logged_in = False self.log_callback = log_callback self.stop_flag = False self._closed = False # 防止重复关闭 self.last_total_records = 0 # 初始化HTML解析缓存 self._parse_cache = HTMLParseCache(ttl=300, maxsize=500) # 5分钟缓存,最多500条记录 # 设置代理 if proxy_config and proxy_config.get("server"): proxy_server = proxy_config["server"] self.session.proxies = {"http": proxy_server, "https": proxy_server} self.proxy_server = proxy_server else: self.proxy_server = None _api_browser_instances.add(self) def _calculate_adaptive_delay(self, iteration: int, consecutive_failures: int) -> float: """ 智能延迟计算:文章处理延迟 根据迭代次数和连续失败次数动态调整延迟 """ # 基础延迟,显著降低 base_delay = 0.03 # 如果有连续失败,增加延迟但有上限 if consecutive_failures > 0: delay = base_delay * (1.5 ** min(consecutive_failures, 3)) return min(delay, 0.2) # 最多200ms # 根据处理进度调整延迟,开始时较慢,后来可以更快 progress_factor = min(iteration / 100.0, 1.0) # 100个文章后达到最大优化 optimized_delay = base_delay * (1.2 - 0.4 * progress_factor) # 从120%逐渐降低到80% return max(optimized_delay, 0.02) # 最少20ms def _calculate_page_delay(self, current_page: int, new_articles_in_page: int) -> float: """ 智能延迟计算:页面处理延迟 根据页面位置和新文章数量调整延迟 """ base_delay = 0.08 # 基础延迟,降低50% # 如果当前页有大量新文章,可以稍微增加延迟 if new_articles_in_page > 10: return base_delay * 1.2 # 如果是新页面,降低延迟(内容可能需要加载) if current_page <= 3: return base_delay * 1.1 # 后续页面可以更快 return base_delay * 0.8 def log(self, message: str): """记录日志""" if self.log_callback: self.log_callback(message) def save_cookies_for_screenshot(self, username: str): """保存 cookies 供 wkhtmltoimage 使用(Netscape Cookie 格式)""" cookies_path = get_cookie_jar_path(username) try: lines = [ "# Netscape HTTP Cookie File", "# This file was generated by zsglpt", ] for cookie in self.session.cookies: domain = cookie.domain or _cookie_domain_fallback include_subdomains = "TRUE" if domain.startswith(".") else "FALSE" path = cookie.path or "/" secure = "TRUE" if getattr(cookie, "secure", False) else "FALSE" expires = int(getattr(cookie, "expires", 0) or 0) lines.append( "\t".join( [ domain, include_subdomains, path, secure, str(expires), cookie.name, cookie.value, ] ) ) with open(cookies_path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") self.log(f"[API] Cookies已保存供截图使用") return True except Exception as e: self.log(f"[API] 保存cookies失败: {e}") return False def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs): """带重试机制的请求方法""" # 启动后 60 秒内使用更长超时(15秒),之后使用配置的超时 if (_time_module.time() - _MODULE_START_TIME) < _WARMUP_PERIOD_SECONDS: kwargs.setdefault("timeout", _WARMUP_TIMEOUT_SECONDS) else: kwargs.setdefault("timeout", _API_REQUEST_TIMEOUT_SECONDS) last_error = None timeout_value = kwargs.get("timeout") diag_enabled = _API_DIAGNOSTIC_LOG slow_ms = _API_DIAGNOSTIC_SLOW_MS for attempt in range(1, max_retries + 1): start_ts = _time_module.time() try: if method.lower() == "get": resp = self.session.get(url, **kwargs) else: resp = self.session.post(url, **kwargs) if diag_enabled: elapsed_ms = int((_time_module.time() - start_ts) * 1000) if slow_ms <= 0 or elapsed_ms >= slow_ms: self.log( f"[API][trace] {method.upper()} {url} ok status={resp.status_code} elapsed_ms={elapsed_ms} timeout={timeout_value} attempt={attempt}/{max_retries}" ) return resp except Exception as e: last_error = e if diag_enabled: elapsed_ms = int((_time_module.time() - start_ts) * 1000) self.log( f"[API][trace] {method.upper()} {url} err={type(e).__name__} elapsed_ms={elapsed_ms} timeout={timeout_value} attempt={attempt}/{max_retries}" ) if attempt < max_retries: self.log(f"[API] 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...") import time time.sleep(retry_delay) else: self.log(f"[API] 请求失败,已重试{max_retries}次: {str(e)}") raise last_error def _get_aspnet_fields(self, soup): """获取 ASP.NET 隐藏字段""" fields = {} for name in ["__VIEWSTATE", "__VIEWSTATEGENERATOR", "__EVENTVALIDATION"]: field = soup.find("input", {"name": name}) if field: fields[name] = field.get("value", "") return fields def get_real_name(self) -> Optional[str]: """ 获取用户真实姓名 从 center.aspx 页面解析姓名信息 返回: 姓名字符串,失败返回 None """ if not self.logged_in: return None try: url = f"{BASE_URL}/admin/center.aspx" resp = self._request_with_retry("get", url) soup = BeautifulSoup(resp.text, "html.parser") # 查找包含"姓名:"的元素 # 页面格式:
  • 姓名:喻勇祥(19174616018) 人力资源编码: ...

  • nlist = soup.find("div", {"class": "nlist-5"}) if nlist: first_li = nlist.find("li") if first_li: text = first_li.get_text() # 解析姓名:格式为 "姓名:XXX(手机号)" match = re.search(r"姓名[::]\s*([^\((]+)", text) if match: real_name = match.group(1).strip() if real_name: return real_name return None except Exception as e: return None def login(self, username: str, password: str) -> bool: """登录""" self.log(f"[API] 登录: {username}") try: resp = self._request_with_retry("get", LOGIN_URL) soup = BeautifulSoup(resp.text, "html.parser") fields = self._get_aspnet_fields(soup) data = fields.copy() data["txtUserName"] = username data["txtPassword"] = password data["btnSubmit"] = "登 录" resp = self._request_with_retry( "post", LOGIN_URL, data=data, headers={ "Content-Type": "application/x-www-form-urlencoded", "Origin": BASE_URL, "Referer": LOGIN_URL, }, allow_redirects=True, ) if INDEX_URL_PATTERN in resp.url: self.logged_in = True self.log(f"[API] 登录成功") return True else: soup = BeautifulSoup(resp.text, "html.parser") error = soup.find(id="lblMsg") error_msg = error.get_text().strip() if error else "未知错误" self.log(f"[API] 登录失败: {error_msg}") return False except Exception as e: self.log(f"[API] 登录异常: {str(e)}") return False def get_article_list_page(self, bz: int = 2, page: int = 1, base_url: str = None): """获取单页文章列表""" if not self.logged_in: return [], 0, None if base_url and page > 1: url = re.sub(r"page=\d+", f"page={page}", base_url) elif page > 1: # 兼容兜底:若没有 next_url(极少数情况下页面不提供“下一页”链接),尝试直接拼 page 参数 url = f"{BASE_URL}/admin/center.aspx?bz={bz}&page={page}" else: url = f"{BASE_URL}/admin/center.aspx?bz={bz}" resp = self._request_with_retry("get", url) soup = BeautifulSoup(resp.text, "html.parser") articles = [] ltable = soup.find("table", {"class": "ltable"}) if ltable: rows = ltable.find_all("tr")[1:] for row in rows: # 检查是否是"暂无记录" if "暂无记录" in row.get_text(): continue link = row.find("a", href=True) if link: href = link.get("href", "") title = link.get_text().strip() match = re.search(r"id=(\d+)", href) article_id = match.group(1) if match else None articles.append( { "title": title, "href": href, "article_id": article_id, } ) # 获取总页数 total_pages = 1 next_page_url = None total_records = 0 page_content = soup.find(id="PageContent") if page_content: text = page_content.get_text() total_match = re.search(r"共(\d+)记录", text) if total_match: total_records = int(total_match.group(1)) total_pages = (total_records + 9) // 10 next_link = page_content.find("a", string=re.compile("下一页")) if next_link: next_href = next_link.get("href", "") if next_href: next_page_url = f"{BASE_URL}/admin/{next_href}" try: self.last_total_records = int(total_records or 0) except Exception: self.last_total_records = 0 return articles, total_pages, next_page_url def get_article_attachments(self, article_href: str): """获取文章的附件列表和文章信息""" if not article_href.startswith("http"): url = f"{BASE_URL}/admin/{article_href}" else: url = article_href # 先检查缓存,避免不必要的请求 # 使用URL作为缓存键(简化版本) cache_key = f"attachments_{hash(url)}" cached_result = self._parse_cache.get(cache_key) if cached_result: return cached_result resp = self._request_with_retry("get", url) soup = BeautifulSoup(resp.text, "html.parser") attachments = [] article_info = {"channel_id": None, "article_id": None} # 从 saveread 按钮获取 channel_id 和 article_id for elem in soup.find_all(["button", "input"]): onclick = elem.get("onclick", "") match = re.search(r"saveread\((\d+),(\d+)\)", onclick) if match: article_info["channel_id"] = match.group(1) article_info["article_id"] = match.group(2) break attach_list = soup.find("div", {"class": "attach-list2"}) if attach_list: items = attach_list.find_all("li") for item in items: download_links = item.find_all("a", onclick=re.compile(r"download2?\.ashx")) for link in download_links: onclick = link.get("onclick", "") id_match = re.search(r"id=(\d+)", onclick) channel_match = re.search(r"channel_id=(\d+)", onclick) if id_match: attach_id = id_match.group(1) channel_id = channel_match.group(1) if channel_match else "1" h3 = item.find("h3") filename = h3.get_text().strip() if h3 else f"附件{attach_id}" attachments.append({"id": attach_id, "channel_id": channel_id, "filename": filename}) break result = (attachments, article_info) # 存入缓存 self._parse_cache.set(cache_key, result) return result def mark_article_read(self, channel_id: str, article_id: str) -> bool: """通过 saveread API 标记文章已读""" if not channel_id or not article_id: return False import random saveread_url = ( f"{BASE_URL}/tools/submit_ajax.ashx?action=saveread&time={random.random()}&fl={channel_id}&id={article_id}" ) try: resp = self._request_with_retry("post", saveread_url) # 检查响应是否成功 if resp.status_code == 200: try: data = resp.json() return data.get("status") == 1 except: return True # 如果不是 JSON 但状态码 200,也认为成功 return False except: return False def mark_read(self, attach_id: str, channel_id: str = "1") -> bool: """通过访问预览通道标记附件已读""" download_url = f"{BASE_URL}/tools/download2.ashx?site=main&id={attach_id}&channel_id={channel_id}" try: resp = self._request_with_retry("get", download_url, stream=True) resp.close() return resp.status_code == 200 except: return False def browse_content( self, browse_type: str, should_stop_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None, ) -> APIBrowseResult: """ 浏览内容并标记已读 Args: browse_type: 浏览类型 (应读/注册前未读) should_stop_callback: 检查是否应该停止的回调函数 progress_callback: 进度回调(可选),用于实时上报已浏览内容数量 Returns: 浏览结果 """ result = APIBrowseResult(success=False) if not self.logged_in: result.error_message = "未登录" return result # 根据浏览类型确定 bz 参数 # 网站更新后参数: 0=应读, 1=已读(注册前未读需通过页面交互切换) # 当前前端选项: 注册前未读、应读(默认应读) browse_type_text = str(browse_type or "") if "注册前" in browse_type_text: bz = 0 # 注册前未读(暂与应读相同,网站通过页面状态区分) else: bz = 0 # 应读 self.log(f"[API] 开始浏览 '{browse_type}' (bz={bz})...") try: total_items = 0 total_attachments = 0 skipped_items = 0 consecutive_failures = 0 max_consecutive_failures = 3 # 获取第一页,了解总记录数 try: articles, total_pages, _ = self.get_article_list_page(bz, 1) consecutive_failures = 0 except Exception as e: result.error_message = str(e) self.log(f"[API] 获取第1页列表失败: {str(e)}") return result if not articles: self.log(f"[API] '{browse_type}' 没有待处理内容") result.success = True return result total_records = int(getattr(self, "last_total_records", 0) or 0) self.log(f"[API] 共 {total_records} 条记录,开始处理...") last_report_ts = 0.0 def report_progress(force: bool = False): nonlocal last_report_ts if not progress_callback: return now_ts = time.time() if not force and now_ts - last_report_ts < 1.0: return last_report_ts = now_ts try: progress_callback({"total_items": total_records, "browsed_items": total_items}) except Exception: pass report_progress(force=True) # 循环处理:遍历所有页面,跟踪已处理文章防止重复 max_iterations = total_records + 20 # 防止无限循环 iteration = 0 processed_hrefs = set() # 跟踪已处理的文章,防止重复处理 current_page = 1 while articles and iteration < max_iterations: iteration += 1 if should_stop_callback and should_stop_callback(): self.log("[API] 收到停止信号") break new_articles_in_page = 0 # 本次迭代中新处理的文章数 for article in articles: if should_stop_callback and should_stop_callback(): break article_href = article["href"] # 跳过已处理的文章 if article_href in processed_hrefs: continue processed_hrefs.add(article_href) new_articles_in_page += 1 title = article["title"][:30] # 获取附件和文章信息(文章详情页) try: attachments, article_info = self.get_article_attachments(article_href) consecutive_failures = 0 except Exception as e: skipped_items += 1 consecutive_failures += 1 self.log( f"[API] 获取文章失败,跳过(连续失败{consecutive_failures}/{max_consecutive_failures}): {title} | {str(e)}" ) if consecutive_failures >= max_consecutive_failures: raise continue total_items += 1 report_progress() # 标记文章已读(调用 saveread API) article_marked = False if article_info.get("channel_id") and article_info.get("article_id"): article_marked = self.mark_article_read(article_info["channel_id"], article_info["article_id"]) # 处理附件(如果有) if attachments: for attach in attachments: if self.mark_read(attach["id"], attach["channel_id"]): total_attachments += 1 self.log(f"[API] [{total_items}] {title} - {len(attachments)}个附件") else: # 没有附件的文章,只记录标记状态 status = "已标记" if article_marked else "标记失败" self.log(f"[API] [{total_items}] {title} - 无附件({status})") # 智能延迟策略:根据连续失败次数和文章数量动态调整 time.sleep(self._calculate_adaptive_delay(total_items, consecutive_failures)) time.sleep(self._calculate_page_delay(current_page, new_articles_in_page)) # 决定下一步获取哪一页 if new_articles_in_page > 0: # 有新文章被处理,重新获取第1页(因为已读文章会从列表消失,页面会上移) current_page = 1 else: # 当前页没有新文章,尝试下一页 current_page += 1 if current_page > total_pages: self.log(f"[API] 已遍历所有 {total_pages} 页,结束循环") break try: articles, new_total_pages, _ = self.get_article_list_page(bz, current_page) if new_total_pages > 0: total_pages = new_total_pages except Exception as e: self.log(f"[API] 获取第{current_page}页列表失败: {str(e)}") break report_progress(force=True) if skipped_items: self.log( f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件(跳过 {skipped_items} 条内容)" ) else: self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件") result.success = True result.total_items = total_items result.total_attachments = total_attachments return result except Exception as e: result.error_message = str(e) self.log(f"[API] 浏览出错: {str(e)}") return result def close(self): """关闭会话""" if self._closed: return self._closed = True try: self.session.close() except: pass finally: try: _api_browser_instances.discard(self) except Exception: pass def __enter__(self): """Context manager支持 - 进入""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager支持 - 退出""" self.close() return False # 不抑制异常 def warmup_api_connection(proxy_config: Optional[dict] = None, log_callback: Optional[Callable] = None): """预热 API 连接 - 建立 TCP/TLS 连接池""" def log(msg: str): if log_callback: log_callback(msg) else: print(f"[API预热] {msg}") log("正在预热 API 连接...") try: session = requests.Session() if proxy_config and proxy_config.get("server"): session.proxies = {"http": proxy_config["server"], "https": proxy_config["server"]} # 发送一个轻量级请求建立连接 resp = session.get(f"{BASE_URL}/admin/login.aspx", timeout=10, allow_redirects=False) log(f"[OK] API 连接预热完成 (status={resp.status_code})") session.close() return True except Exception as e: log(f"API 连接预热失败: {e}") return False