Files
zsglpt-pc/core/kdocs_uploader.py
237899745 262797e78b fix: 修复金山文档登录逻辑问题
- 分离检查和操作方法,避免副作用
- 精简登录状态检查流程
- 优化二维码检测选择器
- 添加扫码后自动确认登录
- 修复邀请对话框处理逻辑

[2026-01-30 17:30:09]
2026-01-30 17:30:09 +08:00

634 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
金山文档上传模块 - 精简版
使用Playwright自动化上传截图到金山文档表格
移除了队列、并发控制,改为单任务顺序执行
修复登录逻辑问题
"""
import base64
import os
import re
import time
from io import BytesIO
from typing import Any, Dict, Optional, Callable
from urllib.parse import urlparse
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
except ImportError:
sync_playwright = None
class PlaywrightTimeoutError(Exception):
pass
class KDocsUploader:
"""金山文档上传器"""
def __init__(self, log_callback: Optional[Callable] = None):
self._playwright = None
self._browser = None
self._context = None
self._page = None
self._doc_url: Optional[str] = None
self._last_error: Optional[str] = None
self._logged_in = False
self._log_callback = log_callback
def log(self, msg: str):
"""记录日志"""
if self._log_callback:
self._log_callback(msg)
def _find_visible_element(self, text: str, use_role: bool = False, role: str = "button"):
"""找到包含指定文本的可见元素"""
if not self._page:
return None
try:
if use_role:
els = self._page.get_by_role(role, name=text)
else:
els = self._page.locator(f"text={text}")
count = els.count()
for i in range(count):
el = els.nth(i)
try:
if el.is_visible(timeout=500):
box = el.bounding_box()
if box and box.get('width', 0) > 0 and box.get('height', 0) > 0:
return el
except Exception:
pass
except Exception:
pass
return None
def _ensure_playwright(self, use_storage_state: bool = True) -> bool:
"""确保Playwright已启动"""
if sync_playwright is None:
self._last_error = "playwright 未安装"
return False
try:
from config import KDOCS_LOGIN_STATE_FILE
if self._playwright is None:
self._playwright = sync_playwright().start()
if self._browser is None:
headless = os.environ.get("KDOCS_HEADLESS", "true").lower() != "false"
chrome_args = [
"--disable-blink-features=AutomationControlled",
"--disable-features=DialMediaRouteProvider",
"--allow-running-insecure-content",
]
try:
self._browser = self._playwright.chromium.launch(
headless=headless,
channel='chrome',
args=chrome_args
)
except Exception:
self._browser = self._playwright.chromium.launch(headless=headless, args=chrome_args)
if self._context is None:
storage_state = str(KDOCS_LOGIN_STATE_FILE)
context_options = {
"permissions": ["clipboard-read", "clipboard-write"],
"ignore_https_errors": True,
}
if use_storage_state and os.path.exists(storage_state):
context_options["storage_state"] = storage_state
self._context = self._browser.new_context(**context_options)
if self._page is None or self._page.is_closed():
self._page = self._context.new_page()
self._page.set_default_timeout(60000)
return True
except Exception as e:
self._last_error = f"浏览器启动失败: {e}"
self._cleanup_browser()
return False
def _cleanup_browser(self):
"""清理浏览器资源"""
for attr in ['_page', '_context', '_browser', '_playwright']:
obj = getattr(self, attr, None)
if obj:
try:
if hasattr(obj, 'close'):
obj.close()
elif hasattr(obj, 'stop'):
obj.stop()
except Exception:
pass
setattr(self, attr, None)
def _open_document(self, doc_url: str) -> bool:
"""打开金山文档"""
try:
self._doc_url = doc_url
self._page.goto(doc_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(3)
return True
except Exception as e:
self._last_error = f"打开文档失败: {e}"
return False
def _is_login_url(self, url: str) -> bool:
"""检查是否是登录页面"""
if not url:
return False
lower = url.lower()
return "account.wps.cn" in lower or "passport" in lower
def _check_login_by_url(self) -> bool:
"""通过URL判断是否已登录纯检查无副作用"""
if not self._page or self._page.is_closed():
return False
url = self._page.url
# 如果URL是文档页面说明已登录
if "kdocs.cn/l/" in url and "account.wps.cn" not in url:
return True
return False
def _check_needs_login(self) -> bool:
"""检查是否需要登录(纯检查,无副作用)"""
if not self._page or self._page.is_closed():
return True
url = self._page.url
# 在登录页面,需要登录
if self._is_login_url(url):
return True
# 如果已经是文档页面,不需要登录
if "kdocs.cn/l/" in url and "account.wps.cn" not in url:
return False
# 检查是否有登录按钮(排除邀请对话框)
login_buttons = ["立即登录", "去登录"]
for text in login_buttons:
try:
btn = self._page.get_by_role("button", name=text)
if btn.count() > 0 and btn.first.is_visible(timeout=500):
return True
except Exception:
pass
return False
def _is_logged_in(self) -> bool:
"""检查是否已登录"""
# 优先用URL判断
if self._check_login_by_url():
return True
# URL判断不了再用按钮检查
return not self._check_needs_login()
def _save_login_state(self):
"""保存登录状态"""
try:
from config import KDOCS_LOGIN_STATE_FILE
storage_state = str(KDOCS_LOGIN_STATE_FILE)
KDOCS_LOGIN_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
self._context.storage_state(path=storage_state)
except Exception:
pass
def _ensure_login_dialog(self):
"""确保打开登录对话框并进入扫码页面"""
buttons_priority = [
"登录并加入编辑",
"立即登录",
"去登录",
]
max_clicks = 8
for _ in range(max_clicks):
clicked = False
current_url = self._page.url
# 如果已经在登录页面account.wps.cn说明需要扫码
if self._is_login_url(current_url):
# 检查是否已经显示了二维码
if self._is_qr_page():
return
# 等待二维码加载
time.sleep(2)
continue
# 如果已经是文档页面
if "kdocs.cn/l/" in current_url and "account.wps.cn" not in current_url:
# 检查是否需要点击"登录并加入编辑"(邀请对话框)
invite_btn = self._find_visible_element("登录并加入编辑", use_role=True)
if invite_btn:
invite_btn.click(force=True)
time.sleep(2)
continue # 等待页面跳转到登录页
else:
# 没有登录按钮,说明已登录完成
return
# 检查是否已经到达登录二维码页面
qr_page_indicators = ["微信扫码登录", "微信快捷登录"]
for indicator in qr_page_indicators:
if self._find_visible_element(indicator):
return
# 按优先级点击登录按钮
for btn_name in buttons_priority:
el = self._find_visible_element(btn_name, use_role=True)
if el:
el.click(force=True)
time.sleep(2)
clicked = True
break
if not clicked:
for btn_name in buttons_priority:
el = self._find_visible_element(btn_name)
if el:
el.click(force=True)
time.sleep(2)
clicked = True
break
if not clicked:
time.sleep(1)
def _click_confirm_after_scan(self):
"""扫码后点击确认登录(如果有的话)"""
confirm_names = ["确认登录", "确定登录", "登录", "确定", "确认", "同意并登录"]
for name in confirm_names:
try:
confirm_btn = self._page.get_by_role("button", name=name)
if confirm_btn.count() > 0 and confirm_btn.first.is_visible(timeout=500):
confirm_btn.first.click()
time.sleep(2)
return True
except Exception:
pass
return False
def _is_qr_page(self) -> bool:
"""检查是否在二维码页面"""
qr_indicators = ["微信扫码登录", "微信快捷登录"]
for indicator in qr_indicators:
if self._find_visible_element(indicator):
return True
return False
def _capture_qr_image(self) -> Optional[bytes]:
"""捕获登录二维码图片"""
# 精准的二维码选择器
selectors = [
"canvas", # Canvas 绘制二维码
"img[src*='qrcode']", # 带 qrcode 的图片
"img[src*='wxqr']", # 微信二维码
"[class*='qrcode'] img", # qrcode 容器内的图片
"[class*='qr-code'] img",
"[class*='scan-code'] img",
]
for selector in selectors:
result = self._try_capture_qr_with_selector(self._page, selector)
if result:
return result
# 尝试在iframe中查找
try:
frames = self._page.frames
for frame in frames:
if frame == self._page.main_frame:
continue
for selector in selectors[:3]:
result = self._try_capture_qr_with_selector(frame, selector)
if result:
return result
except Exception:
pass
return None
def _try_capture_qr_with_selector(self, page_or_frame, selector: str) -> Optional[bytes]:
"""尝试用指定选择器捕获二维码"""
try:
locator = page_or_frame.locator(selector)
count = locator.count()
for i in range(min(count, 5)):
el = locator.nth(i)
try:
if not el.is_visible(timeout=300):
continue
box = el.bounding_box()
if not box:
continue
w, h = box.get("width", 0), box.get("height", 0)
# 二维码通常是正方形大小在100-400之间
if 80 <= w <= 400 and 80 <= h <= 400 and abs(w - h) < 50:
screenshot = el.screenshot()
if screenshot and len(screenshot) > 500:
return screenshot
except Exception:
continue
except Exception:
pass
return None
def request_qr(self, force: bool = False) -> Dict[str, Any]:
"""请求登录二维码"""
from config import get_config, KDOCS_LOGIN_STATE_FILE
config = get_config()
doc_url = config.kdocs.doc_url.strip()
if not doc_url:
return {"success": False, "error": "未配置金山文档链接"}
if force:
try:
if KDOCS_LOGIN_STATE_FILE.exists():
KDOCS_LOGIN_STATE_FILE.unlink()
except Exception:
pass
self._cleanup_browser()
if not self._ensure_playwright(use_storage_state=not force):
return {"success": False, "error": self._last_error or "浏览器不可用"}
if not self._open_document(doc_url):
return {"success": False, "error": self._last_error or "打开文档失败"}
# 检查是否已登录
if not force and self._is_logged_in():
self._logged_in = True
self._save_login_state()
return {"success": True, "logged_in": True, "qr_image": ""}
# 需要登录,获取二维码
self._ensure_login_dialog()
time.sleep(2)
# 等待二维码出现
qr_image = None
for _ in range(10):
if self._is_qr_page():
qr_image = self._capture_qr_image()
if qr_image and len(qr_image) > 1024:
break
time.sleep(1)
if not qr_image:
return {"success": False, "error": "二维码获取失败,请检查网络"}
return {
"success": True,
"logged_in": False,
"qr_image": base64.b64encode(qr_image).decode("ascii"),
}
def check_login_status(self) -> Dict[str, Any]:
"""检查登录状态"""
if not self._page or self._page.is_closed():
return {"success": False, "logged_in": False, "error": "页面未打开"}
try:
# 扫码后尝试点击确认按钮
self._click_confirm_after_scan()
# 统一用 URL 判断是否已登录
logged_in = self._check_login_by_url()
# 如果URL判断不确定再检查按钮
if not logged_in:
logged_in = not self._check_needs_login()
self._logged_in = logged_in
if logged_in:
self._save_login_state()
return {"success": True, "logged_in": logged_in}
except Exception as e:
return {"success": False, "logged_in": False, "error": str(e)}
def _navigate_to_cell(self, cell_address: str):
"""导航到指定单元格"""
try:
name_box = self._page.locator("input.edit-box").first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
except Exception:
name_box = self._page.locator('#root input[type="text"]').first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
time.sleep(0.3)
def _get_current_cell_address(self) -> str:
"""获取当前单元格地址"""
try:
name_box = self._page.locator("input.edit-box").first
value = name_box.input_value()
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value.upper()
except Exception:
pass
return ""
def _search_and_get_row(self, search_text: str, expected_col: str = None,
row_start: int = 0, row_end: int = 0) -> int:
"""搜索并获取行号"""
self._page.keyboard.press("Control+f")
time.sleep(0.3)
try:
search_input = self._page.get_by_role("textbox").nth(3)
if search_input.is_visible(timeout=500):
search_input.fill(search_text)
except Exception:
pass
time.sleep(0.2)
try:
find_btn = self._page.get_by_role("button", name="查找").first
find_btn.click()
except Exception:
self._page.keyboard.press("Enter")
time.sleep(0.3)
self._page.keyboard.press("Escape")
time.sleep(0.3)
address = self._get_current_cell_address()
if not address:
return -1
match = re.search(r"(\d+)$", address)
if not match:
return -1
row_num = int(match.group(1))
col_letter = "".join(c for c in address if c.isalpha()).upper()
if expected_col and col_letter != expected_col.upper():
return -1
if row_start > 0 and row_num < row_start:
return -1
if row_end > 0 and row_num > row_end:
return -1
return row_num
def _upload_image_to_cell(self, row_num: int, image_path: str, image_col: str) -> bool:
"""上传图片到单元格"""
cell_address = f"{image_col}{row_num}"
self._navigate_to_cell(cell_address)
time.sleep(0.3)
try:
self._page.keyboard.press("Escape")
time.sleep(0.2)
self._page.keyboard.press("Delete")
time.sleep(0.3)
except Exception:
pass
try:
insert_btn = self._page.get_by_role("button", name="插入")
insert_btn.click()
time.sleep(0.3)
image_btn = self._page.get_by_role("button", name="图片")
image_btn.click()
time.sleep(0.3)
cell_image_option = self._page.get_by_role("option", name="单元格图片")
cell_image_option.click()
time.sleep(0.2)
local_option = self._page.get_by_role("option", name="本地")
with self._page.expect_file_chooser() as fc_info:
local_option.click()
file_chooser = fc_info.value
file_chooser.set_files(image_path)
time.sleep(2)
return True
except Exception as e:
self._last_error = f"上传图片失败: {e}"
return False
def upload_image(
self,
image_path: str,
unit: str,
name: str,
) -> Dict[str, Any]:
"""上传截图到金山文档"""
from config import get_config
config = get_config()
kdocs_config = config.kdocs
if not kdocs_config.enabled:
return {"success": False, "error": "金山文档上传未启用"}
doc_url = kdocs_config.doc_url.strip()
if not doc_url:
return {"success": False, "error": "未配置金山文档链接"}
if not unit or not name:
return {"success": False, "error": "缺少县区或姓名"}
if not image_path or not os.path.exists(image_path):
return {"success": False, "error": "图片文件不存在"}
if not self._ensure_playwright():
return {"success": False, "error": self._last_error or "浏览器不可用"}
if not self._open_document(doc_url):
return {"success": False, "error": self._last_error or "打开文档失败"}
if not self._is_logged_in():
return {"success": False, "error": "未登录,请先扫码登录"}
try:
# 选择工作表
if kdocs_config.sheet_name:
try:
tab = self._page.locator("[role='tab']").filter(has_text=kdocs_config.sheet_name)
if tab.count() > 0:
tab.first.click()
time.sleep(0.5)
except Exception:
pass
row_num = self._search_and_get_row(
name,
expected_col=kdocs_config.name_column,
row_start=kdocs_config.row_start,
row_end=kdocs_config.row_end,
)
if row_num < 0:
return {"success": False, "error": f"未找到人员: {name}"}
if self._upload_image_to_cell(row_num, image_path, kdocs_config.image_column):
return {"success": True}
else:
return {"success": False, "error": self._last_error or "上传失败"}
except Exception as e:
return {"success": False, "error": str(e)}
def clear_login(self):
"""清除登录状态"""
from config import KDOCS_LOGIN_STATE_FILE
try:
if KDOCS_LOGIN_STATE_FILE.exists():
KDOCS_LOGIN_STATE_FILE.unlink()
except Exception:
pass
self._logged_in = False
self._cleanup_browser()
def close(self):
"""关闭上传器"""
self._cleanup_browser()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
# 全局实例
_uploader: Optional[KDocsUploader] = None
def get_kdocs_uploader() -> KDocsUploader:
"""获取金山文档上传器实例"""
global _uploader
if _uploader is None:
_uploader = KDocsUploader()
return _uploader