Files
zsglpt/services/kdocs_uploader.py
2026-01-07 14:17:01 +08:00

919 lines
33 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import base64
import os
import queue
import threading
import time
from io import BytesIO
from typing import Any, Dict, Optional
import database
import email_service
from app_config import get_config
from services.client_log import log_to_client
from services.runtime import get_logger
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
except Exception: # pragma: no cover - 运行环境缺少 playwright 时降级
sync_playwright = None
class PlaywrightTimeoutError(Exception):
pass
logger = get_logger()
config = get_config()
class KDocsUploader:
def __init__(self) -> None:
self._queue: queue.Queue = queue.Queue(maxsize=int(os.environ.get("KDOCS_QUEUE_MAXSIZE", "200")))
self._thread = threading.Thread(target=self._run, name="kdocs-uploader", daemon=True)
self._running = False
self._last_error: Optional[str] = None
self._last_success_at: Optional[float] = None
self._login_required = False
self._playwright = None
self._browser = None
self._context = None
self._page = None
self._last_qr_image: Optional[bytes] = None
self._last_login_check: float = 0.0
self._last_login_ok: Optional[bool] = None
def start(self) -> None:
if self._running:
return
self._running = True
self._thread.start()
def stop(self) -> None:
if not self._running:
return
self._running = False
self._queue.put({"action": "shutdown"})
def get_status(self) -> Dict[str, Any]:
return {
"queue_size": self._queue.qsize(),
"login_required": self._login_required,
"last_error": self._last_error,
"last_success_at": self._last_success_at,
"last_login_ok": self._last_login_ok,
}
def enqueue_upload(
self,
*,
user_id: int,
account_id: str,
unit: str,
name: str,
image_path: str,
) -> bool:
if not self._running:
self.start()
payload = {
"user_id": int(user_id),
"account_id": str(account_id),
"unit": unit,
"name": name,
"image_path": image_path,
}
try:
self._queue.put({"action": "upload", "payload": payload}, timeout=1)
return True
except queue.Full:
self._last_error = "上传队列已满"
return False
def request_qr(self, timeout: int = 60, *, force: bool = False) -> Dict[str, Any]:
return self._submit_command("qr", timeout=timeout, payload={"force": force})
def clear_login(self, timeout: int = 20) -> Dict[str, Any]:
return self._submit_command("clear_login", timeout=timeout)
def refresh_login_status(self, timeout: int = 20) -> Dict[str, Any]:
return self._submit_command("status", timeout=timeout)
def _submit_command(
self,
action: str,
timeout: int = 30,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
if not self._running:
self.start()
resp_queue: queue.Queue = queue.Queue(maxsize=1)
self._queue.put({"action": action, "response": resp_queue, "payload": payload or {}})
try:
return resp_queue.get(timeout=timeout)
except queue.Empty:
return {"success": False, "error": "操作超时"}
def _run(self) -> None:
while True:
task = self._queue.get()
if not task:
continue
action = task.get("action")
if action == "shutdown":
break
try:
if action == "upload":
self._handle_upload(task.get("payload") or {})
elif action == "qr":
result = self._handle_qr(task.get("payload") or {})
task.get("response").put(result)
elif action == "clear_login":
result = self._handle_clear_login()
task.get("response").put(result)
elif action == "status":
result = self._handle_status_check()
task.get("response").put(result)
except Exception as e:
logger.warning(f"[KDocs] 处理任务失败: {e}")
self._cleanup_browser()
def _load_system_config(self) -> Dict[str, Any]:
return database.get_system_config() or {}
def _ensure_playwright(self, *, use_storage_state: bool = True) -> bool:
if sync_playwright is None:
self._last_error = "playwright 未安装"
return False
try:
if self._playwright is None:
self._playwright = sync_playwright().start()
if self._browser is None:
headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false"
self._browser = self._playwright.chromium.launch(headless=headless)
if self._context is None:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
if use_storage_state and os.path.exists(storage_state):
self._context = self._browser.new_context(storage_state=storage_state)
else:
self._context = self._browser.new_context()
if self._page is None or self._page.is_closed():
self._page = self._context.new_page()
self._page.set_default_timeout(int(getattr(config, "DEFAULT_TIMEOUT", 60000)))
return True
except Exception as e:
self._last_error = f"浏览器启动失败: {e}"
self._cleanup_browser()
return False
def _cleanup_browser(self) -> None:
try:
if self._page:
self._page.close()
except Exception:
pass
self._page = None
try:
if self._context:
self._context.close()
except Exception:
pass
self._context = None
try:
if self._browser:
self._browser.close()
except Exception:
pass
self._browser = None
try:
if self._playwright:
self._playwright.stop()
except Exception:
pass
self._playwright = None
def _open_document(self, doc_url: str) -> bool:
try:
self._page.goto(doc_url)
time.sleep(1)
return True
except Exception as e:
self._last_error = f"打开文档失败: {e}"
return False
def _is_logged_in(self) -> bool:
try:
login_btn = self._page.get_by_role("button", name="登录并加入编辑")
if login_btn.is_visible(timeout=1500):
return False
except Exception:
return False
try:
login_btn = self._page.get_by_role("button", name="立即登录")
if login_btn.is_visible(timeout=1200):
return False
except Exception:
pass
try:
login_btn = self._page.get_by_role("button", name="登录")
if login_btn.is_visible(timeout=1200):
return False
except Exception:
pass
try:
login_link = self._page.get_by_role("link", name="登录")
if login_link.is_visible(timeout=1200):
return False
except Exception:
pass
try:
wechat_btn = self._page.get_by_role("button", name="微信登录")
if wechat_btn.is_visible(timeout=1200):
return False
except Exception:
pass
return True
def _has_saved_login_state(self) -> bool:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
return os.path.exists(storage_state)
def _ensure_login_dialog(self) -> None:
login_names = ["登录并加入编辑", "立即登录", "登录"]
wechat_names = ["微信登录", "微信扫码登录", "微信扫码", "扫码登录"]
pages = self._iter_pages()
clicked = False
for page in pages:
if self._try_click_names(page, login_names):
clicked = True
break
if clicked:
time.sleep(1.5)
pages = self._iter_pages()
for page in pages:
if self._try_click_names(page, wechat_names):
return
self._try_confirm_login()
def _capture_qr_image(self) -> Optional[bytes]:
pages = self._iter_pages()
for page in pages:
for frame in page.frames:
target = self._find_qr_element_in_frame(frame)
if not target:
continue
try:
return target.screenshot()
except Exception:
continue
for page in pages:
dialog_image = self._capture_dialog_image(page)
if dialog_image:
return dialog_image
return None
def _iter_pages(self) -> list:
pages = []
if self._context:
pages.extend(self._context.pages)
if self._page and self._page not in pages:
pages.insert(0, self._page)
def rank(p) -> int:
url = (getattr(p, "url", "") or "").lower()
keywords = ("login", "account", "passport", "wechat", "qr")
return 0 if any(k in url for k in keywords) else 1
pages.sort(key=rank)
return pages
def _find_qr_element_in_frame(self, frame) -> Optional[Any]:
selectors = [
"canvas",
"img[alt*='二维码']",
"img[src*='qr']",
"img[src*='qrcode']",
"img[class*='qr']",
"canvas[class*='qr']",
"svg",
"div[role='img']",
"div[class*='qr']",
"div[id*='qr']",
"div[class*='qrcode']",
"div[id*='qrcode']",
"div[class*='wechat']",
"div[class*='weixin']",
"div[class*='wx']",
"img[src*='wx']",
]
best = None
best_score = None
for selector in selectors:
try:
locator = frame.locator(selector)
count = min(locator.count(), 20)
except Exception:
continue
for i in range(count):
el = locator.nth(i)
try:
if not el.is_visible(timeout=800):
continue
box = el.bounding_box()
if not box:
continue
width = box.get("width", 0)
height = box.get("height", 0)
if width < 80 or height < 80 or width > 520 or height > 520:
continue
aspect_diff = abs(width - height)
if aspect_diff > 80:
continue
score = aspect_diff + abs(width - 260) + abs(height - 260)
if best_score is None or score < best_score:
best_score = score
best = el
except Exception:
continue
if best:
return best
handle = None
try:
handle = frame.evaluate_handle(
"""() => {
const patterns = [/qr/i, /qrcode/i, /weixin/i, /wechat/i, /wx/i, /data:image/i];
const elements = Array.from(document.querySelectorAll('*'));
for (const el of elements) {
const style = window.getComputedStyle(el);
const bg = style.backgroundImage || '';
if (!bg || bg === 'none') continue;
if (!patterns.some((re) => re.test(bg))) continue;
const rect = el.getBoundingClientRect();
if (!rect.width || !rect.height) continue;
if (rect.width < 80 || rect.height < 80 || rect.width > 520 || rect.height > 520) continue;
const diff = Math.abs(rect.width - rect.height);
if (diff > 80) continue;
return el;
}
return null;
}"""
)
element = handle.as_element() if handle else None
if element:
return element
except Exception:
pass
finally:
try:
if handle:
handle.dispose()
except Exception:
pass
return best
def _try_click_role(self, page, role: str, name: str, timeout: int = 1500) -> bool:
try:
el = page.get_by_role(role, name=name)
if el.is_visible(timeout=timeout):
el.click()
time.sleep(1)
return True
except Exception:
return False
return False
def _try_click_names(self, page, names: list) -> bool:
for name in names:
if self._try_click_role(page, "button", name, timeout=1200):
return True
if self._try_click_role(page, "link", name, timeout=1200):
return True
try:
el = page.get_by_text(name, exact=True)
if el.is_visible(timeout=1200):
el.click()
time.sleep(1)
return True
except Exception:
pass
try:
el = page.get_by_text(name, exact=False)
if el.is_visible(timeout=1200):
el.click()
time.sleep(1)
return True
except Exception:
pass
try:
for frame in page.frames:
for name in names:
try:
el = frame.get_by_role("button", name=name)
if el.is_visible(timeout=800):
el.click()
time.sleep(1)
return True
except Exception:
pass
try:
el = frame.get_by_text(name, exact=True)
if el.is_visible(timeout=800):
el.click()
time.sleep(1)
return True
except Exception:
pass
try:
el = frame.get_by_text(name, exact=False)
if el.is_visible(timeout=800):
el.click()
time.sleep(1)
return True
except Exception:
pass
except Exception:
return False
return False
def _try_confirm_login(self) -> bool:
confirm_names = ["确认登录", "确认登陆"]
pages = self._iter_pages()
for page in pages:
if self._try_click_names(page, confirm_names):
return True
return False
def _capture_dialog_image(self, page) -> Optional[bytes]:
selectors = (
"[role='dialog'], .dialog, .modal, .popup, "
"div[class*='dialog'], div[class*='modal'], div[class*='popup'], "
"div[class*='login'], div[class*='passport'], div[class*='auth']"
)
try:
dialogs = page.locator(selectors)
count = min(dialogs.count(), 6)
except Exception:
return None
best = None
best_area = 0
viewport = page.viewport_size or {}
vp_width = viewport.get("width", 0)
vp_height = viewport.get("height", 0)
for i in range(count):
el = dialogs.nth(i)
try:
if not el.is_visible(timeout=800):
continue
box = el.bounding_box()
if not box:
continue
width = box.get("width", 0)
height = box.get("height", 0)
if width < 160 or height < 160:
continue
if vp_width and vp_height:
if width > vp_width * 0.92 and height > vp_height * 0.92:
continue
area = width * height
if area > best_area:
best_area = area
best = el
except Exception:
continue
if not best:
return None
try:
return best.screenshot()
except Exception:
return None
def _is_valid_qr_image(self, data: Optional[bytes]) -> bool:
if not data or len(data) < 1024:
return False
try:
from PIL import Image, ImageStat
img = Image.open(BytesIO(data))
width, height = img.size
if width < 120 or height < 120:
return False
ratio = width / float(height or 1)
if ratio < 0.6 or ratio > 1.4:
return False
gray = img.convert("L")
stat = ImageStat.Stat(gray)
if stat.stddev[0] < 5:
return False
return True
except Exception:
return len(data) >= 2048
def _save_login_state(self) -> None:
try:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
os.makedirs(os.path.dirname(storage_state), exist_ok=True)
self._context.storage_state(path=storage_state)
except Exception as e:
logger.warning(f"[KDocs] 保存登录态失败: {e}")
def _handle_qr(self, payload: Dict[str, Any]) -> Dict[str, Any]:
cfg = self._load_system_config()
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
if not doc_url:
return {"success": False, "error": "未配置金山文档链接"}
force = bool(payload.get("force"))
if force:
self._handle_clear_login()
if not self._ensure_playwright(use_storage_state=not force):
return {"success": False, "error": self._last_error or "浏览器不可用"}
if not self._open_document(doc_url):
return {"success": False, "error": self._last_error or "打开文档失败"}
if not force and self._has_saved_login_state() and self._is_logged_in():
self._login_required = False
self._last_login_ok = True
self._save_login_state()
return {"success": True, "logged_in": True, "qr_image": ""}
self._ensure_login_dialog()
qr_image = None
invalid_qr = None
for _ in range(10):
self._ensure_login_dialog()
candidate = self._capture_qr_image()
if candidate and self._is_valid_qr_image(candidate):
qr_image = candidate
break
if candidate:
invalid_qr = candidate
time.sleep(1)
if not qr_image:
self._last_error = "二维码识别异常" if invalid_qr else "二维码获取失败"
try:
pages = self._iter_pages()
page_urls = [getattr(p, "url", "") for p in pages]
logger.warning(f"[KDocs] 二维码未捕获,页面: {page_urls}")
ts = int(time.time())
saved = []
for idx, page in enumerate(pages[:3]):
try:
path = f"data/kdocs_debug_{ts}_{idx}.png"
page.screenshot(path=path, full_page=True)
saved.append(path)
except Exception:
continue
if saved:
logger.warning(f"[KDocs] 已保存调试截图: {saved}")
if invalid_qr:
try:
path = f"data/kdocs_invalid_qr_{ts}.png"
with open(path, "wb") as handle:
handle.write(invalid_qr)
logger.warning(f"[KDocs] 已保存无效二维码截图: {path}")
except Exception:
pass
except Exception:
pass
return {"success": False, "error": self._last_error}
try:
ts = int(time.time())
path = f"data/kdocs_last_qr_{ts}.png"
with open(path, "wb") as handle:
handle.write(qr_image)
logger.info(f"[KDocs] 已保存二维码截图: {path} ({len(qr_image)} bytes)")
except Exception as e:
logger.warning(f"[KDocs] 保存二维码截图失败: {e}")
self._last_qr_image = qr_image
self._login_required = True
return {
"success": True,
"logged_in": False,
"qr_image": base64.b64encode(qr_image).decode("ascii"),
}
def _handle_clear_login(self) -> Dict[str, Any]:
storage_state = getattr(config, "KDOCS_LOGIN_STATE_FILE", "data/kdocs_login_state.json")
try:
if os.path.exists(storage_state):
os.remove(storage_state)
except Exception as e:
return {"success": False, "error": f"清除登录态失败: {e}"}
self._login_required = False
self._last_login_ok = None
self._cleanup_browser()
return {"success": True}
def _handle_status_check(self) -> Dict[str, Any]:
cfg = self._load_system_config()
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
if not doc_url:
return {"success": True, "logged_in": False, "error": "未配置文档链接"}
if not self._ensure_playwright():
return {"success": False, "logged_in": False, "error": self._last_error or "浏览器不可用"}
if not self._open_document(doc_url):
return {"success": False, "logged_in": False, "error": self._last_error or "打开文档失败"}
self._ensure_login_dialog()
self._try_confirm_login()
logged_in = self._is_logged_in()
self._last_login_ok = logged_in
self._login_required = not logged_in
if logged_in:
self._save_login_state()
return {"success": True, "logged_in": logged_in}
def _handle_upload(self, payload: Dict[str, Any]) -> None:
cfg = self._load_system_config()
if int(cfg.get("kdocs_enabled", 0) or 0) != 1:
return
doc_url = (cfg.get("kdocs_doc_url") or "").strip()
if not doc_url:
return
unit = (payload.get("unit") or "").strip()
name = (payload.get("name") or "").strip()
image_path = payload.get("image_path")
user_id = payload.get("user_id")
account_id = payload.get("account_id")
if not unit or not name:
return
if not image_path or not os.path.exists(image_path):
return
if not self._ensure_playwright():
self._notify_admin(unit, name, image_path, self._last_error or "浏览器不可用")
return
if not self._open_document(doc_url):
self._notify_admin(unit, name, image_path, self._last_error or "打开文档失败")
return
if not self._is_logged_in():
self._login_required = True
self._last_login_ok = False
self._notify_admin(unit, name, image_path, "登录已失效,请管理员重新扫码登录")
return
self._login_required = False
self._last_login_ok = True
sheet_name = (cfg.get("kdocs_sheet_name") or "").strip()
sheet_index = int(cfg.get("kdocs_sheet_index") or 0)
unit_col = (cfg.get("kdocs_unit_column") or "A").strip().upper()
image_col = (cfg.get("kdocs_image_column") or "D").strip().upper()
success = False
error_msg = ""
for attempt in range(2):
try:
if sheet_name or sheet_index:
self._select_sheet(sheet_name, sheet_index)
row_num = self._find_person_with_unit(unit, name, unit_col)
if row_num < 0:
error_msg = f"未找到人员: {unit}-{name}"
break
success = self._upload_image_to_cell(row_num, image_path, image_col)
if success:
break
except Exception as e:
error_msg = str(e)
if success:
self._last_success_at = time.time()
self._last_error = None
log_to_client(f"已上传表格截图: {unit}-{name}", user_id, account_id)
return
if not error_msg:
error_msg = "上传失败"
self._last_error = error_msg
self._notify_admin(unit, name, image_path, error_msg)
log_to_client(f"表格上传失败: {error_msg}", user_id, account_id)
def _notify_admin(self, unit: str, name: str, image_path: str, error: str) -> None:
cfg = self._load_system_config()
if int(cfg.get("kdocs_admin_notify_enabled", 0) or 0) != 1:
return
to_email = (cfg.get("kdocs_admin_notify_email") or "").strip()
if not to_email:
return
settings = email_service.get_email_settings()
if not settings.get("enabled", False):
return
subject = "金山文档上传失败提醒"
body = (
f"上传失败\n\n人员: {unit}-{name}\n图片: {image_path}\n错误: {error}\n\n"
"请检查登录状态或表格配置。"
)
try:
email_service.send_email_async(
to_email=to_email,
subject=subject,
body=body,
email_type="kdocs_upload_failed",
)
except Exception as e:
logger.warning(f"[KDocs] 发送管理员邮件失败: {e}")
def _select_sheet(self, sheet_name: str, sheet_index: int) -> None:
if sheet_name:
candidates = [
self._page.locator("[role='tab']").filter(has_text=sheet_name),
self._page.locator(".sheet-tab").filter(has_text=sheet_name),
self._page.locator(".sheet-tab-name").filter(has_text=sheet_name),
]
for locator in candidates:
try:
if locator.count() < 1:
continue
locator.first.click()
time.sleep(0.5)
return
except Exception:
continue
if sheet_index > 0:
idx = sheet_index - 1
candidates = [
self._page.locator("[role='tab']"),
self._page.locator(".sheet-tab"),
self._page.locator(".sheet-tab-name"),
]
for locator in candidates:
try:
if locator.count() <= idx:
continue
locator.nth(idx).click()
time.sleep(0.5)
return
except Exception:
continue
def _get_current_cell_address(self) -> str:
name_box = self._page.locator('#root input[type="text"]').first
return name_box.input_value()
def _navigate_to_cell(self, cell_address: str) -> None:
name_box = self._page.locator('#root input[type="text"]').first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
time.sleep(0.3)
def _get_cell_value(self, cell_address: str) -> str:
self._navigate_to_cell(cell_address)
time.sleep(0.3)
try:
formula_bar = self._page.locator('input[type="text"]').nth(1)
value = formula_bar.input_value()
if value:
return value.strip()
except Exception:
pass
try:
formula_inputs = self._page.locator('textbox')
for i in range(formula_inputs.count()):
try:
value = formula_inputs.nth(i).input_value()
if value:
import re
if not re.match(r"^[A-Z]+\d+$", value.strip()):
return value.strip()
except Exception:
continue
except Exception:
pass
return ""
def _search_person(self, name: str) -> None:
self._page.keyboard.press("Control+f")
time.sleep(0.3)
search_input = None
try:
search_input = self._page.get_by_role("textbox").nth(3)
search_input.fill(name)
except Exception:
try:
search_input = self._page.locator("input[placeholder*='查找']").first
search_input.fill(name)
except Exception:
pass
time.sleep(0.2)
try:
find_btn = self._page.get_by_role("button", name="查找").nth(2)
find_btn.click()
except Exception:
try:
self._page.get_by_role("button", name="查找").first.click()
except Exception:
pass
time.sleep(0.3)
def _find_next(self) -> None:
try:
find_btn = self._page.get_by_role("button", name="查找").nth(2)
find_btn.click()
except Exception:
try:
self._page.get_by_role("button", name="查找").first.click()
except Exception:
pass
time.sleep(0.3)
def _close_search(self) -> None:
self._page.keyboard.press("Escape")
time.sleep(0.2)
def _extract_row_number(self, cell_address: str) -> int:
import re
match = re.search(r"(\\d+)$", cell_address)
if match:
return int(match.group(1))
return -1
def _verify_unit_by_navigation(self, row_num: int, unit: str, unit_col: str) -> bool:
cell_address = f"{unit_col}{row_num}"
cell_value = self._get_cell_value(cell_address)
if cell_value:
return cell_value == unit
return False
def _find_person_with_unit(self, unit: str, name: str, unit_col: str, max_attempts: int = 50) -> int:
self._search_person(name)
found_rows = set()
for _ in range(max_attempts):
self._close_search()
current_address = self._get_current_cell_address()
row_num = self._extract_row_number(current_address)
if row_num == -1:
return -1
if row_num in found_rows:
return -1
found_rows.add(row_num)
if self._verify_unit_by_navigation(row_num, unit, unit_col):
return row_num
self._page.keyboard.press("Control+f")
time.sleep(0.2)
self._find_next()
return -1
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
cell_address = f"{image_col}{row_num}"
self._navigate_to_cell(cell_address)
time.sleep(0.3)
try:
self._page.keyboard.press("Delete")
time.sleep(0.1)
self._page.keyboard.press("Backspace")
except Exception:
pass
try:
insert_btn = self._page.get_by_role("button", name="插入")
insert_btn.click()
time.sleep(0.3)
except Exception as e:
raise RuntimeError(f"打开插入菜单失败: {e}")
try:
image_btn = self._page.get_by_role("button", name="图片")
image_btn.click()
time.sleep(0.3)
cell_image_option = self._page.get_by_role("option", name="单元格图片")
cell_image_option.click()
time.sleep(0.2)
except Exception as e:
raise RuntimeError(f"选择单元格图片失败: {e}")
try:
local_option = self._page.get_by_role("option", name="本地")
with self._page.expect_file_chooser() as fc_info:
local_option.click()
file_chooser = fc_info.value
file_chooser.set_files(image_path)
except Exception as e:
raise RuntimeError(f"上传文件失败: {e}")
time.sleep(2)
return True
_kdocs_uploader: Optional[KDocsUploader] = None
def get_kdocs_uploader() -> KDocsUploader:
global _kdocs_uploader
if _kdocs_uploader is None:
_kdocs_uploader = KDocsUploader()
_kdocs_uploader.start()
return _kdocs_uploader