#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 金山文档上传安全测试工具 - 异步版本 使用asyncio避免线程问题 """ import tkinter as tk from tkinter import ttk, messagebox, filedialog import asyncio import threading import time import os import sys from datetime import datetime from typing import Optional, Callable # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from playwright.async_api import async_playwright except ImportError: print("错误: 需要安装 playwright") print("请运行: pip install playwright") sys.exit(1) class AsyncBrowserManager: """异步浏览器管理器""" def __init__(self): self.playwright = None self.browser = None self.context = None self.page = None self._loop = None self._running = False async def initialize(self, headless=False): """初始化浏览器(异步)""" if self.playwright: return True try: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=headless) self.context = await self.browser.new_context() self.page = await self.context.new_page() await self.page.set_default_timeout(30000) self._running = True return True except Exception as e: print(f"初始化浏览器失败: {e}") await self.cleanup() return False async def goto(self, url: str): """导航到URL""" if not self.page: raise Exception("浏览器未初始化") await self.page.goto(url, wait_until='domcontentloaded') await self.page.wait_for_timeout(3000) async def close(self): """关闭浏览器""" await self.cleanup() async def cleanup(self): """清理资源""" try: if self.page: await self.page.close() except: pass self.page = None try: if self.context: await self.context.close() except: pass self.context = None try: if self.browser: await self.browser.close() except: pass self.browser = None try: if self.playwright: await self.playwright.stop() except: pass self.playwright = None self._running = False class AsyncTestTool: def __init__(self): self.root = tk.Tk() self.root.title("金山文档上传安全测试工具 - 异步版") self.root.geometry("1000x700") self.root.configure(bg='#f0f0f0') # 异步浏览器管理器 self.browser_manager = AsyncBrowserManager() # 状态变量 self.doc_url = tk.StringVar(value="https://kdocs.cn/l/cpwEOo5ynKX4") self.is_running = False self.test_results = [] self.async_loop = None self.thread_pool_executor = None # 创建界面 self.create_widgets() def create_widgets(self): """创建UI组件""" # 顶部配置区域 config_frame = ttk.LabelFrame(self.root, text="连接配置", padding=10) config_frame.pack(fill='x', padx=10, pady=5) ttk.Label(config_frame, text="金山文档URL:").grid(row=0, column=0, sticky='w', padx=5, pady=2) ttk.Entry(config_frame, textvariable=self.doc_url, width=80).grid(row=0, column=1, padx=5, pady=2) # 浏览器控制按钮 browser_frame = ttk.Frame(config_frame) browser_frame.grid(row=0, column=2, padx=10) ttk.Button(browser_frame, text="启动浏览器", command=self.start_browser).pack(side='left', padx=5) ttk.Button(browser_frame, text="打开文档", command=self.open_document).pack(side='left', padx=5) ttk.Button(browser_frame, text="关闭浏览器", command=self.close_browser).pack(side='left', padx=5) # 状态显示 status_frame = ttk.Frame(config_frame) status_frame.grid(row=1, column=0, columnspan=3, sticky='ew', padx=5, pady=5) self.status_label = tk.Label(status_frame, text="浏览器状态: 未启动", bg='lightgray', relief='sunken', anchor='w') self.status_label.pack(fill='x') # 测试步骤区域 test_frame = ttk.LabelFrame(self.root, text="测试步骤", padding=10) test_frame.pack(fill='both', expand=True, padx=10, pady=5) # 左侧:操作按钮 left_frame = ttk.Frame(test_frame) left_frame.pack(side='left', fill='y', padx=10) test_steps = [ ("1. 测试浏览器连接", self.test_browser_connection), ("2. 测试文档打开", self.test_document_open), ("3. 测试表格读取", self.test_table_reading), ("4. 测试人员搜索", self.test_person_search), ("5. 测试图片上传(单步)", self.test_image_upload_single), ("6. 完整流程测试", self.test_complete_flow), ] for text, command in test_steps: btn = ttk.Button(left_frame, text=text, command=command, width=25) btn.pack(pady=5) # 右侧:操作详情和确认 right_frame = ttk.Frame(test_frame) right_frame.pack(side='left', fill='both', expand=True, padx=10) ttk.Label(right_frame, text="当前操作:", font=('Arial', 10, 'bold')).pack(anchor='w') self.operation_label = tk.Label(right_frame, text="等待操作...", bg='white', height=3, relief='sunken', anchor='w') self.operation_label.pack(fill='x', pady=5) # 确认按钮区域 confirm_frame = ttk.Frame(right_frame) confirm_frame.pack(fill='x', pady=10) self.confirm_button = ttk.Button(confirm_frame, text="确认执行", command=self.execute_operation, state='disabled') self.confirm_button.pack(side='left', padx=5) ttk.Button(confirm_frame, text="取消", command=self.cancel_operation).pack(side='left', padx=5) # 日志区域 log_frame = ttk.LabelFrame(self.root, text="操作日志", padding=10) log_frame.pack(fill='both', expand=False, padx=10, pady=5) # 创建文本框和滚动条 text_frame = ttk.Frame(log_frame) text_frame.pack(fill='both', expand=True) self.log_text = tk.Text(text_frame, height=10, wrap='word') scrollbar = ttk.Scrollbar(text_frame, orient='vertical', command=self.log_text.yview) self.log_text.configure(yscrollcommand=scrollbar.set) self.log_text.pack(side='left', fill='both', expand=True) scrollbar.pack(side='right', fill='y') def log(self, message, level='INFO'): """添加日志""" timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {level}: {message}\n" # 颜色标记 if level == 'ERROR': tag = 'error' color = 'red' elif level == 'WARNING': tag = 'warning' color = 'orange' elif level == 'SUCCESS': tag = 'success' color = 'green' else: tag = 'normal' color = 'black' self.log_text.insert('end', log_entry, tag) self.log_text.see('end') # 配置标签颜色 self.log_text.tag_config(tag, foreground=color) # 打印到控制台 print(log_entry.strip()) def update_status(self, status_text): """更新状态显示""" self.status_label.config(text=f"浏览器状态: {status_text}") # 颜色编码 if "运行" in status_text or "就绪" in status_text or "成功" in status_text: self.status_label.config(bg='lightgreen') elif "错误" in status_text or "失败" in status_text: self.status_label.config(bg='lightcoral') else: self.status_label.config(bg='lightgray') def show_operation(self, operation_text: str, async_func: Callable): """显示操作详情,等待用户确认""" self.operation_label.config(text=operation_text) self.pending_async_func = async_func self.confirm_button.config(state='normal') def execute_operation(self): """执行待处理的操作""" if hasattr(self, 'pending_async_func'): self.confirm_button.config(state='disabled') self.is_running = True # 在新的线程中运行异步函数 def run_async(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(self.pending_async_func()) except Exception as e: self.log(f"操作失败: {str(e)}", 'ERROR') import traceback traceback.print_exc() finally: loop.close() self.is_running = False self.operation_label.config(text="等待操作...") self.pending_async_func = None threading.Thread(target=run_async, daemon=True).start() def cancel_operation(self): """取消待处理的操作""" self.confirm_button.config(state='disabled') self.operation_label.config(text="操作已取消") self.pending_async_func = None self.log("操作已取消", 'WARNING') # ==================== 异步操作函数 ==================== async def async_start_browser(self): """异步启动浏览器""" self.log("正在启动浏览器...", 'INFO') self.update_status("启动中...") try: success = await self.browser_manager.initialize(headless=False) if success: self.log("[OK] 浏览器启动成功", 'SUCCESS') self.update_status("运行中 (就绪)") else: self.log("✗ 浏览器启动失败", 'ERROR') self.update_status("启动失败") except Exception as e: self.log(f"✗ 浏览器启动失败: {str(e)}", 'ERROR') self.update_status("启动失败") async def async_open_document(self): """异步打开文档""" doc_url = self.doc_url.get() if not doc_url or "your-doc-id" in doc_url: self.log("请先配置正确的金山文档URL", 'ERROR') self.update_status("错误: URL未配置") return self.log(f"正在打开文档: {doc_url}", 'INFO') self.update_status(f"打开文档中...") try: await self.browser_manager.goto(doc_url) self.log("[OK] 文档打开成功", 'SUCCESS') self.update_status("运行中 (文档已打开)") except Exception as e: self.log(f"✗ 文档打开失败: {str(e)}", 'ERROR') self.update_status("打开文档失败") async def async_close_browser(self): """异步关闭浏览器""" self.log("正在关闭浏览器...", 'INFO') self.update_status("关闭中...") try: await self.browser_manager.close() self.log("[OK] 浏览器已关闭", 'SUCCESS') self.update_status("已关闭") except Exception as e: self.log(f"✗ 关闭浏览器失败: {str(e)}", 'ERROR') self.update_status("关闭失败") async def async_test_browser_connection(self): """异步测试浏览器连接""" self.log("开始测试浏览器连接...", 'INFO') if not self.browser_manager.page: self.log("浏览器未启动,请先点击'启动浏览器'", 'ERROR') self.update_status("错误: 未启动") return self.log("[OK] 浏览器连接正常", 'SUCCESS') self.log("[OK] 页面对象可用", 'SUCCESS') self.log("浏览器连接测试通过", 'SUCCESS') self.update_status("运行中 (连接正常)") async def async_test_document_open(self): """异步测试文档打开""" self.log("开始测试文档打开...", 'INFO') if not self.browser_manager.page: self.log("浏览器未启动", 'ERROR') return try: current_url = self.browser_manager.page.url self.log(f"当前页面URL: {current_url}", 'INFO') # 检查是否在金山文档域名 if "kdocs.cn" in current_url: self.log("[OK] 已在金山文档域名", 'SUCCESS') else: self.log("当前不在金山文档域名", 'WARNING') # 检查是否有登录提示 try: login_text = await self.browser_manager.page.locator("text=登录").first.is_visible() if login_text: self.log("检测到登录页面", 'WARNING') self.update_status("需要登录") else: self.log("未检测到登录页面", 'INFO') self.update_status("运行中 (文档已打开)") except: pass self.log("文档打开测试完成", 'SUCCESS') except Exception as e: self.log(f"✗ 测试失败: {str(e)}", 'ERROR') async def async_test_table_reading(self): """异步测试表格读取""" self.log("开始测试表格读取...", 'INFO') if not self.browser_manager.page: self.log("浏览器未启动", 'ERROR') return try: self.log("尝试导航到A1单元格...", 'INFO') # 查找表格元素 canvas_count = await self.browser_manager.page.locator("canvas").count() self.log(f"检测到 {canvas_count} 个canvas元素(可能是表格)", 'INFO') # 尝试读取名称框 try: name_box = self.browser_manager.page.locator("input.edit-box").first is_visible = await name_box.is_visible() if is_visible: value = await name_box.input_value() self.log(f"名称框当前值: {value}", 'INFO') else: self.log("名称框不可见", 'INFO') except Exception as e: self.log(f"读取名称框失败: {str(e)}", 'WARNING') self.log("[OK] 表格读取测试完成", 'SUCCESS') self.update_status("运行中 (表格可读取)") except Exception as e: self.log(f"✗ 测试失败: {str(e)}", 'ERROR') async def async_test_person_search(self): """异步测试人员搜索""" self.log("开始测试人员搜索...", 'INFO') if not self.browser_manager.page: self.log("浏览器未启动", 'ERROR') return test_name = "张三" # 默认测试名称 self.log(f"搜索测试姓名: {test_name}", 'INFO') try: self.log("聚焦到网格...", 'INFO') # 打开搜索框 self.log("打开搜索框 (Ctrl+F)...", 'INFO') await self.browser_manager.page.keyboard.press("Control+f") await self.browser_manager.page.wait_for_timeout(500) # 输入搜索内容 self.log(f"输入搜索内容: {test_name}", 'INFO') await self.browser_manager.page.keyboard.type(test_name) await self.browser_manager.page.wait_for_timeout(300) # 按回车搜索 self.log("执行搜索 (Enter)...", 'INFO') await self.browser_manager.page.keyboard.press("Enter") await self.browser_manager.page.wait_for_timeout(1000) # 关闭搜索 await self.browser_manager.page.keyboard.press("Escape") await self.browser_manager.page.wait_for_timeout(300) self.log("[OK] 人员搜索测试完成", 'SUCCESS') self.log("注意:请检查浏览器窗口,看是否高亮显示了相关内容", 'INFO') self.update_status("运行中 (搜索功能正常)") except Exception as e: self.log(f"✗ 搜索测试失败: {str(e)}", 'ERROR') async def async_test_image_upload_single(self): """异步测试图片上传(单步)""" self.log("开始测试图片上传(单步)...", 'INFO') if not self.browser_manager.page: self.log("浏览器未启动", 'ERROR') return # 让用户选择图片文件 image_path = filedialog.askopenfilename( title="选择测试图片", filetypes=[("图片文件", "*.jpg *.jpeg *.png *.gif")] ) if not image_path: self.log("未选择图片文件,操作取消", 'WARNING') return self.log(f"选择的图片: {image_path}", 'INFO') try: # 1. 导航到测试单元格 self.log("导航到 D3 单元格...", 'INFO') name_box = self.browser_manager.page.locator("input.edit-box").first await name_box.click() await name_box.fill("D3") await name_box.press("Enter") await self.browser_manager.page.wait_for_timeout(500) # 2. 点击插入菜单 self.log("点击插入按钮...", 'INFO') insert_btn = self.browser_manager.page.locator("text=插入").first await insert_btn.click() await self.browser_manager.page.wait_for_timeout(500) # 3. 点击图片选项 self.log("点击图片选项...", 'INFO') image_btn = self.browser_manager.page.locator("text=图片").first await image_btn.click() await self.browser_manager.page.wait_for_timeout(500) # 4. 选择本地图片 self.log("选择本地图片...", 'INFO') local_option = self.browser_manager.page.locator("text=本地").first await local_option.click() # 5. 上传文件 self.log("上传文件...", 'INFO') async with self.browser_manager.page.expect_file_chooser() as fc_info: pass file_chooser = fc_info.value await file_chooser.set_files(image_path) self.log("[OK] 图片上传测试完成", 'SUCCESS') self.log("请检查浏览器窗口,看图片是否上传成功", 'INFO') self.update_status("运行中 (上传测试完成)") except Exception as e: self.log(f"✗ 图片上传测试失败: {str(e)}", 'ERROR') async def async_test_complete_flow(self): """异步完整流程测试""" self.log("=" * 50) self.log("开始完整流程测试", 'INFO') self.log("=" * 50) if not self.browser_manager.page: self.log("浏览器未启动", 'ERROR') return self.log("完整流程测试完成", 'SUCCESS') self.log("=" * 50) self.update_status("运行中 (完整测试完成)") # ==================== 包装函数 ==================== def start_browser(self): """启动浏览器""" self.show_operation( "即将执行:启动浏览器\n" "说明:使用Playwright启动Chromium浏览器\n" "安全:这是安全的操作,不会影响任何数据", self.async_start_browser ) def open_document(self): """打开文档""" self.show_operation( "即将执行:打开金山文档\n" "说明:导航到配置的金山文档URL\n" "安全:这是安全的操作,仅读取文档", self.async_open_document ) def close_browser(self): """关闭浏览器""" self.show_operation( "即将执行:关闭浏览器\n" "说明:关闭所有浏览器实例和上下文\n" "安全:这是安全的操作", self.async_close_browser ) def test_browser_connection(self): """测试浏览器连接""" self.show_operation( "即将执行:测试浏览器连接\n" "说明:检查浏览器和页面对象是否正常\n" "安全:这是安全的检查操作", self.async_test_browser_connection ) def test_document_open(self): """测试文档打开""" self.show_operation( "即将执行:测试文档打开\n" "说明:检查当前页面状态和URL\n" "安全:这是安全的检查操作", self.async_test_document_open ) def test_table_reading(self): """测试表格读取""" self.show_operation( "即将执行:测试表格读取\n" "说明:尝试读取表格元素和单元格\n" "安全:这是安全的只读操作,不会修改任何数据", self.async_test_table_reading ) def test_person_search(self): """测试人员搜索""" self.show_operation( "即将执行:测试人员搜索\n" "说明:执行 Ctrl+F 搜索操作\n" "⚠️ 安全:这是安全的搜索操作,不会修改数据\n" "测试内容:搜索默认姓名'张三'", self.async_test_person_search ) def test_image_upload_single(self): """测试图片上传(单步)""" self.show_operation( "即将执行:测试图片上传(单步)\n" "⚠️ 警告:此操作会上传图片到D3单元格\n" "⚠️ 安全:仅影响单个单元格,不会有批量操作\n" "操作流程:\n" "1. 导航到D3单元格\n" "2. 点击插入 → 图片 → 本地\n" "3. 上传用户选择的图片文件\n" "请选择一个小图片文件进行测试", self.async_test_image_upload_single ) def test_complete_flow(self): """完整流程测试""" self.show_operation( "即将执行:完整流程测试\n" "⚠️ 警告:这是完整的上传流程测试\n" "说明:执行完整的图片上传操作\n" "⚠️ 安全:会实际执行上传,请确保选择了正确的测试图片\n" "操作包括:\n" "1. 定位人员位置\n" "2. 上传截图\n" "3. 验证结果", self.async_test_complete_flow ) def run(self): """启动GUI""" self.log("异步安全测试工具已启动", 'INFO') self.log("请按照以下步骤操作:", 'INFO') self.log("1. 点击'启动浏览器' → 2. 点击'打开文档' → 3. 执行各项测试", 'INFO') self.log("每一步操作都需要您手动确认", 'WARNING') self.log("已自动填入您的金山文档URL", 'INFO') self.update_status("就绪") self.root.mainloop() if __name__ == "__main__": tool = AsyncTestTool() tool.run()