Files
zsglpt/services/kdocs_uploader.py
237899745 fae21329d7 优化 KDocs 上传器
- 删除死代码 (二分搜索相关方法,减少 ~186 行)
- 优化 sleep 等待时间,减少约 30% 的等待
- 添加缓存过期机制 (5分钟 TTL)
- 优化日志级别,减少调试日志噪音

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 20:09:46 +08:00

1574 lines
59 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
KDocs Uploader with Auto-Recovery Mechanism
自动恢复机制:当检测到上传线程卡住时,自动重启线程
优化记录 (2026-01-21):
- 删除无效的二分搜索相关代码 (_binary_search_person, _name_matches, _name_less_than, _get_cell_value_fast)
- 优化 sleep 等待时间,减少约 30% 的等待
- 添加缓存过期机制 (5分钟 TTL)
- 优化日志级别,减少调试日志噪音
"""
from __future__ import annotations
import base64
import os
import queue
import re
import threading
import time
from io import BytesIO
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlparse
import database
import email_service
from app_config import get_config
from services.client_log import log_to_client
from services.runtime import get_logger, get_socketio
from services.state import safe_get_account
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级
sync_playwright = None
class PlaywrightTimeoutError(Exception):
pass
logger = get_logger()
config = get_config()
# 看门狗配置
WATCHDOG_CHECK_INTERVAL = 60 # 每60秒检查一次
WATCHDOG_TIMEOUT = 300 # 如果5分钟没有活动且队列有任务认为线程卡住
# 缓存配置
CACHE_TTL_SECONDS = 300 # 缓存过期时间: 5分钟
class KDocsUploader:
def __init__(self) -> None:
self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200")))
self._thread: Optional[threading.Thread] = None
self._thread_id = 0 # 线程ID用于追踪重启次数
self._running = False
self._last_error: Optional[str] = None
self._last_success_at: Optional[float] = None
self._login_required = False
self._playwright = None
self._browser = None
self._context = None
self._page = None
self._last_qr_image: Optional[bytes] = None
self._last_login_check: float = 0.0
self._last_login_ok: Optional[bool] = None
self._doc_url: Optional[str] = None
# 自动恢复机制相关
self._last_activity: float = time.time() # 最后活动时间
self._watchdog_thread: Optional[threading.Thread] = None
self._watchdog_running = False
self._restart_count = 0 # 重启次数统计
self._lock = threading.Lock() # 线程安全锁
# 人员位置缓存: {cache_key: (row_num, timestamp)}
self._person_cache: Dict[str, Tuple[int, float]] = {}
def start(self) -> None:
with self._lock:
if self._running:
return
self._running = True
self._thread_id += 1
self._thread = threading.Thread(
target=self._run,
name=f"kdocs-uploader-{self._thread_id}",
daemon=True
)
self._thread.start()
self._last_activity = time.time()
# 启动看门狗线程
if not self._watchdog_running:
self._watchdog_running = True
self._watchdog_thread = threading.Thread(
target=self._watchdog_run,
name="kdocs-watchdog",
daemon=True
)
self._watchdog_thread.start()
logger.info("[KDocs] 看门狗线程已启动")
def stop(self) -> None:
with self._lock:
if not self._running:
return
self._running = False
self._watchdog_running = False
self._queue.put({"action": "shutdown"})
def _watchdog_run(self) -> None:
"""看门狗线程:监控上传线程健康状态"""
logger.info("[KDocs] 看门狗开始监控")
while self._watchdog_running:
try:
time.sleep(WATCHDOG_CHECK_INTERVAL)
if not self._running:
continue
# 检查线程是否存活
if self._thread is None or not self._thread.is_alive():
logger.warning("[KDocs] 检测到上传线程已停止,正在重启...")
self._restart_thread()
continue
# 检查是否有任务堆积且长时间无活动
queue_size = self._queue.qsize()
time_since_activity = time.time() - self._last_activity
if queue_size > 0 and time_since_activity > WATCHDOG_TIMEOUT:
logger.warning(
f"[KDocs] 检测到上传线程可能卡住: "
f"队列={queue_size}, 无活动时间={time_since_activity:.0f}"
)
self._restart_thread()
except Exception as e:
logger.warning(f"[KDocs] 看门狗检查异常: {e}")
def _restart_thread(self) -> None:
"""重启上传线程"""
with self._lock:
self._restart_count += 1
logger.warning(f"[KDocs] 正在重启上传线程 (第{self._restart_count}次重启)")
# 清理浏览器资源
try:
self._cleanup_browser()
except Exception as e:
logger.warning(f"[KDocs] 清理浏览器时出错: {e}")
# 停止旧线程(如果还在运行)
old_running = self._running
self._running = False
# 等待一小段时间让旧线程有机会退出
time.sleep(1)
# 启动新线程
self._running = True
self._thread_id += 1
self._thread = threading.Thread(
target=self._run,
name=f"kdocs-uploader-{self._thread_id}",
daemon=True
)
self._thread.start()
self._last_activity = time.time()
self._last_error = f"线程已自动恢复 (第{self._restart_count}次)"
logger.info(f"[KDocs] 上传线程已重启 (ID={self._thread_id})")
def get_status(self) -> Dict[str, Any]:
return {
"queue_size": self._queue.qsize(),
"login_required": self._login_required,
"last_error": self._last_error,
"last_success_at": self._last_success_at,
"last_login_ok": self._last_login_ok,
"restart_count": self._restart_count,
"thread_alive": self._thread.is_alive() if self._thread else False,
}
def enqueue_upload(
self,
*,
user_id: int,
account_id: str,
unit: str,
name: str,
image_path: str,
) -> bool:
if not self._running:
self.start()
payload = {
"user_id": int(user_id),
"account_id": str(account_id),
"unit": unit,
"name": name,
"image_path": image_path,
}
try:
# 入队前设置状态为等待上传
try:
account = safe_get_account(user_id, account_id)
if account and self._should_mark_upload(account):
account.status = "等待上传"
self._emit_account_update(user_id, account)
except Exception:
pass
self._queue.put({"action": "upload", "payload": payload}, timeout=1)
return True
except queue.Full:
self._last_error = "上传队列已满"
return False
def request_qr(self, timeout: int = 60, *, force: bool = False) -> Dict[str, Any]:
return self._submit_command("qr", timeout=timeout, payload={"force": force})
def clear_login(self, timeout: int = 20) -> Dict[str, Any]:
return self._submit_command("clear_login", timeout=timeout)
def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]:
return self._submit_command("status", timeout=timeout)
def _submit_command(
self,
action: str,
timeout: int = 30,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
if not self._running:
self.start()
resp_queue: queue.Queue = queue.Queue(maxsize=1)
self._queue.put({"action": action, "response": resp_queue, "payload": payload or {}})
try:
return resp_queue.get(timeout=timeout)
except queue.Empty:
return {"success": False, "error": "操作超时"}
def _run(self) -> None:
thread_id = self._thread_id
logger.info(f"[KDocs] 上传线程启动 (ID={thread_id})")
while self._running:
try:
# 使用超时获取任务,以便定期检查 _running 状态
try:
task = self._queue.get(timeout=5)
except queue.Empty:
continue
if not task:
continue
# 更新最后活动时间
self._last_activity = time.time()
action = task.get("action")
if action == "shutdown":
break
try:
if action == "upload":
self._handle_upload(task.get("payload") or {})
elif action == "qr":
result = self._handle_qr(task.get("payload") or {})
task.get("response").put(result)
elif action == "clear_login":
result = self._handle_clear_login()
task.get("response").put(result)
elif action == "status":
result = self._handle_status_check()
task.get("response").put(result)
# 任务处理完成后更新活动时间
self._last_activity = time.time()
except Exception as e:
logger.warning(f"[KDocs] 处理任务失败: {e}")
# 如果有响应队列,返回错误
if "response" in task and task.get("response"):
try:
task["response"].put({"success": False, "error": str(e)})
except Exception:
pass
except Exception as e:
logger.warning(f"[KDocs] 线程主循环异常: {e}")
time.sleep(1) # 避免异常时的紧密循环
logger.info(f"[KDocs] 上传线程退出 (ID={thread_id})")
self._cleanup_browser()
def _load_system_config(self) -> Dict[str, Any]:
return database.get_system_config() or {}
def _ensure_playwright(self, *, use_storage_state: bool = True) -> bool:
if sync_playwright is None:
self._last_error = "playwright 未安装"
return False
try:
if self._playwright is None:
self._playwright = sync_playwright().start()
if self._browser is None:
headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false"
self._browser = self._playwright.chromium.launch(headless=headless)
if self._context is None:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
if use_storage_state and os.path.exists(storage_state):
self._context = self._browser.new_context(storage_state=storage_state)
else:
self._context = self._browser.new_context()
if self._page is None or self._page.is_closed():
self._page = self._context.new_page()
self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000)))
return True
except Exception as e:
self._last_error = f"浏览器启动失败: {e}"
self._cleanup_browser()
return False
def _cleanup_browser(self) -> None:
try:
if self._page:
self._page.close()
except Exception:
pass
self._page = None
try:
if self._context:
self._context.close()
except Exception:
pass
self._context = None
try:
if self._browser:
self._browser.close()
except Exception:
pass
self._browser = None
try:
if self._playwright:
self._playwright.stop()
except Exception:
pass
self._playwright = None
def _open_document(self, doc_url: str, *, fast: bool = False) -> bool:
try:
self._doc_url = doc_url
self._ensure_clipboard_permissions(doc_url)
if fast:
doc_pages = self._find_doc_pages(doc_url)
if doc_pages:
self._page = doc_pages[0]
return True
login_pages = []
for page in self._list_pages():
try:
url = getattr(page, "url", "") or ""
if self._is_login_url(url):
login_pages.append(page)
except Exception:
continue
if login_pages:
self._page = login_pages[0]
return True
goto_kwargs = {}
if fast:
fast_timeout = int(os.environ.get("KDOCS_FAST_GOTO_TIMEOUT_MS", "15000"))
goto_kwargs = {"wait_until": "domcontentloaded", "timeout": fast_timeout}
self._page.goto(doc_url, **goto_kwargs)
time.sleep(0.5) # 优化: 0.6 -> 0.5
doc_pages = self._find_doc_pages(doc_url)
if doc_pages and doc_pages[0] is not self._page:
self._page = doc_pages[0]
return True
except Exception as e:
self._last_error = f"打开文档失败: {e}"
return False
def _ensure_clipboard_permissions(self, doc_url: str) -> None:
if not self._context or not doc_url:
return
try:
parsed = urlparse(doc_url)
if not parsed.scheme or not parsed.netloc:
return
host = parsed.netloc
origins = {f"{parsed.scheme}://{host}"}
if host.startswith("www."):
origins.add(f"{parsed.scheme}://{host[4:]}")
else:
origins.add(f"{parsed.scheme}://www.{host}")
for origin in origins:
try:
self._context.grant_permissions(["clipboard-read", "clipboard-write"], origin=origin)
except Exception:
continue
except Exception:
return
def _normalize_doc_url(self, url: str) -> str:
if not url:
return ""
return url.split("#", 1)[0].split("?", 1)[0].rstrip("/")
def _list_pages(self) -> list:
pages = []
if self._context:
pages.extend(self._context.pages)
if self._page and self._page not in pages:
pages.insert(0, self._page)
return pages
def _is_login_url(self, url: str) -> bool:
if not url:
return False
lower = url.lower()
try:
host = urlparse(lower).netloc
except Exception:
host = ""
if "account.wps.cn" in host:
return True
if "passport" in lower:
return True
if "login" in lower and "kdocs.cn" not in host:
return True
return False
def _find_doc_pages(self, doc_url: Optional[str]) -> list:
doc_key = self._normalize_doc_url(doc_url or "")
pages = self._list_pages()
matches = []
for page in pages:
url = getattr(page, "url", "") or ""
if not url:
continue
if self._is_login_url(url):
continue
norm_url = self._normalize_doc_url(url)
if doc_key and doc_key in norm_url:
matches.append(page)
continue
try:
host = urlparse(url).netloc.lower()
except Exception:
host = ""
if "kdocs.cn" in host:
matches.append(page)
return matches
def _page_has_login_gate(self, page) -> bool:
url = getattr(page, "url", "") or ""
if self._is_login_url(url):
return True
login_texts = [
"登录并加入编辑",
"立即登录",
"微信登录",
"扫码登录",
"确认登录",
"确认登陆",
"账号登录",
"登录",
]
for text in login_texts:
try:
if page.get_by_role("button", name=text).is_visible(timeout=800):
return True
except Exception:
pass
try:
if page.get_by_role("link", name=text).is_visible(timeout=800):
return True
except Exception:
pass
try:
if page.get_by_text(text, exact=True).is_visible(timeout=800):
return True
except Exception:
pass
try:
if page.locator("text=登录并加入编辑").first.is_visible(timeout=800):
return True
except Exception:
pass
return False
def _is_logged_in(self) -> bool:
doc_pages = self._find_doc_pages(self._doc_url)
if not doc_pages:
if self._page and not self._page.is_closed() and not self._page_has_login_gate(self._page):
return False
return False
page = doc_pages[0]
if self._page is None or self._page.is_closed() or self._page.url != page.url:
self._page = page
return not self._page_has_login_gate(page)
def _has_saved_login_state(self) -> bool:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
return os.path.exists(storage_state)
def _ensure_login_dialog(
self,
*,
timeout_ms: int = 1200,
frame_timeout_ms: int = 800,
quick: bool = False,
) -> None:
login_names = ["登录并加入编辑", "立即登录", "登录"]
wechat_names = ["微信登录", "微信扫码登录", "微信扫码", "扫码登录"]
pages = self._iter_pages()
clicked = False
for page in pages:
if self._try_click_names(
page,
login_names,
timeout_ms=timeout_ms,
frame_timeout_ms=frame_timeout_ms,
quick=quick,
):
clicked = True
break
if clicked:
time.sleep(1.2) # 优化: 1.5 -> 1.2
pages = self._iter_pages()
for page in pages:
if self._try_click_names(
page,
wechat_names,
timeout_ms=timeout_ms,
frame_timeout_ms=frame_timeout_ms,
quick=quick,
):
return
self._try_confirm_login(timeout_ms=timeout_ms, frame_timeout_ms=frame_timeout_ms, quick=quick)
def _capture_qr_image(self) -> Optional[bytes]:
pages = self._iter_pages()
for page in pages:
for frame in page.frames:
target = self._find_qr_element_in_frame(frame)
if not target:
continue
try:
return target.screenshot()
except Exception:
continue
for page in pages:
dialog_image = self._capture_dialog_image(page)
if dialog_image:
return dialog_image
return None
def _iter_pages(self) -> list:
pages = []
if self._context:
pages.extend(self._context.pages)
if self._page and self._page not in pages:
pages.insert(0, self._page)
def rank(p) -> int:
url = (getattr(p, "url", "") or "").lower()
keywords = ("login", "account", "passport", "wechat", "qr")
return 0 if any(k in url for k in keywords) else 1
pages.sort(key=rank)
return pages
def _find_qr_element_in_frame(self, frame) -> Optional[Any]:
selectors = [
"canvas",
"img[alt*='二维码']",
"img[src*='qr']",
"img[src*='qrcode']",
"img[class*='qr']",
"canvas[class*='qr']",
"svg",
"div[role='img']",
"div[class*='qr']",
"div[id*='qr']",
"div[class*='qrcode']",
"div[id*='qrcode']",
"div[class*='wechat']",
"div[class*='weixin']",
"div[class*='wx']",
"img[src*='wx']",
]
best = None
best_score = None
for selector in selectors:
try:
locator = frame.locator(selector)
count = min(locator.count(), 20)
except Exception:
continue
for i in range(count):
el = locator.nth(i)
try:
if not el.is_visible(timeout=800):
continue
box = el.bounding_box()
if not box:
continue
width = box.get("width", 0)
height = box.get("height", 0)
if width < 80 or height < 80 or width > 520 or height > 520:
continue
aspect_diff = abs(width - height)
if aspect_diff > 80:
continue
score = aspect_diff + abs(width - 260) + abs(height - 260)
if best_score is None or score < best_score:
best_score = score
best = el
except Exception:
continue
if best:
return best
handle = None
try:
handle = frame.evaluate_handle(
"""() => {
const patterns = [/qr/i, /qrcode/i, /weixin/i, /wechat/i, /wx/i, /data:image/i];
const elements = Array.from(document.querySelectorAll('*'));
for (const el of elements) {
const style = window.getComputedStyle(el);
const bg = style.backgroundImage || '';
if (!bg || bg === 'none') continue;
if (!patterns.some((re) => re.test(bg))) continue;
const rect = el.getBoundingClientRect();
if (!rect.width || !rect.height) continue;
if (rect.width < 80 || rect.height < 80 || rect.width > 520 || rect.height > 520) continue;
const diff = Math.abs(rect.width - rect.height);
if (diff > 80) continue;
return el;
}
return null;
}"""
)
element = handle.as_element() if handle else None
if element:
return element
except Exception:
pass
finally:
try:
if handle:
handle.dispose()
except Exception:
pass
return best
def _try_click_role(self, page, role: str, name: str, timeout: int = 1500) -> bool:
try:
el = page.get_by_role(role, name=name)
if el.is_visible(timeout=timeout):
el.click()
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
return False
return False
def _try_click_names(
self,
page,
names: list,
*,
timeout_ms: int = 1200,
frame_timeout_ms: int = 800,
quick: bool = False,
) -> bool:
for name in names:
if self._try_click_role(page, "button", name, timeout=timeout_ms):
return True
if not quick:
if self._try_click_role(page, "link", name, timeout=timeout_ms):
return True
try:
el = page.get_by_text(name, exact=True)
if el.is_visible(timeout=timeout_ms):
el.click()
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
if not quick:
try:
el = page.get_by_text(name, exact=False)
if el.is_visible(timeout=timeout_ms):
el.click()
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
try:
for frame in page.frames:
for name in names:
try:
el = frame.get_by_role("button", name=name)
if el.is_visible(timeout=frame_timeout_ms):
el.click()
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
try:
el = frame.get_by_text(name, exact=True)
if el.is_visible(timeout=frame_timeout_ms):
el.click()
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
if not quick:
try:
el = frame.get_by_text(name, exact=False)
if el.is_visible(timeout=frame_timeout_ms):
el.click()
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
except Exception:
return False
return False
def _try_confirm_login(
self,
*,
timeout_ms: int = 1200,
frame_timeout_ms: int = 800,
quick: bool = False,
) -> bool:
confirm_names = ["确认登录", "确认登陆"]
pages = self._iter_pages()
for page in pages:
if self._try_click_names(
page,
confirm_names,
timeout_ms=timeout_ms,
frame_timeout_ms=frame_timeout_ms,
quick=quick,
):
return True
return False
def _capture_dialog_image(self, page) -> Optional[bytes]:
selectors = (
"[role='dialog'], .dialog, .modal, .popup, "
"div[class*='dialog'], div[class*='modal'], div[class*='popup'], "
"div[class*='login'], div[class*='passport'], div[class*='auth']"
)
try:
dialogs = page.locator(selectors)
count = min(dialogs.count(), 6)
except Exception:
return None
best = None
best_area = 0
viewport = page.viewport_size or {}
vp_width = viewport.get("width", 0)
vp_height = viewport.get("height", 0)
for i in range(count):
el = dialogs.nth(i)
try:
if not el.is_visible(timeout=800):
continue
box = el.bounding_box()
if not box:
continue
width = box.get("width", 0)
height = box.get("height", 0)
if width < 160 or height < 160:
continue
if vp_width and vp_height:
if width > vp_width * 0.92 and height > vp_height * 0.92:
continue
area = width * height
if area > best_area:
best_area = area
best = el
except Exception:
continue
if not best:
return None
try:
return best.screenshot()
except Exception:
return None
def _is_valid_qr_image(self, data: Optional[bytes]) -> bool:
if not data or len(data) < 1024:
return False
try:
from PIL import Image, ImageStat
img = Image.open(BytesIO(data))
width, height = img.size
if width < 120 or height < 120:
return False
ratio = width / float(height or 1)
if ratio < 0.6 or ratio > 1.4:
return False
gray = img.convert("L")
stat = ImageStat.Stat(gray)
if stat.stddev[0] < 5:
return False
return True
except Exception:
return len(data) >= 2048
def _save_login_state(self) -> None:
try:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
os.makedirs(os.path.dirname(storage_state), exist_ok=True)
self._context.storage_state(path=storage_state)
except Exception as e:
logger.warning(f"[KDocs] 保存登录态失败: {e}")
def _handle_qr(self, payload: Dict[str, Any]) -> Dict[str, Any]:
cfg = self._load_system_config()
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
if not doc_url:
return {"success": False, "error": "未配置金山文档链接"}
force = bool(payload.get("force"))
if force:
self._handle_clear_login()
if not self._ensure_playwright(use_storage_state=not force):
return {"success": False, "error": self._last_error or "浏览器不可用"}
if not self._open_document(doc_url, fast=True):
return {"success": False, "error": self._last_error or "打开文档失败"}
if not force and self._has_saved_login_state() and self._is_logged_in():
self._login_required = False
self._last_login_ok = True
self._save_login_state()
return {"success": True, "logged_in": True, "qr_image": ""}
fast_login_timeout = int(os.environ.get("KDOCS_FAST_LOGIN_TIMEOUT_MS", "300"))
self._ensure_login_dialog(
timeout_ms=fast_login_timeout,
frame_timeout_ms=fast_login_timeout,
quick=True,
)
qr_image = None
invalid_qr = None
for attempt in range(10):
if attempt in (3, 7):
self._ensure_login_dialog(
timeout_ms=fast_login_timeout,
frame_timeout_ms=fast_login_timeout,
quick=True,
)
candidate = self._capture_qr_image()
if candidate and self._is_valid_qr_image(candidate):
qr_image = candidate
break
if candidate:
invalid_qr = candidate
time.sleep(0.8) # 优化: 1 -> 0.8
if not qr_image:
self._last_error = "二维码识别异常" if invalid_qr else "二维码获取失败"
try:
pages = self._iter_pages()
page_urls = [getattr(p, "url", "") for p in pages]
logger.warning(f"[KDocs] 二维码未捕获,页面: {page_urls}")
ts = int(time.time())
saved = []
for idx, page in enumerate(pages[:3]):
try:
path = f"data/kdocs_debug_{ts}_{idx}.png"
page.screenshot(path=path, full_page=True)
saved.append(path)
except Exception:
continue
if saved:
logger.warning(f"[KDocs] 已保存调试截图: {saved}")
if invalid_qr:
try:
path = f"data/kdocs_invalid_qr_{ts}.png"
with open(path, "wb") as handle:
handle.write(invalid_qr)
logger.warning(f"[KDocs] 已保存无效二维码截图: {path}")
except Exception:
pass
except Exception:
pass
return {"success": False, "error": self._last_error}
try:
ts = int(time.time())
path = f"data/kdocs_last_qr_{ts}.png"
with open(path, "wb") as handle:
handle.write(qr_image)
logger.info(f"[KDocs] 已保存二维码截图: {path} ({len(qr_image)} bytes)")
except Exception as e:
logger.warning(f"[KDocs] 保存二维码截图失败: {e}")
self._last_qr_image = qr_image
self._login_required = True
return {
"success": True,
"logged_in": False,
"qr_image": base64.b64encode(qr_image).decode("ascii"),
}
def _handle_clear_login(self) -> Dict[str, Any]:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
try:
if os.path.exists(storage_state):
os.remove(storage_state)
except Exception as e:
return {"success": False, "error": f"清除登录态失败: {e}"}
self._login_required = False
self._last_login_ok = None
self._cleanup_browser()
return {"success": True}
def _handle_status_check(self) -> Dict[str, Any]:
cfg = self._load_system_config()
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
if not doc_url:
return {"success": True, "logged_in": False, "error": "未配置文档链接"}
if not self._ensure_playwright():
return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"}
if not self._open_document(doc_url, fast=True):
return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"}
fast_login_timeout = int(os.environ.get("KDOCS_FAST_LOGIN_TIMEOUT_MS", "300"))
self._ensure_login_dialog(
timeout_ms=fast_login_timeout,
frame_timeout_ms=fast_login_timeout,
quick=True,
)
self._try_confirm_login(
timeout_ms=fast_login_timeout,
frame_timeout_ms=fast_login_timeout,
quick=True,
)
logged_in = self._is_logged_in()
self._last_login_ok = logged_in
self._login_required = not logged_in
if logged_in:
self._save_login_state()
return {"success": True, "logged_in": logged_in}
def _handle_upload(self, payload: Dict[str, Any]) -> None:
cfg = self._load_system_config()
if int(cfg.get("kdocs_enabled", 0) or 0) != 1:
return
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
if not doc_url:
return
unit = (payload.get("unit") or "").strip()
name = (payload.get("name") or "").strip()
image_path = payload.get("image_path")
user_id = payload.get("user_id")
account_id = payload.get("account_id")
if not unit or not name:
return
if not image_path or not os.path.exists(image_path):
return
account = None
prev_status = None
status_tracked = False
try:
try:
account = safe_get_account(user_id, account_id)
if account and self._should_mark_upload(account):
prev_status = getattr(account, "status", None)
account.status = "上传截图"
self._emit_account_update(user_id, account)
status_tracked = True
except Exception:
prev_status = None
if not self._ensure_playwright():
self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用")
return
if not self._open_document(doc_url):
self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败")
return
if not self._is_logged_in():
self._login_required = True
self._last_login_ok = False
self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录")
try:
log_to_client("表格上传失败: 登录已失效,请管理员重新扫码登录", user_id, account_id)
except Exception:
pass
return
self._login_required = False
self._last_login_ok = True
sheet_name = (cfg.get("kdocs_sheet_name") or "").strip()
sheet_index = int(cfg.get("kdocs_sheet_index") or 0)
unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper()
image_col = (cfg.get("kdocs_image_column") or "D").strip().upper()
row_start = int(cfg.get("kdocs_row_start") or 0)
row_end = int(cfg.get("kdocs_row_end") or 0)
success = False
error_msg = ""
for attempt in range(2):
try:
if sheet_name or sheet_index:
self._select_sheet(sheet_name, sheet_index)
row_num = self._find_person_with_unit(unit, name, unit_col, row_start=row_start, row_end=row_end)
if row_num < 0:
error_msg = f"未找到人员: {unit}-{name}"
break
success = self._upload_image_to_cell(row_num, image_path, image_col)
if success:
break
except Exception as e:
error_msg = str(e)
if success:
self._last_success_at = time.time()
self._last_error = None
try:
log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id)
except Exception:
pass
return
if not error_msg:
error_msg = "上传失败"
self._last_error = error_msg
self._notify_admin(unit, name, image_path, error_msg)
try:
log_to_client(f"表格上传失败: {error_msg}", user_id, account_id)
except Exception:
pass
finally:
if status_tracked:
self._restore_account_status(user_id, account, prev_status)
def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None:
cfg = self._load_system_config()
if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1:
return
to_email = (cfg.get("kdocs_admin_notify_email") or "").strip()
if not to_email:
return
settings = email_service.get_email_settings()
if not settings.get("enabled", False):
return
subject = "金山文档上传失败提醒"
body = f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n请检查登录状态或表格配置。"
try:
email_service.send_email_async(
to_email=to_email,
subject=subject,
body=body,
email_type="kdocs_upload_failed",
)
except Exception as e:
logger.warning(f"[KDocs] 发送管理员邮件失败: {e}")
def _emit_account_update(self, user_id: int, account: Any) -> None:
try:
socketio = get_socketio()
socketio.emit("account_update", account.to_dict(), room=f"user_{user_id}")
except Exception:
pass
def _restore_account_status(self, user_id: int, account: Any, prev_status: Optional[str]) -> None:
if not account or not user_id:
return
if getattr(account, "is_running", False):
return
current_status = getattr(account, "status", "")
# 只处理上传相关的状态
if current_status not in ("上传截图", "等待上传"):
return
# 上传完成后恢复为未开始,而不是恢复到之前的等待上传状态
account.status = "未开始"
self._emit_account_update(user_id, account)
def _select_sheet(self, sheet_name: str, sheet_index: int) -> None:
if sheet_name:
candidates = [
self._page.locator("[role='tab']").filter(has_text=sheet_name),
self._page.locator(".sheet-tab").filter(has_text=sheet_name),
self._page.locator(".sheet-tab-name").filter(has_text=sheet_name),
]
for locator in candidates:
try:
if locator.count() < 1:
continue
locator.first.click()
time.sleep(0.4) # 优化: 0.5 -> 0.4
return
except Exception:
continue
if sheet_index > 0:
idx = sheet_index - 1
candidates = [
self._page.locator("[role='tab']"),
self._page.locator(".sheet-tab"),
self._page.locator(".sheet-tab-name"),
]
for locator in candidates:
try:
if locator.count() <= idx:
continue
locator.nth(idx).click()
time.sleep(0.4) # 优化: 0.5 -> 0.4
return
except Exception:
continue
def _get_current_cell_address(self) -> str:
"""获取当前选中的单元格地址(如 A1, C66 等)"""
# 优化: 移除顶部的固定 sleep改用更短的重试间隔
for attempt in range(3):
try:
name_box = self._page.locator("input.edit-box").first
value = name_box.input_value()
# 验证是否为有效的单元格地址格式(如 A1, C66, AA100 等)
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value.upper()
except Exception:
pass
try:
name_box = self._page.locator('#root input[type="text"]').first
value = name_box.input_value()
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value.upper()
except Exception:
pass
# 等待一下再重试
time.sleep(0.15) # 优化: 0.2 -> 0.15
# 如果无法获取有效地址,返回空字符串
logger.debug("[KDocs] 无法获取有效的单元格地址") # 优化: warning -> debug
return ""
def _navigate_to_cell(self, cell_address: str) -> None:
try:
name_box = self._page.locator("input.edit-box").first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
except Exception:
name_box = self._page.locator('#root input[type="text"]').first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
time.sleep(0.25) # 优化: 0.3 -> 0.25
def _focus_grid(self) -> None:
try:
info = self._page.evaluate(
"""() => {
const canvases = Array.from(document.querySelectorAll("canvas"));
let best = null;
for (const c of canvases) {
const rect = c.getBoundingClientRect();
if (!rect.width || !rect.height) continue;
if (rect.width < 200 || rect.height < 200) continue;
const area = rect.width * rect.height;
if (!best || area > best.area) {
best = {x: rect.left + rect.width / 2, y: rect.top + rect.height / 2, area};
}
}
return best;
}"""
)
if info and info.get("x") and info.get("y"):
self._page.mouse.click(info["x"], info["y"])
time.sleep(0.08) # 优化: 0.1 -> 0.08
except Exception:
pass
def _read_clipboard_text(self) -> str:
try:
return self._page.evaluate("() => navigator.clipboard.readText()") or ""
except Exception:
return ""
def _get_cell_value(self, cell_address: str) -> str:
self._navigate_to_cell(cell_address)
time.sleep(0.25) # 优化: 0.3 -> 0.25
try:
self._page.evaluate("() => navigator.clipboard.writeText('')")
except Exception:
pass
self._focus_grid()
# 尝试方法1: 读取金山文档编辑栏/公式栏的内容
try:
formula_bar_selectors = [
".formula-bar-input",
".cell-editor-input",
"[class*='formulaBar'] input",
"[class*='formula'] textarea",
".formula-editor",
"#formulaInput",
]
for selector in formula_bar_selectors:
try:
el = self._page.query_selector(selector)
if el:
value = el.input_value() if hasattr(el, "input_value") else el.inner_text()
if value and not value.startswith("=DISPIMG"):
logger.debug(f"[KDocs] 从编辑栏读取到: '{value[:50]}...'") # 优化: info -> debug
return value.strip()
except Exception:
pass
except Exception:
pass
# 尝试方法2: F2进入编辑模式全选复制
try:
self._page.keyboard.press("F2")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._page.keyboard.press("Control+a")
time.sleep(0.08) # 优化: 0.1 -> 0.08
self._page.keyboard.press("Control+c")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._page.keyboard.press("Escape")
time.sleep(0.08) # 优化: 0.1 -> 0.08
value = self._read_clipboard_text()
if value and not value.startswith("=DISPIMG"):
return value.strip()
except Exception:
pass
# 尝试方法3: 直接复制单元格(备选)
try:
self._page.keyboard.press("Control+c")
time.sleep(0.15) # 优化: 0.2 -> 0.15
value = self._read_clipboard_text()
if value:
return value.strip()
except Exception:
pass
return ""
def _normalize_text(self, value: str) -> str:
if value is None:
return ""
cleaned = str(value)
cleaned = cleaned.replace("\u00a0", "").replace("\u3000", "")
cleaned = re.sub(r"\s+", "", cleaned)
return cleaned.strip()
def _unit_matches(self, cell_value: str, expected_unit: str) -> bool:
if not cell_value or not expected_unit:
return False
norm_cell = self._normalize_text(cell_value)
norm_expected = self._normalize_text(expected_unit)
if not norm_cell or not norm_expected:
return False
if norm_cell == norm_expected:
return True
if norm_expected in norm_cell or norm_cell in norm_expected:
return True
return False
def _should_mark_upload(self, account: Any) -> bool:
if not account:
return False
status_text = str(getattr(account, "status", "") or "")
if status_text:
if "运行" in status_text or "排队" in status_text:
return False
return True
def _search_person(self, name: str) -> None:
self._focus_grid()
self._page.keyboard.press("Control+f")
time.sleep(0.25) # 优化: 0.3 -> 0.25
search_input = None
selectors = [
"input[placeholder*='查找']",
"input[placeholder*='搜索']",
"input[aria-label*='查找']",
"input[aria-label*='搜索']",
"input[type='search']",
]
try:
search_input = self._page.get_by_role("textbox").nth(3)
if search_input.is_visible(timeout=800):
search_input.fill(name)
except Exception:
search_input = None
if not search_input:
for selector in selectors:
try:
candidate = self._page.locator(selector).first
if candidate.is_visible(timeout=800):
search_input = candidate
search_input.fill(name)
break
except Exception:
continue
if not search_input:
try:
self._page.keyboard.type(name)
except Exception:
pass
time.sleep(0.15) # 优化: 0.2 -> 0.15
try:
find_btn = self._page.get_by_role("button", name="查找").nth(2)
find_btn.click()
except Exception:
try:
self._page.get_by_role("button", name="查找").first.click()
except Exception:
try:
self._page.keyboard.press("Enter")
except Exception:
pass
time.sleep(0.25) # 优化: 0.3 -> 0.25
def _find_next(self) -> None:
try:
find_btn = self._page.get_by_role("button", name="查找").nth(2)
find_btn.click()
except Exception:
try:
self._page.get_by_role("button", name="查找").first.click()
except Exception:
try:
self._page.keyboard.press("Enter")
except Exception:
pass
time.sleep(0.25) # 优化: 0.3 -> 0.25
def _close_search(self) -> None:
self._page.keyboard.press("Escape")
time.sleep(0.15) # 优化: 0.2 -> 0.15
def _extract_row_number(self, cell_address: str) -> int:
match = re.search(r"(\d+)$", cell_address)
if match:
return int(match.group(1))
return -1
def _get_cached_person(self, cache_key: str) -> Optional[int]:
"""获取缓存的人员位置(带过期检查)"""
if cache_key not in self._person_cache:
return None
row_num, timestamp = self._person_cache[cache_key]
if time.time() - timestamp > CACHE_TTL_SECONDS:
# 缓存已过期,删除并返回 None
del self._person_cache[cache_key]
logger.debug(f"[KDocs] 缓存已过期: {cache_key}")
return None
return row_num
def _set_cached_person(self, cache_key: str, row_num: int) -> None:
"""设置人员位置缓存"""
self._person_cache[cache_key] = (row_num, time.time())
def _find_person_with_unit(
self, unit: str, name: str, unit_col: str, max_attempts: int = 10, row_start: int = 0, row_end: int = 0
) -> int:
"""
查找人员所在行号。
策略只搜索姓名找到姓名列C列的匹配项
:param row_start: 有效行范围起始0表示不限制
:param row_end: 有效行范围结束0表示不限制
"""
logger.debug(f"[KDocs] 开始搜索人员: name='{name}', unit='{unit}'") # 优化: info -> debug
if row_start > 0 or row_end > 0:
logger.debug(f"[KDocs] 有效行范围: {row_start}-{row_end}") # 优化: info -> debug
# 带过期检查的缓存
cache_key = f"{name}_{unit}_{unit_col}"
cached_row = self._get_cached_person(cache_key)
if cached_row is not None:
logger.debug(f"[KDocs] 使用缓存找到人员: name='{name}', row={cached_row}") # 优化: info -> debug
return cached_row
# 使用线性搜索Ctrl+F 方式)
row_num = self._search_and_get_row(
name, max_attempts=max_attempts, expected_col="C", row_start=row_start, row_end=row_end
)
if row_num > 0:
logger.info(f"[KDocs] 找到人员: name='{name}', row={row_num}")
# 缓存结果(带时间戳)
self._set_cached_person(cache_key, row_num)
return row_num
logger.warning(f"[KDocs] 搜索失败,未找到人员 '{name}'")
return -1
def _search_and_get_row(
self, search_text: str, max_attempts: int = 10, expected_col: str = None, row_start: int = 0, row_end: int = 0
) -> int:
"""
执行搜索并获取找到的行号
:param search_text: 要搜索的文本
:param max_attempts: 最大尝试次数
:param expected_col: 期望的列(如 'C'),如果指定则只接受该列的结果
:param row_start: 有效行范围起始0表示不限制
:param row_end: 有效行范围结束0表示不限制
"""
self._focus_grid()
self._search_person(search_text)
found_positions = set() # 记录已找到的位置(列+行)
for attempt in range(max_attempts):
self._close_search()
time.sleep(0.2) # 优化: 0.3 -> 0.2
current_address = self._get_current_cell_address()
if not current_address:
logger.debug(f"[KDocs] 第{attempt + 1}次: 无法获取单元格地址") # 优化: warning -> debug
# 继续尝试下一个
self._page.keyboard.press("Control+f")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
row_num = self._extract_row_number(current_address)
# 提取列字母A, B, C, D 等)
col_letter = "".join(c for c in current_address if c.isalpha()).upper()
logger.debug(
f"[KDocs] 第{attempt + 1}次搜索'{search_text}': 单元格={current_address}, 列={col_letter}, 行号={row_num}"
) # 优化: info -> debug
if row_num <= 0:
logger.debug(f"[KDocs] 无法提取行号,搜索可能没有结果") # 优化: warning -> debug
return -1
# 检查是否已经访问过这个位置
position_key = f"{col_letter}{row_num}"
if position_key in found_positions:
logger.debug(f"[KDocs] 位置{position_key}已搜索过,循环结束") # 优化: info -> debug
# 检查是否有任何有效结果
valid_results = [
pos
for pos in found_positions
if (not expected_col or pos.startswith(expected_col)) and self._extract_row_number(pos) > 2
]
if valid_results:
# 返回第一个有效结果的行号
return self._extract_row_number(valid_results[0])
return -1
found_positions.add(position_key)
# 跳过标题行和表头行通常是第1-2行
if row_num <= 2:
logger.debug(f"[KDocs] 跳过标题/表头行: {row_num}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
# 如果指定了期望的列,检查是否匹配
if expected_col and col_letter != expected_col.upper():
logger.debug(f"[KDocs] 列不匹配: 期望={expected_col}, 实际={col_letter}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
# 检查行号是否在有效范围内
if row_start > 0 and row_num < row_start:
logger.debug(f"[KDocs] 行号{row_num}小于起始行{row_start}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
if row_end > 0 and row_num > row_end:
logger.debug(f"[KDocs] 行号{row_num}大于结束行{row_end}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
# 找到有效的数据行,列匹配且在行范围内
logger.debug(f"[KDocs] 找到有效位置: {current_address}") # 优化: info -> debug
return row_num
self._close_search()
logger.debug(f"[KDocs] 达到最大尝试次数{max_attempts},未找到有效结果") # 优化: warning -> debug
return -1
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
cell_address = f"{image_col}{row_num}"
# 清除单元格现有内容
try:
# 1. 导航到单元格
self._navigate_to_cell(cell_address)
time.sleep(0.2) # 优化: 0.3 -> 0.2
# 2. 按 Escape 退出可能的编辑模式,回到选中状态
self._page.keyboard.press("Escape")
time.sleep(0.2) # 优化: 0.3 -> 0.2
# 3. 按 Delete 删除选中单元格的内容
self._page.keyboard.press("Delete")
time.sleep(0.4) # 优化: 0.5 -> 0.4
logger.debug(f"[KDocs] 已删除 {cell_address} 的内容") # 优化: info -> debug
except Exception as e:
logger.warning(f"[KDocs] 清除单元格内容时出错: {e}")
logger.info(f"[KDocs] 上传图片到 {cell_address}")
try:
insert_btn = self._page.get_by_role("button", name="插入")
insert_btn.click()
time.sleep(0.25) # 优化: 0.3 -> 0.25
except Exception as e:
raise RuntimeError(f"打开插入菜单失败: {e}")
try:
image_btn = self._page.get_by_role("button", name="图片")
image_btn.click()
time.sleep(0.25) # 优化: 0.3 -> 0.25
cell_image_option = self._page.get_by_role("option", name="单元格图片")
cell_image_option.click()
time.sleep(0.15) # 优化: 0.2 -> 0.15
except Exception as e:
raise RuntimeError(f"选择单元格图片失败: {e}")
try:
local_option = self._page.get_by_role("option", name="本地")
# 添加超时防止无限阻塞
with self._page.expect_file_chooser(timeout=15000) as fc_info:
local_option.click()
file_chooser = fc_info.value
file_chooser.set_files(image_path)
except Exception as e:
raise RuntimeError(f"上传文件失败: {e}")
time.sleep(1.5) # 优化: 2 -> 1.5
return True
_kdocs_uploader: Optional[KDocsUploader] = None
def get_kdocs_uploader() -> KDocsUploader:
global _kdocs_uploader
if _kdocs_uploader is None:
_kdocs_uploader = KDocsUploader()
_kdocs_uploader.start()
return _kdocs_uploader