606 lines
21 KiB
Python
606 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
from typing import Any, Dict, Optional
|
|
|
|
import database
|
|
import email_service
|
|
from app_config import get_config
|
|
from services.client_log import log_to_client
|
|
from services.runtime import get_logger
|
|
|
|
try:
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
|
except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级
|
|
sync_playwright = None
|
|
|
|
class PlaywrightTimeoutError(Exception):
|
|
pass
|
|
|
|
|
|
logger = get_logger()
|
|
config = get_config()
|
|
|
|
|
|
class KDocsUploader:
|
|
def __init__(self) -> None:
|
|
self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200")))
|
|
self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True)
|
|
self._running = False
|
|
self._last_error: Optional[str] = None
|
|
self._last_success_at: Optional[float] = None
|
|
self._login_required = False
|
|
self._playwright = None
|
|
self._browser = None
|
|
self._context = None
|
|
self._page = None
|
|
self._last_qr_image: Optional[bytes] = None
|
|
self._last_login_check: float = 0.0
|
|
self._last_login_ok: Optional[bool] = None
|
|
|
|
def start(self) -> None:
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
self._running = False
|
|
self._queue.put({"action": "shutdown"})
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
return {
|
|
"queue_size": self._queue.qsize(),
|
|
"login_required": self._login_required,
|
|
"last_error": self._last_error,
|
|
"last_success_at": self._last_success_at,
|
|
"last_login_ok": self._last_login_ok,
|
|
}
|
|
|
|
def enqueue_upload(
|
|
self,
|
|
*,
|
|
user_id: int,
|
|
account_id: str,
|
|
unit: str,
|
|
name: str,
|
|
image_path: str,
|
|
) -> bool:
|
|
if not self._running:
|
|
self.start()
|
|
|
|
payload = {
|
|
"user_id": int(user_id),
|
|
"account_id": str(account_id),
|
|
"unit": unit,
|
|
"name": name,
|
|
"image_path": image_path,
|
|
}
|
|
try:
|
|
self._queue.put({"action": "upload", "payload": payload}, timeout=1)
|
|
return True
|
|
except queue.Full:
|
|
self._last_error = "上传队列已满"
|
|
return False
|
|
|
|
def request_qr(self, timeout: int = 30) -> Dict[str, Any]:
|
|
return self._submit_command("qr", timeout=timeout)
|
|
|
|
def clear_login(self, timeout: int = 20) -> Dict[str, Any]:
|
|
return self._submit_command("clear_login", timeout=timeout)
|
|
|
|
def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]:
|
|
return self._submit_command("status", timeout=timeout)
|
|
|
|
def _submit_command(self, action: str, timeout: int = 30) -> Dict[str, Any]:
|
|
if not self._running:
|
|
self.start()
|
|
resp_queue: queue.Queue = queue.Queue(maxsize=1)
|
|
self._queue.put({"action": action, "response": resp_queue})
|
|
try:
|
|
return resp_queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
return {"success": False, "error": "操作超时"}
|
|
|
|
def _run(self) -> None:
|
|
while True:
|
|
task = self._queue.get()
|
|
if not task:
|
|
continue
|
|
action = task.get("action")
|
|
if action == "shutdown":
|
|
break
|
|
try:
|
|
if action == "upload":
|
|
self._handle_upload(task.get("payload") or {})
|
|
elif action == "qr":
|
|
result = self._handle_qr()
|
|
task.get("response").put(result)
|
|
elif action == "clear_login":
|
|
result = self._handle_clear_login()
|
|
task.get("response").put(result)
|
|
elif action == "status":
|
|
result = self._handle_status_check()
|
|
task.get("response").put(result)
|
|
except Exception as e:
|
|
logger.warning(f"[KDocs] 处理任务失败: {e}")
|
|
|
|
self._cleanup_browser()
|
|
|
|
def _load_system_config(self) -> Dict[str, Any]:
|
|
return database.get_system_config() or {}
|
|
|
|
def _ensure_playwright(self) -> bool:
|
|
if sync_playwright is None:
|
|
self._last_error = "playwright 未安装"
|
|
return False
|
|
try:
|
|
if self._playwright is None:
|
|
self._playwright = sync_playwright().start()
|
|
if self._browser is None:
|
|
headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false"
|
|
self._browser = self._playwright.chromium.launch(headless=headless)
|
|
if self._context is None:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
if os.path.exists(storage_state):
|
|
self._context = self._browser.new_context(storage_state=storage_state)
|
|
else:
|
|
self._context = self._browser.new_context()
|
|
if self._page is None or self._page.is_closed():
|
|
self._page = self._context.new_page()
|
|
self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000)))
|
|
return True
|
|
except Exception as e:
|
|
self._last_error = f"浏览器启动失败: {e}"
|
|
self._cleanup_browser()
|
|
return False
|
|
|
|
def _cleanup_browser(self) -> None:
|
|
try:
|
|
if self._page:
|
|
self._page.close()
|
|
except Exception:
|
|
pass
|
|
self._page = None
|
|
try:
|
|
if self._context:
|
|
self._context.close()
|
|
except Exception:
|
|
pass
|
|
self._context = None
|
|
try:
|
|
if self._browser:
|
|
self._browser.close()
|
|
except Exception:
|
|
pass
|
|
self._browser = None
|
|
try:
|
|
if self._playwright:
|
|
self._playwright.stop()
|
|
except Exception:
|
|
pass
|
|
self._playwright = None
|
|
|
|
def _open_document(self, doc_url: str) -> bool:
|
|
try:
|
|
self._page.goto(doc_url)
|
|
time.sleep(1)
|
|
return True
|
|
except Exception as e:
|
|
self._last_error = f"打开文档失败: {e}"
|
|
return False
|
|
|
|
def _is_logged_in(self) -> bool:
|
|
try:
|
|
login_btn = self._page.get_by_role("button", name="登录并加入编辑")
|
|
if login_btn.is_visible(timeout=1500):
|
|
return False
|
|
except Exception:
|
|
return True
|
|
return True
|
|
|
|
def _ensure_login_dialog(self) -> None:
|
|
try:
|
|
login_btn = self._page.get_by_role("button", name="登录并加入编辑")
|
|
if login_btn.is_visible(timeout=1500):
|
|
login_btn.click()
|
|
time.sleep(1)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
wechat_btn = self._page.get_by_role("button", name="微信登录")
|
|
if wechat_btn.is_visible(timeout=3000):
|
|
wechat_btn.click()
|
|
time.sleep(1.5)
|
|
except Exception:
|
|
pass
|
|
|
|
def _capture_qr_image(self) -> Optional[bytes]:
|
|
candidates = [
|
|
self._page.locator("canvas"),
|
|
self._page.locator("img[alt*='二维码']"),
|
|
self._page.locator("img[src*='qr']"),
|
|
]
|
|
for locator in candidates:
|
|
try:
|
|
if locator.count() < 1:
|
|
continue
|
|
target = locator.first
|
|
target.wait_for(state="visible", timeout=5000)
|
|
return target.screenshot()
|
|
except PlaywrightTimeoutError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
try:
|
|
return self._page.screenshot(full_page=True)
|
|
except Exception:
|
|
return None
|
|
|
|
def _save_login_state(self) -> None:
|
|
try:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
os.makedirs(os.path.dirname(storage_state), exist_ok=True)
|
|
self._context.storage_state(path=storage_state)
|
|
except Exception as e:
|
|
logger.warning(f"[KDocs] 保存登录态失败: {e}")
|
|
|
|
def _handle_qr(self) -> Dict[str, Any]:
|
|
cfg = self._load_system_config()
|
|
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
|
|
if not doc_url:
|
|
return {"success": False, "error": "未配置金山文档链接"}
|
|
if not self._ensure_playwright():
|
|
return {"success": False, "error": self._last_error or "浏览器不可用"}
|
|
if not self._open_document(doc_url):
|
|
return {"success": False, "error": self._last_error or "打开文档失败"}
|
|
|
|
if self._is_logged_in():
|
|
self._login_required = False
|
|
self._last_login_ok = True
|
|
self._save_login_state()
|
|
return {"success": True, "logged_in": True, "qr_image": ""}
|
|
|
|
self._ensure_login_dialog()
|
|
qr_image = self._capture_qr_image()
|
|
if not qr_image:
|
|
self._last_error = "二维码获取失败"
|
|
return {"success": False, "error": self._last_error}
|
|
|
|
self._last_qr_image = qr_image
|
|
self._login_required = True
|
|
return {
|
|
"success": True,
|
|
"logged_in": False,
|
|
"qr_image": base64.b64encode(qr_image).decode("ascii"),
|
|
}
|
|
|
|
def _handle_clear_login(self) -> Dict[str, Any]:
|
|
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
|
|
try:
|
|
if os.path.exists(storage_state):
|
|
os.remove(storage_state)
|
|
except Exception as e:
|
|
return {"success": False, "error": f"清除登录态失败: {e}"}
|
|
|
|
self._login_required = False
|
|
self._last_login_ok = None
|
|
self._cleanup_browser()
|
|
return {"success": True}
|
|
|
|
def _handle_status_check(self) -> Dict[str, Any]:
|
|
cfg = self._load_system_config()
|
|
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
|
|
if not doc_url:
|
|
return {"success": True, "logged_in": False, "error": "未配置文档链接"}
|
|
if not self._ensure_playwright():
|
|
return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"}
|
|
if not self._open_document(doc_url):
|
|
return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"}
|
|
logged_in = self._is_logged_in()
|
|
self._last_login_ok = logged_in
|
|
self._login_required = not logged_in
|
|
if logged_in:
|
|
self._save_login_state()
|
|
return {"success": True, "logged_in": logged_in}
|
|
|
|
def _handle_upload(self, payload: Dict[str, Any]) -> None:
|
|
cfg = self._load_system_config()
|
|
if int(cfg.get("kdocs_enabled", 0) or 0) != 1:
|
|
return
|
|
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
|
|
if not doc_url:
|
|
return
|
|
|
|
unit = (payload.get("unit") or "").strip()
|
|
name = (payload.get("name") or "").strip()
|
|
image_path = payload.get("image_path")
|
|
user_id = payload.get("user_id")
|
|
account_id = payload.get("account_id")
|
|
|
|
if not unit or not name:
|
|
return
|
|
if not image_path or not os.path.exists(image_path):
|
|
return
|
|
|
|
if not self._ensure_playwright():
|
|
self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用")
|
|
return
|
|
|
|
if not self._open_document(doc_url):
|
|
self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败")
|
|
return
|
|
|
|
if not self._is_logged_in():
|
|
self._login_required = True
|
|
self._last_login_ok = False
|
|
self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录")
|
|
return
|
|
self._login_required = False
|
|
self._last_login_ok = True
|
|
|
|
sheet_name = (cfg.get("kdocs_sheet_name") or "").strip()
|
|
sheet_index = int(cfg.get("kdocs_sheet_index") or 0)
|
|
unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper()
|
|
image_col = (cfg.get("kdocs_image_column") or "D").strip().upper()
|
|
|
|
success = False
|
|
error_msg = ""
|
|
for attempt in range(2):
|
|
try:
|
|
if sheet_name or sheet_index:
|
|
self._select_sheet(sheet_name, sheet_index)
|
|
row_num = self._find_person_with_unit(unit, name, unit_col)
|
|
if row_num < 0:
|
|
error_msg = f"未找到人员: {unit}-{name}"
|
|
break
|
|
success = self._upload_image_to_cell(row_num, image_path, image_col)
|
|
if success:
|
|
break
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
|
|
if success:
|
|
self._last_success_at = time.time()
|
|
self._last_error = None
|
|
log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id)
|
|
return
|
|
|
|
if not error_msg:
|
|
error_msg = "上传失败"
|
|
self._last_error = error_msg
|
|
self._notify_admin(unit, name, image_path, error_msg)
|
|
log_to_client(f"表格上传失败: {error_msg}", user_id, account_id)
|
|
|
|
def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None:
|
|
cfg = self._load_system_config()
|
|
if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1:
|
|
return
|
|
to_email = (cfg.get("kdocs_admin_notify_email") or "").strip()
|
|
if not to_email:
|
|
return
|
|
settings = email_service.get_email_settings()
|
|
if not settings.get("enabled", False):
|
|
return
|
|
subject = "金山文档上传失败提醒"
|
|
body = (
|
|
f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n"
|
|
"请检查登录状态或表格配置。"
|
|
)
|
|
try:
|
|
email_service.send_email_async(
|
|
to_email=to_email,
|
|
subject=subject,
|
|
body=body,
|
|
email_type="kdocs_upload_failed",
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"[KDocs] 发送管理员邮件失败: {e}")
|
|
|
|
def _select_sheet(self, sheet_name: str, sheet_index: int) -> None:
|
|
if sheet_name:
|
|
candidates = [
|
|
self._page.locator("[role='tab']").filter(has_text=sheet_name),
|
|
self._page.locator(".sheet-tab").filter(has_text=sheet_name),
|
|
self._page.locator(".sheet-tab-name").filter(has_text=sheet_name),
|
|
]
|
|
for locator in candidates:
|
|
try:
|
|
if locator.count() < 1:
|
|
continue
|
|
locator.first.click()
|
|
time.sleep(0.5)
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
if sheet_index > 0:
|
|
idx = sheet_index - 1
|
|
candidates = [
|
|
self._page.locator("[role='tab']"),
|
|
self._page.locator(".sheet-tab"),
|
|
self._page.locator(".sheet-tab-name"),
|
|
]
|
|
for locator in candidates:
|
|
try:
|
|
if locator.count() <= idx:
|
|
continue
|
|
locator.nth(idx).click()
|
|
time.sleep(0.5)
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
def _get_current_cell_address(self) -> str:
|
|
name_box = self._page.locator('#root input[type="text"]').first
|
|
return name_box.input_value()
|
|
|
|
def _navigate_to_cell(self, cell_address: str) -> None:
|
|
name_box = self._page.locator('#root input[type="text"]').first
|
|
name_box.click()
|
|
name_box.fill(cell_address)
|
|
name_box.press("Enter")
|
|
time.sleep(0.3)
|
|
|
|
def _get_cell_value(self, cell_address: str) -> str:
|
|
self._navigate_to_cell(cell_address)
|
|
time.sleep(0.3)
|
|
try:
|
|
formula_bar = self._page.locator('input[type="text"]').nth(1)
|
|
value = formula_bar.input_value()
|
|
if value:
|
|
return value.strip()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
formula_inputs = self._page.locator('textbox')
|
|
for i in range(formula_inputs.count()):
|
|
try:
|
|
value = formula_inputs.nth(i).input_value()
|
|
if value:
|
|
import re
|
|
|
|
if not re.match(r"^[A-Z]+\d+$", value.strip()):
|
|
return value.strip()
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
def _search_person(self, name: str) -> None:
|
|
self._page.keyboard.press("Control+f")
|
|
time.sleep(0.3)
|
|
search_input = None
|
|
try:
|
|
search_input = self._page.get_by_role("textbox").nth(3)
|
|
search_input.fill(name)
|
|
except Exception:
|
|
try:
|
|
search_input = self._page.locator("input[placeholder*='查找']").first
|
|
search_input.fill(name)
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.2)
|
|
try:
|
|
find_btn = self._page.get_by_role("button", name="查找").nth(2)
|
|
find_btn.click()
|
|
except Exception:
|
|
try:
|
|
self._page.get_by_role("button", name="查找").first.click()
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.3)
|
|
|
|
def _find_next(self) -> None:
|
|
try:
|
|
find_btn = self._page.get_by_role("button", name="查找").nth(2)
|
|
find_btn.click()
|
|
except Exception:
|
|
try:
|
|
self._page.get_by_role("button", name="查找").first.click()
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.3)
|
|
|
|
def _close_search(self) -> None:
|
|
self._page.keyboard.press("Escape")
|
|
time.sleep(0.2)
|
|
|
|
def _extract_row_number(self, cell_address: str) -> int:
|
|
import re
|
|
|
|
match = re.search(r"(\\d+)$", cell_address)
|
|
if match:
|
|
return int(match.group(1))
|
|
return -1
|
|
|
|
def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool:
|
|
cell_address = f"{unit_col}{row_num}"
|
|
cell_value = self._get_cell_value(cell_address)
|
|
if cell_value:
|
|
return cell_value == unit
|
|
return False
|
|
|
|
def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50) -> int:
|
|
self._search_person(name)
|
|
found_rows = set()
|
|
for _ in range(max_attempts):
|
|
self._close_search()
|
|
current_address = self._get_current_cell_address()
|
|
row_num = self._extract_row_number(current_address)
|
|
if row_num == -1:
|
|
return -1
|
|
if row_num in found_rows:
|
|
return -1
|
|
found_rows.add(row_num)
|
|
|
|
if self._verify_unit_by_navigation(row_num, unit, unit_col):
|
|
return row_num
|
|
|
|
self._page.keyboard.press("Control+f")
|
|
time.sleep(0.2)
|
|
self._find_next()
|
|
return -1
|
|
|
|
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
|
|
cell_address = f"{image_col}{row_num}"
|
|
self._navigate_to_cell(cell_address)
|
|
time.sleep(0.3)
|
|
|
|
try:
|
|
self._page.keyboard.press("Delete")
|
|
time.sleep(0.1)
|
|
self._page.keyboard.press("Backspace")
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
insert_btn = self._page.get_by_role("button", name="插入")
|
|
insert_btn.click()
|
|
time.sleep(0.3)
|
|
except Exception as e:
|
|
raise RuntimeError(f"打开插入菜单失败: {e}")
|
|
|
|
try:
|
|
image_btn = self._page.get_by_role("button", name="图片")
|
|
image_btn.click()
|
|
time.sleep(0.3)
|
|
cell_image_option = self._page.get_by_role("option", name="单元格图片")
|
|
cell_image_option.click()
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
raise RuntimeError(f"选择单元格图片失败: {e}")
|
|
|
|
try:
|
|
local_option = self._page.get_by_role("option", name="本地")
|
|
with self._page.expect_file_chooser() as fc_info:
|
|
local_option.click()
|
|
file_chooser = fc_info.value
|
|
file_chooser.set_files(image_path)
|
|
except Exception as e:
|
|
raise RuntimeError(f"上传文件失败: {e}")
|
|
|
|
time.sleep(2)
|
|
return True
|
|
|
|
|
|
_kdocs_uploader: Optional[KDocsUploader] = None
|
|
|
|
|
|
def get_kdocs_uploader() -> KDocsUploader:
|
|
global _kdocs_uploader
|
|
if _kdocs_uploader is None:
|
|
_kdocs_uploader = KDocsUploader()
|
|
_kdocs_uploader.start()
|
|
return _kdocs_uploader
|