#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import os import threading import time from datetime import datetime from app_config import get_config from app_logger import get_logger from services.state import ( cleanup_expired_ip_rate_limits, cleanup_expired_ip_request_rates, cleanup_expired_login_security_state, safe_cleanup_expired_batches, safe_cleanup_expired_captcha, safe_cleanup_expired_pending_random, safe_get_user_accounts_last_access_items, safe_has_user, safe_iter_task_status_items, safe_remove_task_status, safe_remove_user_accounts, ) logger = get_logger("app") config = get_config() USER_ACCOUNTS_EXPIRE_SECONDS = int(getattr(config, "USER_ACCOUNTS_EXPIRE_SECONDS", 3600)) BATCH_TASK_EXPIRE_SECONDS = int(getattr(config, "BATCH_TASK_EXPIRE_SECONDS", 21600)) PENDING_RANDOM_EXPIRE_SECONDS = int(getattr(config, "PENDING_RANDOM_EXPIRE_SECONDS", 7200)) # 金山文档离线通知状态:每次掉线只通知一次,恢复在线后重置 _kdocs_offline_notified: bool = False def _to_int(value, default: int = 0) -> int: try: return int(value) except Exception: return int(default) def _collect_active_user_ids() -> set[int]: active_user_ids: set[int] = set() for _, info in safe_iter_task_status_items(): user_id = info.get("user_id") if isinstance(info, dict) else None if user_id is None: continue try: active_user_ids.add(int(user_id)) except Exception: continue return active_user_ids def _find_expired_user_cache_ids(current_time: float, active_user_ids: set[int]) -> list[int]: expired_users = [] for user_id, last_access in (safe_get_user_accounts_last_access_items() or []): try: user_id_int = int(user_id) last_access_ts = float(last_access) except Exception: continue if (current_time - last_access_ts) <= USER_ACCOUNTS_EXPIRE_SECONDS: continue if user_id_int in active_user_ids: continue if safe_has_user(user_id_int): expired_users.append(user_id_int) return expired_users def _find_completed_task_status_ids(current_time: float) -> list[str]: completed_task_ids = [] for account_id, status_data in safe_iter_task_status_items(): status = status_data.get("status") if isinstance(status_data, dict) else None if status not in ["已完成", "失败", "已停止"]: continue start_time = float(status_data.get("start_time", 0) or 0) if (current_time - start_time) > 600: # 10分钟 completed_task_ids.append(account_id) return completed_task_ids def _reap_zombie_processes() -> None: while True: try: pid, _ = os.waitpid(-1, os.WNOHANG) if pid == 0: break logger.debug(f"已回收僵尸进程: PID={pid}") except ChildProcessError: break except Exception: break def cleanup_expired_data() -> None: """定期清理过期数据,防止内存泄漏(逻辑保持不变)。""" current_time = time.time() deleted_captchas = safe_cleanup_expired_captcha(current_time) if deleted_captchas: logger.debug(f"已清理 {deleted_captchas} 个过期验证码") deleted_ips = cleanup_expired_ip_rate_limits(current_time) if deleted_ips: logger.debug(f"已清理 {deleted_ips} 个过期IP限流记录") deleted_ip_requests = cleanup_expired_ip_request_rates(current_time) if deleted_ip_requests: logger.debug(f"已清理 {deleted_ip_requests} 个过期IP请求频率记录") login_cleanup_stats = cleanup_expired_login_security_state(current_time) login_cleanup_total = sum(int(v or 0) for v in login_cleanup_stats.values()) if login_cleanup_total: logger.debug( "已清理登录风控缓存: " f"失败计数={login_cleanup_stats.get('failures', 0)}, " f"限流桶={login_cleanup_stats.get('rate_limits', 0)}, " f"扫描状态={login_cleanup_stats.get('scan_states', 0)}, " f"短时锁={login_cleanup_stats.get('ip_user_locks', 0)}, " f"告警状态={login_cleanup_stats.get('alerts', 0)}" ) active_user_ids = _collect_active_user_ids() expired_users = _find_expired_user_cache_ids(current_time, active_user_ids) for user_id in expired_users: safe_remove_user_accounts(user_id) if expired_users: logger.debug(f"已清理 {len(expired_users)} 个过期用户账号缓存") completed_task_ids = _find_completed_task_status_ids(current_time) for account_id in completed_task_ids: safe_remove_task_status(account_id) if completed_task_ids: logger.debug(f"已清理 {len(completed_task_ids)} 个已完成任务状态") _reap_zombie_processes() deleted_batches = safe_cleanup_expired_batches(BATCH_TASK_EXPIRE_SECONDS, current_time) if deleted_batches: logger.debug(f"已清理 {deleted_batches} 个过期批次任务缓存") deleted_random = safe_cleanup_expired_pending_random(PENDING_RANDOM_EXPIRE_SECONDS, current_time) if deleted_random: logger.debug(f"已清理 {deleted_random} 个过期随机延迟任务") def _load_kdocs_monitor_config(): import database cfg = database.get_system_config() if not cfg: return None kdocs_enabled = _to_int(cfg.get("kdocs_enabled"), 0) if not kdocs_enabled: return None admin_notify_enabled = _to_int(cfg.get("kdocs_admin_notify_enabled"), 0) admin_notify_email = str(cfg.get("kdocs_admin_notify_email") or "").strip() if (not admin_notify_enabled) or (not admin_notify_email): return None return admin_notify_email def _is_kdocs_offline(status: dict) -> tuple[bool, bool, bool | None]: login_required = bool(status.get("login_required", False)) last_login_ok = status.get("last_login_ok") is_offline = login_required or (last_login_ok is False) return is_offline, login_required, last_login_ok def _send_kdocs_offline_alert(admin_notify_email: str, *, login_required: bool, last_login_ok) -> bool: try: import email_service now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") subject = "【金山文档离线告警】需要重新登录" body = f""" 您好, 系统检测到金山文档上传功能已离线,需要重新扫码登录。 检测时间:{now_str} 状态详情: - 需要登录:{login_required} - 上次登录状态:{last_login_ok} 请尽快登录后台,在"系统配置"→"金山文档上传"中点击"获取登录二维码"重新登录。 --- 此邮件由系统自动发送,请勿直接回复。 """ email_service.send_email_async( to_email=admin_notify_email, subject=subject, body=body, email_type="kdocs_offline_alert", ) logger.warning(f"[KDocs监控] 金山文档离线,已发送通知邮件到 {admin_notify_email}") return True except Exception as e: logger.error(f"[KDocs监控] 发送离线通知邮件失败: {e}") return False def check_kdocs_online_status() -> None: """检测金山文档登录状态,如果离线则发送邮件通知管理员(每次掉线只通知一次)""" global _kdocs_offline_notified try: admin_notify_email = _load_kdocs_monitor_config() if not admin_notify_email: return from services.kdocs_uploader import get_kdocs_uploader kdocs = get_kdocs_uploader() status = kdocs.get_status() or {} is_offline, login_required, last_login_ok = _is_kdocs_offline(status) if is_offline: if _kdocs_offline_notified: logger.debug("[KDocs监控] 金山文档离线,已通知过,跳过重复通知") return if _send_kdocs_offline_alert( admin_notify_email, login_required=login_required, last_login_ok=last_login_ok, ): _kdocs_offline_notified = True return if _kdocs_offline_notified: logger.info("[KDocs监控] 金山文档已恢复在线,重置通知状态") _kdocs_offline_notified = False logger.debug("[KDocs监控] 金山文档状态正常") except Exception as e: logger.error(f"[KDocs监控] 检测失败: {e}") def _start_daemon_loop(name: str, *, startup_delay: float, interval_seconds: float, job, error_tag: str): def loop(): if startup_delay > 0: time.sleep(startup_delay) while True: try: job() time.sleep(interval_seconds) except Exception as e: logger.error(f"{error_tag}: {e}") time.sleep(min(60.0, max(1.0, interval_seconds / 5.0))) thread = threading.Thread(target=loop, daemon=True, name=name) thread.start() return thread def start_cleanup_scheduler() -> None: """启动定期清理调度器""" _start_daemon_loop( "cleanup-scheduler", startup_delay=300, interval_seconds=300, job=cleanup_expired_data, error_tag="清理任务执行失败", ) logger.info("内存清理调度器已启动") def start_kdocs_monitor() -> None: """启动金山文档状态监控""" _start_daemon_loop( "kdocs-monitor", startup_delay=60, interval_seconds=300, job=check_kdocs_online_status, error_tag="[KDocs监控] 监控任务执行失败", ) logger.info("[KDocs监控] 金山文档状态监控已启动(每5分钟检测一次)")