876 lines
31 KiB
Python
876 lines
31 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
from typing import Any, Dict, Optional
|
|
|
|
import database
|
|
import email_service
|
|
from app_config import get_config
|
|
from services.client_log import log_to_client
|
|
from services.runtime import get_logger
|
|
|
|
try:
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
|
except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级
|
|
sync_playwright = None
|
|
|
|
class PlaywrightTimeoutError(Exception):
|
|
pass
|
|
|
|
|
|
logger = get_logger()
|
|
config = get_config()
|
|
|
|
|
|
class KDocsUploader:
|
|
def __init__(self) -> None:
|
|
self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200")))
|
|
self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True)
|
|
self._running = False
|
|
self._last_error: Optional[str] = None
|
|
self._last_success_at: Optional[float] = None
|
|
self._login_required = False
|
|
self._playwright = None
|
|
self._browser = None
|
|
self._context = None
|
|
self._page = None
|
|
self._last_qr_image: Optional[bytes] = None
|
|
self._last_login_check: float = 0.0
|
|
self._last_login_ok: Optional[bool] = None
|
|
|
|
def start(self) -> None:
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
self._running = False
|
|
self._queue.put({"action": "shutdown"})
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
return {
|
|
"queue_size": self._queue.qsize(),
|
|
"login_required": self._login_required,
|
|
"last_error": self._last_error,
|
|
"last_success_at": self._last_success_at,
|
|
"last_login_ok": self._last_login_ok,
|
|
}
|
|
|
|
def enqueue_upload(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
account_id: str,
|
|
unit: str,
|
|
name: str,
|
|
image_path: str,
|
|
) -> bool:
|
|
if not self._running:
|
|
self.start()
|
|
|
|
payload = {
|
|
"user_id": int(user_id),
|
|
"account_id": str(account_id),
|
|
"unit": unit,
|
|
"name": name,
|
|
"image_path": image_path,
|
|
}
|
|
try:
|
|
self._queue.put({"action": "upload", "payload": payload}, timeout=1)
|
|
return True
|
|
except queue.Full:
|
|
self._last_error = "上传队列已满"
|
|
return False
|
|
|
|
def request_qr(self, timeout: int = 30, *, force: bool = False) -> Dict[str, Any]:
|
|
return self._submit_command("qr", timeout=timeout, payload={"force": force})
|
|
|
|
def clear_login(self, timeout: int = 20) -> Dict[str, Any]:
|
|
return self._submit_command("clear_login", timeout=timeout)
|
|
|
|
def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]:
|
|
return self._submit_command("status", timeout=timeout)
|
|
|
|
def _submit_command(
|
|
self,
|
|
action: str,
|
|
timeout: int = 30,
|
|
payload: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
if not self._running:
|
|
self.start()
|
|
resp_queue: queue.Queue = queue.Queue(maxsize=1)
|
|
self._queue.put({"action": action, "response": resp_queue, "payload": payload or {}})
|
|
try:
|
|
return resp_queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
return {"success": False, "error": "操作超时"}
|
|
|
|
def _run(self) -> None:
|
|
while True:
|
|
task = self._queue.get()
|
|
if not task:
|
|
continue
|
|
action = task.get("action")
|
|
if action == "shutdown":
|
|
break
|
|
try:
|
|
if action == "upload":
|
|
self._handle_upload(task.get("payload") or {})
|
|
elif action == "qr":
|
|
result = self._handle_qr(task.get("payload") or {})
|
|
task.get("response").put(result)
|
|
elif action == "clear_login":
|
|
result = self._handle_clear_login()
|
|
task.get("response").put(result)
|
|
elif action == "status":
|
|
result = self._handle_status_check()
|
|
task.get("response").put(result)
|
|
except Exception as e:
|
|
logger.warning(f"[KDocs] 处理任务失败: {e}")
|
|
|
|
self._cleanup_browser()
|
|
|
|
def _load_system_config(self) -> Dict[str, Any]:
|
|
return database.get_system_config() or {}
|
|
|
|
def _ensure_playwright(self, *, use_storage_state: bool = True) -> bool:
|
|
if sync_playwright is None:
|
|
self._last_error = "playwright 未安装"
|
|
return False
|
|
try:
|
|
if self._playwright is None:
|
|
self._playwright = sync_playwright().start()
|
|
if self._browser is None:
|
|
headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false"
|
|
self._browser = self._playwright.chromium.launch(headless=headless)
|
|
if self._context is None:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
if use_storage_state and os.path.exists(storage_state):
|
|
self._context = self._browser.new_context(storage_state=storage_state)
|
|
else:
|
|
self._context = self._browser.new_context()
|
|
if self._page is None or self._page.is_closed():
|
|
self._page = self._context.new_page()
|
|
self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000)))
|
|
return True
|
|
except Exception as e:
|
|
self._last_error = f"浏览器启动失败: {e}"
|
|
self._cleanup_browser()
|
|
return False
|
|
|
|
def _cleanup_browser(self) -> None:
|
|
try:
|
|
if self._page:
|
|
self._page.close()
|
|
except Exception:
|
|
pass
|
|
self._page = None
|
|
try:
|
|
if self._context:
|
|
self._context.close()
|
|
except Exception:
|
|
pass
|
|
self._context = None
|
|
try:
|
|
if self._browser:
|
|
self._browser.close()
|
|
except Exception:
|
|
pass
|
|
self._browser = None
|
|
try:
|
|
if self._playwright:
|
|
self._playwright.stop()
|
|
except Exception:
|
|
pass
|
|
self._playwright = None
|
|
|
|
def _open_document(self, doc_url: str) -> bool:
|
|
try:
|
|
self._page.goto(doc_url)
|
|
time.sleep(1)
|
|
return True
|
|
except Exception as e:
|
|
self._last_error = f"打开文档失败: {e}"
|
|
return False
|
|
|
|
def _is_logged_in(self) -> bool:
|
|
try:
|
|
login_btn = self._page.get_by_role("button", name="登录并加入编辑")
|
|
if login_btn.is_visible(timeout=1500):
|
|
return False
|
|
except Exception:
|
|
return False
|
|
try:
|
|
login_btn = self._page.get_by_role("button", name="立即登录")
|
|
if login_btn.is_visible(timeout=1200):
|
|
return False
|
|
except Exception:
|
|
pass
|
|
try:
|
|
login_btn = self._page.get_by_role("button", name="登录")
|
|
if login_btn.is_visible(timeout=1200):
|
|
return False
|
|
except Exception:
|
|
pass
|
|
try:
|
|
login_link = self._page.get_by_role("link", name="登录")
|
|
if login_link.is_visible(timeout=1200):
|
|
return False
|
|
except Exception:
|
|
pass
|
|
try:
|
|
wechat_btn = self._page.get_by_role("button", name="微信登录")
|
|
if wechat_btn.is_visible(timeout=1200):
|
|
return False
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
def _has_saved_login_state(self) -> bool:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
return os.path.exists(storage_state)
|
|
|
|
def _ensure_login_dialog(self) -> None:
|
|
login_names = ["登录并加入编辑", "立即登录", "登录"]
|
|
wechat_names = ["微信登录", "微信扫码登录", "微信扫码", "扫码登录"]
|
|
pages = self._iter_pages()
|
|
clicked = False
|
|
for page in pages:
|
|
if self._try_click_names(page, login_names):
|
|
clicked = True
|
|
break
|
|
if clicked:
|
|
time.sleep(1.5)
|
|
pages = self._iter_pages()
|
|
for page in pages:
|
|
if self._try_click_names(page, wechat_names):
|
|
return
|
|
self._try_confirm_login()
|
|
|
|
def _capture_qr_image(self) -> Optional[bytes]:
|
|
pages = self._iter_pages()
|
|
for page in pages:
|
|
for frame in page.frames:
|
|
target = self._find_qr_element_in_frame(frame)
|
|
if not target:
|
|
continue
|
|
try:
|
|
return target.screenshot()
|
|
except Exception:
|
|
continue
|
|
for page in pages:
|
|
dialog_image = self._capture_dialog_image(page)
|
|
if dialog_image:
|
|
return dialog_image
|
|
return None
|
|
|
|
def _iter_pages(self) -> list:
|
|
pages = []
|
|
if self._context:
|
|
pages.extend(self._context.pages)
|
|
if self._page and self._page not in pages:
|
|
pages.insert(0, self._page)
|
|
def rank(p) -> int:
|
|
url = (getattr(p, "url", "") or "").lower()
|
|
keywords = ("login", "account", "passport", "wechat", "qr")
|
|
return 0 if any(k in url for k in keywords) else 1
|
|
pages.sort(key=rank)
|
|
return pages
|
|
|
|
def _find_qr_element_in_frame(self, frame) -> Optional[Any]:
|
|
selectors = [
|
|
"canvas",
|
|
"img[alt*='二维码']",
|
|
"img[src*='qr']",
|
|
"img[src*='qrcode']",
|
|
"img[class*='qr']",
|
|
"canvas[class*='qr']",
|
|
"svg",
|
|
"div[role='img']",
|
|
"div[class*='qr']",
|
|
"div[id*='qr']",
|
|
"div[class*='qrcode']",
|
|
"div[id*='qrcode']",
|
|
"div[class*='wechat']",
|
|
"div[class*='weixin']",
|
|
"div[class*='wx']",
|
|
"img[src*='wx']",
|
|
]
|
|
best = None
|
|
best_score = None
|
|
for selector in selectors:
|
|
try:
|
|
locator = frame.locator(selector)
|
|
count = min(locator.count(), 20)
|
|
except Exception:
|
|
continue
|
|
for i in range(count):
|
|
el = locator.nth(i)
|
|
try:
|
|
if not el.is_visible(timeout=800):
|
|
continue
|
|
box = el.bounding_box()
|
|
if not box:
|
|
continue
|
|
width = box.get("width", 0)
|
|
height = box.get("height", 0)
|
|
if width < 80 or height < 80 or width > 520 or height > 520:
|
|
continue
|
|
aspect_diff = abs(width - height)
|
|
if aspect_diff > 80:
|
|
continue
|
|
score = aspect_diff + abs(width - 260) + abs(height - 260)
|
|
if best_score is None or score < best_score:
|
|
best_score = score
|
|
best = el
|
|
except Exception:
|
|
continue
|
|
if best:
|
|
return best
|
|
|
|
handle = None
|
|
try:
|
|
handle = frame.evaluate_handle(
|
|
"""() => {
|
|
const patterns = [/qr/i, /qrcode/i, /weixin/i, /wechat/i, /wx/i, /data:image/i];
|
|
const elements = Array.from(document.querySelectorAll('*'));
|
|
for (const el of elements) {
|
|
const style = window.getComputedStyle(el);
|
|
const bg = style.backgroundImage || '';
|
|
if (!bg || bg === 'none') continue;
|
|
if (!patterns.some((re) => re.test(bg))) continue;
|
|
const rect = el.getBoundingClientRect();
|
|
if (!rect.width || !rect.height) continue;
|
|
if (rect.width < 80 || rect.height < 80 || rect.width > 520 || rect.height > 520) continue;
|
|
const diff = Math.abs(rect.width - rect.height);
|
|
if (diff > 80) continue;
|
|
return el;
|
|
}
|
|
return null;
|
|
}"""
|
|
)
|
|
element = handle.as_element() if handle else None
|
|
if element:
|
|
return element
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
if handle:
|
|
handle.dispose()
|
|
except Exception:
|
|
pass
|
|
return best
|
|
|
|
def _try_click_role(self, page, role: str, name: str, timeout: int = 1500) -> bool:
|
|
try:
|
|
el = page.get_by_role(role, name=name)
|
|
if el.is_visible(timeout=timeout):
|
|
el.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
def _try_click_names(self, page, names: list) -> bool:
|
|
for name in names:
|
|
if self._try_click_role(page, "button", name, timeout=1200):
|
|
return True
|
|
if self._try_click_role(page, "link", name, timeout=1200):
|
|
return True
|
|
try:
|
|
el = page.get_by_text(name, exact=True)
|
|
if el.is_visible(timeout=1200):
|
|
el.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
el = page.get_by_text(name, exact=False)
|
|
if el.is_visible(timeout=1200):
|
|
el.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
for frame in page.frames:
|
|
for name in names:
|
|
try:
|
|
el = frame.get_by_role("button", name=name)
|
|
if el.is_visible(timeout=800):
|
|
el.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
el = frame.get_by_text(name, exact=True)
|
|
if el.is_visible(timeout=800):
|
|
el.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
try:
|
|
el = frame.get_by_text(name, exact=False)
|
|
if el.is_visible(timeout=800):
|
|
el.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
def _try_confirm_login(self) -> bool:
|
|
confirm_names = ["确认登录", "确认登陆"]
|
|
pages = self._iter_pages()
|
|
for page in pages:
|
|
if self._try_click_names(page, confirm_names):
|
|
return True
|
|
return False
|
|
|
|
def _capture_dialog_image(self, page) -> Optional[bytes]:
|
|
selectors = (
|
|
"[role='dialog'], .dialog, .modal, .popup, "
|
|
"div[class*='dialog'], div[class*='modal'], div[class*='popup'], "
|
|
"div[class*='login'], div[class*='passport'], div[class*='auth']"
|
|
)
|
|
try:
|
|
dialogs = page.locator(selectors)
|
|
count = min(dialogs.count(), 6)
|
|
except Exception:
|
|
return None
|
|
best = None
|
|
best_area = 0
|
|
viewport = page.viewport_size or {}
|
|
vp_width = viewport.get("width", 0)
|
|
vp_height = viewport.get("height", 0)
|
|
for i in range(count):
|
|
el = dialogs.nth(i)
|
|
try:
|
|
if not el.is_visible(timeout=800):
|
|
continue
|
|
box = el.bounding_box()
|
|
if not box:
|
|
continue
|
|
width = box.get("width", 0)
|
|
height = box.get("height", 0)
|
|
if vp_width and vp_height:
|
|
if width > vp_width * 0.92 and height > vp_height * 0.92:
|
|
continue
|
|
area = width * height
|
|
if area > best_area:
|
|
best_area = area
|
|
best = el
|
|
except Exception:
|
|
continue
|
|
if not best:
|
|
return None
|
|
try:
|
|
return best.screenshot()
|
|
except Exception:
|
|
return None
|
|
|
|
def _save_login_state(self) -> None:
|
|
try:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
os.makedirs(os.path.dirname(storage_state), exist_ok=True)
|
|
self._context.storage_state(path=storage_state)
|
|
except Exception as e:
|
|
logger.warning(f"[KDocs] 保存登录态失败: {e}")
|
|
|
|
def _handle_qr(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
cfg = self._load_system_config()
|
|
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
|
|
if not doc_url:
|
|
return {"success": False, "error": "未配置金山文档链接"}
|
|
force = bool(payload.get("force"))
|
|
if force:
|
|
self._handle_clear_login()
|
|
if not self._ensure_playwright(use_storage_state=not force):
|
|
return {"success": False, "error": self._last_error or "浏览器不可用"}
|
|
if not self._open_document(doc_url):
|
|
return {"success": False, "error": self._last_error or "打开文档失败"}
|
|
|
|
if not force and self._has_saved_login_state() and self._is_logged_in():
|
|
self._login_required = False
|
|
self._last_login_ok = True
|
|
self._save_login_state()
|
|
return {"success": True, "logged_in": True, "qr_image": ""}
|
|
|
|
self._ensure_login_dialog()
|
|
qr_image = None
|
|
for _ in range(10):
|
|
self._ensure_login_dialog()
|
|
qr_image = self._capture_qr_image()
|
|
if qr_image:
|
|
break
|
|
time.sleep(1)
|
|
if not qr_image:
|
|
self._last_error = "二维码获取失败"
|
|
try:
|
|
pages = self._iter_pages()
|
|
page_urls = [getattr(p, "url", "") for p in pages]
|
|
logger.warning(f"[KDocs] 二维码未捕获,页面: {page_urls}")
|
|
ts = int(time.time())
|
|
saved = []
|
|
for idx, page in enumerate(pages[:3]):
|
|
try:
|
|
path = f"data/kdocs_debug_{ts}_{idx}.png"
|
|
page.screenshot(path=path, full_page=True)
|
|
saved.append(path)
|
|
except Exception:
|
|
continue
|
|
if saved:
|
|
logger.warning(f"[KDocs] 已保存调试截图: {saved}")
|
|
except Exception:
|
|
pass
|
|
return {"success": False, "error": self._last_error}
|
|
|
|
self._last_qr_image = qr_image
|
|
self._login_required = True
|
|
return {
|
|
"success": True,
|
|
"logged_in": False,
|
|
"qr_image": base64.b64encode(qr_image).decode("ascii"),
|
|
}
|
|
|
|
def _handle_clear_login(self) -> Dict[str, Any]:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
try:
|
|
if os.path.exists(storage_state):
|
|
os.remove(storage_state)
|
|
except Exception as e:
|
|
return {"success": False, "error": f"清除登录态失败: {e}"}
|
|
|
|
self._login_required = False
|
|
self._last_login_ok = None
|
|
self._cleanup_browser()
|
|
return {"success": True}
|
|
|
|
def _handle_status_check(self) -> Dict[str, Any]:
|
|
cfg = self._load_system_config()
|
|
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
|
|
if not doc_url:
|
|
return {"success": True, "logged_in": False, "error": "未配置文档链接"}
|
|
if not self._ensure_playwright():
|
|
return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"}
|
|
if not self._open_document(doc_url):
|
|
return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"}
|
|
self._ensure_login_dialog()
|
|
self._try_confirm_login()
|
|
logged_in = self._is_logged_in()
|
|
if not self._has_saved_login_state():
|
|
logged_in = False
|
|
self._last_login_ok = logged_in
|
|
self._login_required = not logged_in
|
|
if logged_in:
|
|
self._save_login_state()
|
|
return {"success": True, "logged_in": logged_in}
|
|
|
|
def _handle_upload(self, payload: Dict[str, Any]) -> None:
|
|
cfg = self._load_system_config()
|
|
if int(cfg.get("kdocs_enabled", 0) or 0) != 1:
|
|
return
|
|
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
|
|
if not doc_url:
|
|
return
|
|
|
|
unit = (payload.get("unit") or "").strip()
|
|
name = (payload.get("name") or "").strip()
|
|
image_path = payload.get("image_path")
|
|
user_id = payload.get("user_id")
|
|
account_id = payload.get("account_id")
|
|
|
|
if not unit or not name:
|
|
return
|
|
if not image_path or not os.path.exists(image_path):
|
|
return
|
|
|
|
if not self._ensure_playwright():
|
|
self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用")
|
|
return
|
|
|
|
if not self._open_document(doc_url):
|
|
self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败")
|
|
return
|
|
|
|
if not self._is_logged_in():
|
|
self._login_required = True
|
|
self._last_login_ok = False
|
|
self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录")
|
|
return
|
|
self._login_required = False
|
|
self._last_login_ok = True
|
|
|
|
sheet_name = (cfg.get("kdocs_sheet_name") or "").strip()
|
|
sheet_index = int(cfg.get("kdocs_sheet_index") or 0)
|
|
unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper()
|
|
image_col = (cfg.get("kdocs_image_column") or "D").strip().upper()
|
|
|
|
success = False
|
|
error_msg = ""
|
|
for attempt in range(2):
|
|
try:
|
|
if sheet_name or sheet_index:
|
|
self._select_sheet(sheet_name, sheet_index)
|
|
row_num = self._find_person_with_unit(unit, name, unit_col)
|
|
if row_num < 0:
|
|
error_msg = f"未找到人员: {unit}-{name}"
|
|
break
|
|
success = self._upload_image_to_cell(row_num, image_path, image_col)
|
|
if success:
|
|
break
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
|
|
if success:
|
|
self._last_success_at = time.time()
|
|
self._last_error = None
|
|
log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id)
|
|
return
|
|
|
|
if not error_msg:
|
|
error_msg = "上传失败"
|
|
self._last_error = error_msg
|
|
self._notify_admin(unit, name, image_path, error_msg)
|
|
log_to_client(f"表格上传失败: {error_msg}", user_id, account_id)
|
|
|
|
def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None:
|
|
cfg = self._load_system_config()
|
|
if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1:
|
|
return
|
|
to_email = (cfg.get("kdocs_admin_notify_email") or "").strip()
|
|
if not to_email:
|
|
return
|
|
settings = email_service.get_email_settings()
|
|
if not settings.get("enabled", False):
|
|
return
|
|
subject = "金山文档上传失败提醒"
|
|
body = (
|
|
f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n"
|
|
"请检查登录状态或表格配置。"
|
|
)
|
|
try:
|
|
email_service.send_email_async(
|
|
to_email=to_email,
|
|
subject=subject,
|
|
body=body,
|
|
email_type="kdocs_upload_failed",
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"[KDocs] 发送管理员邮件失败: {e}")
|
|
|
|
def _select_sheet(self, sheet_name: str, sheet_index: int) -> None:
|
|
if sheet_name:
|
|
candidates = [
|
|
self._page.locator("[role='tab']").filter(has_text=sheet_name),
|
|
self._page.locator(".sheet-tab").filter(has_text=sheet_name),
|
|
self._page.locator(".sheet-tab-name").filter(has_text=sheet_name),
|
|
]
|
|
for locator in candidates:
|
|
try:
|
|
if locator.count() < 1:
|
|
continue
|
|
locator.first.click()
|
|
time.sleep(0.5)
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
if sheet_index > 0:
|
|
idx = sheet_index - 1
|
|
candidates = [
|
|
self._page.locator("[role='tab']"),
|
|
self._page.locator(".sheet-tab"),
|
|
self._page.locator(".sheet-tab-name"),
|
|
]
|
|
for locator in candidates:
|
|
try:
|
|
if locator.count() <= idx:
|
|
continue
|
|
locator.nth(idx).click()
|
|
time.sleep(0.5)
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
def _get_current_cell_address(self) -> str:
|
|
name_box = self._page.locator('#root input[type="text"]').first
|
|
return name_box.input_value()
|
|
|
|
def _navigate_to_cell(self, cell_address: str) -> None:
|
|
name_box = self._page.locator('#root input[type="text"]').first
|
|
name_box.click()
|
|
name_box.fill(cell_address)
|
|
name_box.press("Enter")
|
|
time.sleep(0.3)
|
|
|
|
def _get_cell_value(self, cell_address: str) -> str:
|
|
self._navigate_to_cell(cell_address)
|
|
time.sleep(0.3)
|
|
try:
|
|
formula_bar = self._page.locator('input[type="text"]').nth(1)
|
|
value = formula_bar.input_value()
|
|
if value:
|
|
return value.strip()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
formula_inputs = self._page.locator('textbox')
|
|
for i in range(formula_inputs.count()):
|
|
try:
|
|
value = formula_inputs.nth(i).input_value()
|
|
if value:
|
|
import re
|
|
|
|
if not re.match(r"^[A-Z]+\d+$", value.strip()):
|
|
return value.strip()
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
def _search_person(self, name: str) -> None:
|
|
self._page.keyboard.press("Control+f")
|
|
time.sleep(0.3)
|
|
search_input = None
|
|
try:
|
|
search_input = self._page.get_by_role("textbox").nth(3)
|
|
search_input.fill(name)
|
|
except Exception:
|
|
try:
|
|
search_input = self._page.locator("input[placeholder*='查找']").first
|
|
search_input.fill(name)
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.2)
|
|
try:
|
|
find_btn = self._page.get_by_role("button", name="查找").nth(2)
|
|
find_btn.click()
|
|
except Exception:
|
|
try:
|
|
self._page.get_by_role("button", name="查找").first.click()
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.3)
|
|
|
|
def _find_next(self) -> None:
|
|
try:
|
|
find_btn = self._page.get_by_role("button", name="查找").nth(2)
|
|
find_btn.click()
|
|
except Exception:
|
|
try:
|
|
self._page.get_by_role("button", name="查找").first.click()
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.3)
|
|
|
|
def _close_search(self) -> None:
|
|
self._page.keyboard.press("Escape")
|
|
time.sleep(0.2)
|
|
|
|
def _extract_row_number(self, cell_address: str) -> int:
|
|
import re
|
|
|
|
match = re.search(r"(\\d+)$", cell_address)
|
|
if match:
|
|
return int(match.group(1))
|
|
return -1
|
|
|
|
def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool:
|
|
cell_address = f"{unit_col}{row_num}"
|
|
cell_value = self._get_cell_value(cell_address)
|
|
if cell_value:
|
|
return cell_value == unit
|
|
return False
|
|
|
|
def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50) -> int:
|
|
self._search_person(name)
|
|
found_rows = set()
|
|
for _ in range(max_attempts):
|
|
self._close_search()
|
|
current_address = self._get_current_cell_address()
|
|
row_num = self._extract_row_number(current_address)
|
|
if row_num == -1:
|
|
return -1
|
|
if row_num in found_rows:
|
|
return -1
|
|
found_rows.add(row_num)
|
|
|
|
if self._verify_unit_by_navigation(row_num, unit, unit_col):
|
|
return row_num
|
|
|
|
self._page.keyboard.press("Control+f")
|
|
time.sleep(0.2)
|
|
self._find_next()
|
|
return -1
|
|
|
|
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
|
|
cell_address = f"{image_col}{row_num}"
|
|
self._navigate_to_cell(cell_address)
|
|
time.sleep(0.3)
|
|
|
|
try:
|
|
self._page.keyboard.press("Delete")
|
|
time.sleep(0.1)
|
|
self._page.keyboard.press("Backspace")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
insert_btn = self._page.get_by_role("button", name="插入")
|
|
insert_btn.click()
|
|
time.sleep(0.3)
|
|
except Exception as e:
|
|
raise RuntimeError(f"打开插入菜单失败: {e}")
|
|
|
|
try:
|
|
image_btn = self._page.get_by_role("button", name="图片")
|
|
image_btn.click()
|
|
time.sleep(0.3)
|
|
cell_image_option = self._page.get_by_role("option", name="单元格图片")
|
|
cell_image_option.click()
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
raise RuntimeError(f"选择单元格图片失败: {e}")
|
|
|
|
try:
|
|
local_option = self._page.get_by_role("option", name="本地")
|
|
with self._page.expect_file_chooser() as fc_info:
|
|
local_option.click()
|
|
file_chooser = fc_info.value
|
|
file_chooser.set_files(image_path)
|
|
except Exception as e:
|
|
raise RuntimeError(f"上传文件失败: {e}")
|
|
|
|
time.sleep(2)
|
|
return True
|
|
|
|
|
|
_kdocs_uploader: Optional[KDocsUploader] = None
|
|
|
|
|
|
def get_kdocs_uploader() -> KDocsUploader:
|
|
global _kdocs_uploader
|
|
if _kdocs_uploader is None:
|
|
_kdocs_uploader = KDocsUploader()
|
|
_kdocs_uploader.start()
|
|
return _kdocs_uploader
|