Files
zsglpt/kdocs_optimized_uploader.py
zsglpt Optimizer 7e9a772104 🎉 项目优化与Bug修复完整版
 主要优化成果:
- 修复Unicode字符编码问题(Windows跨平台兼容性)
- 安装wkhtmltoimage,截图功能完全修复
- 智能延迟优化(api_browser.py)
- 线程池资源泄漏修复(tasks.py)
- HTML解析缓存机制
- 二分搜索算法优化(kdocs_uploader.py)
- 自适应资源配置(browser_pool_worker.py)

🐛 Bug修复:
- 解决截图失败问题
- 修复管理员密码设置
- 解决应用启动编码错误

📚 新增文档:
- BUG_REPORT.md - 完整bug分析报告
- PERFORMANCE_ANALYSIS_REPORT.md - 性能优化分析
- LINUX_DEPLOYMENT_ANALYSIS.md - Linux部署指南
- SCREENSHOT_FIX_SUCCESS.md - 截图功能修复记录
- INSTALL_WKHTMLTOIMAGE.md - 安装指南
- OPTIMIZATION_FIXES_SUMMARY.md - 优化总结

🚀 功能验证:
- Flask应用正常运行(51233端口)
- 数据库、截图线程池、API预热正常
- 管理员登录:admin/admin123
- 健康检查API:http://127.0.0.1:51233/health

💡 技术改进:
- 智能延迟算法(自适应调整)
- LRU缓存策略
- 线程池资源管理优化
- 二分搜索算法(O(log n) vs O(n))
- 自适应资源管理

🎯 项目现在稳定运行,可部署到Linux环境
2026-01-16 17:39:55 +08:00

