#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import threading import time from datetime import datetime from app_config import get_config from app_logger import get_logger from services.state import ( cleanup_expired_ip_rate_limits, safe_cleanup_expired_batches, safe_cleanup_expired_captcha, safe_cleanup_expired_pending_random, safe_get_user_accounts_last_access_items, safe_has_user, safe_iter_task_status_items, safe_remove_task_status, safe_remove_user_accounts, ) logger = get_logger("app") config = get_config() USER_ACCOUNTS_EXPIRE_SECONDS = int(getattr(config, "USER_ACCOUNTS_EXPIRE_SECONDS", 3600)) BATCH_TASK_EXPIRE_SECONDS = int(getattr(config, "BATCH_TASK_EXPIRE_SECONDS", 21600)) PENDING_RANDOM_EXPIRE_SECONDS = int(getattr(config, "PENDING_RANDOM_EXPIRE_SECONDS", 7200)) # 金山文档离线通知状态:每次掉线只通知一次,恢复在线后重置 _kdocs_offline_notified: bool = False def cleanup_expired_data() -> None: """定期清理过期数据,防止内存泄漏(逻辑保持不变)。""" current_time = time.time() deleted_captchas = safe_cleanup_expired_captcha(current_time) if deleted_captchas: logger.debug(f"已清理 {deleted_captchas} 个过期验证码") deleted_ips = cleanup_expired_ip_rate_limits(current_time) if deleted_ips: logger.debug(f"已清理 {deleted_ips} 个过期IP限流记录") expired_users = [] last_access_items = safe_get_user_accounts_last_access_items() if last_access_items: task_items = safe_iter_task_status_items() active_user_ids = {int(info.get("user_id")) for _, info in task_items if info.get("user_id")} for user_id, last_access in last_access_items: if (current_time - float(last_access)) <= USER_ACCOUNTS_EXPIRE_SECONDS: continue if int(user_id) in active_user_ids: continue if safe_has_user(user_id): expired_users.append(int(user_id)) for user_id in expired_users: safe_remove_user_accounts(user_id) if expired_users: logger.debug(f"已清理 {len(expired_users)} 个过期用户账号缓存") completed_tasks = [] for account_id, status_data in safe_iter_task_status_items(): if status_data.get("status") in ["已完成", "失败", "已停止"]: start_time = float(status_data.get("start_time", 0) or 0) if (current_time - start_time) > 600: # 10分钟 completed_tasks.append(account_id) for account_id in completed_tasks: safe_remove_task_status(account_id) if completed_tasks: logger.debug(f"已清理 {len(completed_tasks)} 个已完成任务状态") try: import os while True: try: pid, status = os.waitpid(-1, os.WNOHANG) if pid == 0: break logger.debug(f"已回收僵尸进程: PID={pid}") except ChildProcessError: break except Exception: pass deleted_batches = safe_cleanup_expired_batches(BATCH_TASK_EXPIRE_SECONDS, current_time) if deleted_batches: logger.debug(f"已清理 {deleted_batches} 个过期批次任务缓存") deleted_random = safe_cleanup_expired_pending_random(PENDING_RANDOM_EXPIRE_SECONDS, current_time) if deleted_random: logger.debug(f"已清理 {deleted_random} 个过期随机延迟任务") def check_kdocs_online_status() -> None: """检测金山文档登录状态,如果离线则发送邮件通知管理员(每次掉线只通知一次)""" global _kdocs_offline_notified try: import database from services.kdocs_uploader import get_kdocs_uploader # 获取系统配置 cfg = database.get_system_config() if not cfg: return # 检查是否启用了金山文档功能 kdocs_enabled = int(cfg.get("kdocs_enabled") or 0) if not kdocs_enabled: return # 检查是否启用了管理员通知 admin_notify_enabled = int(cfg.get("kdocs_admin_notify_enabled") or 0) admin_notify_email = (cfg.get("kdocs_admin_notify_email") or "").strip() if not admin_notify_enabled or not admin_notify_email: return # 获取金山文档状态 kdocs = get_kdocs_uploader() status = kdocs.get_status() login_required = status.get("login_required", False) last_login_ok = status.get("last_login_ok") # 如果需要登录或最后登录状态不是成功 is_offline = login_required or (last_login_ok is False) if is_offline: # 已经通知过了,不再重复通知 if _kdocs_offline_notified: logger.debug("[KDocs监控] 金山文档离线,已通知过,跳过重复通知") return # 发送邮件通知 try: import email_service now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") subject = "【金山文档离线告警】需要重新登录" body = f""" 您好, 系统检测到金山文档上传功能已离线,需要重新扫码登录。 检测时间:{now_str} 状态详情: - 需要登录:{login_required} - 上次登录状态:{last_login_ok} 请尽快登录后台,在"系统配置"→"金山文档上传"中点击"获取登录二维码"重新登录。 --- 此邮件由系统自动发送,请勿直接回复。 """ email_service.send_email_async( to_email=admin_notify_email, subject=subject, body=body, email_type="kdocs_offline_alert", ) _kdocs_offline_notified = True # 标记为已通知 logger.warning(f"[KDocs监控] 金山文档离线,已发送通知邮件到 {admin_notify_email}") except Exception as e: logger.error(f"[KDocs监控] 发送离线通知邮件失败: {e}") else: # 恢复在线,重置通知状态 if _kdocs_offline_notified: logger.info("[KDocs监控] 金山文档已恢复在线,重置通知状态") _kdocs_offline_notified = False logger.debug("[KDocs监控] 金山文档状态正常") except Exception as e: logger.error(f"[KDocs监控] 检测失败: {e}") def start_cleanup_scheduler() -> None: """启动定期清理调度器""" def cleanup_loop(): while True: try: time.sleep(300) # 每5分钟执行一次清理 cleanup_expired_data() except Exception as e: logger.error(f"清理任务执行失败: {e}") cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="cleanup-scheduler") cleanup_thread.start() logger.info("内存清理调度器已启动") def start_kdocs_monitor() -> None: """启动金山文档状态监控""" def monitor_loop(): # 启动后等待 60 秒再开始检测(给系统初始化的时间) time.sleep(60) while True: try: check_kdocs_online_status() time.sleep(300) # 每5分钟检测一次 except Exception as e: logger.error(f"[KDocs监控] 监控任务执行失败: {e}") time.sleep(60) monitor_thread = threading.Thread(target=monitor_loop, daemon=True, name="kdocs-monitor") monitor_thread.start() logger.info("[KDocs监控] 金山文档状态监控已启动(每5分钟检测一次)")