#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 金山文档上传模块 - 精简版 使用Playwright自动化上传截图到金山文档表格 移除了队列、并发控制,改为单任务顺序执行 """ import base64 import os import re import time from io import BytesIO from typing import Any, Dict, Optional, Callable from urllib.parse import urlparse try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError except ImportError: sync_playwright = None class PlaywrightTimeoutError(Exception): pass class KDocsUploader: """金山文档上传器""" def __init__(self, log_callback: Optional[Callable] = None): self._playwright = None self._browser = None self._context = None self._page = None self._doc_url: Optional[str] = None self._last_error: Optional[str] = None self._logged_in = False self._log_callback = log_callback def log(self, msg: str): """记录日志""" if self._log_callback: self._log_callback(msg) def _find_visible_element(self, text: str, use_role: bool = False, role: str = "button"): """找到包含指定文本的可见元素 Args: text: 要查找的文本 use_role: 是否使用role查找(默认False,使用文本查找) role: 使用role查找时的角色类型 Returns: 可见的元素,或None """ if not self._page: return None try: if use_role: els = self._page.get_by_role(role, name=text) else: els = self._page.locator(f"text={text}") count = els.count() for i in range(count): el = els.nth(i) try: if el.is_visible(timeout=500): box = el.bounding_box() if box and box.get('width', 0) > 0 and box.get('height', 0) > 0: return el except Exception: pass except Exception: pass return None def _ensure_playwright(self, use_storage_state: bool = True) -> bool: """确保Playwright已启动""" if sync_playwright is None: self._last_error = "playwright 未安装" return False try: from config import KDOCS_LOGIN_STATE_FILE if self._playwright is None: self._playwright = sync_playwright().start() if self._browser is None: # 默认无头模式,设置环境变量 KDOCS_HEADLESS=false 可切换为有头模式调试 headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false" # 使用系统安装的Chrome浏览器(支持微信快捷登录) # channel='chrome' 会使用系统Chrome,而不是Playwright自带的Chromium chrome_args = [ "--disable-blink-features=AutomationControlled", # 隐藏自动化特征 "--disable-features=DialMediaRouteProvider", # 禁用本地网络发现提示 "--allow-running-insecure-content", ] try: self._browser = self._playwright.chromium.launch( headless=headless, channel='chrome', # 使用系统Chrome args=chrome_args ) except Exception as e: # 如果系统没有Chrome,回退到Chromium self._browser = self._playwright.chromium.launch(headless=headless, args=chrome_args) if self._context is None: storage_state = str(KDOCS_LOGIN_STATE_FILE) # 创建context时的通用配置 context_options = { "permissions": ["clipboard-read", "clipboard-write"], # 剪贴板权限 "ignore_https_errors": True, } if use_storage_state and os.path.exists(storage_state): context_options["storage_state"] = storage_state self._context = self._browser.new_context(**context_options) # 授予本地网络访问权限(用于微信快捷登录检测) try: self._context.grant_permissions( ["clipboard-read", "clipboard-write"], origin="https://account.wps.cn" ) except Exception: pass if self._page is None or self._page.is_closed(): self._page = self._context.new_page() self._page.set_default_timeout(60000) return True except Exception as e: self._last_error = f"浏览器启动失败: {e}" self._cleanup_browser() return False def _cleanup_browser(self): """清理浏览器资源""" for attr in ['_page', '_context', '_browser', '_playwright']: obj = getattr(self, attr, None) if obj: try: if hasattr(obj, 'close'): obj.close() elif hasattr(obj, 'stop'): obj.stop() except Exception: pass setattr(self, attr, None) def _open_document(self, doc_url: str) -> bool: """打开金山文档""" try: self._doc_url = doc_url self._ensure_clipboard_permissions(doc_url) self._page.goto(doc_url, wait_until="domcontentloaded", timeout=30000) time.sleep(3) # 等待页面完全加载,包括登录按钮 return True except Exception as e: self._last_error = f"打开文档失败: {e}" return False def _ensure_clipboard_permissions(self, doc_url: str): """授予剪贴板权限""" if not self._context or not doc_url: return try: parsed = urlparse(doc_url) if not parsed.scheme or not parsed.netloc: return origin = f"{parsed.scheme}://{parsed.netloc}" self._context.grant_permissions(["clipboard-read", "clipboard-write"], origin=origin) except Exception: pass def _is_login_url(self, url: str) -> bool: """检查是否是登录页面""" if not url: return False lower = url.lower() if "account.wps.cn" in lower or "passport" in lower: return True if "login" in lower and "kdocs.cn" not in lower: return True return False def _page_has_login_gate(self, page) -> bool: """检查页面是否需要登录""" url = getattr(page, "url", "") or "" # 如果URL已经是文档页面,说明已登录成功 if "kdocs.cn/l/" in url or "www.kdocs.cn/l/" in url: # 但可能有邀请对话框,先尝试点击关闭 try: join_btn = page.get_by_role("button", name="登录并加入编辑") if join_btn.count() > 0 and join_btn.first.is_visible(timeout=500): join_btn.first.click() time.sleep(1) except Exception: pass # 已经在文档页面,算作已登录 return False # 检查是否在登录页面 if self._is_login_url(url): return True # 只检查登录页面上的登录按钮(排除文档页面的邀请对话框) login_buttons = ["立即登录", "去登录"] for text in login_buttons: try: btn = page.get_by_role("button", name=text) if btn.count() > 0 and btn.first.is_visible(timeout=500): return True except Exception: pass # 检查是否有二维码元素可见(说明还在等待扫码) try: qr_selectors = ["canvas", "img[class*='qr']", "div[class*='qrcode']"] for selector in qr_selectors: qr = page.locator(selector) if qr.count() > 0: for i in range(min(qr.count(), 3)): el = qr.nth(i) try: if el.is_visible(timeout=200): box = el.bounding_box() if box and 80 <= box.get("width", 0) <= 400: return True except Exception: pass except Exception: pass return False def _is_logged_in(self) -> bool: """检查是否已登录""" if not self._page or self._page.is_closed(): return False return not self._page_has_login_gate(self._page) def _save_login_state(self): """保存登录状态""" try: from config import KDOCS_LOGIN_STATE_FILE storage_state = str(KDOCS_LOGIN_STATE_FILE) KDOCS_LOGIN_STATE_FILE.parent.mkdir(parents=True, exist_ok=True) self._context.storage_state(path=storage_state) except Exception: pass def _ensure_login_dialog(self, use_quick_login: bool = False): """确保打开登录对话框并进入扫码页面""" buttons_priority = [ "登录并加入编辑", "立即登录", "去登录", ] max_clicks = 12 for _ in range(max_clicks): clicked = False current_url = self._page.url # 检查是否已经到达文档页面(登录成功) if "kdocs.cn/l/" in current_url and "account.wps.cn" not in current_url: time.sleep(1) has_login_elements = ( self._find_visible_element("立即登录", use_role=True) or self._find_visible_element("登录并加入编辑", use_role=True) or self._find_visible_element("微信扫码登录") or self._find_visible_element("微信快捷登录") ) if not has_login_elements: return # 检查是否已经到达登录二维码页面 qr_page_indicators = ["微信扫码登录", "微信快捷登录"] for indicator in qr_page_indicators: if self._find_visible_element(indicator): return # 按优先级点击登录按钮 for btn_name in buttons_priority: el = self._find_visible_element(btn_name, use_role=True) if el: el.click(force=True) time.sleep(2) clicked = True break if not clicked: for btn_name in buttons_priority: el = self._find_visible_element(btn_name) if el: el.click(force=True) time.sleep(2) clicked = True break if not clicked: time.sleep(1) def _capture_qr_image(self) -> Optional[bytes]: """捕获登录二维码图片""" # 查找二维码元素的选择器 selectors = [ "canvas", "img[src*='qr']", "img[class*='qr']", "img[class*='code']", "div[class*='qr'] img", "div[class*='qrcode'] img", "div[class*='scan'] img", ".qrcode img", ".qr-code img", "img", # 最后尝试所有图片 ] # 先在主页面查找 for selector in selectors: result = self._try_capture_qr_with_selector(self._page, selector) if result: return result # 尝试在iframe中查找 try: frames = self._page.frames for frame in frames: if frame == self._page.main_frame: continue for selector in selectors[:5]: # 只用前几个选择器 result = self._try_capture_qr_with_selector(frame, selector) if result: return result except Exception: pass return None def _try_capture_qr_with_selector(self, page_or_frame, selector: str) -> Optional[bytes]: """尝试用指定选择器捕获二维码""" try: locator = page_or_frame.locator(selector) count = locator.count() for i in range(min(count, 10)): el = locator.nth(i) try: if not el.is_visible(timeout=300): continue box = el.bounding_box() if not box: continue w, h = box.get("width", 0), box.get("height", 0) # 二维码通常是正方形,大小在100-400之间 if 80 <= w <= 400 and 80 <= h <= 400 and abs(w - h) < 60: screenshot = el.screenshot() if screenshot and len(screenshot) > 500: return screenshot except Exception: continue except Exception: pass return None def request_qr(self, force: bool = False) -> Dict[str, Any]: """ 请求登录二维码 Args: force: 是否强制重新登录 Returns: { "success": bool, "logged_in": bool, # 是否已登录 "qr_image": str, # base64编码的二维码图片 "error": str # 错误信息 } """ from config import get_config, KDOCS_LOGIN_STATE_FILE config = get_config() doc_url = config.kdocs.doc_url.strip() if not doc_url: return {"success": False, "error": "未配置金山文档链接"} if force: # 清除登录状态 try: if KDOCS_LOGIN_STATE_FILE.exists(): KDOCS_LOGIN_STATE_FILE.unlink() except Exception: pass self._cleanup_browser() if not self._ensure_playwright(use_storage_state=not force): return {"success": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url): return {"success": False, "error": self._last_error or "打开文档失败"} # 检查是否已登录 if not force and self._is_logged_in(): self._logged_in = True self._save_login_state() return {"success": True, "logged_in": True, "qr_image": ""} # 需要登录,获取二维码 self._ensure_login_dialog() time.sleep(2) qr_image = None for _ in range(15): qr_image = self._capture_qr_image() if qr_image and len(qr_image) > 1024: break time.sleep(1) if not qr_image: return {"success": False, "error": "二维码获取失败,请检查网络"} return { "success": True, "logged_in": False, "qr_image": base64.b64encode(qr_image).decode("ascii"), } def check_login_status(self) -> Dict[str, Any]: """检查登录状态(不重新打开页面,只检查当前状态)""" # 如果页面不存在或已关闭,说明还没开始登录流程 if not self._page or self._page.is_closed(): return {"success": False, "logged_in": False, "error": "页面未打开"} try: clicked_confirm = False # 在主页面和所有iframe中查找确认按钮 frames_to_check = [self._page] + list(self._page.frames) for frame in frames_to_check: if clicked_confirm: break # 尝试点击确认登录按钮(微信扫码后PC端需要再点一下确认) confirm_names = ["确认登录", "确定登录", "登录", "确定", "确认", "同意并登录"] for name in confirm_names: try: confirm_btn = frame.get_by_role("button", name=name) if confirm_btn.count() > 0 and confirm_btn.first.is_visible(timeout=200): confirm_btn.first.click() clicked_confirm = True time.sleep(3) break except Exception: pass # 如果按钮角色没找到,尝试用文本查找 if not clicked_confirm: for name in confirm_names: try: el = frame.get_by_text(name, exact=True) if el.count() > 0 and el.first.is_visible(timeout=200): el.first.click() clicked_confirm = True time.sleep(3) break except Exception: pass # 尝试用CSS选择器查找 if not clicked_confirm: try: selectors = [ "button.ant-btn-primary", "button[type='primary']", ".confirm-btn", ".login-confirm", ".btn-primary", ".wps-btn-primary", "a.confirm", "div.confirm", "[class*='confirm']", "[class*='login-btn']" ] for selector in selectors: btns = frame.locator(selector) if btns.count() > 0: for i in range(min(btns.count(), 3)): btn = btns.nth(i) try: if btn.is_visible(timeout=100): btn_text = btn.inner_text() or "" if any(kw in btn_text for kw in ["确认", "登录", "确定"]): btn.click() clicked_confirm = True time.sleep(3) break except Exception: pass if clicked_confirm: break except Exception: pass if clicked_confirm: time.sleep(3) current_url = self._page.url # 直接检查URL判断是否已登录 if "kdocs.cn/l/" in current_url and "account.wps.cn" not in current_url: logged_in = True try: join_btn = self._page.get_by_role("button", name="登录并加入编辑") if join_btn.count() > 0 and join_btn.first.is_visible(timeout=500): join_btn.first.click() time.sleep(1) except Exception: pass else: logged_in = self._is_logged_in() self._logged_in = logged_in if logged_in: self._save_login_state() return {"success": True, "logged_in": logged_in} except Exception as e: return {"success": False, "logged_in": False, "error": str(e)} def _navigate_to_cell(self, cell_address: str): """导航到指定单元格""" try: name_box = self._page.locator("input.edit-box").first name_box.click() name_box.fill(cell_address) name_box.press("Enter") except Exception: name_box = self._page.locator('#root input[type="text"]').first name_box.click() name_box.fill(cell_address) name_box.press("Enter") time.sleep(0.3) def _get_current_cell_address(self) -> str: """获取当前单元格地址""" try: name_box = self._page.locator("input.edit-box").first value = name_box.input_value() if value and re.match(r"^[A-Z]+\d+$", value.upper()): return value.upper() except Exception: pass return "" def _search_and_get_row(self, search_text: str, expected_col: str = None, row_start: int = 0, row_end: int = 0) -> int: """搜索并获取行号""" # 打开搜索 self._page.keyboard.press("Control+f") time.sleep(0.3) # 输入搜索内容 try: search_input = self._page.get_by_role("textbox").nth(3) if search_input.is_visible(timeout=500): search_input.fill(search_text) except Exception: pass time.sleep(0.2) # 点击查找 try: find_btn = self._page.get_by_role("button", name="查找").first find_btn.click() except Exception: self._page.keyboard.press("Enter") time.sleep(0.3) # 获取当前位置 self._page.keyboard.press("Escape") time.sleep(0.3) address = self._get_current_cell_address() if not address: return -1 # 提取行号 match = re.search(r"(\d+)$", address) if not match: return -1 row_num = int(match.group(1)) col_letter = "".join(c for c in address if c.isalpha()).upper() # 检查列 if expected_col and col_letter != expected_col.upper(): return -1 # 检查行范围 if row_start > 0 and row_num < row_start: return -1 if row_end > 0 and row_num > row_end: return -1 return row_num def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool: """上传图片到单元格""" cell_address = f"{image_col}{row_num}" self._navigate_to_cell(cell_address) time.sleep(0.3) # 清除单元格内容 try: self._page.keyboard.press("Escape") time.sleep(0.2) self._page.keyboard.press("Delete") time.sleep(0.3) except Exception: pass # 插入 -> 图片 -> 单元格图片 try: insert_btn = self._page.get_by_role("button", name="插入") insert_btn.click() time.sleep(0.3) image_btn = self._page.get_by_role("button", name="图片") image_btn.click() time.sleep(0.3) cell_image_option = self._page.get_by_role("option", name="单元格图片") cell_image_option.click() time.sleep(0.2) local_option = self._page.get_by_role("option", name="本地") with self._page.expect_file_chooser() as fc_info: local_option.click() file_chooser = fc_info.value file_chooser.set_files(image_path) time.sleep(2) return True except Exception as e: self._last_error = f"上传图片失败: {e}" return False def upload_image( self, image_path: str, unit: str, name: str, ) -> Dict[str, Any]: """ 上传截图到金山文档 Args: image_path: 图片路径 unit: 县区名(用于定位行) name: 姓名(用于定位行) Returns: {"success": bool, "error": str} """ from config import get_config config = get_config() kdocs_config = config.kdocs if not kdocs_config.enabled: return {"success": False, "error": "金山文档上传未启用"} doc_url = kdocs_config.doc_url.strip() if not doc_url: return {"success": False, "error": "未配置金山文档链接"} if not unit or not name: return {"success": False, "error": "缺少县区或姓名"} if not image_path or not os.path.exists(image_path): return {"success": False, "error": "图片文件不存在"} if not self._ensure_playwright(): return {"success": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url): return {"success": False, "error": self._last_error or "打开文档失败"} if not self._is_logged_in(): return {"success": False, "error": "未登录,请先扫码登录"} try: # 选择工作表 if kdocs_config.sheet_name: try: tab = self._page.locator("[role='tab']").filter(has_text=kdocs_config.sheet_name) if tab.count() > 0: tab.first.click() time.sleep(0.5) except Exception: pass # 搜索姓名找到行 row_num = self._search_and_get_row( name, expected_col=kdocs_config.name_column, row_start=kdocs_config.row_start, row_end=kdocs_config.row_end, ) if row_num < 0: return {"success": False, "error": f"未找到人员: {name}"} # 上传图片 if self._upload_image_to_cell(row_num, image_path, kdocs_config.image_column): return {"success": True} else: return {"success": False, "error": self._last_error or "上传失败"} except Exception as e: return {"success": False, "error": str(e)} def clear_login(self): """清除登录状态""" from config import KDOCS_LOGIN_STATE_FILE try: if KDOCS_LOGIN_STATE_FILE.exists(): KDOCS_LOGIN_STATE_FILE.unlink() except Exception: pass self._logged_in = False self._cleanup_browser() def close(self): """关闭上传器""" self._cleanup_browser() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False # 全局实例 _uploader: Optional[KDocsUploader] = None def get_kdocs_uploader() -> KDocsUploader: """获取金山文档上传器实例""" global _uploader if _uploader is None: _uploader = KDocsUploader() return _uploader