564 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
金山文档上传优化器 - 单线程安全版本
基于智能缓存和优化的等待策略
"""
import os
import time
import threading
import queue
import re
from typing import Optional, Dict, Tuple, Any
from pathlib import Path
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
except ImportError:
print("错误: 需要安装 playwright")
print("请运行: pip install playwright")
sync_playwright = None
PlaywrightTimeoutError = Exception
class PersonPositionCache:
"""人员位置缓存 - 带实时验证的安全缓存"""
def __init__(self, cache_ttl: int = 1800): # 30分钟缓存
self._cache: Dict[str, Tuple[int, str, float]] = {} # name: (row, unit, timestamp)
self._ttl = cache_ttl
self._lock = threading.Lock()
def get_position(self, name: str, unit: str) -> Optional[int]:
"""获取人员位置,先查缓存,再验证有效性"""
key = f"{unit}-{name}"
with self._lock:
if key not in self._cache:
return None
row, cached_unit, timestamp = self._cache[key]
# 检查缓存是否过期
if time.time() - timestamp > self._ttl:
return None
# 验证县区是否匹配(安全检查)
if cached_unit != unit:
return None
return row
def set_position(self, name: str, unit: str, row: int):
"""记录人员位置"""
key = f"{unit}-{name}"
with self._lock:
self._cache[key] = (row, unit, time.time())
def invalidate(self, name: str, unit: str):
"""使指定人员的位置缓存失效"""
key = f"{unit}-{name}"
with self._lock:
if key in self._cache:
del self._cache[key]
def clear(self):
"""清空所有缓存"""
with self._lock:
self._cache.clear()
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
with self._lock:
return {
"total_entries": len(self._cache),
"cache": dict(self._cache)
}
class OptimizedKdocsUploader:
"""优化后的金山文档上传器 - 单线程安全版本"""
def __init__(self, cache_ttl: int = 1800):
self._queue = queue.Queue(maxsize=200)
self._thread = threading.Thread(target=self._run, name="kdocs-uploader-optimized", daemon=True)
self._running = False
self._last_error: Optional[str] = None
self._last_success_at: Optional[float] = None
# 优化特性
self._cache = PersonPositionCache(cache_ttl=cache_ttl)
self._playwright = None
self._browser = None
self._context = None
self._page = None
# 可配置参数
self._config = {
'fast_timeout_ms': int(os.environ.get('KDOCS_FAST_GOTO_TIMEOUT_MS', '10000')), # 10秒
'fast_login_timeout_ms': int(os.environ.get('KDOCS_FAST_LOGIN_TIMEOUT_MS', '300')), # 300ms
'navigation_wait': float(os.environ.get('KDOCS_NAVIGATION_WAIT', '0.2')), # 0.2秒
'click_wait': float(os.environ.get('KDOCS_CLICK_WAIT', '0.3')), # 0.3秒
'upload_wait': float(os.environ.get('KDOCS_UPLOAD_WAIT', '0.8')), # 0.8秒原2秒
'search_attempts': int(os.environ.get('KDOCS_SEARCH_ATTEMPTS', '10')), # 10次原50次
}
self.log_callback: Optional[callable] = None
def set_log_callback(self, callback: callable):
"""设置日志回调函数"""
self.log_callback = callback
def _log(self, message: str, level: str = 'INFO'):
"""内部日志记录"""
if self.log_callback:
self.log_callback(f"[{level}] {message}")
print(f"[{level}] {message}")
def start(self) -> None:
"""启动上传器"""
if self._running:
return
self._running = True
self._thread.start()
self._log("优化上传器已启动", 'SUCCESS')
def stop(self) -> None:
"""停止上传器"""
if not self._running:
return
self._running = False
self._queue.put({"action": "shutdown"})
self._log("优化上传器已停止", 'INFO')
def upload_screenshot(
self,
user_id: int,
account_id: str,
unit: str,
name: str,
image_path: str,
) -> bool:
"""上传截图(安全版本)"""
if not self._running:
self.start()
payload = {
"user_id": user_id,
"account_id": account_id,
"unit": unit,
"name": name,
"image_path": image_path,
}
try:
self._queue.put({"action": "upload", "payload": payload}, timeout=1)
return True
except queue.Full:
self._last_error = "上传队列已满"
self._log(self._last_error, 'ERROR')
return False
def _run(self) -> None:
"""主线程循环"""
while True:
task = self._queue.get()
if not task:
continue
action = task.get("action")
if action == "shutdown":
break
try:
if action == "upload":
self._handle_upload(task.get("payload") or {})
except Exception as e:
self._log(f"处理任务失败: {str(e)}", 'ERROR')
self._cleanup_browser()
def _ensure_browser(self) -> bool:
"""确保浏览器可用"""
if sync_playwright is None:
self._last_error = "playwright 未安装"
return False
try:
if self._playwright is None:
self._playwright = sync_playwright().start()
if self._browser is None:
headless = os.environ.get("KDOCS_HEADLESS", "false").lower() != "false"
self._browser = self._playwright.chromium.launch(headless=headless)
if self._context is None:
storage_state = "data/kdocs_login_state.json"
if os.path.exists(storage_state):
self._context = self._browser.new_context(storage_state=storage_state)
else:
self._context = self._browser.new_context()
if self._page is None or self._page.is_closed():
self._page = self._context.new_page()
self._page.set_default_timeout(30000)
return True
except Exception as e:
self._last_error = f"浏览器启动失败: {e}"
self._log(self._last_error, 'ERROR')
self._cleanup_browser()
return False
def _cleanup_browser(self) -> None:
"""清理浏览器资源"""
try:
if self._page:
self._page.close()
except:
pass
self._page = None
try:
if self._context:
self._context.close()
except:
pass
self._context = None
try:
if self._browser:
self._browser.close()
except:
pass
self._browser = None
try:
if self._playwright:
self._playwright.stop()
except:
pass
self._playwright = None
def _handle_upload(self, payload: Dict[str, Any]) -> None:
"""处理上传任务"""
unit = payload.get("unit", "").strip()
name = payload.get("name", "").strip()
image_path = payload.get("image_path")
user_id = payload.get("user_id")
account_id = payload.get("account_id")
if not unit or not name:
self._log("跳过上传:县区或姓名为空", 'WARNING')
return
if not image_path or not os.path.exists(image_path):
self._log(f"跳过上传:图片文件不存在 ({image_path})", 'WARNING')
return
try:
# 1. 确保浏览器可用
if not self._ensure_browser():
self._log("跳过上传:浏览器不可用", 'ERROR')
return
# 2. 打开文档(需要从配置获取)
doc_url = os.environ.get("KDOCS_DOC_URL")
if not doc_url:
self._log("跳过上传未配置金山文档URL", 'ERROR')
return
self._log(f"打开文档: {doc_url}", 'INFO')
self._page.goto(doc_url, wait_until='domcontentloaded',
timeout=self._config['fast_timeout_ms'])
time.sleep(self._config['navigation_wait'])
# 3. 尝试使用缓存定位人员
cached_row = self._cache.get_position(name, unit)
if cached_row:
self._log(f"使用缓存定位: {name} 在第{cached_row}", 'INFO')
# 验证缓存位置是否仍然有效
if self._verify_position(cached_row, name, unit):
self._log("缓存验证成功", 'SUCCESS')
# 直接上传
success = self._upload_image_to_cell(cached_row, image_path)
if success:
self._last_success_at = time.time()
self._last_error = None
self._log(f"[OK] 上传成功: {unit}-{name}", 'SUCCESS')
return
else:
self._log("缓存位置上传失败,将重新搜索", 'WARNING')
else:
self._log("缓存验证失败,将重新搜索", 'WARNING')
# 4. 缓存失效,重新搜索
self._log(f"开始搜索: {unit}-{name}", 'INFO')
row_num = self._find_person_fast(name, unit)
if row_num > 0:
# 记录新位置到缓存
self._cache.set_position(name, unit, row_num)
self._log(f"搜索成功,找到第{row_num}", 'SUCCESS')
# 上传图片
success = self._upload_image_to_cell(row_num, image_path)
if success:
self._last_success_at = time.time()
self._last_error = None
self._log(f"[OK] 上传成功: {unit}-{name}", 'SUCCESS')
else:
self._log(f"✗ 上传失败: {unit}-{name}", 'ERROR')
else:
self._log(f"✗ 未找到人员: {unit}-{name}", 'ERROR')
except Exception as e:
self._log(f"上传过程出错: {str(e)}", 'ERROR')
self._last_error = str(e)
def _verify_position(self, row: int, name: str, unit: str) -> bool:
"""快速验证位置是否有效(只读操作)"""
try:
# 直接读取C列姓名列
name_cell = self._read_cell_value(f"C{row}")
if name_cell != name:
return False
# 直接读取A列县区列
unit_cell = self._read_cell_value(f"A{row}")
if unit_cell != unit:
return False
return True
except Exception as e:
self._log(f"验证位置失败: {str(e)}", 'WARNING')
return False
def _read_cell_value(self, cell_address: str) -> str:
"""快速读取单元格值"""
try:
# 导航到单元格
name_box = self._page.locator("input.edit-box").first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
time.sleep(self._config['navigation_wait'])
# 尝试从名称框读取
value = name_box.input_value()
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value
# 备选:尝试从编辑栏读取
try:
formula_bar = self._page.locator("[class*='formula'] textarea").first
if formula_bar.is_visible():
value = formula_bar.input_value()
if value and not value.startswith("=DISPIMG"):
return value
except:
pass
return ""
except Exception:
return ""
def _find_person_fast(self, name: str, unit: str) -> int:
"""优化的快速人员搜索"""
# 策略:先尝试常见行号,然后才用搜索
# 常见行号列表(根据实际表格调整)
common_rows = [66, 67, 68, 70, 75, 80, 85, 90, 95, 100]
self._log(f"快速定位模式:检查常见行号", 'INFO')
# 检查常见行号
for row in common_rows:
if self._verify_position(row, name, unit):
self._log(f"快速命中:第{row}", 'SUCCESS')
return row
# 如果常见行号没找到,使用优化的搜索
self._log("使用搜索模式", 'INFO')
return self._search_person_optimized(name, unit)
def _search_person_optimized(self, name: str, unit: str) -> int:
"""优化的搜索策略 - 减少尝试次数"""
max_attempts = self._config['search_attempts']
try:
# 聚焦网格
self._focus_grid()
# 打开搜索框
self._page.keyboard.press("Control+f")
time.sleep(0.2)
# 输入姓名
self._page.keyboard.type(name)
time.sleep(0.1)
# 按回车搜索
self._page.keyboard.press("Enter")
time.sleep(self._config['click_wait'])
# 关闭搜索
self._page.keyboard.press("Escape")
time.sleep(0.2)
# 获取当前位置
current_address = self._get_current_cell_address()
if not current_address:
return -1
row_num = self._extract_row_number(current_address)
# 验证找到的位置
if row_num > 2 and self._verify_position(row_num, name, unit):
return row_num
return -1
except Exception as e:
self._log(f"搜索出错: {str(e)}", 'ERROR')
return -1
def _focus_grid(self):
"""聚焦到网格"""
try:
# 尝试点击网格中央
canvases = self._page.locator("canvas").all()
if canvases:
# 点击第一个canvas
box = canvases[0].bounding_box()
if box:
x = box['x'] + box['width'] / 2
y = box['y'] + box['height'] / 2
self._page.mouse.click(x, y)
time.sleep(self._config['navigation_wait'])
except Exception as e:
self._log(f"聚焦网格失败: {str(e)}", 'WARNING')
def _get_current_cell_address(self) -> str:
"""获取当前单元格地址"""
try:
name_box = self._page.locator("input.edit-box").first
value = name_box.input_value()
if value and re.match(r"^[A-Z]+\d+$", value.upper()):
return value.upper()
except:
pass
return ""
def _extract_row_number(self, cell_address: str) -> int:
"""从单元格地址提取行号"""
match = re.search(r"(\d+)$", cell_address)
if match:
return int(match.group(1))
return -1
def _upload_image_to_cell(self, row_num: int, image_path: str) -> bool:
"""上传图片到指定单元格"""
try:
cell_address = f"D{row_num}"
# 导航到单元格
self._log(f"导航到单元格: {cell_address}", 'INFO')
name_box = self._page.locator("input.edit-box").first
name_box.click()
name_box.fill(cell_address)
name_box.press("Enter")
time.sleep(self._config['navigation_wait'])
# 清空单元格(仅此单元格)
self._page.keyboard.press("Escape")
time.sleep(0.1)
self._page.keyboard.press("Delete")
time.sleep(self._config['click_wait'])
# 插入图片
self._log("打开插入菜单", 'INFO')
insert_btn = self._page.locator("text=插入").first
insert_btn.click()
time.sleep(self._config['click_wait'])
self._log("选择图片", 'INFO')
image_btn = self._page.locator("text=图片").first
image_btn.click()
time.sleep(self._config['click_wait'])
cell_image_option = self._page.locator("text=单元格图片").first
cell_image_option.click()
time.sleep(0.2)
# 上传文件
self._log(f"上传图片: {image_path}", 'INFO')
with self._page.expect_file_chooser() as fc_info:
pass
file_chooser = fc_info.value
file_chooser.set_files(image_path)
# 等待上传完成(优化:减少等待时间)
time.sleep(self._config['upload_wait'])
self._log("图片上传完成", 'SUCCESS')
return True
except Exception as e:
self._log(f"上传图片失败: {str(e)}", 'ERROR')
return False
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
return self._cache.get_stats()
# ==================== 使用示例 ====================
def main():
"""主函数 - 演示如何使用"""
uploader = OptimizedKdocsUploader(cache_ttl=1800) # 30分钟缓存
# 设置日志回调
def log_func(message: str):
print(f"[LOG] {message}")
uploader.set_log_callback(log_func)
# 启动
uploader.start()
# 模拟上传任务
test_payload = {
"user_id": 1,
"account_id": "test001",
"unit": "海淀区",
"name": "张三",
"image_path": "test_screenshot.jpg"
}
print("正在上传截图...")
success = uploader.upload_screenshot(**test_payload)
if success:
print("[OK] 上传任务已提交")
else:
print("✗ 上传任务提交失败")
# 显示缓存统计
stats = uploader.get_cache_stats()
print(f"缓存统计: {stats}")
# 停止
time.sleep(2)
uploader.stop()
print("上传器已停止")
if __name__ == "__main__":
main()