Files
zsglpt/services/maintenance.py
yuyx b0fe325154 feat: KDocs 上传增强 + 离线监控 + Bug修复
KDocs 上传功能增强:
- 搜索优化:只用姓名搜索 + C列验证,避免匹配到错误单元格
- 有效行范围:支持配置起始行/结束行,限制上传区域
- 图片覆盖:支持覆盖单元格已有图片(Escape + Delete)
- 配置持久化:kdocs_row_start/row_end 保存到数据库(v18迁移)

二次登录功能:
- 登录后立即再次登录,让"上次登录时间"显示为刚刚

KDocs 离线监控:
- 每5分钟检测金山文档登录状态
- 离线时发送邮件通知管理员(每次掉线只通知一次)
- 恢复在线后重置通知状态

Bug 修复:
- 任务日志搜索账号关键词报错500:添加异常处理

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 23:40:46 +08:00

213 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import threading
import time
from datetime import datetime
from app_config import get_config
from app_logger import get_logger
from services.state import (
cleanup_expired_ip_rate_limits,
safe_cleanup_expired_batches,
safe_cleanup_expired_captcha,
safe_cleanup_expired_pending_random,
safe_get_user_accounts_last_access_items,
safe_has_user,
safe_iter_task_status_items,
safe_remove_task_status,
safe_remove_user_accounts,
)
logger = get_logger("app")
config = get_config()
USER_ACCOUNTS_EXPIRE_SECONDS = int(getattr(config, "USER_ACCOUNTS_EXPIRE_SECONDS", 3600))
BATCH_TASK_EXPIRE_SECONDS = int(getattr(config, "BATCH_TASK_EXPIRE_SECONDS", 21600))
PENDING_RANDOM_EXPIRE_SECONDS = int(getattr(config, "PENDING_RANDOM_EXPIRE_SECONDS", 7200))
# 金山文档离线通知状态:每次掉线只通知一次,恢复在线后重置
_kdocs_offline_notified: bool = False
def cleanup_expired_data() -> None:
"""定期清理过期数据,防止内存泄漏(逻辑保持不变)。"""
current_time = time.time()
deleted_captchas = safe_cleanup_expired_captcha(current_time)
if deleted_captchas:
logger.debug(f"已清理 {deleted_captchas} 个过期验证码")
deleted_ips = cleanup_expired_ip_rate_limits(current_time)
if deleted_ips:
logger.debug(f"已清理 {deleted_ips} 个过期IP限流记录")
expired_users = []
last_access_items = safe_get_user_accounts_last_access_items()
if last_access_items:
task_items = safe_iter_task_status_items()
active_user_ids = {int(info.get("user_id")) for _, info in task_items if info.get("user_id")}
for user_id, last_access in last_access_items:
if (current_time - float(last_access)) <= USER_ACCOUNTS_EXPIRE_SECONDS:
continue
if int(user_id) in active_user_ids:
continue
if safe_has_user(user_id):
expired_users.append(int(user_id))
for user_id in expired_users:
safe_remove_user_accounts(user_id)
if expired_users:
logger.debug(f"已清理 {len(expired_users)} 个过期用户账号缓存")
completed_tasks = []
for account_id, status_data in safe_iter_task_status_items():
if status_data.get("status") in ["已完成", "失败", "已停止"]:
start_time = float(status_data.get("start_time", 0) or 0)
if (current_time - start_time) > 600: # 10分钟
completed_tasks.append(account_id)
for account_id in completed_tasks:
safe_remove_task_status(account_id)
if completed_tasks:
logger.debug(f"已清理 {len(completed_tasks)} 个已完成任务状态")
try:
import os
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
break
logger.debug(f"已回收僵尸进程: PID={pid}")
except ChildProcessError:
break
except Exception:
pass
deleted_batches = safe_cleanup_expired_batches(BATCH_TASK_EXPIRE_SECONDS, current_time)
if deleted_batches:
logger.debug(f"已清理 {deleted_batches} 个过期批次任务缓存")
deleted_random = safe_cleanup_expired_pending_random(PENDING_RANDOM_EXPIRE_SECONDS, current_time)
if deleted_random:
logger.debug(f"已清理 {deleted_random} 个过期随机延迟任务")
def check_kdocs_online_status() -> None:
"""检测金山文档登录状态,如果离线则发送邮件通知管理员(每次掉线只通知一次)"""
global _kdocs_offline_notified
try:
import database
from services.kdocs_uploader import get_kdocs_uploader
# 获取系统配置
cfg = database.get_system_config()
if not cfg:
return
# 检查是否启用了金山文档功能
kdocs_enabled = int(cfg.get("kdocs_enabled") or 0)
if not kdocs_enabled:
return
# 检查是否启用了管理员通知
admin_notify_enabled = int(cfg.get("kdocs_admin_notify_enabled") or 0)
admin_notify_email = (cfg.get("kdocs_admin_notify_email") or "").strip()
if not admin_notify_enabled or not admin_notify_email:
return
# 获取金山文档状态
kdocs = get_kdocs_uploader()
status = kdocs.get_status()
login_required = status.get("login_required", False)
last_login_ok = status.get("last_login_ok")
# 如果需要登录或最后登录状态不是成功
is_offline = login_required or (last_login_ok is False)
if is_offline:
# 已经通知过了,不再重复通知
if _kdocs_offline_notified:
logger.debug("[KDocs监控] 金山文档离线,已通知过,跳过重复通知")
return
# 发送邮件通知
try:
import email_service
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = "【金山文档离线告警】需要重新登录"
body = f"""
您好,
系统检测到金山文档上传功能已离线,需要重新扫码登录。
检测时间:{now_str}
状态详情:
- 需要登录:{login_required}
- 上次登录状态:{last_login_ok}
请尽快登录后台,在"系统配置""金山文档上传"中点击"获取登录二维码"重新登录。
---
此邮件由系统自动发送,请勿直接回复。
"""
email_service.send_email_async(
to_email=admin_notify_email,
subject=subject,
body=body,
email_type="kdocs_offline_alert",
)
_kdocs_offline_notified = True # 标记为已通知
logger.warning(f"[KDocs监控] 金山文档离线,已发送通知邮件到 {admin_notify_email}")
except Exception as e:
logger.error(f"[KDocs监控] 发送离线通知邮件失败: {e}")
else:
# 恢复在线,重置通知状态
if _kdocs_offline_notified:
logger.info("[KDocs监控] 金山文档已恢复在线,重置通知状态")
_kdocs_offline_notified = False
logger.debug("[KDocs监控] 金山文档状态正常")
except Exception as e:
logger.error(f"[KDocs监控] 检测失败: {e}")
def start_cleanup_scheduler() -> None:
"""启动定期清理调度器"""
def cleanup_loop():
while True:
try:
time.sleep(300) # 每5分钟执行一次清理
cleanup_expired_data()
except Exception as e:
logger.error(f"清理任务执行失败: {e}")
cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="cleanup-scheduler")
cleanup_thread.start()
logger.info("内存清理调度器已启动")
def start_kdocs_monitor() -> None:
"""启动金山文档状态监控"""
def monitor_loop():
# 启动后等待 60 秒再开始检测(给系统初始化的时间)
time.sleep(60)
while True:
try:
check_kdocs_online_status()
time.sleep(300) # 每5分钟检测一次
except Exception as e:
logger.error(f"[KDocs监控] 监控任务执行失败: {e}")
time.sleep(60)
monitor_thread = threading.Thread(target=monitor_loop, daemon=True, name="kdocs-monitor")
monitor_thread.start()
logger.info("[KDocs监控] 金山文档状态监控已启动每5分钟检测一次")