diff --git a/core/kdocs_uploader.py b/core/kdocs_uploader.py index 4c58b1c..9f2fa01 100644 --- a/core/kdocs_uploader.py +++ b/core/kdocs_uploader.py @@ -4,6 +4,7 @@ 金山文档上传模块 - 精简版 使用Playwright自动化上传截图到金山文档表格 移除了队列、并发控制,改为单任务顺序执行 +修复登录逻辑问题 """ import base64 @@ -42,16 +43,7 @@ class KDocsUploader: self._log_callback(msg) def _find_visible_element(self, text: str, use_role: bool = False, role: str = "button"): - """找到包含指定文本的可见元素 - - Args: - text: 要查找的文本 - use_role: 是否使用role查找(默认False,使用文本查找) - role: 使用role查找时的角色类型 - - Returns: - 可见的元素,或None - """ + """找到包含指定文本的可见元素""" if not self._page: return None @@ -87,43 +79,29 @@ class KDocsUploader: if self._playwright is None: self._playwright = sync_playwright().start() if self._browser is None: - # 默认无头模式,设置环境变量 KDOCS_HEADLESS=false 可切换为有头模式调试 headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false" - # 使用系统安装的Chrome浏览器(支持微信快捷登录) - # channel='chrome' 会使用系统Chrome,而不是Playwright自带的Chromium chrome_args = [ - "--disable-blink-features=AutomationControlled", # 隐藏自动化特征 - "--disable-features=DialMediaRouteProvider", # 禁用本地网络发现提示 + "--disable-blink-features=AutomationControlled", + "--disable-features=DialMediaRouteProvider", "--allow-running-insecure-content", ] try: self._browser = self._playwright.chromium.launch( headless=headless, - channel='chrome', # 使用系统Chrome + channel='chrome', args=chrome_args ) - except Exception as e: - # 如果系统没有Chrome,回退到Chromium + except Exception: self._browser = self._playwright.chromium.launch(headless=headless, args=chrome_args) if self._context is None: storage_state = str(KDOCS_LOGIN_STATE_FILE) - # 创建context时的通用配置 context_options = { - "permissions": ["clipboard-read", "clipboard-write"], # 剪贴板权限 + "permissions": ["clipboard-read", "clipboard-write"], "ignore_https_errors": True, } if use_storage_state and os.path.exists(storage_state): context_options["storage_state"] = storage_state self._context = self._browser.new_context(**context_options) - - # 授予本地网络访问权限(用于微信快捷登录检测) - try: - self._context.grant_permissions( - ["clipboard-read", "clipboard-write"], - origin="https://account.wps.cn" - ) - except Exception: - pass if self._page is None or self._page.is_closed(): self._page = self._context.new_page() self._page.set_default_timeout(60000) @@ -151,94 +129,68 @@ class KDocsUploader: """打开金山文档""" try: self._doc_url = doc_url - self._ensure_clipboard_permissions(doc_url) self._page.goto(doc_url, wait_until="domcontentloaded", timeout=30000) - time.sleep(3) # 等待页面完全加载,包括登录按钮 + time.sleep(3) return True except Exception as e: self._last_error = f"打开文档失败: {e}" return False - def _ensure_clipboard_permissions(self, doc_url: str): - """授予剪贴板权限""" - if not self._context or not doc_url: - return - try: - parsed = urlparse(doc_url) - if not parsed.scheme or not parsed.netloc: - return - origin = f"{parsed.scheme}://{parsed.netloc}" - self._context.grant_permissions(["clipboard-read", "clipboard-write"], origin=origin) - except Exception: - pass - def _is_login_url(self, url: str) -> bool: """检查是否是登录页面""" if not url: return False lower = url.lower() - if "account.wps.cn" in lower or "passport" in lower: - return True - if "login" in lower and "kdocs.cn" not in lower: + return "account.wps.cn" in lower or "passport" in lower + + def _check_login_by_url(self) -> bool: + """通过URL判断是否已登录(纯检查,无副作用)""" + if not self._page or self._page.is_closed(): + return False + + url = self._page.url + + # 如果URL是文档页面,说明已登录 + if "kdocs.cn/l/" in url and "account.wps.cn" not in url: return True + return False - def _page_has_login_gate(self, page) -> bool: - """检查页面是否需要登录""" - url = getattr(page, "url", "") or "" - - # 如果URL已经是文档页面,说明已登录成功 - if "kdocs.cn/l/" in url or "www.kdocs.cn/l/" in url: - # 但可能有邀请对话框,先尝试点击关闭 - try: - join_btn = page.get_by_role("button", name="登录并加入编辑") - if join_btn.count() > 0 and join_btn.first.is_visible(timeout=500): - join_btn.first.click() - time.sleep(1) - except Exception: - pass - # 已经在文档页面,算作已登录 - return False - - # 检查是否在登录页面 + def _check_needs_login(self) -> bool: + """检查是否需要登录(纯检查,无副作用)""" + if not self._page or self._page.is_closed(): + return True + + url = self._page.url + + # 在登录页面,需要登录 if self._is_login_url(url): return True - - # 只检查登录页面上的登录按钮(排除文档页面的邀请对话框) + + # 如果已经是文档页面,不需要登录 + if "kdocs.cn/l/" in url and "account.wps.cn" not in url: + return False + + # 检查是否有登录按钮(排除邀请对话框) login_buttons = ["立即登录", "去登录"] for text in login_buttons: try: - btn = page.get_by_role("button", name=text) + btn = self._page.get_by_role("button", name=text) if btn.count() > 0 and btn.first.is_visible(timeout=500): return True except Exception: pass - - # 检查是否有二维码元素可见(说明还在等待扫码) - try: - qr_selectors = ["canvas", "img[class*='qr']", "div[class*='qrcode']"] - for selector in qr_selectors: - qr = page.locator(selector) - if qr.count() > 0: - for i in range(min(qr.count(), 3)): - el = qr.nth(i) - try: - if el.is_visible(timeout=200): - box = el.bounding_box() - if box and 80 <= box.get("width", 0) <= 400: - return True - except Exception: - pass - except Exception: - pass - + return False def _is_logged_in(self) -> bool: """检查是否已登录""" - if not self._page or self._page.is_closed(): - return False - return not self._page_has_login_gate(self._page) + # 优先用URL判断 + if self._check_login_by_url(): + return True + + # URL判断不了,再用按钮检查 + return not self._check_needs_login() def _save_login_state(self): """保存登录状态""" @@ -250,7 +202,7 @@ class KDocsUploader: except Exception: pass - def _ensure_login_dialog(self, use_quick_login: bool = False): + def _ensure_login_dialog(self): """确保打开登录对话框并进入扫码页面""" buttons_priority = [ "登录并加入编辑", @@ -258,21 +210,30 @@ class KDocsUploader: "去登录", ] - max_clicks = 12 + max_clicks = 8 for _ in range(max_clicks): clicked = False current_url = self._page.url - # 检查是否已经到达文档页面(登录成功) + # 如果已经在登录页面(account.wps.cn),说明需要扫码 + if self._is_login_url(current_url): + # 检查是否已经显示了二维码 + if self._is_qr_page(): + return + # 等待二维码加载 + time.sleep(2) + continue + + # 如果已经是文档页面 if "kdocs.cn/l/" in current_url and "account.wps.cn" not in current_url: - time.sleep(1) - has_login_elements = ( - self._find_visible_element("立即登录", use_role=True) or - self._find_visible_element("登录并加入编辑", use_role=True) or - self._find_visible_element("微信扫码登录") or - self._find_visible_element("微信快捷登录") - ) - if not has_login_elements: + # 检查是否需要点击"登录并加入编辑"(邀请对话框) + invite_btn = self._find_visible_element("登录并加入编辑", use_role=True) + if invite_btn: + invite_btn.click(force=True) + time.sleep(2) + continue # 等待页面跳转到登录页 + else: + # 没有登录按钮,说明已登录完成 return # 检查是否已经到达登录二维码页面 @@ -302,23 +263,42 @@ class KDocsUploader: if not clicked: time.sleep(1) + def _click_confirm_after_scan(self): + """扫码后点击确认登录(如果有的话)""" + confirm_names = ["确认登录", "确定登录", "登录", "确定", "确认", "同意并登录"] + + for name in confirm_names: + try: + confirm_btn = self._page.get_by_role("button", name=name) + if confirm_btn.count() > 0 and confirm_btn.first.is_visible(timeout=500): + confirm_btn.first.click() + time.sleep(2) + return True + except Exception: + pass + + return False + + def _is_qr_page(self) -> bool: + """检查是否在二维码页面""" + qr_indicators = ["微信扫码登录", "微信快捷登录"] + for indicator in qr_indicators: + if self._find_visible_element(indicator): + return True + return False + def _capture_qr_image(self) -> Optional[bytes]: """捕获登录二维码图片""" - # 查找二维码元素的选择器 + # 精准的二维码选择器 selectors = [ - "canvas", - "img[src*='qr']", - "img[class*='qr']", - "img[class*='code']", - "div[class*='qr'] img", - "div[class*='qrcode'] img", - "div[class*='scan'] img", - ".qrcode img", - ".qr-code img", - "img", # 最后尝试所有图片 + "canvas", # Canvas 绘制二维码 + "img[src*='qrcode']", # 带 qrcode 的图片 + "img[src*='wxqr']", # 微信二维码 + "[class*='qrcode'] img", # qrcode 容器内的图片 + "[class*='qr-code'] img", + "[class*='scan-code'] img", ] - # 先在主页面查找 for selector in selectors: result = self._try_capture_qr_with_selector(self._page, selector) if result: @@ -330,7 +310,7 @@ class KDocsUploader: for frame in frames: if frame == self._page.main_frame: continue - for selector in selectors[:5]: # 只用前几个选择器 + for selector in selectors[:3]: result = self._try_capture_qr_with_selector(frame, selector) if result: return result @@ -344,7 +324,7 @@ class KDocsUploader: try: locator = page_or_frame.locator(selector) count = locator.count() - for i in range(min(count, 10)): + for i in range(min(count, 5)): el = locator.nth(i) try: if not el.is_visible(timeout=300): @@ -354,7 +334,7 @@ class KDocsUploader: continue w, h = box.get("width", 0), box.get("height", 0) # 二维码通常是正方形,大小在100-400之间 - if 80 <= w <= 400 and 80 <= h <= 400 and abs(w - h) < 60: + if 80 <= w <= 400 and 80 <= h <= 400 and abs(w - h) < 50: screenshot = el.screenshot() if screenshot and len(screenshot) > 500: return screenshot @@ -365,20 +345,7 @@ class KDocsUploader: return None def request_qr(self, force: bool = False) -> Dict[str, Any]: - """ - 请求登录二维码 - - Args: - force: 是否强制重新登录 - - Returns: - { - "success": bool, - "logged_in": bool, # 是否已登录 - "qr_image": str, # base64编码的二维码图片 - "error": str # 错误信息 - } - """ + """请求登录二维码""" from config import get_config, KDOCS_LOGIN_STATE_FILE config = get_config() @@ -388,7 +355,6 @@ class KDocsUploader: return {"success": False, "error": "未配置金山文档链接"} if force: - # 清除登录状态 try: if KDOCS_LOGIN_STATE_FILE.exists(): KDOCS_LOGIN_STATE_FILE.unlink() @@ -412,11 +378,13 @@ class KDocsUploader: self._ensure_login_dialog() time.sleep(2) + # 等待二维码出现 qr_image = None - for _ in range(15): - qr_image = self._capture_qr_image() - if qr_image and len(qr_image) > 1024: - break + for _ in range(10): + if self._is_qr_page(): + qr_image = self._capture_qr_image() + if qr_image and len(qr_image) > 1024: + break time.sleep(1) if not qr_image: @@ -429,99 +397,20 @@ class KDocsUploader: } def check_login_status(self) -> Dict[str, Any]: - """检查登录状态(不重新打开页面,只检查当前状态)""" - # 如果页面不存在或已关闭,说明还没开始登录流程 + """检查登录状态""" if not self._page or self._page.is_closed(): return {"success": False, "logged_in": False, "error": "页面未打开"} try: - clicked_confirm = False + # 扫码后尝试点击确认按钮 + self._click_confirm_after_scan() - # 在主页面和所有iframe中查找确认按钮 - frames_to_check = [self._page] + list(self._page.frames) + # 统一用 URL 判断是否已登录 + logged_in = self._check_login_by_url() - for frame in frames_to_check: - if clicked_confirm: - break - - # 尝试点击确认登录按钮(微信扫码后PC端需要再点一下确认) - confirm_names = ["确认登录", "确定登录", "登录", "确定", "确认", "同意并登录"] - for name in confirm_names: - try: - confirm_btn = frame.get_by_role("button", name=name) - if confirm_btn.count() > 0 and confirm_btn.first.is_visible(timeout=200): - confirm_btn.first.click() - clicked_confirm = True - time.sleep(3) - break - except Exception: - pass - - # 如果按钮角色没找到,尝试用文本查找 - if not clicked_confirm: - for name in confirm_names: - try: - el = frame.get_by_text(name, exact=True) - if el.count() > 0 and el.first.is_visible(timeout=200): - el.first.click() - clicked_confirm = True - time.sleep(3) - break - except Exception: - pass - - # 尝试用CSS选择器查找 - if not clicked_confirm: - try: - selectors = [ - "button.ant-btn-primary", - "button[type='primary']", - ".confirm-btn", - ".login-confirm", - ".btn-primary", - ".wps-btn-primary", - "a.confirm", - "div.confirm", - "[class*='confirm']", - "[class*='login-btn']" - ] - for selector in selectors: - btns = frame.locator(selector) - if btns.count() > 0: - for i in range(min(btns.count(), 3)): - btn = btns.nth(i) - try: - if btn.is_visible(timeout=100): - btn_text = btn.inner_text() or "" - if any(kw in btn_text for kw in ["确认", "登录", "确定"]): - btn.click() - clicked_confirm = True - time.sleep(3) - break - except Exception: - pass - if clicked_confirm: - break - except Exception: - pass - - if clicked_confirm: - time.sleep(3) - - current_url = self._page.url - - # 直接检查URL判断是否已登录 - if "kdocs.cn/l/" in current_url and "account.wps.cn" not in current_url: - logged_in = True - try: - join_btn = self._page.get_by_role("button", name="登录并加入编辑") - if join_btn.count() > 0 and join_btn.first.is_visible(timeout=500): - join_btn.first.click() - time.sleep(1) - except Exception: - pass - else: - logged_in = self._is_logged_in() + # 如果URL判断不确定,再检查按钮 + if not logged_in: + logged_in = not self._check_needs_login() self._logged_in = logged_in @@ -561,11 +450,9 @@ class KDocsUploader: def _search_and_get_row(self, search_text: str, expected_col: str = None, row_start: int = 0, row_end: int = 0) -> int: """搜索并获取行号""" - # 打开搜索 self._page.keyboard.press("Control+f") time.sleep(0.3) - # 输入搜索内容 try: search_input = self._page.get_by_role("textbox").nth(3) if search_input.is_visible(timeout=500): @@ -575,7 +462,6 @@ class KDocsUploader: time.sleep(0.2) - # 点击查找 try: find_btn = self._page.get_by_role("button", name="查找").first find_btn.click() @@ -583,8 +469,6 @@ class KDocsUploader: self._page.keyboard.press("Enter") time.sleep(0.3) - - # 获取当前位置 self._page.keyboard.press("Escape") time.sleep(0.3) @@ -592,7 +476,6 @@ class KDocsUploader: if not address: return -1 - # 提取行号 match = re.search(r"(\d+)$", address) if not match: return -1 @@ -600,11 +483,9 @@ class KDocsUploader: row_num = int(match.group(1)) col_letter = "".join(c for c in address if c.isalpha()).upper() - # 检查列 if expected_col and col_letter != expected_col.upper(): return -1 - # 检查行范围 if row_start > 0 and row_num < row_start: return -1 if row_end > 0 and row_num > row_end: @@ -618,7 +499,6 @@ class KDocsUploader: self._navigate_to_cell(cell_address) time.sleep(0.3) - # 清除单元格内容 try: self._page.keyboard.press("Escape") time.sleep(0.2) @@ -627,7 +507,6 @@ class KDocsUploader: except Exception: pass - # 插入 -> 图片 -> 单元格图片 try: insert_btn = self._page.get_by_role("button", name="插入") insert_btn.click() @@ -660,17 +539,7 @@ class KDocsUploader: unit: str, name: str, ) -> Dict[str, Any]: - """ - 上传截图到金山文档 - - Args: - image_path: 图片路径 - unit: 县区名(用于定位行) - name: 姓名(用于定位行) - - Returns: - {"success": bool, "error": str} - """ + """上传截图到金山文档""" from config import get_config config = get_config() @@ -709,7 +578,6 @@ class KDocsUploader: except Exception: pass - # 搜索姓名找到行 row_num = self._search_and_get_row( name, expected_col=kdocs_config.name_column, @@ -720,7 +588,6 @@ class KDocsUploader: if row_num < 0: return {"success": False, "error": f"未找到人员: {name}"} - # 上传图片 if self._upload_image_to_cell(row_num, image_path, kdocs_config.image_column): return {"success": True} else: