diff --git a/services/kdocs_uploader.py b/services/kdocs_uploader.py index acd0337..7c541b8 100644 --- a/services/kdocs_uploader.py +++ b/services/kdocs_uploader.py @@ -1,5 +1,9 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +KDocs Uploader with Auto-Recovery Mechanism +自动恢复机制:当检测到上传线程卡住时,自动重启线程 +""" from __future__ import annotations import base64 @@ -31,11 +35,16 @@ except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级 logger = get_logger() config = get_config() +# 看门狗配置 +WATCHDOG_CHECK_INTERVAL = 60 # 每60秒检查一次 +WATCHDOG_TIMEOUT = 300 # 如果5分钟没有活动且队列有任务,认为线程卡住 + class KDocsUploader: def __init__(self) -> None: self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200"))) - self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True) + self._thread: Optional[threading.Thread] = None + self._thread_id = 0 # 线程ID,用于追踪重启次数 self._running = False self._last_error: Optional[str] = None self._last_success_at: Optional[float] = None @@ -49,17 +58,108 @@ class KDocsUploader: self._last_login_ok: Optional[bool] = None self._doc_url: Optional[str] = None + # 自动恢复机制相关 + self._last_activity: float = time.time() # 最后活动时间 + self._watchdog_thread: Optional[threading.Thread] = None + self._watchdog_running = False + self._restart_count = 0 # 重启次数统计 + self._lock = threading.Lock() # 线程安全锁 + def start(self) -> None: - if self._running: - return - self._running = True - self._thread.start() + with self._lock: + if self._running: + return + self._running = True + self._thread_id += 1 + self._thread = threading.Thread( + target=self._run, + name=f"kdocs-uploader-{self._thread_id}", + daemon=True + ) + self._thread.start() + self._last_activity = time.time() + + # 启动看门狗线程 + if not self._watchdog_running: + self._watchdog_running = True + self._watchdog_thread = threading.Thread( + target=self._watchdog_run, + name="kdocs-watchdog", + daemon=True + ) + self._watchdog_thread.start() + logger.info("[KDocs] 看门狗线程已启动") def stop(self) -> None: - if not self._running: - return - self._running = False - self._queue.put({"action": "shutdown"}) + with self._lock: + if not self._running: + return + self._running = False + self._watchdog_running = False + self._queue.put({"action": "shutdown"}) + + def _watchdog_run(self) -> None: + """看门狗线程:监控上传线程健康状态""" + logger.info("[KDocs] 看门狗开始监控") + while self._watchdog_running: + try: + time.sleep(WATCHDOG_CHECK_INTERVAL) + + if not self._running: + continue + + # 检查线程是否存活 + if self._thread is None or not self._thread.is_alive(): + logger.warning("[KDocs] 检测到上传线程已停止,正在重启...") + self._restart_thread() + continue + + # 检查是否有任务堆积且长时间无活动 + queue_size = self._queue.qsize() + time_since_activity = time.time() - self._last_activity + + if queue_size > 0 and time_since_activity > WATCHDOG_TIMEOUT: + logger.warning( + f"[KDocs] 检测到上传线程可能卡住: " + f"队列={queue_size}, 无活动时间={time_since_activity:.0f}秒" + ) + self._restart_thread() + + except Exception as e: + logger.warning(f"[KDocs] 看门狗检查异常: {e}") + + def _restart_thread(self) -> None: + """重启上传线程""" + with self._lock: + self._restart_count += 1 + logger.warning(f"[KDocs] 正在重启上传线程 (第{self._restart_count}次重启)") + + # 清理浏览器资源 + try: + self._cleanup_browser() + + except Exception as e: + logger.warning(f"[KDocs] 清理浏览器时出错: {e}") + + # 停止旧线程(如果还在运行) + old_running = self._running + self._running = False + + # 等待一小段时间让旧线程有机会退出 + time.sleep(1) + + # 启动新线程 + self._running = True + self._thread_id += 1 + self._thread = threading.Thread( + target=self._run, + name=f"kdocs-uploader-{self._thread_id}", + daemon=True + ) + self._thread.start() + self._last_activity = time.time() + self._last_error = f"线程已自动恢复 (第{self._restart_count}次)" + logger.info(f"[KDocs] 上传线程已重启 (ID={self._thread_id})") def get_status(self) -> Dict[str, Any]: return { @@ -68,6 +168,8 @@ class KDocsUploader: "last_error": self._last_error, "last_success_at": self._last_success_at, "last_login_ok": self._last_login_ok, + "restart_count": self._restart_count, # 新增:重启次数 + "thread_alive": self._thread.is_alive() if self._thread else False, # 新增:线程状态 } def enqueue_upload( @@ -130,28 +232,57 @@ class KDocsUploader: return {"success": False, "error": "操作超时"} def _run(self) -> None: - while True: - task = self._queue.get() - if not task: - continue - action = task.get("action") - if action == "shutdown": - break - try: - if action == "upload": - self._handle_upload(task.get("payload") or {}) - elif action == "qr": - result = self._handle_qr(task.get("payload") or {}) - task.get("response").put(result) - elif action == "clear_login": - result = self._handle_clear_login() - task.get("response").put(result) - elif action == "status": - result = self._handle_status_check() - task.get("response").put(result) - except Exception as e: - logger.warning(f"[KDocs] 处理任务失败: {e}") + thread_id = self._thread_id + logger.info(f"[KDocs] 上传线程启动 (ID={thread_id})") + while self._running: + try: + # 使用超时获取任务,以便定期检查 _running 状态 + try: + task = self._queue.get(timeout=5) + except queue.Empty: + continue + + if not task: + continue + + # 更新最后活动时间 + self._last_activity = time.time() + + action = task.get("action") + if action == "shutdown": + break + + try: + if action == "upload": + self._handle_upload(task.get("payload") or {}) + elif action == "qr": + result = self._handle_qr(task.get("payload") or {}) + task.get("response").put(result) + elif action == "clear_login": + result = self._handle_clear_login() + task.get("response").put(result) + elif action == "status": + result = self._handle_status_check() + task.get("response").put(result) + + # 任务处理完成后更新活动时间 + self._last_activity = time.time() + + except Exception as e: + logger.warning(f"[KDocs] 处理任务失败: {e}") + # 如果有响应队列,返回错误 + if "response" in task and task.get("response"): + try: + task["response"].put({"success": False, "error": str(e)}) + except Exception: + pass + + except Exception as e: + logger.warning(f"[KDocs] 线程主循环异常: {e}") + time.sleep(1) # 避免异常时的紧密循环 + + logger.info(f"[KDocs] 上传线程退出 (ID={thread_id})") self._cleanup_browser() def _load_system_config(self) -> Dict[str, Any]: @@ -180,6 +311,7 @@ class KDocsUploader: except Exception as e: self._last_error = f"浏览器启动失败: {e}" self._cleanup_browser() + return False def _cleanup_browser(self) -> None: @@ -784,6 +916,7 @@ class KDocsUploader: self._login_required = False self._last_login_ok = None self._cleanup_browser() + return {"success": True} def _handle_status_check(self) -> Dict[str, Any]: @@ -1335,7 +1468,7 @@ class KDocsUploader: logger.info("[KDocs调试] ====================================") def _find_person_with_unit( - self, unit: str, name: str, unit_col: str, max_attempts: int = 50, row_start: int = 0, row_end: int = 0 + self, unit: str, name: str, unit_col: str, max_attempts: int = 10, row_start: int = 0, row_end: int = 0 ) -> int: """ 查找人员所在行号。 @@ -1359,17 +1492,17 @@ class KDocsUploader: # 只搜索姓名 - 这是目前唯一可靠的方式 logger.info(f"[KDocs调试] 搜索姓名: '{name}'") - # 首先尝试二分搜索优化 - binary_result = self._binary_search_person(name, unit_col, row_start, row_end) - if binary_result > 0: - logger.info(f"[KDocs调试] [OK] 二分搜索成功! 找到行号={binary_result}") - # 缓存结果 - if not hasattr(self, "_person_cache"): - self._person_cache = {} - self._person_cache[cache_key] = binary_result - return binary_result + # 注意: 二分搜索已禁用 - _get_cell_value_fast() 使用的 DOM 选择器在金山文档中不存在 + # 直接使用线性搜索,这是唯一可靠的方法 + # binary_result = self._binary_search_person(name, unit_col, row_start, row_end) + # if binary_result > 0: + # logger.info(f"[KDocs调试] [OK] 二分搜索成功! 找到行号={binary_result}") + # if not hasattr(self, "_person_cache"): + # self._person_cache = {} + # self._person_cache[cache_key] = binary_result + # return binary_result - # 如果二分搜索失败,回退到线性搜索 + # 使用线性搜索(Ctrl+F 方式) row_num = self._search_and_get_row( name, max_attempts=max_attempts, expected_col="C", row_start=row_start, row_end=row_end ) @@ -1562,8 +1695,7 @@ class KDocsUploader: def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool: cell_address = f"{image_col}{row_num}" - self._navigate_to_cell(cell_address) - time.sleep(0.3) + # 注意: 移除了重复的导航调用,只保留一次导航 # 清除单元格现有内容 try: @@ -1603,7 +1735,8 @@ class KDocsUploader: try: local_option = self._page.get_by_role("option", name="本地") - with self._page.expect_file_chooser() as fc_info: + # 添加超时防止无限阻塞 + with self._page.expect_file_chooser(timeout=15000) as fc_info: local_option.click() file_chooser = fc_info.value file_chooser.set_files(image_path) @@ -1623,3 +1756,4 @@ def get_kdocs_uploader() -> KDocsUploader: _kdocs_uploader = KDocsUploader() _kdocs_uploader.start() return _kdocs_uploader +