#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import base64 import os import queue import threading import time from typing import Any, Dict, Optional import database import email_service from app_config import get_config from services.client_log import log_to_client from services.runtime import get_logger try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级 sync_playwright = None class PlaywrightTimeoutError(Exception): pass logger = get_logger() config = get_config() class KDocsUploader: def __init__(self) -> None: self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200"))) self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True) self._running = False self._last_error: Optional[str] = None self._last_success_at: Optional[float] = None self._login_required = False self._playwright = None self._browser = None self._context = None self._page = None self._last_qr_image: Optional[bytes] = None self._last_login_check: float = 0.0 self._last_login_ok: Optional[bool] = None def start(self) -> None: if self._running: return self._running = True self._thread.start() def stop(self) -> None: if not self._running: return self._running = False self._queue.put({"action": "shutdown"}) def get_status(self) -> Dict[str, Any]: return { "queue_size": self._queue.qsize(), "login_required": self._login_required, "last_error": self._last_error, "last_success_at": self._last_success_at, "last_login_ok": self._last_login_ok, } def enqueue_upload( self, *, user_id: int, account_id: str, unit: str, name: str, image_path: str, ) -> bool: if not self._running: self.start() payload = { "user_id": int(user_id), "account_id": str(account_id), "unit": unit, "name": name, "image_path": image_path, } try: self._queue.put({"action": "upload", "payload": payload}, timeout=1) return True except queue.Full: self._last_error = "上传队列已满" return False def request_qr(self, timeout: int = 30, *, force: bool = False) -> Dict[str, Any]: return self._submit_command("qr", timeout=timeout, payload={"force": force}) def clear_login(self, timeout: int = 20) -> Dict[str, Any]: return self._submit_command("clear_login", timeout=timeout) def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]: return self._submit_command("status", timeout=timeout) def _submit_command( self, action: str, timeout: int = 30, payload: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: if not self._running: self.start() resp_queue: queue.Queue = queue.Queue(maxsize=1) self._queue.put({"action": action, "response": resp_queue, "payload": payload or {}}) try: return resp_queue.get(timeout=timeout) except queue.Empty: return {"success": False, "error": "操作超时"} def _run(self) -> None: while True: task = self._queue.get() if not task: continue action = task.get("action") if action == "shutdown": break try: if action == "upload": self._handle_upload(task.get("payload") or {}) elif action == "qr": result = self._handle_qr(task.get("payload") or {}) task.get("response").put(result) elif action == "clear_login": result = self._handle_clear_login() task.get("response").put(result) elif action == "status": result = self._handle_status_check() task.get("response").put(result) except Exception as e: logger.warning(f"[KDocs] 处理任务失败: {e}") self._cleanup_browser() def _load_system_config(self) -> Dict[str, Any]: return database.get_system_config() or {} def _ensure_playwright(self, *, use_storage_state: bool = True) -> bool: if sync_playwright is None: self._last_error = "playwright 未安装" return False try: if self._playwright is None: self._playwright = sync_playwright().start() if self._browser is None: headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false" self._browser = self._playwright.chromium.launch(headless=headless) if self._context is None: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") if use_storage_state and os.path.exists(storage_state): self._context = self._browser.new_context(storage_state=storage_state) else: self._context = self._browser.new_context() if self._page is None or self._page.is_closed(): self._page = self._context.new_page() self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000))) return True except Exception as e: self._last_error = f"浏览器启动失败: {e}" self._cleanup_browser() return False def _cleanup_browser(self) -> None: try: if self._page: self._page.close() except Exception: pass self._page = None try: if self._context: self._context.close() except Exception: pass self._context = None try: if self._browser: self._browser.close() except Exception: pass self._browser = None try: if self._playwright: self._playwright.stop() except Exception: pass self._playwright = None def _open_document(self, doc_url: str) -> bool: try: self._page.goto(doc_url) time.sleep(1) return True except Exception as e: self._last_error = f"打开文档失败: {e}" return False def _is_logged_in(self) -> bool: try: login_btn = self._page.get_by_role("button", name="登录并加入编辑") if login_btn.is_visible(timeout=1500): return False except Exception: return False try: login_btn = self._page.get_by_role("button", name="立即登录") if login_btn.is_visible(timeout=1200): return False except Exception: pass try: login_btn = self._page.get_by_role("button", name="登录") if login_btn.is_visible(timeout=1200): return False except Exception: pass try: login_link = self._page.get_by_role("link", name="登录") if login_link.is_visible(timeout=1200): return False except Exception: pass try: wechat_btn = self._page.get_by_role("button", name="微信登录") if wechat_btn.is_visible(timeout=1200): return False except Exception: pass return True def _has_saved_login_state(self) -> bool: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") return os.path.exists(storage_state) def _ensure_login_dialog(self) -> None: login_names = ["登录并加入编辑", "立即登录", "登录"] wechat_names = ["微信登录", "微信扫码登录", "微信扫码", "扫码登录"] pages = self._iter_pages() clicked = False for page in pages: for name in login_names: if self._try_click_role(page, "button", name): clicked = True break if clicked: continue for name in login_names: if self._try_click_role(page, "link", name): clicked = True break if clicked: time.sleep(1.5) pages = self._iter_pages() for page in pages: if self._try_click_names(page, wechat_names): return def _capture_qr_image(self) -> Optional[bytes]: pages = self._iter_pages() for page in pages: for frame in page.frames: target = self._find_qr_element_in_frame(frame) if not target: continue try: return target.screenshot() except Exception: continue for page in pages: dialog_image = self._capture_dialog_image(page) if dialog_image: return dialog_image return None def _iter_pages(self) -> list: pages = [] if self._context: pages.extend(self._context.pages) if self._page and self._page not in pages: pages.insert(0, self._page) def rank(p) -> int: url = (getattr(p, "url", "") or "").lower() keywords = ("login", "account", "passport", "wechat", "qr") return 0 if any(k in url for k in keywords) else 1 pages.sort(key=rank) return pages def _find_qr_element_in_frame(self, frame) -> Optional[Any]: selectors = [ "canvas", "img[alt*='二维码']", "img[src*='qr']", "img[src*='qrcode']", "img[class*='qr']", "canvas[class*='qr']", "svg", "div[role='img']", "div[class*='qr']", "div[id*='qr']", "div[class*='qrcode']", "div[id*='qrcode']", "div[class*='wechat']", "div[class*='weixin']", "div[class*='wx']", "img[src*='wx']", ] best = None best_score = None for selector in selectors: try: locator = frame.locator(selector) count = min(locator.count(), 20) except Exception: continue for i in range(count): el = locator.nth(i) try: if not el.is_visible(timeout=800): continue box = el.bounding_box() if not box: continue width = box.get("width", 0) height = box.get("height", 0) if width < 80 or height < 80 or width > 520 or height > 520: continue aspect_diff = abs(width - height) if aspect_diff > 80: continue score = aspect_diff + abs(width - 260) + abs(height - 260) if best_score is None or score < best_score: best_score = score best = el except Exception: continue if best: return best handle = None try: handle = frame.evaluate_handle( """() => { const patterns = [/qr/i, /qrcode/i, /weixin/i, /wechat/i, /wx/i, /data:image/i]; const elements = Array.from(document.querySelectorAll('*')); for (const el of elements) { const style = window.getComputedStyle(el); const bg = style.backgroundImage || ''; if (!bg || bg === 'none') continue; if (!patterns.some((re) => re.test(bg))) continue; const rect = el.getBoundingClientRect(); if (!rect.width || !rect.height) continue; if (rect.width < 80 || rect.height < 80 || rect.width > 520 || rect.height > 520) continue; const diff = Math.abs(rect.width - rect.height); if (diff > 80) continue; return el; } return null; }""" ) element = handle.as_element() if handle else None if element: return element except Exception: pass finally: try: if handle: handle.dispose() except Exception: pass return best def _try_click_role(self, page, role: str, name: str, timeout: int = 1500) -> bool: try: el = page.get_by_role(role, name=name) if el.is_visible(timeout=timeout): el.click() time.sleep(1) return True except Exception: return False return False def _try_click_names(self, page, names: list) -> bool: for name in names: if self._try_click_role(page, "button", name, timeout=1200): return True if self._try_click_role(page, "link", name, timeout=1200): return True try: for frame in page.frames: for name in names: try: el = frame.get_by_role("button", name=name) if el.is_visible(timeout=800): el.click() time.sleep(1) return True except Exception: pass try: el = frame.get_by_text(name, exact=True) if el.is_visible(timeout=800): el.click() time.sleep(1) return True except Exception: pass except Exception: return False return False def _capture_dialog_image(self, page) -> Optional[bytes]: selectors = "[role='dialog'], .dialog, .modal, .popup" try: dialogs = page.locator(selectors) count = min(dialogs.count(), 6) except Exception: return None best = None best_area = 0 for i in range(count): el = dialogs.nth(i) try: if not el.is_visible(timeout=800): continue box = el.bounding_box() if not box: continue area = box.get("width", 0) * box.get("height", 0) if area > best_area: best_area = area best = el except Exception: continue if not best: return None try: return best.screenshot() except Exception: return None def _save_login_state(self) -> None: try: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") os.makedirs(os.path.dirname(storage_state), exist_ok=True) self._context.storage_state(path=storage_state) except Exception as e: logger.warning(f"[KDocs] 保存登录态失败: {e}") def _handle_qr(self, payload: Dict[str, Any]) -> Dict[str, Any]: cfg = self._load_system_config() doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return {"success": False, "error": "未配置金山文档链接"} force = bool(payload.get("force")) if force: self._handle_clear_login() if not self._ensure_playwright(use_storage_state=not force): return {"success": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url): return {"success": False, "error": self._last_error or "打开文档失败"} if not force and self._has_saved_login_state() and self._is_logged_in(): self._login_required = False self._last_login_ok = True self._save_login_state() return {"success": True, "logged_in": True, "qr_image": ""} self._ensure_login_dialog() qr_image = None for _ in range(10): self._ensure_login_dialog() qr_image = self._capture_qr_image() if qr_image: break time.sleep(1) if not qr_image: self._last_error = "二维码获取失败" try: pages = self._iter_pages() page_urls = [getattr(p, "url", "") for p in pages] logger.warning(f"[KDocs] 二维码未捕获,页面: {page_urls}") ts = int(time.time()) saved = [] for idx, page in enumerate(pages[:3]): try: path = f"data/kdocs_debug_{ts}_{idx}.png" page.screenshot(path=path, full_page=True) saved.append(path) except Exception: continue if saved: logger.warning(f"[KDocs] 已保存调试截图: {saved}") except Exception: pass return {"success": False, "error": self._last_error} self._last_qr_image = qr_image self._login_required = True return { "success": True, "logged_in": False, "qr_image": base64.b64encode(qr_image).decode("ascii"), } def _handle_clear_login(self) -> Dict[str, Any]: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") try: if os.path.exists(storage_state): os.remove(storage_state) except Exception as e: return {"success": False, "error": f"清除登录态失败: {e}"} self._login_required = False self._last_login_ok = None self._cleanup_browser() return {"success": True} def _handle_status_check(self) -> Dict[str, Any]: cfg = self._load_system_config() doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return {"success": True, "logged_in": False, "error": "未配置文档链接"} if not self._ensure_playwright(): return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url): return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"} logged_in = self._is_logged_in() if not self._has_saved_login_state(): logged_in = False self._last_login_ok = logged_in self._login_required = not logged_in if logged_in: self._save_login_state() return {"success": True, "logged_in": logged_in} def _handle_upload(self, payload: Dict[str, Any]) -> None: cfg = self._load_system_config() if int(cfg.get("kdocs_enabled", 0) or 0) != 1: return doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return unit = (payload.get("unit") or "").strip() name = (payload.get("name") or "").strip() image_path = payload.get("image_path") user_id = payload.get("user_id") account_id = payload.get("account_id") if not unit or not name: return if not image_path or not os.path.exists(image_path): return if not self._ensure_playwright(): self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用") return if not self._open_document(doc_url): self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败") return if not self._is_logged_in(): self._login_required = True self._last_login_ok = False self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录") return self._login_required = False self._last_login_ok = True sheet_name = (cfg.get("kdocs_sheet_name") or "").strip() sheet_index = int(cfg.get("kdocs_sheet_index") or 0) unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper() image_col = (cfg.get("kdocs_image_column") or "D").strip().upper() success = False error_msg = "" for attempt in range(2): try: if sheet_name or sheet_index: self._select_sheet(sheet_name, sheet_index) row_num = self._find_person_with_unit(unit, name, unit_col) if row_num < 0: error_msg = f"未找到人员: {unit}-{name}" break success = self._upload_image_to_cell(row_num, image_path, image_col) if success: break except Exception as e: error_msg = str(e) if success: self._last_success_at = time.time() self._last_error = None log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id) return if not error_msg: error_msg = "上传失败" self._last_error = error_msg self._notify_admin(unit, name, image_path, error_msg) log_to_client(f"表格上传失败: {error_msg}", user_id, account_id) def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None: cfg = self._load_system_config() if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1: return to_email = (cfg.get("kdocs_admin_notify_email") or "").strip() if not to_email: return settings = email_service.get_email_settings() if not settings.get("enabled", False): return subject = "金山文档上传失败提醒" body = ( f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n" "请检查登录状态或表格配置。" ) try: email_service.send_email_async( to_email=to_email, subject=subject, body=body, email_type="kdocs_upload_failed", ) except Exception as e: logger.warning(f"[KDocs] 发送管理员邮件失败: {e}") def _select_sheet(self, sheet_name: str, sheet_index: int) -> None: if sheet_name: candidates = [ self._page.locator("[role='tab']").filter(has_text=sheet_name), self._page.locator(".sheet-tab").filter(has_text=sheet_name), self._page.locator(".sheet-tab-name").filter(has_text=sheet_name), ] for locator in candidates: try: if locator.count() < 1: continue locator.first.click() time.sleep(0.5) return except Exception: continue if sheet_index > 0: idx = sheet_index - 1 candidates = [ self._page.locator("[role='tab']"), self._page.locator(".sheet-tab"), self._page.locator(".sheet-tab-name"), ] for locator in candidates: try: if locator.count() <= idx: continue locator.nth(idx).click() time.sleep(0.5) return except Exception: continue def _get_current_cell_address(self) -> str: name_box = self._page.locator('#root input[type="text"]').first return name_box.input_value() def _navigate_to_cell(self, cell_address: str) -> None: name_box = self._page.locator('#root input[type="text"]').first name_box.click() name_box.fill(cell_address) name_box.press("Enter") time.sleep(0.3) def _get_cell_value(self, cell_address: str) -> str: self._navigate_to_cell(cell_address) time.sleep(0.3) try: formula_bar = self._page.locator('input[type="text"]').nth(1) value = formula_bar.input_value() if value: return value.strip() except Exception: pass try: formula_inputs = self._page.locator('textbox') for i in range(formula_inputs.count()): try: value = formula_inputs.nth(i).input_value() if value: import re if not re.match(r"^[A-Z]+\d+$", value.strip()): return value.strip() except Exception: continue except Exception: pass return "" def _search_person(self, name: str) -> None: self._page.keyboard.press("Control+f") time.sleep(0.3) search_input = None try: search_input = self._page.get_by_role("textbox").nth(3) search_input.fill(name) except Exception: try: search_input = self._page.locator("input[placeholder*='查找']").first search_input.fill(name) except Exception: pass time.sleep(0.2) try: find_btn = self._page.get_by_role("button", name="查找").nth(2) find_btn.click() except Exception: try: self._page.get_by_role("button", name="查找").first.click() except Exception: pass time.sleep(0.3) def _find_next(self) -> None: try: find_btn = self._page.get_by_role("button", name="查找").nth(2) find_btn.click() except Exception: try: self._page.get_by_role("button", name="查找").first.click() except Exception: pass time.sleep(0.3) def _close_search(self) -> None: self._page.keyboard.press("Escape") time.sleep(0.2) def _extract_row_number(self, cell_address: str) -> int: import re match = re.search(r"(\\d+)$", cell_address) if match: return int(match.group(1)) return -1 def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool: cell_address = f"{unit_col}{row_num}" cell_value = self._get_cell_value(cell_address) if cell_value: return cell_value == unit return False def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50) -> int: self._search_person(name) found_rows = set() for _ in range(max_attempts): self._close_search() current_address = self._get_current_cell_address() row_num = self._extract_row_number(current_address) if row_num == -1: return -1 if row_num in found_rows: return -1 found_rows.add(row_num) if self._verify_unit_by_navigation(row_num, unit, unit_col): return row_num self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() return -1 def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool: cell_address = f"{image_col}{row_num}" self._navigate_to_cell(cell_address) time.sleep(0.3) try: self._page.keyboard.press("Delete") time.sleep(0.1) self._page.keyboard.press("Backspace") except Exception: pass try: insert_btn = self._page.get_by_role("button", name="插入") insert_btn.click() time.sleep(0.3) except Exception as e: raise RuntimeError(f"打开插入菜单失败: {e}") try: image_btn = self._page.get_by_role("button", name="图片") image_btn.click() time.sleep(0.3) cell_image_option = self._page.get_by_role("option", name="单元格图片") cell_image_option.click() time.sleep(0.2) except Exception as e: raise RuntimeError(f"选择单元格图片失败: {e}") try: local_option = self._page.get_by_role("option", name="本地") with self._page.expect_file_chooser() as fc_info: local_option.click() file_chooser = fc_info.value file_chooser.set_files(image_path) except Exception as e: raise RuntimeError(f"上传文件失败: {e}") time.sleep(2) return True _kdocs_uploader: Optional[KDocsUploader] = None def get_kdocs_uploader() -> KDocsUploader: global _kdocs_uploader if _kdocs_uploader is None: _kdocs_uploader = KDocsUploader() _kdocs_uploader.start() return _kdocs_uploader