#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API 浏览器 - 用纯 HTTP 请求实现浏览功能 比传统浏览器自动化快 30-60 倍 """ import requests from bs4 import BeautifulSoup import os import re import time import atexit import weakref from typing import Optional, Callable from dataclasses import dataclass from urllib.parse import urlsplit from app_config import get_config import time as _time_module _MODULE_START_TIME = _time_module.time() _WARMUP_PERIOD_SECONDS = 60 # 启动后 60 秒内使用更长超时 _WARMUP_TIMEOUT_SECONDS = 15.0 # 预热期间的超时时间 config = get_config() BASE_URL = getattr(config, "ZSGL_BASE_URL", "https://postoa.aidunsoft.com") LOGIN_URL = getattr(config, "ZSGL_LOGIN_URL", f"{BASE_URL}/admin/login.aspx") INDEX_URL_PATTERN = getattr(config, "ZSGL_INDEX_URL_PATTERN", "index.aspx") COOKIES_DIR = getattr(config, "COOKIES_DIR", "data/cookies") try: _API_REQUEST_TIMEOUT_SECONDS = float(os.environ.get("API_REQUEST_TIMEOUT_SECONDS") or os.environ.get("API_REQUEST_TIMEOUT") or "5") except Exception: _API_REQUEST_TIMEOUT_SECONDS = 5.0 _API_REQUEST_TIMEOUT_SECONDS = max(3.0, _API_REQUEST_TIMEOUT_SECONDS) _API_DIAGNOSTIC_LOG = str(os.environ.get("API_DIAGNOSTIC_LOG", "")).strip().lower() in ("1", "true", "yes", "on") try: _API_DIAGNOSTIC_SLOW_MS = int(os.environ.get("API_DIAGNOSTIC_SLOW_MS", "0") or "0") except Exception: _API_DIAGNOSTIC_SLOW_MS = 0 _API_DIAGNOSTIC_SLOW_MS = max(0, _API_DIAGNOSTIC_SLOW_MS) _cookie_domain_fallback = urlsplit(BASE_URL).hostname or "postoa.aidunsoft.com" _COOKIE_JAR_MAX_AGE_SECONDS = 24 * 60 * 60 def get_cookie_jar_path(username: str) -> str: """获取截图用的 cookies 文件路径(Netscape Cookie 格式)""" import hashlib os.makedirs(COOKIES_DIR, exist_ok=True) filename = hashlib.sha256(username.encode()).hexdigest()[:32] + ".cookies.txt" return os.path.join(COOKIES_DIR, filename) def is_cookie_jar_fresh(cookie_path: str, max_age_seconds: int = _COOKIE_JAR_MAX_AGE_SECONDS) -> bool: """判断 cookies 文件是否存在且未过期""" if not cookie_path or not os.path.exists(cookie_path): return False try: file_age = time.time() - os.path.getmtime(cookie_path) return file_age <= max(0, int(max_age_seconds or 0)) except Exception: return False _api_browser_instances: "weakref.WeakSet[APIBrowser]" = weakref.WeakSet() def _cleanup_api_browser_instances(): """进程退出时清理残留的API浏览器实例(弱引用,不阻止GC)""" for inst in list(_api_browser_instances): try: inst.close() except Exception: pass atexit.register(_cleanup_api_browser_instances) @dataclass class APIBrowseResult: """API 浏览结果""" success: bool total_items: int = 0 total_attachments: int = 0 error_message: str = "" class APIBrowser: """API 浏览器 - 使用纯 HTTP 请求实现浏览""" def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', }) self.logged_in = False self.log_callback = log_callback self.stop_flag = False self._closed = False # 防止重复关闭 self.last_total_records = 0 # 设置代理 if proxy_config and proxy_config.get("server"): proxy_server = proxy_config["server"] self.session.proxies = { "http": proxy_server, "https": proxy_server } self.proxy_server = proxy_server else: self.proxy_server = None _api_browser_instances.add(self) def log(self, message: str): """记录日志""" if self.log_callback: self.log_callback(message) def save_cookies_for_screenshot(self, username: str): """保存 cookies 供 wkhtmltoimage 使用(Netscape Cookie 格式)""" cookies_path = get_cookie_jar_path(username) try: lines = [ "# Netscape HTTP Cookie File", "# This file was generated by zsglpt", ] for cookie in self.session.cookies: domain = cookie.domain or _cookie_domain_fallback include_subdomains = "TRUE" if domain.startswith(".") else "FALSE" path = cookie.path or "/" secure = "TRUE" if getattr(cookie, "secure", False) else "FALSE" expires = int(getattr(cookie, "expires", 0) or 0) lines.append( "\t".join( [ domain, include_subdomains, path, secure, str(expires), cookie.name, cookie.value, ] ) ) with open(cookies_path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") self.log(f"[API] Cookies已保存供截图使用") return True except Exception as e: self.log(f"[API] 保存cookies失败: {e}") return False def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs): """带重试机制的请求方法""" # 启动后 60 秒内使用更长超时(15秒),之后使用配置的超时 if (_time_module.time() - _MODULE_START_TIME) < _WARMUP_PERIOD_SECONDS: kwargs.setdefault('timeout', _WARMUP_TIMEOUT_SECONDS) else: kwargs.setdefault('timeout', _API_REQUEST_TIMEOUT_SECONDS) last_error = None timeout_value = kwargs.get("timeout") diag_enabled = _API_DIAGNOSTIC_LOG slow_ms = _API_DIAGNOSTIC_SLOW_MS for attempt in range(1, max_retries + 1): start_ts = _time_module.time() try: if method.lower() == 'get': resp = self.session.get(url, **kwargs) else: resp = self.session.post(url, **kwargs) if diag_enabled: elapsed_ms = int((_time_module.time() - start_ts) * 1000) if slow_ms <= 0 or elapsed_ms >= slow_ms: self.log( f"[API][trace] {method.upper()} {url} ok status={resp.status_code} elapsed_ms={elapsed_ms} timeout={timeout_value} attempt={attempt}/{max_retries}" ) return resp except Exception as e: last_error = e if diag_enabled: elapsed_ms = int((_time_module.time() - start_ts) * 1000) self.log( f"[API][trace] {method.upper()} {url} err={type(e).__name__} elapsed_ms={elapsed_ms} timeout={timeout_value} attempt={attempt}/{max_retries}" ) if attempt < max_retries: self.log(f"[API] 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...") import time time.sleep(retry_delay) else: self.log(f"[API] 请求失败,已重试{max_retries}次: {str(e)}") raise last_error def _get_aspnet_fields(self, soup): """获取 ASP.NET 隐藏字段""" fields = {} for name in ['__VIEWSTATE', '__VIEWSTATEGENERATOR', '__EVENTVALIDATION']: field = soup.find('input', {'name': name}) if field: fields[name] = field.get('value', '') return fields def get_real_name(self) -> Optional[str]: """ 获取用户真实姓名 从 center.aspx 页面解析姓名信息 返回: 姓名字符串,失败返回 None """ if not self.logged_in: return None try: url = f"{BASE_URL}/admin/center.aspx" resp = self._request_with_retry('get', url) soup = BeautifulSoup(resp.text, 'html.parser') # 查找包含"姓名:"的元素 # 页面格式:
  • 姓名:喻勇祥(19174616018) 人力资源编码: ...

  • nlist = soup.find('div', {'class': 'nlist-5'}) if nlist: first_li = nlist.find('li') if first_li: text = first_li.get_text() # 解析姓名:格式为 "姓名:XXX(手机号)" match = re.search(r'姓名[::]\s*([^\((]+)', text) if match: real_name = match.group(1).strip() if real_name: return real_name return None except Exception as e: return None def login(self, username: str, password: str) -> bool: """登录""" self.log(f"[API] 登录: {username}") try: resp = self._request_with_retry('get', LOGIN_URL) soup = BeautifulSoup(resp.text, 'html.parser') fields = self._get_aspnet_fields(soup) data = fields.copy() data['txtUserName'] = username data['txtPassword'] = password data['btnSubmit'] = '登 录' resp = self._request_with_retry( 'post', LOGIN_URL, data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': BASE_URL, 'Referer': LOGIN_URL, }, allow_redirects=True ) if INDEX_URL_PATTERN in resp.url: self.logged_in = True self.log(f"[API] 登录成功") return True else: soup = BeautifulSoup(resp.text, 'html.parser') error = soup.find(id='lblMsg') error_msg = error.get_text().strip() if error else '未知错误' self.log(f"[API] 登录失败: {error_msg}") return False except Exception as e: self.log(f"[API] 登录异常: {str(e)}") return False def get_article_list_page(self, bz: int = 2, page: int = 1, base_url: str = None): """获取单页文章列表""" if not self.logged_in: return [], 0, None if base_url and page > 1: url = re.sub(r'page=\d+', f'page={page}', base_url) elif page > 1: # 兼容兜底:若没有 next_url(极少数情况下页面不提供“下一页”链接),尝试直接拼 page 参数 url = f"{BASE_URL}/admin/center.aspx?bz={bz}&page={page}" else: url = f"{BASE_URL}/admin/center.aspx?bz={bz}" resp = self._request_with_retry('get', url) soup = BeautifulSoup(resp.text, 'html.parser') articles = [] ltable = soup.find('table', {'class': 'ltable'}) if ltable: rows = ltable.find_all('tr')[1:] for row in rows: # 检查是否是"暂无记录" if '暂无记录' in row.get_text(): continue link = row.find('a', href=True) if link: href = link.get('href', '') title = link.get_text().strip() match = re.search(r'id=(\d+)', href) article_id = match.group(1) if match else None articles.append({ 'title': title, 'href': href, 'article_id': article_id, }) # 获取总页数 total_pages = 1 next_page_url = None total_records = 0 page_content = soup.find(id='PageContent') if page_content: text = page_content.get_text() total_match = re.search(r'共(\d+)记录', text) if total_match: total_records = int(total_match.group(1)) total_pages = (total_records + 9) // 10 next_link = page_content.find('a', string=re.compile('下一页')) if next_link: next_href = next_link.get('href', '') if next_href: next_page_url = f"{BASE_URL}/admin/{next_href}" try: self.last_total_records = int(total_records or 0) except Exception: self.last_total_records = 0 return articles, total_pages, next_page_url def get_article_attachments(self, article_href: str): """获取文章的附件列表""" if not article_href.startswith('http'): url = f"{BASE_URL}/admin/{article_href}" else: url = article_href resp = self._request_with_retry('get', url) soup = BeautifulSoup(resp.text, 'html.parser') attachments = [] attach_list = soup.find('div', {'class': 'attach-list2'}) if attach_list: items = attach_list.find_all('li') for item in items: download_links = item.find_all('a', onclick=re.compile(r'download2?\.ashx')) for link in download_links: onclick = link.get('onclick', '') id_match = re.search(r'id=(\d+)', onclick) channel_match = re.search(r'channel_id=(\d+)', onclick) if id_match: attach_id = id_match.group(1) channel_id = channel_match.group(1) if channel_match else '1' h3 = item.find('h3') filename = h3.get_text().strip() if h3 else f'附件{attach_id}' attachments.append({ 'id': attach_id, 'channel_id': channel_id, 'filename': filename }) break return attachments def mark_read(self, attach_id: str, channel_id: str = '1') -> bool: """通过访问预览通道标记已读""" download_url = f"{BASE_URL}/tools/download2.ashx?site=main&id={attach_id}&channel_id={channel_id}" try: resp = self._request_with_retry("get", download_url, stream=True) resp.close() return resp.status_code == 200 except: return False def browse_content( self, browse_type: str, should_stop_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None, ) -> APIBrowseResult: """ 浏览内容并标记已读 Args: browse_type: 浏览类型 (应读/注册前未读) should_stop_callback: 检查是否应该停止的回调函数 progress_callback: 进度回调(可选),用于实时上报已浏览内容数量 Returns: 浏览结果 """ result = APIBrowseResult(success=False) if not self.logged_in: result.error_message = "未登录" return result # 根据浏览类型确定 bz 参数 # 网站更新后参数: 0=应读, 1=已读(注册前未读需通过页面交互切换) # 当前前端选项: 注册前未读、应读(默认应读) browse_type_text = str(browse_type or "") if '注册前' in browse_type_text: bz = 0 # 注册前未读(暂与应读相同,网站通过页面状态区分) else: bz = 0 # 应读 self.log(f"[API] 开始浏览 '{browse_type}' (bz={bz})...") try: total_items = 0 total_attachments = 0 page = 1 base_url = None skipped_items = 0 consecutive_failures = 0 max_consecutive_failures = 3 # 获取第一页 try: articles, total_pages, next_url = self.get_article_list_page(bz, page) consecutive_failures = 0 except Exception as e: result.error_message = str(e) self.log(f"[API] 获取第1页列表失败: {str(e)}") return result if not articles: self.log(f"[API] '{browse_type}' 没有待处理内容") result.success = True return result self.log(f"[API] 共 {total_pages} 页,开始处理...") if next_url: base_url = next_url elif total_pages > 1: base_url = f"{BASE_URL}/admin/center.aspx?bz={bz}&page=2" total_records = int(getattr(self, "last_total_records", 0) or 0) last_report_ts = 0.0 def report_progress(force: bool = False): nonlocal last_report_ts if not progress_callback: return now_ts = time.time() if not force and now_ts - last_report_ts < 1.0: return last_report_ts = now_ts try: progress_callback({"total_items": total_records, "browsed_items": total_items}) except Exception: pass report_progress(force=True) # 处理所有页面 while page <= total_pages: if should_stop_callback and should_stop_callback(): self.log("[API] 收到停止信号") break # page==1 已取过,后续页在这里获取 if page > 1: try: articles, _, next_url = self.get_article_list_page(bz, page, base_url) consecutive_failures = 0 if next_url: base_url = next_url except Exception as e: self.log(f"[API] 获取第{page}页列表失败,终止本次浏览: {str(e)}") raise for article in articles: if should_stop_callback and should_stop_callback(): break title = article['title'][:30] # 获取附件(文章详情页) try: attachments = self.get_article_attachments(article['href']) consecutive_failures = 0 except Exception as e: skipped_items += 1 consecutive_failures += 1 self.log( f"[API] 获取文章失败,跳过(连续失败{consecutive_failures}/{max_consecutive_failures}): {title} | {str(e)}" ) if consecutive_failures >= max_consecutive_failures: raise continue total_items += 1 report_progress() if attachments: for attach in attachments: if self.mark_read(attach['id'], attach['channel_id']): total_attachments += 1 self.log(f"[API] [{total_items}] {title} - {len(attachments)}个附件") time.sleep(0.1) page += 1 time.sleep(0.2) report_progress(force=True) if skipped_items: self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件(跳过 {skipped_items} 条内容)") else: self.log(f"[API] 浏览完成: {total_items} 条内容,{total_attachments} 个附件") result.success = True result.total_items = total_items result.total_attachments = total_attachments return result except Exception as e: result.error_message = str(e) self.log(f"[API] 浏览出错: {str(e)}") return result def close(self): """关闭会话""" if self._closed: return self._closed = True try: self.session.close() except: pass finally: try: _api_browser_instances.discard(self) except Exception: pass def __enter__(self): """Context manager支持 - 进入""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager支持 - 退出""" self.close() return False # 不抑制异常 def warmup_api_connection(proxy_config: Optional[dict] = None, log_callback: Optional[Callable] = None): """预热 API 连接 - 建立 TCP/TLS 连接池""" def log(msg: str): if log_callback: log_callback(msg) else: print(f"[API预热] {msg}") log("正在预热 API 连接...") try: session = requests.Session() if proxy_config and proxy_config.get("server"): session.proxies = {"http": proxy_config["server"], "https": proxy_config["server"]} # 发送一个轻量级请求建立连接 resp = session.get(f"{BASE_URL}/admin/login.aspx", timeout=10, allow_redirects=False) log(f"✓ API 连接预热完成 (status={resp.status_code})") session.close() return True except Exception as e: log(f"API 连接预热失败: {e}") return False