#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 金山文档上传优化器 - 单线程安全版本 基于智能缓存和优化的等待策略 """ import os import time import threading import queue import re from typing import Optional, Dict, Tuple, Any from pathlib import Path try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError except ImportError: print("错误: 需要安装 playwright") print("请运行: pip install playwright") sync_playwright = None PlaywrightTimeoutError = Exception class PersonPositionCache: """人员位置缓存 - 带实时验证的安全缓存""" def __init__(self, cache_ttl: int = 1800): # 30分钟缓存 self._cache: Dict[str, Tuple[int, str, float]] = {} # name: (row, unit, timestamp) self._ttl = cache_ttl self._lock = threading.Lock() def get_position(self, name: str, unit: str) -> Optional[int]: """获取人员位置,先查缓存,再验证有效性""" key = f"{unit}-{name}" with self._lock: if key not in self._cache: return None row, cached_unit, timestamp = self._cache[key] # 检查缓存是否过期 if time.time() - timestamp > self._ttl: return None # 验证县区是否匹配(安全检查) if cached_unit != unit: return None return row def set_position(self, name: str, unit: str, row: int): """记录人员位置""" key = f"{unit}-{name}" with self._lock: self._cache[key] = (row, unit, time.time()) def invalidate(self, name: str, unit: str): """使指定人员的位置缓存失效""" key = f"{unit}-{name}" with self._lock: if key in self._cache: del self._cache[key] def clear(self): """清空所有缓存""" with self._lock: self._cache.clear() def get_stats(self) -> Dict[str, Any]: """获取缓存统计信息""" with self._lock: return { "total_entries": len(self._cache), "cache": dict(self._cache) } class OptimizedKdocsUploader: """优化后的金山文档上传器 - 单线程安全版本""" def __init__(self, cache_ttl: int = 1800): self._queue = queue.Queue(maxsize=200) self._thread = threading.Thread(target=self._run, name="kdocs-uploader-optimized", daemon=True) self._running = False self._last_error: Optional[str] = None self._last_success_at: Optional[float] = None # 优化特性 self._cache = PersonPositionCache(cache_ttl=cache_ttl) self._playwright = None self._browser = None self._context = None self._page = None # 可配置参数 self._config = { 'fast_timeout_ms': int(os.environ.get('KDOCS_FAST_GOTO_TIMEOUT_MS', '10000')), # 10秒 'fast_login_timeout_ms': int(os.environ.get('KDOCS_FAST_LOGIN_TIMEOUT_MS', '300')), # 300ms 'navigation_wait': float(os.environ.get('KDOCS_NAVIGATION_WAIT', '0.2')), # 0.2秒 'click_wait': float(os.environ.get('KDOCS_CLICK_WAIT', '0.3')), # 0.3秒 'upload_wait': float(os.environ.get('KDOCS_UPLOAD_WAIT', '0.8')), # 0.8秒(原2秒) 'search_attempts': int(os.environ.get('KDOCS_SEARCH_ATTEMPTS', '10')), # 10次(原50次) } self.log_callback: Optional[callable] = None def set_log_callback(self, callback: callable): """设置日志回调函数""" self.log_callback = callback def _log(self, message: str, level: str = 'INFO'): """内部日志记录""" if self.log_callback: self.log_callback(f"[{level}] {message}") print(f"[{level}] {message}") def start(self) -> None: """启动上传器""" if self._running: return self._running = True self._thread.start() self._log("优化上传器已启动", 'SUCCESS') def stop(self) -> None: """停止上传器""" if not self._running: return self._running = False self._queue.put({"action": "shutdown"}) self._log("优化上传器已停止", 'INFO') def upload_screenshot( self, user_id: int, account_id: str, unit: str, name: str, image_path: str, ) -> bool: """上传截图(安全版本)""" if not self._running: self.start() payload = { "user_id": user_id, "account_id": account_id, "unit": unit, "name": name, "image_path": image_path, } try: self._queue.put({"action": "upload", "payload": payload}, timeout=1) return True except queue.Full: self._last_error = "上传队列已满" self._log(self._last_error, 'ERROR') return False def _run(self) -> None: """主线程循环""" while True: task = self._queue.get() if not task: continue action = task.get("action") if action == "shutdown": break try: if action == "upload": self._handle_upload(task.get("payload") or {}) except Exception as e: self._log(f"处理任务失败: {str(e)}", 'ERROR') self._cleanup_browser() def _ensure_browser(self) -> bool: """确保浏览器可用""" if sync_playwright is None: self._last_error = "playwright 未安装" return False try: if self._playwright is None: self._playwright = sync_playwright().start() if self._browser is None: headless = os.environ.get("KDOCS_HEADLESS", "false").lower() != "false" self._browser = self._playwright.chromium.launch(headless=headless) if self._context is None: storage_state = "data/kdocs_login_state.json" if os.path.exists(storage_state): self._context = self._browser.new_context(storage_state=storage_state) else: self._context = self._browser.new_context() if self._page is None or self._page.is_closed(): self._page = self._context.new_page() self._page.set_default_timeout(30000) return True except Exception as e: self._last_error = f"浏览器启动失败: {e}" self._log(self._last_error, 'ERROR') self._cleanup_browser() return False def _cleanup_browser(self) -> None: """清理浏览器资源""" try: if self._page: self._page.close() except: pass self._page = None try: if self._context: self._context.close() except: pass self._context = None try: if self._browser: self._browser.close() except: pass self._browser = None try: if self._playwright: self._playwright.stop() except: pass self._playwright = None def _handle_upload(self, payload: Dict[str, Any]) -> None: """处理上传任务""" unit = payload.get("unit", "").strip() name = payload.get("name", "").strip() image_path = payload.get("image_path") user_id = payload.get("user_id") account_id = payload.get("account_id") if not unit or not name: self._log("跳过上传:县区或姓名为空", 'WARNING') return if not image_path or not os.path.exists(image_path): self._log(f"跳过上传:图片文件不存在 ({image_path})", 'WARNING') return try: # 1. 确保浏览器可用 if not self._ensure_browser(): self._log("跳过上传:浏览器不可用", 'ERROR') return # 2. 打开文档(需要从配置获取) doc_url = os.environ.get("KDOCS_DOC_URL") if not doc_url: self._log("跳过上传:未配置金山文档URL", 'ERROR') return self._log(f"打开文档: {doc_url}", 'INFO') self._page.goto(doc_url, wait_until='domcontentloaded', timeout=self._config['fast_timeout_ms']) time.sleep(self._config['navigation_wait']) # 3. 尝试使用缓存定位人员 cached_row = self._cache.get_position(name, unit) if cached_row: self._log(f"使用缓存定位: {name} 在第{cached_row}行", 'INFO') # 验证缓存位置是否仍然有效 if self._verify_position(cached_row, name, unit): self._log("缓存验证成功", 'SUCCESS') # 直接上传 success = self._upload_image_to_cell(cached_row, image_path) if success: self._last_success_at = time.time() self._last_error = None self._log(f"[OK] 上传成功: {unit}-{name}", 'SUCCESS') return else: self._log("缓存位置上传失败,将重新搜索", 'WARNING') else: self._log("缓存验证失败,将重新搜索", 'WARNING') # 4. 缓存失效,重新搜索 self._log(f"开始搜索: {unit}-{name}", 'INFO') row_num = self._find_person_fast(name, unit) if row_num > 0: # 记录新位置到缓存 self._cache.set_position(name, unit, row_num) self._log(f"搜索成功,找到第{row_num}行", 'SUCCESS') # 上传图片 success = self._upload_image_to_cell(row_num, image_path) if success: self._last_success_at = time.time() self._last_error = None self._log(f"[OK] 上传成功: {unit}-{name}", 'SUCCESS') else: self._log(f"✗ 上传失败: {unit}-{name}", 'ERROR') else: self._log(f"✗ 未找到人员: {unit}-{name}", 'ERROR') except Exception as e: self._log(f"上传过程出错: {str(e)}", 'ERROR') self._last_error = str(e) def _verify_position(self, row: int, name: str, unit: str) -> bool: """快速验证位置是否有效(只读操作)""" try: # 直接读取C列(姓名列) name_cell = self._read_cell_value(f"C{row}") if name_cell != name: return False # 直接读取A列(县区列) unit_cell = self._read_cell_value(f"A{row}") if unit_cell != unit: return False return True except Exception as e: self._log(f"验证位置失败: {str(e)}", 'WARNING') return False def _read_cell_value(self, cell_address: str) -> str: """快速读取单元格值""" try: # 导航到单元格 name_box = self._page.locator("input.edit-box").first name_box.click() name_box.fill(cell_address) name_box.press("Enter") time.sleep(self._config['navigation_wait']) # 尝试从名称框读取 value = name_box.input_value() if value and re.match(r"^[A-Z]+\d+$", value.upper()): return value # 备选:尝试从编辑栏读取 try: formula_bar = self._page.locator("[class*='formula'] textarea").first if formula_bar.is_visible(): value = formula_bar.input_value() if value and not value.startswith("=DISPIMG"): return value except: pass return "" except Exception: return "" def _find_person_fast(self, name: str, unit: str) -> int: """优化的快速人员搜索""" # 策略:先尝试常见行号,然后才用搜索 # 常见行号列表(根据实际表格调整) common_rows = [66, 67, 68, 70, 75, 80, 85, 90, 95, 100] self._log(f"快速定位模式:检查常见行号", 'INFO') # 检查常见行号 for row in common_rows: if self._verify_position(row, name, unit): self._log(f"快速命中:第{row}行", 'SUCCESS') return row # 如果常见行号没找到,使用优化的搜索 self._log("使用搜索模式", 'INFO') return self._search_person_optimized(name, unit) def _search_person_optimized(self, name: str, unit: str) -> int: """优化的搜索策略 - 减少尝试次数""" max_attempts = self._config['search_attempts'] try: # 聚焦网格 self._focus_grid() # 打开搜索框 self._page.keyboard.press("Control+f") time.sleep(0.2) # 输入姓名 self._page.keyboard.type(name) time.sleep(0.1) # 按回车搜索 self._page.keyboard.press("Enter") time.sleep(self._config['click_wait']) # 关闭搜索 self._page.keyboard.press("Escape") time.sleep(0.2) # 获取当前位置 current_address = self._get_current_cell_address() if not current_address: return -1 row_num = self._extract_row_number(current_address) # 验证找到的位置 if row_num > 2 and self._verify_position(row_num, name, unit): return row_num return -1 except Exception as e: self._log(f"搜索出错: {str(e)}", 'ERROR') return -1 def _focus_grid(self): """聚焦到网格""" try: # 尝试点击网格中央 canvases = self._page.locator("canvas").all() if canvases: # 点击第一个canvas box = canvases[0].bounding_box() if box: x = box['x'] + box['width'] / 2 y = box['y'] + box['height'] / 2 self._page.mouse.click(x, y) time.sleep(self._config['navigation_wait']) except Exception as e: self._log(f"聚焦网格失败: {str(e)}", 'WARNING') def _get_current_cell_address(self) -> str: """获取当前单元格地址""" try: name_box = self._page.locator("input.edit-box").first value = name_box.input_value() if value and re.match(r"^[A-Z]+\d+$", value.upper()): return value.upper() except: pass return "" def _extract_row_number(self, cell_address: str) -> int: """从单元格地址提取行号""" match = re.search(r"(\d+)$", cell_address) if match: return int(match.group(1)) return -1 def _upload_image_to_cell(self, row_num: int, image_path: str) -> bool: """上传图片到指定单元格""" try: cell_address = f"D{row_num}" # 导航到单元格 self._log(f"导航到单元格: {cell_address}", 'INFO') name_box = self._page.locator("input.edit-box").first name_box.click() name_box.fill(cell_address) name_box.press("Enter") time.sleep(self._config['navigation_wait']) # 清空单元格(仅此单元格) self._page.keyboard.press("Escape") time.sleep(0.1) self._page.keyboard.press("Delete") time.sleep(self._config['click_wait']) # 插入图片 self._log("打开插入菜单", 'INFO') insert_btn = self._page.locator("text=插入").first insert_btn.click() time.sleep(self._config['click_wait']) self._log("选择图片", 'INFO') image_btn = self._page.locator("text=图片").first image_btn.click() time.sleep(self._config['click_wait']) cell_image_option = self._page.locator("text=单元格图片").first cell_image_option.click() time.sleep(0.2) # 上传文件 self._log(f"上传图片: {image_path}", 'INFO') with self._page.expect_file_chooser() as fc_info: pass file_chooser = fc_info.value file_chooser.set_files(image_path) # 等待上传完成(优化:减少等待时间) time.sleep(self._config['upload_wait']) self._log("图片上传完成", 'SUCCESS') return True except Exception as e: self._log(f"上传图片失败: {str(e)}", 'ERROR') return False def get_cache_stats(self) -> Dict[str, Any]: """获取缓存统计""" return self._cache.get_stats() # ==================== 使用示例 ==================== def main(): """主函数 - 演示如何使用""" uploader = OptimizedKdocsUploader(cache_ttl=1800) # 30分钟缓存 # 设置日志回调 def log_func(message: str): print(f"[LOG] {message}") uploader.set_log_callback(log_func) # 启动 uploader.start() # 模拟上传任务 test_payload = { "user_id": 1, "account_id": "test001", "unit": "海淀区", "name": "张三", "image_path": "test_screenshot.jpg" } print("正在上传截图...") success = uploader.upload_screenshot(**test_payload) if success: print("[OK] 上传任务已提交") else: print("✗ 上传任务提交失败") # 显示缓存统计 stats = uploader.get_cache_stats() print(f"缓存统计: {stats}") # 停止 time.sleep(2) uploader.stop() print("上传器已停止") if __name__ == "__main__": main()