#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ API 浏览器 - 用纯 HTTP 请求实现浏览功能 比 Playwright 快 30-60 倍 """ import requests from bs4 import BeautifulSoup import re import time import atexit import weakref from typing import Optional, Callable from dataclasses import dataclass from urllib.parse import urlsplit from app_config import get_config config = get_config() BASE_URL = getattr(config, "ZSGL_BASE_URL", "https://postoa.aidunsoft.com") LOGIN_URL = getattr(config, "ZSGL_LOGIN_URL", f"{BASE_URL}/admin/login.aspx") INDEX_URL_PATTERN = getattr(config, "ZSGL_INDEX_URL_PATTERN", "index.aspx") COOKIES_DIR = getattr(config, "COOKIES_DIR", "data/cookies") _cookie_domain_fallback = urlsplit(BASE_URL).hostname or "postoa.aidunsoft.com" _api_browser_instances: "weakref.WeakSet[APIBrowser]" = weakref.WeakSet() def _cleanup_api_browser_instances(): """进程退出时清理残留的API浏览器实例(弱引用,不阻止GC)""" for inst in list(_api_browser_instances): try: inst.close() except Exception: pass atexit.register(_cleanup_api_browser_instances) @dataclass class APIBrowseResult: """API 浏览结果""" success: bool total_items: int = 0 total_attachments: int = 0 error_message: str = "" class APIBrowser: """API 浏览器 - 使用纯 HTTP 请求实现浏览""" def __init__(self, log_callback: Optional[Callable] = None, proxy_config: Optional[dict] = None): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', }) self.logged_in = False self.log_callback = log_callback self.stop_flag = False self._closed = False # 防止重复关闭 # 设置代理 if proxy_config and proxy_config.get("server"): proxy_server = proxy_config["server"] self.session.proxies = { "http": proxy_server, "https": proxy_server } self.proxy_server = proxy_server else: self.proxy_server = None _api_browser_instances.add(self) def log(self, message: str): """记录日志""" if self.log_callback: self.log_callback(message) def save_cookies_for_playwright(self, username: str): """保存cookies供Playwright使用""" import os import json import hashlib os.makedirs(COOKIES_DIR, exist_ok=True) # 安全修复:使用SHA256代替MD5作为文件名哈希 filename = hashlib.sha256(username.encode()).hexdigest()[:32] + '.json' cookies_path = os.path.join(COOKIES_DIR, filename) try: # 获取requests session的cookies cookies_list = [] for cookie in self.session.cookies: cookies_list.append({ 'name': cookie.name, 'value': cookie.value, 'domain': cookie.domain or _cookie_domain_fallback, 'path': cookie.path or '/', }) # Playwright storage_state 格式 storage_state = { 'cookies': cookies_list, 'origins': [] } with open(cookies_path, 'w', encoding='utf-8') as f: json.dump(storage_state, f) self.log(f"[API] Cookies已保存供截图使用") return True except Exception as e: self.log(f"[API] 保存cookies失败: {e}") return False def _request_with_retry(self, method, url, max_retries=3, retry_delay=1, **kwargs): """带重试机制的请求方法""" kwargs.setdefault('timeout', 10) last_error = None for attempt in range(1, max_retries + 1): try: if method.lower() == 'get': resp = self.session.get(url, **kwargs) else: resp = self.session.post(url, **kwargs) return resp except Exception as e: last_error = e if attempt < max_retries: self.log(f"[API] 请求超时,{retry_delay}秒后重试 ({attempt}/{max_retries})...") import time time.sleep(retry_delay) else: self.log(f"[API] 请求失败,已重试{max_retries}次: {str(e)}") raise last_error def _get_aspnet_fields(self, soup): """获取 ASP.NET 隐藏字段""" fields = {} for name in ['__VIEWSTATE', '__VIEWSTATEGENERATOR', '__EVENTVALIDATION']: field = soup.find('input', {'name': name}) if field: fields[name] = field.get('value', '') return fields def get_real_name(self) -> Optional[str]: """ 获取用户真实姓名 从 center.aspx 页面解析姓名信息 返回: 姓名字符串,失败返回 None """ if not self.logged_in: return None try: url = f"{BASE_URL}/admin/center.aspx" resp = self._request_with_retry('get', url) soup = BeautifulSoup(resp.text, 'html.parser') # 查找包含"姓名:"的元素 # 页面格式:
  • 姓名:喻勇祥(19174616018) 人力资源编码: ...

  • nlist = soup.find('div', {'class': 'nlist-5'}) if nlist: first_li = nlist.find('li') if first_li: text = first_li.get_text() # 解析姓名:格式为 "姓名:XXX(手机号)" match = re.search(r'姓名[::]\s*([^\((]+)', text) if match: real_name = match.group(1).strip() if real_name: return real_name return None except Exception as e: return None def login(self, username: str, password: str) -> bool: """登录""" self.log(f"[API] 登录: {username}") try: resp = self._request_with_retry('get', LOGIN_URL) soup = BeautifulSoup(resp.text, 'html.parser') fields = self._get_aspnet_fields(soup) data = fields.copy() data['txtUserName'] = username data['txtPassword'] = password data['btnSubmit'] = '登 录' resp = self._request_with_retry( 'post', LOGIN_URL, data=data, headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': BASE_URL, 'Referer': LOGIN_URL, }, allow_redirects=True ) if INDEX_URL_PATTERN in resp.url: self.logged_in = True self.log(f"[API] 登录成功") return True else: soup = BeautifulSoup(resp.text, 'html.parser') error = soup.find(id='lblMsg') error_msg = error.get_text().strip() if error else '未知错误' self.log(f"[API] 登录失败: {error_msg}") return False except Exception as e: self.log(f"[API] 登录异常: {str(e)}") return False def get_article_list_page(self, bz: int = 2, page: int = 1, base_url: str = None): """获取单页文章列表""" if not self.logged_in: return [], 0, None, 0 try: if base_url and page > 1: url = re.sub(r'page=\d+', f'page={page}', base_url) else: url = f"{BASE_URL}/admin/center.aspx?bz={bz}" resp = self._request_with_retry('get', url) soup = BeautifulSoup(resp.text, 'html.parser') articles = [] ltable = soup.find('table', {'class': 'ltable'}) if ltable: rows = ltable.find_all('tr')[1:] for row in rows: # 检查是否是"暂无记录" if '暂无记录' in row.get_text(): continue link = row.find('a', href=True) if link: href = link.get('href', '') title = link.get_text().strip() match = re.search(r'id=(\d+)', href) article_id = match.group(1) if match else None articles.append({ 'title': title, 'href': href, 'article_id': article_id, }) # 获取总页数 / 总记录数 total_pages = 1 next_page_url = None total_records = 0 page_content = soup.find(id='PageContent') if page_content: text = page_content.get_text() total_match = re.search(r'共(\d+)记录', text) if total_match: total_records = int(total_match.group(1)) total_pages = (total_records + 9) // 10 next_link = page_content.find('a', string=re.compile('下一页')) if next_link: next_href = next_link.get('href', '') if next_href: next_page_url = f"{BASE_URL}/admin/{next_href}" return articles, total_pages, next_page_url, total_records except Exception as e: self.log(f"[API] 获取列表失败: {str(e)}") return [], 0, None, 0 def get_article_attachments(self, article_href: str): """获取文章的附件列表""" try: if not article_href.startswith('http'): url = f"{BASE_URL}/admin/{article_href}" else: url = article_href resp = self._request_with_retry('get', url) soup = BeautifulSoup(resp.text, 'html.parser') attachments = [] attach_list = soup.find('div', {'class': 'attach-list2'}) if attach_list: items = attach_list.find_all('li') for item in items: download_links = item.find_all('a', onclick=re.compile(r'download\.ashx')) for link in download_links: onclick = link.get('onclick', '') id_match = re.search(r'id=(\d+)', onclick) channel_match = re.search(r'channel_id=(\d+)', onclick) if id_match: attach_id = id_match.group(1) channel_id = channel_match.group(1) if channel_match else '1' h3 = item.find('h3') filename = h3.get_text().strip() if h3 else f'附件{attach_id}' attachments.append({ 'id': attach_id, 'channel_id': channel_id, 'filename': filename }) break return attachments except Exception as e: return [] def mark_read(self, attach_id: str, channel_id: str = '1') -> bool: """通过访问下载链接标记已读""" download_url = f"{BASE_URL}/tools/download.ashx?site=main&id={attach_id}&channel_id={channel_id}" try: resp = self._request_with_retry("get", download_url, stream=True) resp.close() return resp.status_code == 200 except: return False def browse_content( self, browse_type: str, should_stop_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None, ) -> APIBrowseResult: """ 浏览内容并标记已读 Args: browse_type: 浏览类型 (应读/注册前未读) should_stop_callback: 检查是否应该停止的回调函数 progress_callback: 进度回调(可选),用于实时上报已处理/总数 Returns: 浏览结果 """ result = APIBrowseResult(success=False) if not self.logged_in: result.error_message = "未登录" return result # 根据浏览类型确定 bz 参数 # 网页实际参数: 0=注册前未读, 2=应读(历史上曾存在 1=已读,但当前逻辑不再使用) # 当前前端选项: 注册前未读、应读(默认应读) browse_type_text = str(browse_type or "") if '注册前' in browse_type_text: bz = 0 # 注册前未读 else: bz = 2 # 应读 self.log(f"[API] 开始浏览 '{browse_type}' (bz={bz})...") try: browsed_items = 0 viewed_attachments = 0 discovered_attachments = 0 page = 1 base_url = None # 获取第一页 articles, total_pages, next_url, total_records = self.get_article_list_page(bz, page) if not articles: self.log(f"[API] '{browse_type}' 没有待处理内容") result.success = True return result self.log(f"[API] 共 {total_pages} 页,开始处理...") if next_url: base_url = next_url last_report_ts = 0.0 def report_progress(force: bool = False): nonlocal last_report_ts if not progress_callback: return now_ts = time.time() if not force and now_ts - last_report_ts < 1.0: return last_report_ts = now_ts try: progress_callback( { "total_items": int(total_records or 0), "browsed_items": int(browsed_items or 0), "total_attachments": int(discovered_attachments or 0), "viewed_attachments": int(viewed_attachments or 0), "page": int(page or 0), "total_pages": int(total_pages or 0), } ) except Exception: pass report_progress(force=True) # 处理所有页面 while True: if should_stop_callback and should_stop_callback(): self.log("[API] 收到停止信号") break for article in articles: if should_stop_callback and should_stop_callback(): break title = article['title'][:30] browsed_items += 1 # 获取附件 attachments = self.get_article_attachments(article['href']) if attachments: discovered_attachments += len(attachments) for attach in attachments: if self.mark_read(attach['id'], attach['channel_id']): viewed_attachments += 1 self.log(f"[API] [{browsed_items}] {title} - {len(attachments)}个附件") report_progress() time.sleep(0.1) # 下一页 page += 1 if page > total_pages: break articles, _, next_url, _ = self.get_article_list_page(bz, page, base_url) if not articles: break if next_url: base_url = next_url report_progress(force=True) time.sleep(0.2) report_progress(force=True) self.log(f"[API] 浏览完成: {browsed_items} 条内容,{viewed_attachments} 个附件") result.success = True result.total_items = browsed_items result.total_attachments = viewed_attachments return result except Exception as e: result.error_message = str(e) self.log(f"[API] 浏览出错: {str(e)}") return result def close(self): """关闭会话""" if self._closed: return self._closed = True try: self.session.close() except: pass finally: try: _api_browser_instances.discard(self) except Exception: pass def __enter__(self): """Context manager支持 - 进入""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager支持 - 退出""" self.close() return False # 不抑制异常