优化 KDocs 上传器

- 删除死代码 (二分搜索相关方法,减少 ~186 行)
- 优化 sleep 等待时间,减少约 30% 的等待
- 添加缓存过期机制 (5分钟 TTL)
- 优化日志级别,减少调试日志噪音

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-21 20:09:46 +08:00
parent f46f325518
commit fae21329d7

View File

@@ -3,6 +3,12 @@
"""
KDocs Uploader with Auto-Recovery Mechanism
自动恢复机制:当检测到上传线程卡住时,自动重启线程
优化记录 (2026-01-21):
- 删除无效的二分搜索相关代码 (_binary_search_person, _name_matches, _name_less_than, _get_cell_value_fast)
- 优化 sleep 等待时间,减少约 30% 的等待
- 添加缓存过期机制 (5分钟 TTL)
- 优化日志级别,减少调试日志噪音
"""
from __future__ import annotations
@@ -13,7 +19,7 @@ import re
import threading
import time
from io import BytesIO
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlparse
import database
@@ -39,6 +45,9 @@ config = get_config()
WATCHDOG_CHECK_INTERVAL = 60 # 每60秒检查一次
WATCHDOG_TIMEOUT = 300 # 如果5分钟没有活动且队列有任务认为线程卡住
# 缓存配置
CACHE_TTL_SECONDS = 300 # 缓存过期时间: 5分钟
class KDocsUploader:
def __init__(self) -> None:
@@ -65,6 +74,9 @@ class KDocsUploader:
self._restart_count = 0 # 重启次数统计
self._lock = threading.Lock() # 线程安全锁
# 人员位置缓存: {cache_key: (row_num, timestamp)}
self._person_cache: Dict[str, Tuple[int, float]] = {}
def start(self) -> None:
with self._lock:
if self._running:
@@ -168,8 +180,8 @@ class KDocsUploader:
"last_error": self._last_error,
"last_success_at": self._last_success_at,
"last_login_ok": self._last_login_ok,
"restart_count": self._restart_count, # 新增:重启次数
"thread_alive": self._thread.is_alive() if self._thread else False, # 新增:线程状态
"restart_count": self._restart_count,
"thread_alive": self._thread.is_alive() if self._thread else False,
}
def enqueue_upload(
@@ -365,7 +377,7 @@ class KDocsUploader:
fast_timeout = int(os.environ.get("KDOCS_FAST_GOTO_TIMEOUT_MS", "15000"))
goto_kwargs = {"wait_until": "domcontentloaded", "timeout": fast_timeout}
self._page.goto(doc_url, **goto_kwargs)
time.sleep(0.6)
time.sleep(0.5) # 优化: 0.6 -> 0.5
doc_pages = self._find_doc_pages(doc_url)
if doc_pages and doc_pages[0] is not self._page:
self._page = doc_pages[0]
@@ -520,7 +532,7 @@ class KDocsUploader:
clicked = True
break
if clicked:
time.sleep(1.5)
time.sleep(1.2) # 优化: 1.5 -> 1.2
pages = self._iter_pages()
for page in pages:
if self._try_click_names(
@@ -655,7 +667,7 @@ class KDocsUploader:
el = page.get_by_role(role, name=name)
if el.is_visible(timeout=timeout):
el.click()
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
return False
@@ -680,7 +692,7 @@ class KDocsUploader:
el = page.get_by_text(name, exact=True)
if el.is_visible(timeout=timeout_ms):
el.click()
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
@@ -689,7 +701,7 @@ class KDocsUploader:
el = page.get_by_text(name, exact=False)
if el.is_visible(timeout=timeout_ms):
el.click()
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
@@ -700,7 +712,7 @@ class KDocsUploader:
el = frame.get_by_role("button", name=name)
if el.is_visible(timeout=frame_timeout_ms):
el.click()
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
@@ -708,7 +720,7 @@ class KDocsUploader:
el = frame.get_by_text(name, exact=True)
if el.is_visible(timeout=frame_timeout_ms):
el.click()
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
@@ -717,7 +729,7 @@ class KDocsUploader:
el = frame.get_by_text(name, exact=False)
if el.is_visible(timeout=frame_timeout_ms):
el.click()
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
return True
except Exception:
pass
@@ -858,7 +870,7 @@ class KDocsUploader:
break
if candidate:
invalid_qr = candidate
time.sleep(1)
time.sleep(0.8) # 优化: 1 -> 0.8
if not qr_image:
self._last_error = "二维码识别异常" if invalid_qr else "二维码获取失败"
try:
@@ -1098,7 +1110,7 @@ class KDocsUploader:
if locator.count() < 1:
continue
locator.first.click()
time.sleep(0.5)
time.sleep(0.4) # 优化: 0.5 -> 0.4
return
except Exception:
continue
@@ -1115,18 +1127,14 @@ class KDocsUploader:
if locator.count() <= idx:
continue
locator.nth(idx).click()
time.sleep(0.5)
time.sleep(0.4) # 优化: 0.5 -> 0.4
return
except Exception:
continue
def _get_current_cell_address(self) -> str:
"""获取当前选中的单元格地址(如 A1, C66 等)"""
import re
# 等待一小段时间让名称框稳定
time.sleep(0.1)
# 优化: 移除顶部的固定 sleep改用更短的重试间隔
for attempt in range(3):
try:
name_box = self._page.locator("input.edit-box").first
@@ -1146,10 +1154,10 @@ class KDocsUploader:
pass
# 等待一下再重试
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
# 如果无法获取有效地址,返回空字符串
logger.warning("[KDocs调试] 无法获取有效的单元格地址")
logger.debug("[KDocs] 无法获取有效的单元格地址") # 优化: warning -> debug
return ""
def _navigate_to_cell(self, cell_address: str) -> None:
@@ -1163,7 +1171,7 @@ class KDocsUploader:
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
def _focus_grid(self) -> None:
try:
@@ -1185,7 +1193,7 @@ class KDocsUploader:
)
if info and info.get("x") and info.get("y"):
self._page.mouse.click(info["x"], info["y"])
time.sleep(0.1)
time.sleep(0.08) # 优化: 0.1 -> 0.08
except Exception:
pass
@@ -1197,7 +1205,7 @@ class KDocsUploader:
def _get_cell_value(self, cell_address: str) -> str:
self._navigate_to_cell(cell_address)
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
try:
self._page.evaluate("() => navigator.clipboard.writeText('')")
except Exception:
@@ -1206,7 +1214,6 @@ class KDocsUploader:
# 尝试方法1: 读取金山文档编辑栏/公式栏的内容
try:
# 金山文档的编辑栏选择器(可能需要调整)
formula_bar_selectors = [
".formula-bar-input",
".cell-editor-input",
@@ -1221,7 +1228,7 @@ class KDocsUploader:
if el:
value = el.input_value() if hasattr(el, "input_value") else el.inner_text()
if value and not value.startswith("=DISPIMG"):
logger.info(f"[KDocs调试] 从编辑栏读取到: '{value[:50]}...' (selector={selector})")
logger.debug(f"[KDocs] 从编辑栏读取到: '{value[:50]}...'") # 优化: info -> debug
return value.strip()
except Exception:
pass
@@ -1231,13 +1238,13 @@ class KDocsUploader:
# 尝试方法2: F2进入编辑模式全选复制
try:
self._page.keyboard.press("F2")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._page.keyboard.press("Control+a")
time.sleep(0.1)
time.sleep(0.08) # 优化: 0.1 -> 0.08
self._page.keyboard.press("Control+c")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._page.keyboard.press("Escape")
time.sleep(0.1)
time.sleep(0.08) # 优化: 0.1 -> 0.08
value = self._read_clipboard_text()
if value and not value.startswith("=DISPIMG"):
return value.strip()
@@ -1247,7 +1254,7 @@ class KDocsUploader:
# 尝试方法3: 直接复制单元格(备选)
try:
self._page.keyboard.press("Control+c")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
value = self._read_clipboard_text()
if value:
return value.strip()
@@ -1288,7 +1295,7 @@ class KDocsUploader:
def _search_person(self, name: str) -> None:
self._focus_grid()
self._page.keyboard.press("Control+f")
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
search_input = None
selectors = [
"input[placeholder*='查找']",
@@ -1318,7 +1325,7 @@ class KDocsUploader:
self._page.keyboard.type(name)
except Exception:
pass
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
try:
find_btn = self._page.get_by_role("button", name="查找").nth(2)
find_btn.click()
@@ -1330,7 +1337,7 @@ class KDocsUploader:
self._page.keyboard.press("Enter")
except Exception:
pass
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
def _find_next(self) -> None:
try:
@@ -1344,128 +1351,33 @@ class KDocsUploader:
self._page.keyboard.press("Enter")
except Exception:
pass
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
def _close_search(self) -> None:
self._page.keyboard.press("Escape")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
def _extract_row_number(self, cell_address: str) -> int:
import re
match = re.search(r"(\d+)$", cell_address)
if match:
return int(match.group(1))
return -1
def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool:
"""验证县区 - 从目标行开始搜索县区"""
logger.info(f"[KDocs调试] 验证县区: 期望行={row_num}, 期望值='{unit}'")
def _get_cached_person(self, cache_key: str) -> Optional[int]:
"""获取缓存的人员位置(带过期检查)"""
if cache_key not in self._person_cache:
return None
row_num, timestamp = self._person_cache[cache_key]
if time.time() - timestamp > CACHE_TTL_SECONDS:
# 缓存已过期,删除并返回 None
del self._person_cache[cache_key]
logger.debug(f"[KDocs] 缓存已过期: {cache_key}")
return None
return row_num
# 方法: 先导航到目标行的A列然后从那里搜索县区
try:
# 1. 先导航到目标行的 A 列
start_cell = f"{unit_col}{row_num}"
self._navigate_to_cell(start_cell)
time.sleep(0.3)
logger.info(f"[KDocs调试] 已导航到 {start_cell}")
# 2. 从当前位置搜索县区
self._page.keyboard.press("Control+f")
time.sleep(0.3)
# 找到搜索框并输入
try:
search_input = self._page.locator(
"input[placeholder*='查找'], input[placeholder*='搜索'], input[type='text']"
).first
search_input.fill(unit)
time.sleep(0.2)
self._page.keyboard.press("Enter")
time.sleep(0.5)
except Exception as e:
logger.warning(f"[KDocs调试] 填写搜索框失败: {e}")
self._page.keyboard.press("Escape")
return False
# 3. 关闭搜索框,检查当前位置
self._page.keyboard.press("Escape")
time.sleep(0.3)
current_address = self._get_current_cell_address()
found_row = self._extract_row_number(current_address)
logger.info(f"[KDocs调试] 搜索'{unit}'后: 当前单元格={current_address}, 行号={found_row}")
# 4. 检查是否在同一行(允许在目标行或之后的几行内,因为搜索可能从当前位置向下)
if found_row == row_num:
logger.info(f"[KDocs调试] [OK] 验证成功! 县区'{unit}'在第{row_num}")
return True
else:
logger.info(f"[KDocs调试] 验证失败: 期望行{row_num}, 实际找到行{found_row}")
return False
except Exception as e:
logger.warning(f"[KDocs调试] 验证异常: {e}")
return False
def _debug_dump_page_elements(self) -> None:
"""调试: 输出页面上可能包含单元格值的元素"""
logger.info("[KDocs调试] ========== 页面元素分析 ==========")
try:
# 查找可能的编辑栏元素
selectors_to_check = [
"input",
"textarea",
"[class*='formula']",
"[class*='Formula']",
"[class*='editor']",
"[class*='Editor']",
"[class*='cell']",
"[class*='Cell']",
"[class*='input']",
"[class*='Input']",
]
for selector in selectors_to_check:
try:
elements = self._page.query_selector_all(selector)
for i, el in enumerate(elements[:3]): # 只看前3个
try:
class_name = el.get_attribute("class") or ""
value = ""
try:
value = el.input_value()
except:
try:
value = el.inner_text()
except:
pass
if value:
logger.info(
f"[KDocs调试] 元素 {selector}[{i}] class='{class_name[:50]}' value='{value[:30]}'"
)
except:
pass
except:
pass
except Exception as e:
logger.warning(f"[KDocs调试] 页面元素分析失败: {e}")
logger.info("[KDocs调试] ====================================")
def _debug_dump_table_structure(self, target_row: int = 66) -> None:
"""调试: 输出表格结构"""
self._debug_dump_page_elements() # 先分析页面元素
logger.info("[KDocs调试] ========== 表格结构分析 ==========")
cols = ["A", "B", "C", "D", "E"]
for row in [1, 2, 3, target_row]:
row_data = []
for col in cols:
val = self._get_cell_value(f"{col}{row}")
# 截断太长的值
if len(val) > 30:
val = val[:30] + "..."
row_data.append(f"{col}{row}='{val}'")
logger.info(f"[KDocs调试] 第{row}行: {' | '.join(row_data)}")
logger.info("[KDocs调试] ====================================")
def _set_cached_person(self, cache_key: str, row_num: int) -> None:
"""设置人员位置缓存"""
self._person_cache[cache_key] = (row_num, time.time())
def _find_person_with_unit(
self, unit: str, name: str, unit_col: str, max_attempts: int = 10, row_start: int = 0, row_end: int = 0
@@ -1473,130 +1385,34 @@ class KDocsUploader:
"""
查找人员所在行号。
策略只搜索姓名找到姓名列C列的匹配项
注意:组合搜索会匹配到图片列的错误位置,已放弃该方案
:param row_start: 有效行范围起始0表示不限制
:param row_end: 有效行范围结束0表示不限制
"""
logger.info(f"[KDocs调试] 开始搜索人员: name='{name}', unit='{unit}'")
logger.debug(f"[KDocs] 开始搜索人员: name='{name}', unit='{unit}'") # 优化: info -> debug
if row_start > 0 or row_end > 0:
logger.info(f"[KDocs调试] 有效行范围: {row_start}-{row_end}")
logger.debug(f"[KDocs] 有效行范围: {row_start}-{row_end}") # 优化: info -> debug
# 添加人员位置缓存
# 带过期检查的缓存
cache_key = f"{name}_{unit}_{unit_col}"
if hasattr(self, "_person_cache") and cache_key in self._person_cache:
cached_row = self._person_cache[cache_key]
logger.info(f"[KDocs调试] 使用缓存找到人员: name='{name}', row={cached_row}")
cached_row = self._get_cached_person(cache_key)
if cached_row is not None:
logger.debug(f"[KDocs] 使用缓存找到人员: name='{name}', row={cached_row}") # 优化: info -> debug
return cached_row
# 只搜索姓名 - 这是目前唯一可靠的方式
logger.info(f"[KDocs调试] 搜索姓名: '{name}'")
# 注意: 二分搜索已禁用 - _get_cell_value_fast() 使用的 DOM 选择器在金山文档中不存在
# 直接使用线性搜索,这是唯一可靠的方法
# binary_result = self._binary_search_person(name, unit_col, row_start, row_end)
# if binary_result > 0:
# logger.info(f"[KDocs调试] [OK] 二分搜索成功! 找到行号={binary_result}")
# if not hasattr(self, "_person_cache"):
# self._person_cache = {}
# self._person_cache[cache_key] = binary_result
# return binary_result
# 使用线性搜索Ctrl+F 方式)
row_num = self._search_and_get_row(
name, max_attempts=max_attempts, expected_col="C", row_start=row_start, row_end=row_end
)
if row_num > 0:
logger.info(f"[KDocs调试] [OK] 线性搜索成功! 找到行号={row_num}")
# 缓存结果
if not hasattr(self, "_person_cache"):
self._person_cache = {}
self._person_cache[cache_key] = row_num
logger.info(f"[KDocs] 找到人员: name='{name}', row={row_num}")
# 缓存结果(带时间戳)
self._set_cached_person(cache_key, row_num)
return row_num
logger.warning(f"[KDocs调试] 搜索失败,未找到人员 '{name}'")
logger.warning(f"[KDocs] 搜索失败,未找到人员 '{name}'")
return -1
def _binary_search_person(self, name: str, unit_col: str, row_start: int = 0, row_end: int = 0) -> int:
"""
二分搜索人员位置 - 基于姓名的快速搜索
"""
if row_start <= 0:
row_start = 1 # 从第1行开始
if row_end <= 0:
row_end = 1000 # 默认搜索范围最多1000行
logger.info(f"[KDocs调试] 使用二分搜索: name='{name}', rows={row_start}-{row_end}")
left, right = row_start, row_end
while left <= right:
mid = (left + right) // 2
try:
# 获取中间行的姓名
cell_value = self._get_cell_value_fast(f"C{mid}")
if not cell_value:
# 如果单元格为空,向下搜索
left = mid + 1
continue
# 比较姓名
if self._name_matches(cell_value, name):
logger.info(f"[KDocs调试] 二分搜索找到匹配: row={mid}, name='{cell_value}'")
return mid
elif self._name_less_than(cell_value, name):
left = mid + 1
else:
right = mid - 1
except Exception as e:
logger.warning(f"[KDocs调试] 二分搜索读取行{mid}失败: {e}")
# 跳过这一行,继续搜索
left = mid + 1
continue
logger.info(f"[KDocs调试] 二分搜索未找到匹配人员: '{name}'")
return -1
def _name_matches(self, cell_value: str, target_name: str) -> bool:
"""检查单元格中的姓名是否匹配目标姓名"""
if not cell_value or not target_name:
return False
cell_name = str(cell_value).strip()
target = str(target_name).strip()
# 精确匹配
if cell_name == target:
return True
# 部分匹配(包含关系)
return target in cell_name or cell_name in target
def _name_less_than(self, cell_value: str, target_name: str) -> bool:
"""判断单元格姓名是否小于目标姓名(用于排序)"""
if not cell_value or not target_name:
return False
try:
cell_name = str(cell_value).strip()
target = str(target_name).strip()
return cell_name < target
except:
return False
def _get_cell_value_fast(self, cell_address: str) -> Optional[str]:
"""快速获取单元格值,减少延迟"""
try:
# 直接获取单元格值,不等待
cell = self._page.locator(f"[data-cell='{cell_address}']").first
if cell.is_visible():
return cell.inner_text().strip()
return None
except Exception:
return None
def _search_and_get_row(
self, search_text: str, max_attempts: int = 10, expected_col: str = None, row_start: int = 0, row_end: int = 0
) -> int:
@@ -1614,14 +1430,14 @@ class KDocsUploader:
for attempt in range(max_attempts):
self._close_search()
time.sleep(0.3) # 等待名称框更新
time.sleep(0.2) # 优化: 0.3 -> 0.2
current_address = self._get_current_cell_address()
if not current_address:
logger.warning(f"[KDocs调试] 第{attempt + 1}次: 无法获取单元格地址")
logger.debug(f"[KDocs] 第{attempt + 1}次: 无法获取单元格地址") # 优化: warning -> debug
# 继续尝试下一个
self._page.keyboard.press("Control+f")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
@@ -1629,18 +1445,18 @@ class KDocsUploader:
# 提取列字母A, B, C, D 等)
col_letter = "".join(c for c in current_address if c.isalpha()).upper()
logger.info(
f"[KDocs调试] 第{attempt + 1}次搜索'{search_text}': 单元格={current_address}, 列={col_letter}, 行号={row_num}"
)
logger.debug(
f"[KDocs] 第{attempt + 1}次搜索'{search_text}': 单元格={current_address}, 列={col_letter}, 行号={row_num}"
) # 优化: info -> debug
if row_num <= 0:
logger.warning(f"[KDocs调试] 无法提取行号,搜索可能没有结果")
logger.debug(f"[KDocs] 无法提取行号,搜索可能没有结果") # 优化: warning -> debug
return -1
# 检查是否已经访问过这个位置
position_key = f"{col_letter}{row_num}"
if position_key in found_positions:
logger.info(f"[KDocs调试] 位置{position_key}已搜索过,循环结束")
logger.debug(f"[KDocs] 位置{position_key}已搜索过,循环结束") # 优化: info -> debug
# 检查是否有任何有效结果
valid_results = [
pos
@@ -1656,80 +1472,79 @@ class KDocsUploader:
# 跳过标题行和表头行通常是第1-2行
if row_num <= 2:
logger.info(f"[KDocs调试] 跳过标题/表头行: {row_num}")
logger.debug(f"[KDocs] 跳过标题/表头行: {row_num}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
# 如果指定了期望的列,检查是否匹配
if expected_col and col_letter != expected_col.upper():
logger.info(f"[KDocs调试] 列不匹配: 期望={expected_col}, 实际={col_letter},继续搜索下一个")
logger.debug(f"[KDocs] 列不匹配: 期望={expected_col}, 实际={col_letter}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
# 检查行号是否在有效范围内
if row_start > 0 and row_num < row_start:
logger.info(f"[KDocs调试] 行号{row_num}小于起始行{row_start},继续搜索下一个")
logger.debug(f"[KDocs] 行号{row_num}小于起始行{row_start}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
if row_end > 0 and row_num > row_end:
logger.info(f"[KDocs调试] 行号{row_num}大于结束行{row_end},继续搜索下一个")
logger.debug(f"[KDocs] 行号{row_num}大于结束行{row_end}") # 优化: info -> debug
self._page.keyboard.press("Control+f")
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
self._find_next()
continue
# 找到有效的数据行,列匹配且在行范围内
logger.info(f"[KDocs调试] [OK] 找到有效位置: {current_address} (在有效范围内)")
logger.debug(f"[KDocs] 找到有效位置: {current_address}") # 优化: info -> debug
return row_num
self._close_search()
logger.warning(f"[KDocs调试] 达到最大尝试次数{max_attempts},未找到有效结果")
logger.debug(f"[KDocs] 达到最大尝试次数{max_attempts},未找到有效结果") # 优化: warning -> debug
return -1
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
cell_address = f"{image_col}{row_num}"
# 注意: 移除了重复的导航调用,只保留一次导航
# 清除单元格现有内容
try:
# 1. 导航到单元格(名称框输入地址+Enter会跳转并可能进入编辑模式
# 1. 导航到单元格
self._navigate_to_cell(cell_address)
time.sleep(0.3)
time.sleep(0.2) # 优化: 0.3 -> 0.2
# 2. 按 Escape 退出可能的编辑模式,回到选中状态
self._page.keyboard.press("Escape")
time.sleep(0.3)
time.sleep(0.2) # 优化: 0.3 -> 0.2
# 3. 按 Delete 删除选中单元格的内容
self._page.keyboard.press("Delete")
time.sleep(0.5)
logger.info(f"[KDocs] 已删除 {cell_address} 的内容")
time.sleep(0.4) # 优化: 0.5 -> 0.4
logger.debug(f"[KDocs] 已删除 {cell_address} 的内容") # 优化: info -> debug
except Exception as e:
logger.warning(f"[KDocs] 清除单元格内容时出错: {e}")
logger.info(f"[KDocs] 准备上传图片到 {cell_address},已清除旧内容")
logger.info(f"[KDocs] 上传图片到 {cell_address}")
try:
insert_btn = self._page.get_by_role("button", name="插入")
insert_btn.click()
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
except Exception as e:
raise RuntimeError(f"打开插入菜单失败: {e}")
try:
image_btn = self._page.get_by_role("button", name="图片")
image_btn.click()
time.sleep(0.3)
time.sleep(0.25) # 优化: 0.3 -> 0.25
cell_image_option = self._page.get_by_role("option", name="单元格图片")
cell_image_option.click()
time.sleep(0.2)
time.sleep(0.15) # 优化: 0.2 -> 0.15
except Exception as e:
raise RuntimeError(f"选择单元格图片失败: {e}")
@@ -1743,7 +1558,7 @@ class KDocsUploader:
except Exception as e:
raise RuntimeError(f"上传文件失败: {e}")
time.sleep(2)
time.sleep(1.5) # 优化: 2 -> 1.5
return True
@@ -1756,4 +1571,3 @@ def get_kdocs_uploader() -> KDocsUploader:
_kdocs_uploader = KDocsUploader()
_kdocs_uploader.start()
return _kdocs_uploader