#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import base64 import os import queue import re import threading import time from io import BytesIO from typing import Any, Dict, Optional from urllib.parse import urlparse import database import email_service from app_config import get_config from services.client_log import log_to_client from services.runtime import get_logger, get_socketio from services.state import safe_get_account try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级 sync_playwright = None class PlaywrightTimeoutError(Exception): pass logger = get_logger() config = get_config() class KDocsUploader: def __init__(self) -> None: self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200"))) self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True) self._running = False self._last_error: Optional[str] = None self._last_success_at: Optional[float] = None self._login_required = False self._playwright = None self._browser = None self._context = None self._page = None self._last_qr_image: Optional[bytes] = None self._last_login_check: float = 0.0 self._last_login_ok: Optional[bool] = None self._doc_url: Optional[str] = None def start(self) -> None: if self._running: return self._running = True self._thread.start() def stop(self) -> None: if not self._running: return self._running = False self._queue.put({"action": "shutdown"}) def get_status(self) -> Dict[str, Any]: return { "queue_size": self._queue.qsize(), "login_required": self._login_required, "last_error": self._last_error, "last_success_at": self._last_success_at, "last_login_ok": self._last_login_ok, } def enqueue_upload( self, *, user_id: int, account_id: str, unit: str, name: str, image_path: str, ) -> bool: if not self._running: self.start() payload = { "user_id": int(user_id), "account_id": str(account_id), "unit": unit, "name": name, "image_path": image_path, } try: # 入队前设置状态为等待上传 try: account = safe_get_account(user_id, account_id) if account and self._should_mark_upload(account): account.status = "等待上传" self._emit_account_update(user_id, account) except Exception: pass self._queue.put({"action": "upload", "payload": payload}, timeout=1) return True except queue.Full: self._last_error = "上传队列已满" return False def request_qr(self, timeout: int = 60, *, force: bool = False) -> Dict[str, Any]: return self._submit_command("qr", timeout=timeout, payload={"force": force}) def clear_login(self, timeout: int = 20) -> Dict[str, Any]: return self._submit_command("clear_login", timeout=timeout) def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]: return self._submit_command("status", timeout=timeout) def _submit_command( self, action: str, timeout: int = 30, payload: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: if not self._running: self.start() resp_queue: queue.Queue = queue.Queue(maxsize=1) self._queue.put({"action": action, "response": resp_queue, "payload": payload or {}}) try: return resp_queue.get(timeout=timeout) except queue.Empty: return {"success": False, "error": "操作超时"} def _run(self) -> None: while True: task = self._queue.get() if not task: continue action = task.get("action") if action == "shutdown": break try: if action == "upload": self._handle_upload(task.get("payload") or {}) elif action == "qr": result = self._handle_qr(task.get("payload") or {}) task.get("response").put(result) elif action == "clear_login": result = self._handle_clear_login() task.get("response").put(result) elif action == "status": result = self._handle_status_check() task.get("response").put(result) except Exception as e: logger.warning(f"[KDocs] 处理任务失败: {e}") self._cleanup_browser() def _load_system_config(self) -> Dict[str, Any]: return database.get_system_config() or {} def _ensure_playwright(self, *, use_storage_state: bool = True) -> bool: if sync_playwright is None: self._last_error = "playwright 未安装" return False try: if self._playwright is None: self._playwright = sync_playwright().start() if self._browser is None: headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false" self._browser = self._playwright.chromium.launch(headless=headless) if self._context is None: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") if use_storage_state and os.path.exists(storage_state): self._context = self._browser.new_context(storage_state=storage_state) else: self._context = self._browser.new_context() if self._page is None or self._page.is_closed(): self._page = self._context.new_page() self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000))) return True except Exception as e: self._last_error = f"浏览器启动失败: {e}" self._cleanup_browser() return False def _cleanup_browser(self) -> None: try: if self._page: self._page.close() except Exception: pass self._page = None try: if self._context: self._context.close() except Exception: pass self._context = None try: if self._browser: self._browser.close() except Exception: pass self._browser = None try: if self._playwright: self._playwright.stop() except Exception: pass self._playwright = None def _open_document(self, doc_url: str, *, fast: bool = False) -> bool: try: self._doc_url = doc_url self._ensure_clipboard_permissions(doc_url) if fast: doc_pages = self._find_doc_pages(doc_url) if doc_pages: self._page = doc_pages[0] return True login_pages = [] for page in self._list_pages(): try: url = getattr(page, "url", "") or "" if self._is_login_url(url): login_pages.append(page) except Exception: continue if login_pages: self._page = login_pages[0] return True goto_kwargs = {} if fast: fast_timeout = int(os.environ.get("KDOCS_FAST_GOTO_TIMEOUT_MS", "15000")) goto_kwargs = {"wait_until": "domcontentloaded", "timeout": fast_timeout} self._page.goto(doc_url, **goto_kwargs) time.sleep(0.6) doc_pages = self._find_doc_pages(doc_url) if doc_pages and doc_pages[0] is not self._page: self._page = doc_pages[0] return True except Exception as e: self._last_error = f"打开文档失败: {e}" return False def _ensure_clipboard_permissions(self, doc_url: str) -> None: if not self._context or not doc_url: return try: parsed = urlparse(doc_url) if not parsed.scheme or not parsed.netloc: return host = parsed.netloc origins = {f"{parsed.scheme}://{host}"} if host.startswith("www."): origins.add(f"{parsed.scheme}://{host[4:]}") else: origins.add(f"{parsed.scheme}://www.{host}") for origin in origins: try: self._context.grant_permissions(["clipboard-read", "clipboard-write"], origin=origin) except Exception: continue except Exception: return def _normalize_doc_url(self, url: str) -> str: if not url: return "" return url.split("#", 1)[0].split("?", 1)[0].rstrip("/") def _list_pages(self) -> list: pages = [] if self._context: pages.extend(self._context.pages) if self._page and self._page not in pages: pages.insert(0, self._page) return pages def _is_login_url(self, url: str) -> bool: if not url: return False lower = url.lower() try: host = urlparse(lower).netloc except Exception: host = "" if "account.wps.cn" in host: return True if "passport" in lower: return True if "login" in lower and "kdocs.cn" not in host: return True return False def _find_doc_pages(self, doc_url: Optional[str]) -> list: doc_key = self._normalize_doc_url(doc_url or "") pages = self._list_pages() matches = [] for page in pages: url = getattr(page, "url", "") or "" if not url: continue if self._is_login_url(url): continue norm_url = self._normalize_doc_url(url) if doc_key and doc_key in norm_url: matches.append(page) continue try: host = urlparse(url).netloc.lower() except Exception: host = "" if "kdocs.cn" in host: matches.append(page) return matches def _page_has_login_gate(self, page) -> bool: url = getattr(page, "url", "") or "" if self._is_login_url(url): return True login_texts = [ "登录并加入编辑", "立即登录", "微信登录", "扫码登录", "确认登录", "确认登陆", "账号登录", "登录", ] for text in login_texts: try: if page.get_by_role("button", name=text).is_visible(timeout=800): return True except Exception: pass try: if page.get_by_role("link", name=text).is_visible(timeout=800): return True except Exception: pass try: if page.get_by_text(text, exact=True).is_visible(timeout=800): return True except Exception: pass try: if page.locator("text=登录并加入编辑").first.is_visible(timeout=800): return True except Exception: pass return False def _is_logged_in(self) -> bool: doc_pages = self._find_doc_pages(self._doc_url) if not doc_pages: if self._page and not self._page.is_closed() and not self._page_has_login_gate(self._page): return False return False page = doc_pages[0] if self._page is None or self._page.is_closed() or self._page.url != page.url: self._page = page return not self._page_has_login_gate(page) def _has_saved_login_state(self) -> bool: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") return os.path.exists(storage_state) def _ensure_login_dialog( self, *, timeout_ms: int = 1200, frame_timeout_ms: int = 800, quick: bool = False, ) -> None: login_names = ["登录并加入编辑", "立即登录", "登录"] wechat_names = ["微信登录", "微信扫码登录", "微信扫码", "扫码登录"] pages = self._iter_pages() clicked = False for page in pages: if self._try_click_names( page, login_names, timeout_ms=timeout_ms, frame_timeout_ms=frame_timeout_ms, quick=quick, ): clicked = True break if clicked: time.sleep(1.5) pages = self._iter_pages() for page in pages: if self._try_click_names( page, wechat_names, timeout_ms=timeout_ms, frame_timeout_ms=frame_timeout_ms, quick=quick, ): return self._try_confirm_login(timeout_ms=timeout_ms, frame_timeout_ms=frame_timeout_ms, quick=quick) def _capture_qr_image(self) -> Optional[bytes]: pages = self._iter_pages() for page in pages: for frame in page.frames: target = self._find_qr_element_in_frame(frame) if not target: continue try: return target.screenshot() except Exception: continue for page in pages: dialog_image = self._capture_dialog_image(page) if dialog_image: return dialog_image return None def _iter_pages(self) -> list: pages = [] if self._context: pages.extend(self._context.pages) if self._page and self._page not in pages: pages.insert(0, self._page) def rank(p) -> int: url = (getattr(p, "url", "") or "").lower() keywords = ("login", "account", "passport", "wechat", "qr") return 0 if any(k in url for k in keywords) else 1 pages.sort(key=rank) return pages def _find_qr_element_in_frame(self, frame) -> Optional[Any]: selectors = [ "canvas", "img[alt*='二维码']", "img[src*='qr']", "img[src*='qrcode']", "img[class*='qr']", "canvas[class*='qr']", "svg", "div[role='img']", "div[class*='qr']", "div[id*='qr']", "div[class*='qrcode']", "div[id*='qrcode']", "div[class*='wechat']", "div[class*='weixin']", "div[class*='wx']", "img[src*='wx']", ] best = None best_score = None for selector in selectors: try: locator = frame.locator(selector) count = min(locator.count(), 20) except Exception: continue for i in range(count): el = locator.nth(i) try: if not el.is_visible(timeout=800): continue box = el.bounding_box() if not box: continue width = box.get("width", 0) height = box.get("height", 0) if width < 80 or height < 80 or width > 520 or height > 520: continue aspect_diff = abs(width - height) if aspect_diff > 80: continue score = aspect_diff + abs(width - 260) + abs(height - 260) if best_score is None or score < best_score: best_score = score best = el except Exception: continue if best: return best handle = None try: handle = frame.evaluate_handle( """() => { const patterns = [/qr/i, /qrcode/i, /weixin/i, /wechat/i, /wx/i, /data:image/i]; const elements = Array.from(document.querySelectorAll('*')); for (const el of elements) { const style = window.getComputedStyle(el); const bg = style.backgroundImage || ''; if (!bg || bg === 'none') continue; if (!patterns.some((re) => re.test(bg))) continue; const rect = el.getBoundingClientRect(); if (!rect.width || !rect.height) continue; if (rect.width < 80 || rect.height < 80 || rect.width > 520 || rect.height > 520) continue; const diff = Math.abs(rect.width - rect.height); if (diff > 80) continue; return el; } return null; }""" ) element = handle.as_element() if handle else None if element: return element except Exception: pass finally: try: if handle: handle.dispose() except Exception: pass return best def _try_click_role(self, page, role: str, name: str, timeout: int = 1500) -> bool: try: el = page.get_by_role(role, name=name) if el.is_visible(timeout=timeout): el.click() time.sleep(1) return True except Exception: return False return False def _try_click_names( self, page, names: list, *, timeout_ms: int = 1200, frame_timeout_ms: int = 800, quick: bool = False, ) -> bool: for name in names: if self._try_click_role(page, "button", name, timeout=timeout_ms): return True if not quick: if self._try_click_role(page, "link", name, timeout=timeout_ms): return True try: el = page.get_by_text(name, exact=True) if el.is_visible(timeout=timeout_ms): el.click() time.sleep(1) return True except Exception: pass if not quick: try: el = page.get_by_text(name, exact=False) if el.is_visible(timeout=timeout_ms): el.click() time.sleep(1) return True except Exception: pass try: for frame in page.frames: for name in names: try: el = frame.get_by_role("button", name=name) if el.is_visible(timeout=frame_timeout_ms): el.click() time.sleep(1) return True except Exception: pass try: el = frame.get_by_text(name, exact=True) if el.is_visible(timeout=frame_timeout_ms): el.click() time.sleep(1) return True except Exception: pass if not quick: try: el = frame.get_by_text(name, exact=False) if el.is_visible(timeout=frame_timeout_ms): el.click() time.sleep(1) return True except Exception: pass except Exception: return False return False def _try_confirm_login( self, *, timeout_ms: int = 1200, frame_timeout_ms: int = 800, quick: bool = False, ) -> bool: confirm_names = ["确认登录", "确认登陆"] pages = self._iter_pages() for page in pages: if self._try_click_names( page, confirm_names, timeout_ms=timeout_ms, frame_timeout_ms=frame_timeout_ms, quick=quick, ): return True return False def _capture_dialog_image(self, page) -> Optional[bytes]: selectors = ( "[role='dialog'], .dialog, .modal, .popup, " "div[class*='dialog'], div[class*='modal'], div[class*='popup'], " "div[class*='login'], div[class*='passport'], div[class*='auth']" ) try: dialogs = page.locator(selectors) count = min(dialogs.count(), 6) except Exception: return None best = None best_area = 0 viewport = page.viewport_size or {} vp_width = viewport.get("width", 0) vp_height = viewport.get("height", 0) for i in range(count): el = dialogs.nth(i) try: if not el.is_visible(timeout=800): continue box = el.bounding_box() if not box: continue width = box.get("width", 0) height = box.get("height", 0) if width < 160 or height < 160: continue if vp_width and vp_height: if width > vp_width * 0.92 and height > vp_height * 0.92: continue area = width * height if area > best_area: best_area = area best = el except Exception: continue if not best: return None try: return best.screenshot() except Exception: return None def _is_valid_qr_image(self, data: Optional[bytes]) -> bool: if not data or len(data) < 1024: return False try: from PIL import Image, ImageStat img = Image.open(BytesIO(data)) width, height = img.size if width < 120 or height < 120: return False ratio = width / float(height or 1) if ratio < 0.6 or ratio > 1.4: return False gray = img.convert("L") stat = ImageStat.Stat(gray) if stat.stddev[0] < 5: return False return True except Exception: return len(data) >= 2048 def _save_login_state(self) -> None: try: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") os.makedirs(os.path.dirname(storage_state), exist_ok=True) self._context.storage_state(path=storage_state) except Exception as e: logger.warning(f"[KDocs] 保存登录态失败: {e}") def _handle_qr(self, payload: Dict[str, Any]) -> Dict[str, Any]: cfg = self._load_system_config() doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return {"success": False, "error": "未配置金山文档链接"} force = bool(payload.get("force")) if force: self._handle_clear_login() if not self._ensure_playwright(use_storage_state=not force): return {"success": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url, fast=True): return {"success": False, "error": self._last_error or "打开文档失败"} if not force and self._has_saved_login_state() and self._is_logged_in(): self._login_required = False self._last_login_ok = True self._save_login_state() return {"success": True, "logged_in": True, "qr_image": ""} fast_login_timeout = int(os.environ.get("KDOCS_FAST_LOGIN_TIMEOUT_MS", "300")) self._ensure_login_dialog( timeout_ms=fast_login_timeout, frame_timeout_ms=fast_login_timeout, quick=True, ) qr_image = None invalid_qr = None for attempt in range(10): if attempt in (3, 7): self._ensure_login_dialog( timeout_ms=fast_login_timeout, frame_timeout_ms=fast_login_timeout, quick=True, ) candidate = self._capture_qr_image() if candidate and self._is_valid_qr_image(candidate): qr_image = candidate break if candidate: invalid_qr = candidate time.sleep(1) if not qr_image: self._last_error = "二维码识别异常" if invalid_qr else "二维码获取失败" try: pages = self._iter_pages() page_urls = [getattr(p, "url", "") for p in pages] logger.warning(f"[KDocs] 二维码未捕获,页面: {page_urls}") ts = int(time.time()) saved = [] for idx, page in enumerate(pages[:3]): try: path = f"data/kdocs_debug_{ts}_{idx}.png" page.screenshot(path=path, full_page=True) saved.append(path) except Exception: continue if saved: logger.warning(f"[KDocs] 已保存调试截图: {saved}") if invalid_qr: try: path = f"data/kdocs_invalid_qr_{ts}.png" with open(path, "wb") as handle: handle.write(invalid_qr) logger.warning(f"[KDocs] 已保存无效二维码截图: {path}") except Exception: pass except Exception: pass return {"success": False, "error": self._last_error} try: ts = int(time.time()) path = f"data/kdocs_last_qr_{ts}.png" with open(path, "wb") as handle: handle.write(qr_image) logger.info(f"[KDocs] 已保存二维码截图: {path} ({len(qr_image)} bytes)") except Exception as e: logger.warning(f"[KDocs] 保存二维码截图失败: {e}") self._last_qr_image = qr_image self._login_required = True return { "success": True, "logged_in": False, "qr_image": base64.b64encode(qr_image).decode("ascii"), } def _handle_clear_login(self) -> Dict[str, Any]: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") try: if os.path.exists(storage_state): os.remove(storage_state) except Exception as e: return {"success": False, "error": f"清除登录态失败: {e}"} self._login_required = False self._last_login_ok = None self._cleanup_browser() return {"success": True} def _handle_status_check(self) -> Dict[str, Any]: cfg = self._load_system_config() doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return {"success": True, "logged_in": False, "error": "未配置文档链接"} if not self._ensure_playwright(): return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url, fast=True): return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"} fast_login_timeout = int(os.environ.get("KDOCS_FAST_LOGIN_TIMEOUT_MS", "300")) self._ensure_login_dialog( timeout_ms=fast_login_timeout, frame_timeout_ms=fast_login_timeout, quick=True, ) self._try_confirm_login( timeout_ms=fast_login_timeout, frame_timeout_ms=fast_login_timeout, quick=True, ) logged_in = self._is_logged_in() self._last_login_ok = logged_in self._login_required = not logged_in if logged_in: self._save_login_state() return {"success": True, "logged_in": logged_in} def _handle_upload(self, payload: Dict[str, Any]) -> None: cfg = self._load_system_config() if int(cfg.get("kdocs_enabled", 0) or 0) != 1: return doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return unit = (payload.get("unit") or "").strip() name = (payload.get("name") or "").strip() image_path = payload.get("image_path") user_id = payload.get("user_id") account_id = payload.get("account_id") if not unit or not name: return if not image_path or not os.path.exists(image_path): return account = None prev_status = None status_tracked = False try: try: account = safe_get_account(user_id, account_id) if account and self._should_mark_upload(account): prev_status = getattr(account, "status", None) account.status = "上传截图" self._emit_account_update(user_id, account) status_tracked = True except Exception: prev_status = None if not self._ensure_playwright(): self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用") return if not self._open_document(doc_url): self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败") return if not self._is_logged_in(): self._login_required = True self._last_login_ok = False self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录") try: log_to_client("表格上传失败: 登录已失效,请管理员重新扫码登录", user_id, account_id) except Exception: pass return self._login_required = False self._last_login_ok = True sheet_name = (cfg.get("kdocs_sheet_name") or "").strip() sheet_index = int(cfg.get("kdocs_sheet_index") or 0) unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper() image_col = (cfg.get("kdocs_image_column") or "D").strip().upper() row_start = int(cfg.get("kdocs_row_start") or 0) row_end = int(cfg.get("kdocs_row_end") or 0) success = False error_msg = "" for attempt in range(2): try: if sheet_name or sheet_index: self._select_sheet(sheet_name, sheet_index) row_num = self._find_person_with_unit(unit, name, unit_col, row_start=row_start, row_end=row_end) if row_num < 0: error_msg = f"未找到人员: {unit}-{name}" break success = self._upload_image_to_cell(row_num, image_path, image_col) if success: break except Exception as e: error_msg = str(e) if success: self._last_success_at = time.time() self._last_error = None try: log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id) except Exception: pass return if not error_msg: error_msg = "上传失败" self._last_error = error_msg self._notify_admin(unit, name, image_path, error_msg) try: log_to_client(f"表格上传失败: {error_msg}", user_id, account_id) except Exception: pass finally: if status_tracked: self._restore_account_status(user_id, account, prev_status) def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None: cfg = self._load_system_config() if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1: return to_email = (cfg.get("kdocs_admin_notify_email") or "").strip() if not to_email: return settings = email_service.get_email_settings() if not settings.get("enabled", False): return subject = "金山文档上传失败提醒" body = f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n请检查登录状态或表格配置。" try: email_service.send_email_async( to_email=to_email, subject=subject, body=body, email_type="kdocs_upload_failed", ) except Exception as e: logger.warning(f"[KDocs] 发送管理员邮件失败: {e}") def _emit_account_update(self, user_id: int, account: Any) -> None: try: socketio = get_socketio() socketio.emit("account_update", account.to_dict(), room=f"user_{user_id}") except Exception: pass def _restore_account_status(self, user_id: int, account: Any, prev_status: Optional[str]) -> None: if not account or not user_id: return if getattr(account, "is_running", False): return current_status = getattr(account, "status", "") # 只处理上传相关的状态 if current_status not in ("上传截图", "等待上传"): return # 上传完成后恢复为未开始,而不是恢复到之前的等待上传状态 account.status = "未开始" self._emit_account_update(user_id, account) def _select_sheet(self, sheet_name: str, sheet_index: int) -> None: if sheet_name: candidates = [ self._page.locator("[role='tab']").filter(has_text=sheet_name), self._page.locator(".sheet-tab").filter(has_text=sheet_name), self._page.locator(".sheet-tab-name").filter(has_text=sheet_name), ] for locator in candidates: try: if locator.count() < 1: continue locator.first.click() time.sleep(0.5) return except Exception: continue if sheet_index > 0: idx = sheet_index - 1 candidates = [ self._page.locator("[role='tab']"), self._page.locator(".sheet-tab"), self._page.locator(".sheet-tab-name"), ] for locator in candidates: try: if locator.count() <= idx: continue locator.nth(idx).click() time.sleep(0.5) return except Exception: continue def _get_current_cell_address(self) -> str: """获取当前选中的单元格地址(如 A1, C66 等)""" import re # 等待一小段时间让名称框稳定 time.sleep(0.1) for attempt in range(3): try: name_box = self._page.locator("input.edit-box").first value = name_box.input_value() # 验证是否为有效的单元格地址格式(如 A1, C66, AA100 等) if value and re.match(r"^[A-Z]+\d+$", value.upper()): return value.upper() except Exception: pass try: name_box = self._page.locator('#root input[type="text"]').first value = name_box.input_value() if value and re.match(r"^[A-Z]+\d+$", value.upper()): return value.upper() except Exception: pass # 等待一下再重试 time.sleep(0.2) # 如果无法获取有效地址,返回空字符串 logger.warning("[KDocs调试] 无法获取有效的单元格地址") return "" def _navigate_to_cell(self, cell_address: str) -> None: try: name_box = self._page.locator("input.edit-box").first name_box.click() name_box.fill(cell_address) name_box.press("Enter") except Exception: name_box = self._page.locator('#root input[type="text"]').first name_box.click() name_box.fill(cell_address) name_box.press("Enter") time.sleep(0.3) def _focus_grid(self) -> None: try: info = self._page.evaluate( """() => { const canvases = Array.from(document.querySelectorAll("canvas")); let best = null; for (const c of canvases) { const rect = c.getBoundingClientRect(); if (!rect.width || !rect.height) continue; if (rect.width < 200 || rect.height < 200) continue; const area = rect.width * rect.height; if (!best || area > best.area) { best = {x: rect.left + rect.width / 2, y: rect.top + rect.height / 2, area}; } } return best; }""" ) if info and info.get("x") and info.get("y"): self._page.mouse.click(info["x"], info["y"]) time.sleep(0.1) except Exception: pass def _read_clipboard_text(self) -> str: try: return self._page.evaluate("() => navigator.clipboard.readText()") or "" except Exception: return "" def _get_cell_value(self, cell_address: str) -> str: self._navigate_to_cell(cell_address) time.sleep(0.3) try: self._page.evaluate("() => navigator.clipboard.writeText('')") except Exception: pass self._focus_grid() # 尝试方法1: 读取金山文档编辑栏/公式栏的内容 try: # 金山文档的编辑栏选择器(可能需要调整) formula_bar_selectors = [ ".formula-bar-input", ".cell-editor-input", "[class*='formulaBar'] input", "[class*='formula'] textarea", ".formula-editor", "#formulaInput", ] for selector in formula_bar_selectors: try: el = self._page.query_selector(selector) if el: value = el.input_value() if hasattr(el, "input_value") else el.inner_text() if value and not value.startswith("=DISPIMG"): logger.info(f"[KDocs调试] 从编辑栏读取到: '{value[:50]}...' (selector={selector})") return value.strip() except Exception: pass except Exception: pass # 尝试方法2: F2进入编辑模式,全选复制 try: self._page.keyboard.press("F2") time.sleep(0.2) self._page.keyboard.press("Control+a") time.sleep(0.1) self._page.keyboard.press("Control+c") time.sleep(0.2) self._page.keyboard.press("Escape") time.sleep(0.1) value = self._read_clipboard_text() if value and not value.startswith("=DISPIMG"): return value.strip() except Exception: pass # 尝试方法3: 直接复制单元格(备选) try: self._page.keyboard.press("Control+c") time.sleep(0.2) value = self._read_clipboard_text() if value: return value.strip() except Exception: pass return "" def _normalize_text(self, value: str) -> str: if value is None: return "" cleaned = str(value) cleaned = cleaned.replace("\u00a0", "").replace("\u3000", "") cleaned = re.sub(r"\s+", "", cleaned) return cleaned.strip() def _unit_matches(self, cell_value: str, expected_unit: str) -> bool: if not cell_value or not expected_unit: return False norm_cell = self._normalize_text(cell_value) norm_expected = self._normalize_text(expected_unit) if not norm_cell or not norm_expected: return False if norm_cell == norm_expected: return True if norm_expected in norm_cell or norm_cell in norm_expected: return True return False def _should_mark_upload(self, account: Any) -> bool: if not account: return False status_text = str(getattr(account, "status", "") or "") if status_text: if "运行" in status_text or "排队" in status_text: return False return True def _search_person(self, name: str) -> None: self._focus_grid() self._page.keyboard.press("Control+f") time.sleep(0.3) search_input = None selectors = [ "input[placeholder*='查找']", "input[placeholder*='搜索']", "input[aria-label*='查找']", "input[aria-label*='搜索']", "input[type='search']", ] try: search_input = self._page.get_by_role("textbox").nth(3) if search_input.is_visible(timeout=800): search_input.fill(name) except Exception: search_input = None if not search_input: for selector in selectors: try: candidate = self._page.locator(selector).first if candidate.is_visible(timeout=800): search_input = candidate search_input.fill(name) break except Exception: continue if not search_input: try: self._page.keyboard.type(name) except Exception: pass time.sleep(0.2) try: find_btn = self._page.get_by_role("button", name="查找").nth(2) find_btn.click() except Exception: try: self._page.get_by_role("button", name="查找").first.click() except Exception: try: self._page.keyboard.press("Enter") except Exception: pass time.sleep(0.3) def _find_next(self) -> None: try: find_btn = self._page.get_by_role("button", name="查找").nth(2) find_btn.click() except Exception: try: self._page.get_by_role("button", name="查找").first.click() except Exception: try: self._page.keyboard.press("Enter") except Exception: pass time.sleep(0.3) def _close_search(self) -> None: self._page.keyboard.press("Escape") time.sleep(0.2) def _extract_row_number(self, cell_address: str) -> int: import re match = re.search(r"(\d+)$", cell_address) if match: return int(match.group(1)) return -1 def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool: """验证县区 - 从目标行开始搜索县区""" logger.info(f"[KDocs调试] 验证县区: 期望行={row_num}, 期望值='{unit}'") # 方法: 先导航到目标行的A列,然后从那里搜索县区 try: # 1. 先导航到目标行的 A 列 start_cell = f"{unit_col}{row_num}" self._navigate_to_cell(start_cell) time.sleep(0.3) logger.info(f"[KDocs调试] 已导航到 {start_cell}") # 2. 从当前位置搜索县区 self._page.keyboard.press("Control+f") time.sleep(0.3) # 找到搜索框并输入 try: search_input = self._page.locator( "input[placeholder*='查找'], input[placeholder*='搜索'], input[type='text']" ).first search_input.fill(unit) time.sleep(0.2) self._page.keyboard.press("Enter") time.sleep(0.5) except Exception as e: logger.warning(f"[KDocs调试] 填写搜索框失败: {e}") self._page.keyboard.press("Escape") return False # 3. 关闭搜索框,检查当前位置 self._page.keyboard.press("Escape") time.sleep(0.3) current_address = self._get_current_cell_address() found_row = self._extract_row_number(current_address) logger.info(f"[KDocs调试] 搜索'{unit}'后: 当前单元格={current_address}, 行号={found_row}") # 4. 检查是否在同一行(允许在目标行或之后的几行内,因为搜索可能从当前位置向下) if found_row == row_num: logger.info(f"[KDocs调试] [OK] 验证成功! 县区'{unit}'在第{row_num}行") return True else: logger.info(f"[KDocs调试] 验证失败: 期望行{row_num}, 实际找到行{found_row}") return False except Exception as e: logger.warning(f"[KDocs调试] 验证异常: {e}") return False def _debug_dump_page_elements(self) -> None: """调试: 输出页面上可能包含单元格值的元素""" logger.info("[KDocs调试] ========== 页面元素分析 ==========") try: # 查找可能的编辑栏元素 selectors_to_check = [ "input", "textarea", "[class*='formula']", "[class*='Formula']", "[class*='editor']", "[class*='Editor']", "[class*='cell']", "[class*='Cell']", "[class*='input']", "[class*='Input']", ] for selector in selectors_to_check: try: elements = self._page.query_selector_all(selector) for i, el in enumerate(elements[:3]): # 只看前3个 try: class_name = el.get_attribute("class") or "" value = "" try: value = el.input_value() except: try: value = el.inner_text() except: pass if value: logger.info( f"[KDocs调试] 元素 {selector}[{i}] class='{class_name[:50]}' value='{value[:30]}'" ) except: pass except: pass except Exception as e: logger.warning(f"[KDocs调试] 页面元素分析失败: {e}") logger.info("[KDocs调试] ====================================") def _debug_dump_table_structure(self, target_row: int = 66) -> None: """调试: 输出表格结构""" self._debug_dump_page_elements() # 先分析页面元素 logger.info("[KDocs调试] ========== 表格结构分析 ==========") cols = ["A", "B", "C", "D", "E"] for row in [1, 2, 3, target_row]: row_data = [] for col in cols: val = self._get_cell_value(f"{col}{row}") # 截断太长的值 if len(val) > 30: val = val[:30] + "..." row_data.append(f"{col}{row}='{val}'") logger.info(f"[KDocs调试] 第{row}行: {' | '.join(row_data)}") logger.info("[KDocs调试] ====================================") def _find_person_with_unit( self, unit: str, name: str, unit_col: str, max_attempts: int = 50, row_start: int = 0, row_end: int = 0 ) -> int: """ 查找人员所在行号。 策略:只搜索姓名,找到姓名列(C列)的匹配项 注意:组合搜索会匹配到图片列的错误位置,已放弃该方案 :param row_start: 有效行范围起始(0表示不限制) :param row_end: 有效行范围结束(0表示不限制) """ logger.info(f"[KDocs调试] 开始搜索人员: name='{name}', unit='{unit}'") if row_start > 0 or row_end > 0: logger.info(f"[KDocs调试] 有效行范围: {row_start}-{row_end}") # 添加人员位置缓存 cache_key = f"{name}_{unit}_{unit_col}" if hasattr(self, "_person_cache") and cache_key in self._person_cache: cached_row = self._person_cache[cache_key] logger.info(f"[KDocs调试] 使用缓存找到人员: name='{name}', row={cached_row}") return cached_row # 只搜索姓名 - 这是目前唯一可靠的方式 logger.info(f"[KDocs调试] 搜索姓名: '{name}'") # 首先尝试二分搜索优化 binary_result = self._binary_search_person(name, unit_col, row_start, row_end) if binary_result > 0: logger.info(f"[KDocs调试] [OK] 二分搜索成功! 找到行号={binary_result}") # 缓存结果 if not hasattr(self, "_person_cache"): self._person_cache = {} self._person_cache[cache_key] = binary_result return binary_result # 如果二分搜索失败,回退到线性搜索 row_num = self._search_and_get_row( name, max_attempts=max_attempts, expected_col="C", row_start=row_start, row_end=row_end ) if row_num > 0: logger.info(f"[KDocs调试] [OK] 线性搜索成功! 找到行号={row_num}") # 缓存结果 if not hasattr(self, "_person_cache"): self._person_cache = {} self._person_cache[cache_key] = row_num return row_num logger.warning(f"[KDocs调试] 搜索失败,未找到人员 '{name}'") return -1 def _binary_search_person(self, name: str, unit_col: str, row_start: int = 0, row_end: int = 0) -> int: """ 二分搜索人员位置 - 基于姓名的快速搜索 """ if row_start <= 0: row_start = 1 # 从第1行开始 if row_end <= 0: row_end = 1000 # 默认搜索范围,最多1000行 logger.info(f"[KDocs调试] 使用二分搜索: name='{name}', rows={row_start}-{row_end}") left, right = row_start, row_end while left <= right: mid = (left + right) // 2 try: # 获取中间行的姓名 cell_value = self._get_cell_value_fast(f"C{mid}") if not cell_value: # 如果单元格为空,向下搜索 left = mid + 1 continue # 比较姓名 if self._name_matches(cell_value, name): logger.info(f"[KDocs调试] 二分搜索找到匹配: row={mid}, name='{cell_value}'") return mid elif self._name_less_than(cell_value, name): left = mid + 1 else: right = mid - 1 except Exception as e: logger.warning(f"[KDocs调试] 二分搜索读取行{mid}失败: {e}") # 跳过这一行,继续搜索 left = mid + 1 continue logger.info(f"[KDocs调试] 二分搜索未找到匹配人员: '{name}'") return -1 def _name_matches(self, cell_value: str, target_name: str) -> bool: """检查单元格中的姓名是否匹配目标姓名""" if not cell_value or not target_name: return False cell_name = str(cell_value).strip() target = str(target_name).strip() # 精确匹配 if cell_name == target: return True # 部分匹配(包含关系) return target in cell_name or cell_name in target def _name_less_than(self, cell_value: str, target_name: str) -> bool: """判断单元格姓名是否小于目标姓名(用于排序)""" if not cell_value or not target_name: return False try: cell_name = str(cell_value).strip() target = str(target_name).strip() return cell_name < target except: return False def _get_cell_value_fast(self, cell_address: str) -> Optional[str]: """快速获取单元格值,减少延迟""" try: # 直接获取单元格值,不等待 cell = self._page.locator(f"[data-cell='{cell_address}']").first if cell.is_visible(): return cell.inner_text().strip() return None except Exception: return None def _search_and_get_row( self, search_text: str, max_attempts: int = 10, expected_col: str = None, row_start: int = 0, row_end: int = 0 ) -> int: """ 执行搜索并获取找到的行号 :param search_text: 要搜索的文本 :param max_attempts: 最大尝试次数 :param expected_col: 期望的列(如 'C'),如果指定则只接受该列的结果 :param row_start: 有效行范围起始(0表示不限制) :param row_end: 有效行范围结束(0表示不限制) """ self._focus_grid() self._search_person(search_text) found_positions = set() # 记录已找到的位置(列+行) for attempt in range(max_attempts): self._close_search() time.sleep(0.3) # 等待名称框更新 current_address = self._get_current_cell_address() if not current_address: logger.warning(f"[KDocs调试] 第{attempt + 1}次: 无法获取单元格地址") # 继续尝试下一个 self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() continue row_num = self._extract_row_number(current_address) # 提取列字母(A, B, C, D 等) col_letter = "".join(c for c in current_address if c.isalpha()).upper() logger.info( f"[KDocs调试] 第{attempt + 1}次搜索'{search_text}': 单元格={current_address}, 列={col_letter}, 行号={row_num}" ) if row_num <= 0: logger.warning(f"[KDocs调试] 无法提取行号,搜索可能没有结果") return -1 # 检查是否已经访问过这个位置 position_key = f"{col_letter}{row_num}" if position_key in found_positions: logger.info(f"[KDocs调试] 位置{position_key}已搜索过,循环结束") # 检查是否有任何有效结果 valid_results = [ pos for pos in found_positions if (not expected_col or pos.startswith(expected_col)) and self._extract_row_number(pos) > 2 ] if valid_results: # 返回第一个有效结果的行号 return self._extract_row_number(valid_results[0]) return -1 found_positions.add(position_key) # 跳过标题行和表头行(通常是第1-2行) if row_num <= 2: logger.info(f"[KDocs调试] 跳过标题/表头行: {row_num}") self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() continue # 如果指定了期望的列,检查是否匹配 if expected_col and col_letter != expected_col.upper(): logger.info(f"[KDocs调试] 列不匹配: 期望={expected_col}, 实际={col_letter},继续搜索下一个") self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() continue # 检查行号是否在有效范围内 if row_start > 0 and row_num < row_start: logger.info(f"[KDocs调试] 行号{row_num}小于起始行{row_start},继续搜索下一个") self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() continue if row_end > 0 and row_num > row_end: logger.info(f"[KDocs调试] 行号{row_num}大于结束行{row_end},继续搜索下一个") self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() continue # 找到有效的数据行,列匹配且在行范围内 logger.info(f"[KDocs调试] [OK] 找到有效位置: {current_address} (在有效范围内)") return row_num self._close_search() logger.warning(f"[KDocs调试] 达到最大尝试次数{max_attempts},未找到有效结果") return -1 def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool: cell_address = f"{image_col}{row_num}" self._navigate_to_cell(cell_address) time.sleep(0.3) # 清除单元格现有内容 try: # 1. 导航到单元格(名称框输入地址+Enter,会跳转并可能进入编辑模式) self._navigate_to_cell(cell_address) time.sleep(0.3) # 2. 按 Escape 退出可能的编辑模式,回到选中状态 self._page.keyboard.press("Escape") time.sleep(0.3) # 3. 按 Delete 删除选中单元格的内容 self._page.keyboard.press("Delete") time.sleep(0.5) logger.info(f"[KDocs] 已删除 {cell_address} 的内容") except Exception as e: logger.warning(f"[KDocs] 清除单元格内容时出错: {e}") logger.info(f"[KDocs] 准备上传图片到 {cell_address},已清除旧内容") try: insert_btn = self._page.get_by_role("button", name="插入") insert_btn.click() time.sleep(0.3) except Exception as e: raise RuntimeError(f"打开插入菜单失败: {e}") try: image_btn = self._page.get_by_role("button", name="图片") image_btn.click() time.sleep(0.3) cell_image_option = self._page.get_by_role("option", name="单元格图片") cell_image_option.click() time.sleep(0.2) except Exception as e: raise RuntimeError(f"选择单元格图片失败: {e}") try: local_option = self._page.get_by_role("option", name="本地") with self._page.expect_file_chooser() as fc_info: local_option.click() file_chooser = fc_info.value file_chooser.set_files(image_path) except Exception as e: raise RuntimeError(f"上传文件失败: {e}") time.sleep(2) return True _kdocs_uploader: Optional[KDocsUploader] = None def get_kdocs_uploader() -> KDocsUploader: global _kdocs_uploader if _kdocs_uploader is None: _kdocs_uploader = KDocsUploader() _kdocs_uploader.start() return _kdocs_uploader