Fix KDocs login detection
This commit is contained in:
@@ -10,6 +10,7 @@ import threading
|
||||
import time
|
||||
from io import BytesIO
|
||||
from typing import Any, Dict, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import database
|
||||
import email_service
|
||||
@@ -46,6 +47,7 @@ class KDocsUploader:
|
||||
self._last_qr_image: Optional[bytes] = None
|
||||
self._last_login_check: float = 0.0
|
||||
self._last_login_ok: Optional[bool] = None
|
||||
self._doc_url: Optional[str] = None
|
||||
|
||||
def start(self) -> None:
|
||||
if self._running:
|
||||
@@ -199,45 +201,115 @@ class KDocsUploader:
|
||||
|
||||
def _open_document(self, doc_url: str) -> bool:
|
||||
try:
|
||||
self._doc_url = doc_url
|
||||
self._page.goto(doc_url)
|
||||
time.sleep(1)
|
||||
doc_pages = self._find_doc_pages(doc_url)
|
||||
if doc_pages and doc_pages[0] is not self._page:
|
||||
self._page = doc_pages[0]
|
||||
return True
|
||||
except Exception as e:
|
||||
self._last_error = f"打开文档失败: {e}"
|
||||
return False
|
||||
|
||||
def _is_logged_in(self) -> bool:
|
||||
def _normalize_doc_url(self, url: str) -> str:
|
||||
if not url:
|
||||
return ""
|
||||
return url.split("#", 1)[0].split("?", 1)[0].rstrip("/")
|
||||
|
||||
def _list_pages(self) -> list:
|
||||
pages = []
|
||||
if self._context:
|
||||
pages.extend(self._context.pages)
|
||||
if self._page and self._page not in pages:
|
||||
pages.insert(0, self._page)
|
||||
return pages
|
||||
|
||||
def _is_login_url(self, url: str) -> bool:
|
||||
if not url:
|
||||
return False
|
||||
lower = url.lower()
|
||||
try:
|
||||
login_btn = self._page.get_by_role("button", name="登录并加入编辑")
|
||||
if login_btn.is_visible(timeout=1500):
|
||||
return False
|
||||
host = urlparse(lower).netloc
|
||||
except Exception:
|
||||
return False
|
||||
try:
|
||||
login_btn = self._page.get_by_role("button", name="立即登录")
|
||||
if login_btn.is_visible(timeout=1200):
|
||||
return False
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
login_btn = self._page.get_by_role("button", name="登录")
|
||||
if login_btn.is_visible(timeout=1200):
|
||||
return False
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
login_link = self._page.get_by_role("link", name="登录")
|
||||
if login_link.is_visible(timeout=1200):
|
||||
return False
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
wechat_btn = self._page.get_by_role("button", name="微信登录")
|
||||
if wechat_btn.is_visible(timeout=1200):
|
||||
return False
|
||||
except Exception:
|
||||
pass
|
||||
host = ""
|
||||
if "account.wps.cn" in host:
|
||||
return True
|
||||
if "passport" in lower:
|
||||
return True
|
||||
if "login" in lower and "kdocs.cn" not in host:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _find_doc_pages(self, doc_url: Optional[str]) -> list:
|
||||
doc_key = self._normalize_doc_url(doc_url or "")
|
||||
pages = self._list_pages()
|
||||
matches = []
|
||||
for page in pages:
|
||||
url = getattr(page, "url", "") or ""
|
||||
if not url:
|
||||
continue
|
||||
if self._is_login_url(url):
|
||||
continue
|
||||
norm_url = self._normalize_doc_url(url)
|
||||
if doc_key and doc_key in norm_url:
|
||||
matches.append(page)
|
||||
continue
|
||||
try:
|
||||
host = urlparse(url).netloc.lower()
|
||||
except Exception:
|
||||
host = ""
|
||||
if "kdocs.cn" in host:
|
||||
matches.append(page)
|
||||
return matches
|
||||
|
||||
def _page_has_login_gate(self, page) -> bool:
|
||||
url = getattr(page, "url", "") or ""
|
||||
if self._is_login_url(url):
|
||||
return True
|
||||
login_texts = [
|
||||
"登录并加入编辑",
|
||||
"立即登录",
|
||||
"微信登录",
|
||||
"扫码登录",
|
||||
"确认登录",
|
||||
"确认登陆",
|
||||
"账号登录",
|
||||
"登录",
|
||||
]
|
||||
for text in login_texts:
|
||||
try:
|
||||
if page.get_by_role("button", name=text).is_visible(timeout=800):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if page.get_by_role("link", name=text).is_visible(timeout=800):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if page.get_by_text(text, exact=True).is_visible(timeout=800):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if page.locator("text=登录并加入编辑").first.is_visible(timeout=800):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
def _is_logged_in(self) -> bool:
|
||||
doc_pages = self._find_doc_pages(self._doc_url)
|
||||
if not doc_pages:
|
||||
if self._page and not self._page.is_closed() and not self._page_has_login_gate(self._page):
|
||||
return False
|
||||
return False
|
||||
page = doc_pages[0]
|
||||
if self._page is None or self._page.is_closed() or self._page.url != page.url:
|
||||
self._page = page
|
||||
return not self._page_has_login_gate(page)
|
||||
|
||||
def _has_saved_login_state(self) -> bool:
|
||||
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
||||
|
||||
Reference in New Issue
Block a user