#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import base64 import os import queue import threading import time from typing import Any, Dict, Optional import database import email_service from app_config import get_config from services.client_log import log_to_client from services.runtime import get_logger try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级 sync_playwright = None class PlaywrightTimeoutError(Exception): pass logger = get_logger("app") config = get_config() class KDocsUploader: def __init__(self) -> None: self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200"))) self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True) self._running = False self._last_error: Optional[str] = None self._last_success_at: Optional[float] = None self._login_required = False self._playwright = None self._browser = None self._context = None self._page = None self._last_qr_image: Optional[bytes] = None self._last_login_check: float = 0.0 self._last_login_ok: Optional[bool] = None def start(self) -> None: if self._running: return self._running = True self._thread.start() def stop(self) -> None: if not self._running: return self._running = False self._queue.put({"action": "shutdown"}) def get_status(self) -> Dict[str, Any]: return { "queue_size": self._queue.qsize(), "login_required": self._login_required, "last_error": self._last_error, "last_success_at": self._last_success_at, "last_login_ok": self._last_login_ok, } def enqueue_upload( self, *, user_id: int, account_id: str, unit: str, name: str, image_path: str, ) -> bool: if not self._running: self.start() payload = { "user_id": int(user_id), "account_id": str(account_id), "unit": unit, "name": name, "image_path": image_path, } try: self._queue.put({"action": "upload", "payload": payload}, timeout=1) return True except queue.Full: self._last_error = "上传队列已满" return False def request_qr(self, timeout: int = 30) -> Dict[str, Any]: return self._submit_command("qr", timeout=timeout) def clear_login(self, timeout: int = 20) -> Dict[str, Any]: return self._submit_command("clear_login", timeout=timeout) def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]: return self._submit_command("status", timeout=timeout) def _submit_command(self, action: str, timeout: int = 30) -> Dict[str, Any]: if not self._running: self.start() resp_queue: queue.Queue = queue.Queue(maxsize=1) self._queue.put({"action": action, "response": resp_queue}) try: return resp_queue.get(timeout=timeout) except queue.Empty: return {"success": False, "error": "操作超时"} def _run(self) -> None: while True: task = self._queue.get() if not task: continue action = task.get("action") if action == "shutdown": break try: if action == "upload": self._handle_upload(task.get("payload") or {}) elif action == "qr": result = self._handle_qr() task.get("response").put(result) elif action == "clear_login": result = self._handle_clear_login() task.get("response").put(result) elif action == "status": result = self._handle_status_check() task.get("response").put(result) except Exception as e: logger.warning(f"[KDocs] 处理任务失败: {e}") self._cleanup_browser() def _load_system_config(self) -> Dict[str, Any]: return database.get_system_config() or {} def _ensure_playwright(self) -> bool: if sync_playwright is None: self._last_error = "playwright 未安装" return False try: if self._playwright is None: self._playwright = sync_playwright().start() if self._browser is None: headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false" self._browser = self._playwright.chromium.launch(headless=headless) if self._context is None: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") if os.path.exists(storage_state): self._context = self._browser.new_context(storage_state=storage_state) else: self._context = self._browser.new_context() if self._page is None or self._page.is_closed(): self._page = self._context.new_page() self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000))) return True except Exception as e: self._last_error = f"浏览器启动失败: {e}" self._cleanup_browser() return False def _cleanup_browser(self) -> None: try: if self._page: self._page.close() except Exception: pass self._page = None try: if self._context: self._context.close() except Exception: pass self._context = None try: if self._browser: self._browser.close() except Exception: pass self._browser = None try: if self._playwright: self._playwright.stop() except Exception: pass self._playwright = None def _open_document(self, doc_url: str) -> bool: try: self._page.goto(doc_url) time.sleep(1) return True except Exception as e: self._last_error = f"打开文档失败: {e}" return False def _is_logged_in(self) -> bool: try: login_btn = self._page.get_by_role("button", name="登录并加入编辑") if login_btn.is_visible(timeout=1500): return False except Exception: return True return True def _ensure_login_dialog(self) -> None: try: login_btn = self._page.get_by_role("button", name="登录并加入编辑") if login_btn.is_visible(timeout=1500): login_btn.click() time.sleep(1) except Exception: pass try: wechat_btn = self._page.get_by_role("button", name="微信登录") if wechat_btn.is_visible(timeout=3000): wechat_btn.click() time.sleep(1.5) except Exception: pass def _capture_qr_image(self) -> Optional[bytes]: candidates = [ self._page.locator("canvas"), self._page.locator("img[alt*='二维码']"), self._page.locator("img[src*='qr']"), ] for locator in candidates: try: if locator.count() < 1: continue target = locator.first target.wait_for(state="visible", timeout=5000) return target.screenshot() except PlaywrightTimeoutError: continue except Exception: continue try: return self._page.screenshot(full_page=True) except Exception: return None def _save_login_state(self) -> None: try: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") os.makedirs(os.path.dirname(storage_state), exist_ok=True) self._context.storage_state(path=storage_state) except Exception as e: logger.warning(f"[KDocs] 保存登录态失败: {e}") def _handle_qr(self) -> Dict[str, Any]: cfg = self._load_system_config() doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return {"success": False, "error": "未配置金山文档链接"} if not self._ensure_playwright(): return {"success": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url): return {"success": False, "error": self._last_error or "打开文档失败"} if self._is_logged_in(): self._login_required = False self._last_login_ok = True self._save_login_state() return {"success": True, "logged_in": True, "qr_image": ""} self._ensure_login_dialog() qr_image = self._capture_qr_image() if not qr_image: self._last_error = "二维码获取失败" return {"success": False, "error": self._last_error} self._last_qr_image = qr_image self._login_required = True return { "success": True, "logged_in": False, "qr_image": base64.b64encode(qr_image).decode("ascii"), } def _handle_clear_login(self) -> Dict[str, Any]: storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json") try: if os.path.exists(storage_state): os.remove(storage_state) except Exception as e: return {"success": False, "error": f"清除登录态失败: {e}"} self._login_required = False self._last_login_ok = None self._cleanup_browser() return {"success": True} def _handle_status_check(self) -> Dict[str, Any]: cfg = self._load_system_config() doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return {"success": True, "logged_in": False, "error": "未配置文档链接"} if not self._ensure_playwright(): return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"} if not self._open_document(doc_url): return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"} logged_in = self._is_logged_in() self._last_login_ok = logged_in self._login_required = not logged_in if logged_in: self._save_login_state() return {"success": True, "logged_in": logged_in} def _handle_upload(self, payload: Dict[str, Any]) -> None: cfg = self._load_system_config() if int(cfg.get("kdocs_enabled", 0) or 0) != 1: return doc_url = (cfg.get("kdocs_doc_url") or "").strip() if not doc_url: return unit = (payload.get("unit") or "").strip() name = (payload.get("name") or "").strip() image_path = payload.get("image_path") user_id = payload.get("user_id") account_id = payload.get("account_id") if not unit or not name: return if not image_path or not os.path.exists(image_path): return if not self._ensure_playwright(): self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用") return if not self._open_document(doc_url): self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败") return if not self._is_logged_in(): self._login_required = True self._last_login_ok = False self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录") return self._login_required = False self._last_login_ok = True sheet_name = (cfg.get("kdocs_sheet_name") or "").strip() sheet_index = int(cfg.get("kdocs_sheet_index") or 0) unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper() image_col = (cfg.get("kdocs_image_column") or "D").strip().upper() success = False error_msg = "" for attempt in range(2): try: if sheet_name or sheet_index: self._select_sheet(sheet_name, sheet_index) row_num = self._find_person_with_unit(unit, name, unit_col) if row_num < 0: error_msg = f"未找到人员: {unit}-{name}" break success = self._upload_image_to_cell(row_num, image_path, image_col) if success: break except Exception as e: error_msg = str(e) if success: self._last_success_at = time.time() self._last_error = None log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id) return if not error_msg: error_msg = "上传失败" self._last_error = error_msg self._notify_admin(unit, name, image_path, error_msg) log_to_client(f"表格上传失败: {error_msg}", user_id, account_id) def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None: cfg = self._load_system_config() if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1: return to_email = (cfg.get("kdocs_admin_notify_email") or "").strip() if not to_email: return settings = email_service.get_email_settings() if not settings.get("enabled", False): return subject = "金山文档上传失败提醒" body = ( f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n" "请检查登录状态或表格配置。" ) try: email_service.send_email_async( to_email=to_email, subject=subject, body=body, email_type="kdocs_upload_failed", ) except Exception as e: logger.warning(f"[KDocs] 发送管理员邮件失败: {e}") def _select_sheet(self, sheet_name: str, sheet_index: int) -> None: if sheet_name: candidates = [ self._page.locator("[role='tab']").filter(has_text=sheet_name), self._page.locator(".sheet-tab").filter(has_text=sheet_name), self._page.locator(".sheet-tab-name").filter(has_text=sheet_name), ] for locator in candidates: try: if locator.count() < 1: continue locator.first.click() time.sleep(0.5) return except Exception: continue if sheet_index > 0: idx = sheet_index - 1 candidates = [ self._page.locator("[role='tab']"), self._page.locator(".sheet-tab"), self._page.locator(".sheet-tab-name"), ] for locator in candidates: try: if locator.count() <= idx: continue locator.nth(idx).click() time.sleep(0.5) return except Exception: continue def _get_current_cell_address(self) -> str: name_box = self._page.locator('#root input[type="text"]').first return name_box.input_value() def _navigate_to_cell(self, cell_address: str) -> None: name_box = self._page.locator('#root input[type="text"]').first name_box.click() name_box.fill(cell_address) name_box.press("Enter") time.sleep(0.3) def _get_cell_value(self, cell_address: str) -> str: self._navigate_to_cell(cell_address) time.sleep(0.3) try: formula_bar = self._page.locator('input[type="text"]').nth(1) value = formula_bar.input_value() if value: return value.strip() except Exception: pass try: formula_inputs = self._page.locator('textbox') for i in range(formula_inputs.count()): try: value = formula_inputs.nth(i).input_value() if value: import re if not re.match(r"^[A-Z]+\d+$", value.strip()): return value.strip() except Exception: continue except Exception: pass return "" def _search_person(self, name: str) -> None: self._page.keyboard.press("Control+f") time.sleep(0.3) search_input = None try: search_input = self._page.get_by_role("textbox").nth(3) search_input.fill(name) except Exception: try: search_input = self._page.locator("input[placeholder*='查找']").first search_input.fill(name) except Exception: pass time.sleep(0.2) try: find_btn = self._page.get_by_role("button", name="查找").nth(2) find_btn.click() except Exception: try: self._page.get_by_role("button", name="查找").first.click() except Exception: pass time.sleep(0.3) def _find_next(self) -> None: try: find_btn = self._page.get_by_role("button", name="查找").nth(2) find_btn.click() except Exception: try: self._page.get_by_role("button", name="查找").first.click() except Exception: pass time.sleep(0.3) def _close_search(self) -> None: self._page.keyboard.press("Escape") time.sleep(0.2) def _extract_row_number(self, cell_address: str) -> int: import re match = re.search(r"(\\d+)$", cell_address) if match: return int(match.group(1)) return -1 def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool: cell_address = f"{unit_col}{row_num}" cell_value = self._get_cell_value(cell_address) if cell_value: return cell_value == unit return False def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50) -> int: self._search_person(name) found_rows = set() for _ in range(max_attempts): self._close_search() current_address = self._get_current_cell_address() row_num = self._extract_row_number(current_address) if row_num == -1: return -1 if row_num in found_rows: return -1 found_rows.add(row_num) if self._verify_unit_by_navigation(row_num, unit, unit_col): return row_num self._page.keyboard.press("Control+f") time.sleep(0.2) self._find_next() return -1 def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool: cell_address = f"{image_col}{row_num}" self._navigate_to_cell(cell_address) time.sleep(0.3) try: self._page.keyboard.press("Delete") time.sleep(0.1) self._page.keyboard.press("Backspace") except Exception: pass try: insert_btn = self._page.get_by_role("button", name="插入") insert_btn.click() time.sleep(0.3) except Exception as e: raise RuntimeError(f"打开插入菜单失败: {e}") try: image_btn = self._page.get_by_role("button", name="图片") image_btn.click() time.sleep(0.3) cell_image_option = self._page.get_by_role("option", name="单元格图片") cell_image_option.click() time.sleep(0.2) except Exception as e: raise RuntimeError(f"选择单元格图片失败: {e}") try: local_option = self._page.get_by_role("option", name="本地") with self._page.expect_file_chooser() as fc_info: local_option.click() file_chooser = fc_info.value file_chooser.set_files(image_path) except Exception as e: raise RuntimeError(f"上传文件失败: {e}") time.sleep(2) return True _kdocs_uploader: Optional[KDocsUploader] = None def get_kdocs_uploader() -> KDocsUploader: global _kdocs_uploader if _kdocs_uploader is None: _kdocs_uploader = KDocsUploader() _kdocs_uploader.start() return _kdocs_uploader