- 分离检查和操作方法,避免副作用 - 精简登录状态检查流程 - 优化二维码检测选择器 - 添加扫码后自动确认登录 - 修复邀请对话框处理逻辑 [2026-01-30 17:30:09]
634 lines
21 KiB
Python
634 lines
21 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
金山文档上传模块 - 精简版
|
||
使用Playwright自动化上传截图到金山文档表格
|
||
移除了队列、并发控制,改为单任务顺序执行
|
||
修复登录逻辑问题
|
||
"""
|
||
|
||
import base64
|
||
import os
|
||
import re
|
||
import time
|
||
from io import BytesIO
|
||
from typing import Any, Dict, Optional, Callable
|
||
from urllib.parse import urlparse
|
||
|
||
try:
|
||
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
||
except ImportError:
|
||
sync_playwright = None
|
||
|
||
class PlaywrightTimeoutError(Exception):
|
||
pass
|
||
|
||
|
||
class KDocsUploader:
|
||
"""金山文档上传器"""
|
||
|
||
def __init__(self, log_callback: Optional[Callable] = None):
|
||
self._playwright = None
|
||
self._browser = None
|
||
self._context = None
|
||
self._page = None
|
||
self._doc_url: Optional[str] = None
|
||
self._last_error: Optional[str] = None
|
||
self._logged_in = False
|
||
self._log_callback = log_callback
|
||
|
||
def log(self, msg: str):
|
||
"""记录日志"""
|
||
if self._log_callback:
|
||
self._log_callback(msg)
|
||
|
||
def _find_visible_element(self, text: str, use_role: bool = False, role: str = "button"):
|
||
"""找到包含指定文本的可见元素"""
|
||
if not self._page:
|
||
return None
|
||
|
||
try:
|
||
if use_role:
|
||
els = self._page.get_by_role(role, name=text)
|
||
else:
|
||
els = self._page.locator(f"text={text}")
|
||
|
||
count = els.count()
|
||
for i in range(count):
|
||
el = els.nth(i)
|
||
try:
|
||
if el.is_visible(timeout=500):
|
||
box = el.bounding_box()
|
||
if box and box.get('width', 0) > 0 and box.get('height', 0) > 0:
|
||
return el
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def _ensure_playwright(self, use_storage_state: bool = True) -> bool:
|
||
"""确保Playwright已启动"""
|
||
if sync_playwright is None:
|
||
self._last_error = "playwright 未安装"
|
||
return False
|
||
|
||
try:
|
||
from config import KDOCS_LOGIN_STATE_FILE
|
||
|
||
if self._playwright is None:
|
||
self._playwright = sync_playwright().start()
|
||
if self._browser is None:
|
||
headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false"
|
||
chrome_args = [
|
||
"--disable-blink-features=AutomationControlled",
|
||
"--disable-features=DialMediaRouteProvider",
|
||
"--allow-running-insecure-content",
|
||
]
|
||
try:
|
||
self._browser = self._playwright.chromium.launch(
|
||
headless=headless,
|
||
channel='chrome',
|
||
args=chrome_args
|
||
)
|
||
except Exception:
|
||
self._browser = self._playwright.chromium.launch(headless=headless, args=chrome_args)
|
||
if self._context is None:
|
||
storage_state = str(KDOCS_LOGIN_STATE_FILE)
|
||
context_options = {
|
||
"permissions": ["clipboard-read", "clipboard-write"],
|
||
"ignore_https_errors": True,
|
||
}
|
||
if use_storage_state and os.path.exists(storage_state):
|
||
context_options["storage_state"] = storage_state
|
||
self._context = self._browser.new_context(**context_options)
|
||
if self._page is None or self._page.is_closed():
|
||
self._page = self._context.new_page()
|
||
self._page.set_default_timeout(60000)
|
||
return True
|
||
except Exception as e:
|
||
self._last_error = f"浏览器启动失败: {e}"
|
||
self._cleanup_browser()
|
||
return False
|
||
|
||
def _cleanup_browser(self):
|
||
"""清理浏览器资源"""
|
||
for attr in ['_page', '_context', '_browser', '_playwright']:
|
||
obj = getattr(self, attr, None)
|
||
if obj:
|
||
try:
|
||
if hasattr(obj, 'close'):
|
||
obj.close()
|
||
elif hasattr(obj, 'stop'):
|
||
obj.stop()
|
||
except Exception:
|
||
pass
|
||
setattr(self, attr, None)
|
||
|
||
def _open_document(self, doc_url: str) -> bool:
|
||
"""打开金山文档"""
|
||
try:
|
||
self._doc_url = doc_url
|
||
self._page.goto(doc_url, wait_until="domcontentloaded", timeout=30000)
|
||
time.sleep(3)
|
||
return True
|
||
except Exception as e:
|
||
self._last_error = f"打开文档失败: {e}"
|
||
return False
|
||
|
||
def _is_login_url(self, url: str) -> bool:
|
||
"""检查是否是登录页面"""
|
||
if not url:
|
||
return False
|
||
lower = url.lower()
|
||
return "account.wps.cn" in lower or "passport" in lower
|
||
|
||
def _check_login_by_url(self) -> bool:
|
||
"""通过URL判断是否已登录(纯检查,无副作用)"""
|
||
if not self._page or self._page.is_closed():
|
||
return False
|
||
|
||
url = self._page.url
|
||
|
||
# 如果URL是文档页面,说明已登录
|
||
if "kdocs.cn/l/" in url and "account.wps.cn" not in url:
|
||
return True
|
||
|
||
return False
|
||
|
||
def _check_needs_login(self) -> bool:
|
||
"""检查是否需要登录(纯检查,无副作用)"""
|
||
if not self._page or self._page.is_closed():
|
||
return True
|
||
|
||
url = self._page.url
|
||
|
||
# 在登录页面,需要登录
|
||
if self._is_login_url(url):
|
||
return True
|
||
|
||
# 如果已经是文档页面,不需要登录
|
||
if "kdocs.cn/l/" in url and "account.wps.cn" not in url:
|
||
return False
|
||
|
||
# 检查是否有登录按钮(排除邀请对话框)
|
||
login_buttons = ["立即登录", "去登录"]
|
||
for text in login_buttons:
|
||
try:
|
||
btn = self._page.get_by_role("button", name=text)
|
||
if btn.count() > 0 and btn.first.is_visible(timeout=500):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
return False
|
||
|
||
def _is_logged_in(self) -> bool:
|
||
"""检查是否已登录"""
|
||
# 优先用URL判断
|
||
if self._check_login_by_url():
|
||
return True
|
||
|
||
# URL判断不了,再用按钮检查
|
||
return not self._check_needs_login()
|
||
|
||
def _save_login_state(self):
|
||
"""保存登录状态"""
|
||
try:
|
||
from config import KDOCS_LOGIN_STATE_FILE
|
||
storage_state = str(KDOCS_LOGIN_STATE_FILE)
|
||
KDOCS_LOGIN_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
self._context.storage_state(path=storage_state)
|
||
except Exception:
|
||
pass
|
||
|
||
def _ensure_login_dialog(self):
|
||
"""确保打开登录对话框并进入扫码页面"""
|
||
buttons_priority = [
|
||
"登录并加入编辑",
|
||
"立即登录",
|
||
"去登录",
|
||
]
|
||
|
||
max_clicks = 8
|
||
for _ in range(max_clicks):
|
||
clicked = False
|
||
current_url = self._page.url
|
||
|
||
# 如果已经在登录页面(account.wps.cn),说明需要扫码
|
||
if self._is_login_url(current_url):
|
||
# 检查是否已经显示了二维码
|
||
if self._is_qr_page():
|
||
return
|
||
# 等待二维码加载
|
||
time.sleep(2)
|
||
continue
|
||
|
||
# 如果已经是文档页面
|
||
if "kdocs.cn/l/" in current_url and "account.wps.cn" not in current_url:
|
||
# 检查是否需要点击"登录并加入编辑"(邀请对话框)
|
||
invite_btn = self._find_visible_element("登录并加入编辑", use_role=True)
|
||
if invite_btn:
|
||
invite_btn.click(force=True)
|
||
time.sleep(2)
|
||
continue # 等待页面跳转到登录页
|
||
else:
|
||
# 没有登录按钮,说明已登录完成
|
||
return
|
||
|
||
# 检查是否已经到达登录二维码页面
|
||
qr_page_indicators = ["微信扫码登录", "微信快捷登录"]
|
||
for indicator in qr_page_indicators:
|
||
if self._find_visible_element(indicator):
|
||
return
|
||
|
||
# 按优先级点击登录按钮
|
||
for btn_name in buttons_priority:
|
||
el = self._find_visible_element(btn_name, use_role=True)
|
||
if el:
|
||
el.click(force=True)
|
||
time.sleep(2)
|
||
clicked = True
|
||
break
|
||
|
||
if not clicked:
|
||
for btn_name in buttons_priority:
|
||
el = self._find_visible_element(btn_name)
|
||
if el:
|
||
el.click(force=True)
|
||
time.sleep(2)
|
||
clicked = True
|
||
break
|
||
|
||
if not clicked:
|
||
time.sleep(1)
|
||
|
||
def _click_confirm_after_scan(self):
|
||
"""扫码后点击确认登录(如果有的话)"""
|
||
confirm_names = ["确认登录", "确定登录", "登录", "确定", "确认", "同意并登录"]
|
||
|
||
for name in confirm_names:
|
||
try:
|
||
confirm_btn = self._page.get_by_role("button", name=name)
|
||
if confirm_btn.count() > 0 and confirm_btn.first.is_visible(timeout=500):
|
||
confirm_btn.first.click()
|
||
time.sleep(2)
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
return False
|
||
|
||
def _is_qr_page(self) -> bool:
|
||
"""检查是否在二维码页面"""
|
||
qr_indicators = ["微信扫码登录", "微信快捷登录"]
|
||
for indicator in qr_indicators:
|
||
if self._find_visible_element(indicator):
|
||
return True
|
||
return False
|
||
|
||
def _capture_qr_image(self) -> Optional[bytes]:
|
||
"""捕获登录二维码图片"""
|
||
# 精准的二维码选择器
|
||
selectors = [
|
||
"canvas", # Canvas 绘制二维码
|
||
"img[src*='qrcode']", # 带 qrcode 的图片
|
||
"img[src*='wxqr']", # 微信二维码
|
||
"[class*='qrcode'] img", # qrcode 容器内的图片
|
||
"[class*='qr-code'] img",
|
||
"[class*='scan-code'] img",
|
||
]
|
||
|
||
for selector in selectors:
|
||
result = self._try_capture_qr_with_selector(self._page, selector)
|
||
if result:
|
||
return result
|
||
|
||
# 尝试在iframe中查找
|
||
try:
|
||
frames = self._page.frames
|
||
for frame in frames:
|
||
if frame == self._page.main_frame:
|
||
continue
|
||
for selector in selectors[:3]:
|
||
result = self._try_capture_qr_with_selector(frame, selector)
|
||
if result:
|
||
return result
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
def _try_capture_qr_with_selector(self, page_or_frame, selector: str) -> Optional[bytes]:
|
||
"""尝试用指定选择器捕获二维码"""
|
||
try:
|
||
locator = page_or_frame.locator(selector)
|
||
count = locator.count()
|
||
for i in range(min(count, 5)):
|
||
el = locator.nth(i)
|
||
try:
|
||
if not el.is_visible(timeout=300):
|
||
continue
|
||
box = el.bounding_box()
|
||
if not box:
|
||
continue
|
||
w, h = box.get("width", 0), box.get("height", 0)
|
||
# 二维码通常是正方形,大小在100-400之间
|
||
if 80 <= w <= 400 and 80 <= h <= 400 and abs(w - h) < 50:
|
||
screenshot = el.screenshot()
|
||
if screenshot and len(screenshot) > 500:
|
||
return screenshot
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def request_qr(self, force: bool = False) -> Dict[str, Any]:
|
||
"""请求登录二维码"""
|
||
from config import get_config, KDOCS_LOGIN_STATE_FILE
|
||
|
||
config = get_config()
|
||
doc_url = config.kdocs.doc_url.strip()
|
||
|
||
if not doc_url:
|
||
return {"success": False, "error": "未配置金山文档链接"}
|
||
|
||
if force:
|
||
try:
|
||
if KDOCS_LOGIN_STATE_FILE.exists():
|
||
KDOCS_LOGIN_STATE_FILE.unlink()
|
||
except Exception:
|
||
pass
|
||
self._cleanup_browser()
|
||
|
||
if not self._ensure_playwright(use_storage_state=not force):
|
||
return {"success": False, "error": self._last_error or "浏览器不可用"}
|
||
|
||
if not self._open_document(doc_url):
|
||
return {"success": False, "error": self._last_error or "打开文档失败"}
|
||
|
||
# 检查是否已登录
|
||
if not force and self._is_logged_in():
|
||
self._logged_in = True
|
||
self._save_login_state()
|
||
return {"success": True, "logged_in": True, "qr_image": ""}
|
||
|
||
# 需要登录,获取二维码
|
||
self._ensure_login_dialog()
|
||
time.sleep(2)
|
||
|
||
# 等待二维码出现
|
||
qr_image = None
|
||
for _ in range(10):
|
||
if self._is_qr_page():
|
||
qr_image = self._capture_qr_image()
|
||
if qr_image and len(qr_image) > 1024:
|
||
break
|
||
time.sleep(1)
|
||
|
||
if not qr_image:
|
||
return {"success": False, "error": "二维码获取失败,请检查网络"}
|
||
|
||
return {
|
||
"success": True,
|
||
"logged_in": False,
|
||
"qr_image": base64.b64encode(qr_image).decode("ascii"),
|
||
}
|
||
|
||
def check_login_status(self) -> Dict[str, Any]:
|
||
"""检查登录状态"""
|
||
if not self._page or self._page.is_closed():
|
||
return {"success": False, "logged_in": False, "error": "页面未打开"}
|
||
|
||
try:
|
||
# 扫码后尝试点击确认按钮
|
||
self._click_confirm_after_scan()
|
||
|
||
# 统一用 URL 判断是否已登录
|
||
logged_in = self._check_login_by_url()
|
||
|
||
# 如果URL判断不确定,再检查按钮
|
||
if not logged_in:
|
||
logged_in = not self._check_needs_login()
|
||
|
||
self._logged_in = logged_in
|
||
|
||
if logged_in:
|
||
self._save_login_state()
|
||
|
||
return {"success": True, "logged_in": logged_in}
|
||
|
||
except Exception as e:
|
||
return {"success": False, "logged_in": False, "error": str(e)}
|
||
|
||
def _navigate_to_cell(self, cell_address: str):
|
||
"""导航到指定单元格"""
|
||
try:
|
||
name_box = self._page.locator("input.edit-box").first
|
||
name_box.click()
|
||
name_box.fill(cell_address)
|
||
name_box.press("Enter")
|
||
except Exception:
|
||
name_box = self._page.locator('#root input[type="text"]').first
|
||
name_box.click()
|
||
name_box.fill(cell_address)
|
||
name_box.press("Enter")
|
||
time.sleep(0.3)
|
||
|
||
def _get_current_cell_address(self) -> str:
|
||
"""获取当前单元格地址"""
|
||
try:
|
||
name_box = self._page.locator("input.edit-box").first
|
||
value = name_box.input_value()
|
||
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
|
||
return value.upper()
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
def _search_and_get_row(self, search_text: str, expected_col: str = None,
|
||
row_start: int = 0, row_end: int = 0) -> int:
|
||
"""搜索并获取行号"""
|
||
self._page.keyboard.press("Control+f")
|
||
time.sleep(0.3)
|
||
|
||
try:
|
||
search_input = self._page.get_by_role("textbox").nth(3)
|
||
if search_input.is_visible(timeout=500):
|
||
search_input.fill(search_text)
|
||
except Exception:
|
||
pass
|
||
|
||
time.sleep(0.2)
|
||
|
||
try:
|
||
find_btn = self._page.get_by_role("button", name="查找").first
|
||
find_btn.click()
|
||
except Exception:
|
||
self._page.keyboard.press("Enter")
|
||
|
||
time.sleep(0.3)
|
||
self._page.keyboard.press("Escape")
|
||
time.sleep(0.3)
|
||
|
||
address = self._get_current_cell_address()
|
||
if not address:
|
||
return -1
|
||
|
||
match = re.search(r"(\d+)$", address)
|
||
if not match:
|
||
return -1
|
||
|
||
row_num = int(match.group(1))
|
||
col_letter = "".join(c for c in address if c.isalpha()).upper()
|
||
|
||
if expected_col and col_letter != expected_col.upper():
|
||
return -1
|
||
|
||
if row_start > 0 and row_num < row_start:
|
||
return -1
|
||
if row_end > 0 and row_num > row_end:
|
||
return -1
|
||
|
||
return row_num
|
||
|
||
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
|
||
"""上传图片到单元格"""
|
||
cell_address = f"{image_col}{row_num}"
|
||
self._navigate_to_cell(cell_address)
|
||
time.sleep(0.3)
|
||
|
||
try:
|
||
self._page.keyboard.press("Escape")
|
||
time.sleep(0.2)
|
||
self._page.keyboard.press("Delete")
|
||
time.sleep(0.3)
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
insert_btn = self._page.get_by_role("button", name="插入")
|
||
insert_btn.click()
|
||
time.sleep(0.3)
|
||
|
||
image_btn = self._page.get_by_role("button", name="图片")
|
||
image_btn.click()
|
||
time.sleep(0.3)
|
||
|
||
cell_image_option = self._page.get_by_role("option", name="单元格图片")
|
||
cell_image_option.click()
|
||
time.sleep(0.2)
|
||
|
||
local_option = self._page.get_by_role("option", name="本地")
|
||
with self._page.expect_file_chooser() as fc_info:
|
||
local_option.click()
|
||
file_chooser = fc_info.value
|
||
file_chooser.set_files(image_path)
|
||
|
||
time.sleep(2)
|
||
return True
|
||
|
||
except Exception as e:
|
||
self._last_error = f"上传图片失败: {e}"
|
||
return False
|
||
|
||
def upload_image(
|
||
self,
|
||
image_path: str,
|
||
unit: str,
|
||
name: str,
|
||
) -> Dict[str, Any]:
|
||
"""上传截图到金山文档"""
|
||
from config import get_config
|
||
|
||
config = get_config()
|
||
kdocs_config = config.kdocs
|
||
|
||
if not kdocs_config.enabled:
|
||
return {"success": False, "error": "金山文档上传未启用"}
|
||
|
||
doc_url = kdocs_config.doc_url.strip()
|
||
if not doc_url:
|
||
return {"success": False, "error": "未配置金山文档链接"}
|
||
|
||
if not unit or not name:
|
||
return {"success": False, "error": "缺少县区或姓名"}
|
||
|
||
if not image_path or not os.path.exists(image_path):
|
||
return {"success": False, "error": "图片文件不存在"}
|
||
|
||
if not self._ensure_playwright():
|
||
return {"success": False, "error": self._last_error or "浏览器不可用"}
|
||
|
||
if not self._open_document(doc_url):
|
||
return {"success": False, "error": self._last_error or "打开文档失败"}
|
||
|
||
if not self._is_logged_in():
|
||
return {"success": False, "error": "未登录,请先扫码登录"}
|
||
|
||
try:
|
||
# 选择工作表
|
||
if kdocs_config.sheet_name:
|
||
try:
|
||
tab = self._page.locator("[role='tab']").filter(has_text=kdocs_config.sheet_name)
|
||
if tab.count() > 0:
|
||
tab.first.click()
|
||
time.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
|
||
row_num = self._search_and_get_row(
|
||
name,
|
||
expected_col=kdocs_config.name_column,
|
||
row_start=kdocs_config.row_start,
|
||
row_end=kdocs_config.row_end,
|
||
)
|
||
|
||
if row_num < 0:
|
||
return {"success": False, "error": f"未找到人员: {name}"}
|
||
|
||
if self._upload_image_to_cell(row_num, image_path, kdocs_config.image_column):
|
||
return {"success": True}
|
||
else:
|
||
return {"success": False, "error": self._last_error or "上传失败"}
|
||
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def clear_login(self):
|
||
"""清除登录状态"""
|
||
from config import KDOCS_LOGIN_STATE_FILE
|
||
|
||
try:
|
||
if KDOCS_LOGIN_STATE_FILE.exists():
|
||
KDOCS_LOGIN_STATE_FILE.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
self._logged_in = False
|
||
self._cleanup_browser()
|
||
|
||
def close(self):
|
||
"""关闭上传器"""
|
||
self._cleanup_browser()
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
self.close()
|
||
return False
|
||
|
||
|
||
# 全局实例
|
||
_uploader: Optional[KDocsUploader] = None
|
||
|
||
|
||
def get_kdocs_uploader() -> KDocsUploader:
|
||
"""获取金山文档上传器实例"""
|
||
global _uploader
|
||
if _uploader is None:
|
||
_uploader = KDocsUploader()
|
||
return _uploader
|