From 6bd00021b86d2f96f3ddc9146453382e8ae684bc Mon Sep 17 00:00:00 2001 From: yuyx <237899745@qq.com> Date: Wed, 7 Jan 2026 16:53:44 +0800 Subject: [PATCH] Fix KDocs login detection --- services/kdocs_uploader.py | 126 +++++++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 27 deletions(-) diff --git a/services/kdocs_uploader.py b/services/kdocs_uploader.py index fa6365c..4d0ef41 100644 --- a/services/kdocs_uploader.py +++ b/services/kdocs_uploader.py @@ -10,6 +10,7 @@ import threading import time from io import BytesIO from typing import Any, Dict, Optional +from urllib.parse import urlparse import database import email_service @@ -46,6 +47,7 @@ class KDocsUploader: self._last_qr_image: Optional[bytes] = None self._last_login_check: float = 0.0 self._last_login_ok: Optional[bool] = None + self._doc_url: Optional[str] = None def start(self) -> None: if self._running: @@ -199,45 +201,115 @@ class KDocsUploader: def _open_document(self, doc_url: str) -> bool: try: + self._doc_url = doc_url self._page.goto(doc_url) time.sleep(1) + doc_pages = self._find_doc_pages(doc_url) + if doc_pages and doc_pages[0] is not self._page: + self._page = doc_pages[0] return True except Exception as e: self._last_error = f"打开文档失败: {e}" return False - def _is_logged_in(self) -> bool: - try: - login_btn = self._page.get_by_role("button", name="登录并加入编辑") - if login_btn.is_visible(timeout=1500): - return False - except Exception: + def _normalize_doc_url(self, url: str) -> str: + if not url: + return "" + return url.split("#", 1)[0].split("?", 1)[0].rstrip("/") + + def _list_pages(self) -> list: + pages = [] + if self._context: + pages.extend(self._context.pages) + if self._page and self._page not in pages: + pages.insert(0, self._page) + return pages + + def _is_login_url(self, url: str) -> bool: + if not url: return False + lower = url.lower() try: - login_btn = self._page.get_by_role("button", name="立即登录") - if login_btn.is_visible(timeout=1200): - return False + host = urlparse(lower).netloc + except Exception: + host = "" + if "account.wps.cn" in host: + return True + if "passport" in lower: + return True + if "login" in lower and "kdocs.cn" not in host: + return True + return False + + def _find_doc_pages(self, doc_url: Optional[str]) -> list: + doc_key = self._normalize_doc_url(doc_url or "") + pages = self._list_pages() + matches = [] + for page in pages: + url = getattr(page, "url", "") or "" + if not url: + continue + if self._is_login_url(url): + continue + norm_url = self._normalize_doc_url(url) + if doc_key and doc_key in norm_url: + matches.append(page) + continue + try: + host = urlparse(url).netloc.lower() + except Exception: + host = "" + if "kdocs.cn" in host: + matches.append(page) + return matches + + def _page_has_login_gate(self, page) -> bool: + url = getattr(page, "url", "") or "" + if self._is_login_url(url): + return True + login_texts = [ + "登录并加入编辑", + "立即登录", + "微信登录", + "扫码登录", + "确认登录", + "确认登陆", + "账号登录", + "登录", + ] + for text in login_texts: + try: + if page.get_by_role("button", name=text).is_visible(timeout=800): + return True + except Exception: + pass + try: + if page.get_by_role("link", name=text).is_visible(timeout=800): + return True + except Exception: + pass + try: + if page.get_by_text(text, exact=True).is_visible(timeout=800): + return True + except Exception: + pass + try: + if page.locator("text=登录并加入编辑").first.is_visible(timeout=800): + return True except Exception: pass - try: - login_btn = self._page.get_by_role("button", name="登录") - if login_btn.is_visible(timeout=1200): + return False + + def _is_logged_in(self) -> bool: + doc_pages = self._find_doc_pages(self._doc_url) + if not doc_pages: + if self._page and not self._page.is_closed() and not self._page_has_login_gate(self._page): return False - except Exception: - pass - try: - login_link = self._page.get_by_role("link", name="登录") - if login_link.is_visible(timeout=1200): - return False - except Exception: - pass - try: - wechat_btn = self._page.get_by_role("button", name="微信登录") - if wechat_btn.is_visible(timeout=1200): - return False - except Exception: - pass - return True + return False + page = doc_pages[0] + if self._page is None or self._page.is_closed() or self._page.url != page.url: + self._page = page + return not self._page_has_login_gate(page) def _has_saved_login_state(self) -> bool: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")