feat: KDocs 上传增强 + 离线监控 + Bug修复

KDocs 上传功能增强:
- 搜索优化:只用姓名搜索 + C列验证,避免匹配到错误单元格
- 有效行范围:支持配置起始行/结束行,限制上传区域
- 图片覆盖:支持覆盖单元格已有图片(Escape + Delete)
- 配置持久化:kdocs_row_start/row_end 保存到数据库(v18迁移)

二次登录功能:
- 登录后立即再次登录,让"上次登录时间"显示为刚刚

KDocs 离线监控:
- 每5分钟检测金山文档登录状态
- 离线时发送邮件通知管理员(每次掉线只通知一次)
- 恢复在线后重置通知状态

Bug 修复:
- 任务日志搜索账号关键词报错500:添加异常处理

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-07 23:40:46 +08:00
parent 13544867aa
commit b0fe325154
31 changed files with 604 additions and 141 deletions

View File

@@ -860,6 +860,8 @@ class KDocsUploader:
sheet_index = int(cfg.get("kdocs_sheet_index") or 0)
unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper()
image_col = (cfg.get("kdocs_image_column") or "D").strip().upper()
row_start = int(cfg.get("kdocs_row_start") or 0)
row_end = int(cfg.get("kdocs_row_end") or 0)
success = False
error_msg = ""
@@ -867,7 +869,7 @@ class KDocsUploader:
try:
if sheet_name or sheet_index:
self._select_sheet(sheet_name, sheet_index)
row_num = self._find_person_with_unit(unit, name, unit_col)
row_num = self._find_person_with_unit(unit, name, unit_col, row_start=row_start, row_end=row_end)
if row_num < 0:
error_msg = f"未找到人员: {unit}-{name}"
break
@@ -975,12 +977,35 @@ class KDocsUploader:
continue
def _get_current_cell_address(self) -> str:
try:
name_box = self._page.locator("input.edit-box").first
return name_box.input_value()
except Exception:
name_box = self._page.locator('#root input[type="text"]').first
return name_box.input_value()
"""获取当前选中的单元格地址(如 A1, C66 等)"""
import re
# 等待一小段时间让名称框稳定
time.sleep(0.1)
for attempt in range(3):
try:
name_box = self._page.locator("input.edit-box").first
value = name_box.input_value()
# 验证是否为有效的单元格地址格式(如 A1, C66, AA100 等)
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value.upper()
except Exception:
pass
try:
name_box = self._page.locator('#root input[type="text"]').first
value = name_box.input_value()
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value.upper()
except Exception:
pass
# 等待一下再重试
time.sleep(0.2)
# 如果无法获取有效地址,返回空字符串
logger.warning("[KDocs调试] 无法获取有效的单元格地址")
return ""
def _navigate_to_cell(self, cell_address: str) -> None:
try:
@@ -1033,6 +1058,48 @@ class KDocsUploader:
except Exception:
pass
self._focus_grid()
# 尝试方法1: 读取金山文档编辑栏/公式栏的内容
try:
# 金山文档的编辑栏选择器(可能需要调整)
formula_bar_selectors = [
".formula-bar-input",
".cell-editor-input",
"[class*='formulaBar'] input",
"[class*='formula'] textarea",
".formula-editor",
"#formulaInput",
]
for selector in formula_bar_selectors:
try:
el = self._page.query_selector(selector)
if el:
value = el.input_value() if hasattr(el, 'input_value') else el.inner_text()
if value and not value.startswith("=DISPIMG"):
logger.info(f"[KDocs调试] 从编辑栏读取到: '{value[:50]}...' (selector={selector})")
return value.strip()
except Exception:
pass
except Exception:
pass
# 尝试方法2: F2进入编辑模式全选复制
try:
self._page.keyboard.press("F2")
time.sleep(0.2)
self._page.keyboard.press("Control+a")
time.sleep(0.1)
self._page.keyboard.press("Control+c")
time.sleep(0.2)
self._page.keyboard.press("Escape")
time.sleep(0.1)
value = self._read_clipboard_text()
if value and not value.startswith("=DISPIMG"):
return value.strip()
except Exception:
pass
# 尝试方法3: 直接复制单元格(备选)
try:
self._page.keyboard.press("Control+c")
time.sleep(0.2)
@@ -1141,38 +1208,225 @@ class KDocsUploader:
def _extract_row_number(self, cell_address: str) -> int:
import re
match = re.search(r"(\\d+)$", cell_address)
match = re.search(r"(\d+)$", cell_address)
if match:
return int(match.group(1))
return -1
def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool:
cell_address = f"{unit_col}{row_num}"
cell_value = self._get_cell_value(cell_address)
if cell_value:
return self._unit_matches(cell_value, unit)
return False
"""验证县区 - 从目标行开始搜索县区"""
logger.info(f"[KDocs调试] 验证县区: 期望行={row_num}, 期望值='{unit}'")
def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50) -> int:
self._search_person(name)
found_rows = set()
for _ in range(max_attempts):
self._close_search()
current_address = self._get_current_cell_address()
row_num = self._extract_row_number(current_address)
if row_num == -1:
return -1
if row_num in found_rows:
return -1
found_rows.add(row_num)
if self._verify_unit_by_navigation(row_num, unit, unit_col):
return row_num
# 方法: 先导航到目标行的A列然后从那里搜索县区
try:
# 1. 先导航到目标行的 A 列
start_cell = f"{unit_col}{row_num}"
self._navigate_to_cell(start_cell)
time.sleep(0.3)
logger.info(f"[KDocs调试] 已导航到 {start_cell}")
# 2. 从当前位置搜索县区
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
time.sleep(0.3)
# 找到搜索框并输入
try:
search_input = self._page.locator("input[placeholder*='查找'], input[placeholder*='搜索'], input[type='text']").first
search_input.fill(unit)
time.sleep(0.2)
self._page.keyboard.press("Enter")
time.sleep(0.5)
except Exception as e:
logger.warning(f"[KDocs调试] 填写搜索框失败: {e}")
self._page.keyboard.press("Escape")
return False
# 3. 关闭搜索框,检查当前位置
self._page.keyboard.press("Escape")
time.sleep(0.3)
current_address = self._get_current_cell_address()
found_row = self._extract_row_number(current_address)
logger.info(f"[KDocs调试] 搜索'{unit}'后: 当前单元格={current_address}, 行号={found_row}")
# 4. 检查是否在同一行(允许在目标行或之后的几行内,因为搜索可能从当前位置向下)
if found_row == row_num:
logger.info(f"[KDocs调试] ✓ 验证成功! 县区'{unit}'在第{row_num}")
return True
else:
logger.info(f"[KDocs调试] 验证失败: 期望行{row_num}, 实际找到行{found_row}")
return False
except Exception as e:
logger.warning(f"[KDocs调试] 验证异常: {e}")
return False
def _debug_dump_page_elements(self) -> None:
"""调试: 输出页面上可能包含单元格值的元素"""
logger.info("[KDocs调试] ========== 页面元素分析 ==========")
try:
# 查找可能的编辑栏元素
selectors_to_check = [
"input", "textarea",
"[class*='formula']", "[class*='Formula']",
"[class*='editor']", "[class*='Editor']",
"[class*='cell']", "[class*='Cell']",
"[class*='input']", "[class*='Input']",
]
for selector in selectors_to_check:
try:
elements = self._page.query_selector_all(selector)
for i, el in enumerate(elements[:3]): # 只看前3个
try:
class_name = el.get_attribute("class") or ""
value = ""
try:
value = el.input_value()
except:
try:
value = el.inner_text()
except:
pass
if value:
logger.info(f"[KDocs调试] 元素 {selector}[{i}] class='{class_name[:50]}' value='{value[:30]}'")
except:
pass
except:
pass
except Exception as e:
logger.warning(f"[KDocs调试] 页面元素分析失败: {e}")
logger.info("[KDocs调试] ====================================")
def _debug_dump_table_structure(self, target_row: int = 66) -> None:
"""调试: 输出表格结构"""
self._debug_dump_page_elements() # 先分析页面元素
logger.info("[KDocs调试] ========== 表格结构分析 ==========")
cols = ['A', 'B', 'C', 'D', 'E']
for row in [1, 2, 3, target_row]:
row_data = []
for col in cols:
val = self._get_cell_value(f"{col}{row}")
# 截断太长的值
if len(val) > 30:
val = val[:30] + "..."
row_data.append(f"{col}{row}='{val}'")
logger.info(f"[KDocs调试] 第{row}行: {' | '.join(row_data)}")
logger.info("[KDocs调试] ====================================")
def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50,
row_start: int = 0, row_end: int = 0) -> int:
"""
查找人员所在行号。
策略只搜索姓名找到姓名列C列的匹配项
注意:组合搜索会匹配到图片列的错误位置,已放弃该方案
:param row_start: 有效行范围起始0表示不限制
:param row_end: 有效行范围结束0表示不限制
"""
logger.info(f"[KDocs调试] 开始搜索人员: name='{name}', unit='{unit}'")
if row_start > 0 or row_end > 0:
logger.info(f"[KDocs调试] 有效行范围: {row_start}-{row_end}")
# 只搜索姓名 - 这是目前唯一可靠的方式
logger.info(f"[KDocs调试] 搜索姓名: '{name}'")
row_num = self._search_and_get_row(name, max_attempts=max_attempts, expected_col='C',
row_start=row_start, row_end=row_end)
if row_num > 0:
logger.info(f"[KDocs调试] ✓ 姓名搜索成功! 找到行号={row_num}")
return row_num
logger.warning(f"[KDocs调试] 搜索失败,未找到人员 '{name}'")
return -1
def _search_and_get_row(self, search_text: str, max_attempts: int = 10, expected_col: str = None,
row_start: int = 0, row_end: int = 0) -> int:
"""
执行搜索并获取找到的行号
:param search_text: 要搜索的文本
:param max_attempts: 最大尝试次数
:param expected_col: 期望的列(如 'C'),如果指定则只接受该列的结果
:param row_start: 有效行范围起始0表示不限制
:param row_end: 有效行范围结束0表示不限制
"""
self._focus_grid()
self._search_person(search_text)
found_positions = set() # 记录已找到的位置(列+行)
for attempt in range(max_attempts):
self._close_search()
time.sleep(0.3) # 等待名称框更新
current_address = self._get_current_cell_address()
if not current_address:
logger.warning(f"[KDocs调试] 第{attempt+1}次: 无法获取单元格地址")
# 继续尝试下一个
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
continue
row_num = self._extract_row_number(current_address)
# 提取列字母A, B, C, D 等)
col_letter = ''.join(c for c in current_address if c.isalpha()).upper()
logger.info(f"[KDocs调试] 第{attempt+1}次搜索'{search_text}': 单元格={current_address}, 列={col_letter}, 行号={row_num}")
if row_num <= 0:
logger.warning(f"[KDocs调试] 无法提取行号,搜索可能没有结果")
return -1
# 检查是否已经访问过这个位置
position_key = f"{col_letter}{row_num}"
if position_key in found_positions:
logger.info(f"[KDocs调试] 位置{position_key}已搜索过,循环结束")
# 检查是否有任何有效结果
valid_results = [pos for pos in found_positions
if (not expected_col or pos.startswith(expected_col))
and self._extract_row_number(pos) > 2]
if valid_results:
# 返回第一个有效结果的行号
return self._extract_row_number(valid_results[0])
return -1
found_positions.add(position_key)
# 跳过标题行和表头行通常是第1-2行
if row_num <= 2:
logger.info(f"[KDocs调试] 跳过标题/表头行: {row_num}")
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
continue
# 如果指定了期望的列,检查是否匹配
if expected_col and col_letter != expected_col.upper():
logger.info(f"[KDocs调试] 列不匹配: 期望={expected_col}, 实际={col_letter},继续搜索下一个")
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
continue
# 检查行号是否在有效范围内
if row_start > 0 and row_num < row_start:
logger.info(f"[KDocs调试] 行号{row_num}小于起始行{row_start},继续搜索下一个")
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
continue
if row_end > 0 and row_num > row_end:
logger.info(f"[KDocs调试] 行号{row_num}大于结束行{row_end},继续搜索下一个")
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
continue
# 找到有效的数据行,列匹配且在行范围内
logger.info(f"[KDocs调试] ✓ 找到有效位置: {current_address} (在有效范围内)")
return row_num
self._close_search()
logger.warning(f"[KDocs调试] 达到最大尝试次数{max_attempts},未找到有效结果")
return -1
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
@@ -1180,12 +1434,24 @@ class KDocsUploader:
self._navigate_to_cell(cell_address)
time.sleep(0.3)
# 清除单元格现有内容
try:
# 1. 导航到单元格(名称框输入地址+Enter会跳转并可能进入编辑模式
self._navigate_to_cell(cell_address)
time.sleep(0.3)
# 2. 按 Escape 退出可能的编辑模式,回到选中状态
self._page.keyboard.press("Escape")
time.sleep(0.3)
# 3. 按 Delete 删除选中单元格的内容
self._page.keyboard.press("Delete")
time.sleep(0.1)
self._page.keyboard.press("Backspace")
except Exception:
pass
time.sleep(0.5)
logger.info(f"[KDocs] 已删除 {cell_address} 的内容")
except Exception as e:
logger.warning(f"[KDocs] 清除单元格内容时出错: {e}")
logger.info(f"[KDocs] 准备上传图片到 {cell_address},已清除旧内容")
try:
insert_btn = self._page.get_by_role("button", name="插入")