#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import threading import time from app_config import get_config from app_logger import get_logger from services.state import ( cleanup_expired_ip_rate_limits, safe_cleanup_expired_batches, safe_cleanup_expired_captcha, safe_cleanup_expired_pending_random, safe_get_user_accounts_last_access_items, safe_has_user, safe_iter_task_status_items, safe_remove_task_status, safe_remove_user_accounts, ) logger = get_logger("app") config = get_config() USER_ACCOUNTS_EXPIRE_SECONDS = int(getattr(config, "USER_ACCOUNTS_EXPIRE_SECONDS", 3600)) BATCH_TASK_EXPIRE_SECONDS = int(getattr(config, "BATCH_TASK_EXPIRE_SECONDS", 21600)) PENDING_RANDOM_EXPIRE_SECONDS = int(getattr(config, "PENDING_RANDOM_EXPIRE_SECONDS", 7200)) def cleanup_expired_data() -> None: """定期清理过期数据,防止内存泄漏(逻辑保持不变)。""" current_time = time.time() deleted_captchas = safe_cleanup_expired_captcha(current_time) if deleted_captchas: logger.debug(f"已清理 {deleted_captchas} 个过期验证码") deleted_ips = cleanup_expired_ip_rate_limits(current_time) if deleted_ips: logger.debug(f"已清理 {deleted_ips} 个过期IP限流记录") expired_users = [] last_access_items = safe_get_user_accounts_last_access_items() if last_access_items: task_items = safe_iter_task_status_items() active_user_ids = {int(info.get("user_id")) for _, info in task_items if info.get("user_id")} for user_id, last_access in last_access_items: if (current_time - float(last_access)) <= USER_ACCOUNTS_EXPIRE_SECONDS: continue if int(user_id) in active_user_ids: continue if safe_has_user(user_id): expired_users.append(int(user_id)) for user_id in expired_users: safe_remove_user_accounts(user_id) if expired_users: logger.debug(f"已清理 {len(expired_users)} 个过期用户账号缓存") completed_tasks = [] for account_id, status_data in safe_iter_task_status_items(): if status_data.get("status") in ["已完成", "失败", "已停止"]: start_time = float(status_data.get("start_time", 0) or 0) if (current_time - start_time) > 600: # 10分钟 completed_tasks.append(account_id) for account_id in completed_tasks: safe_remove_task_status(account_id) if completed_tasks: logger.debug(f"已清理 {len(completed_tasks)} 个已完成任务状态") try: import os while True: try: pid, status = os.waitpid(-1, os.WNOHANG) if pid == 0: break logger.debug(f"已回收僵尸进程: PID={pid}") except ChildProcessError: break except Exception: pass deleted_batches = safe_cleanup_expired_batches(BATCH_TASK_EXPIRE_SECONDS, current_time) if deleted_batches: logger.debug(f"已清理 {deleted_batches} 个过期批次任务缓存") deleted_random = safe_cleanup_expired_pending_random(PENDING_RANDOM_EXPIRE_SECONDS, current_time) if deleted_random: logger.debug(f"已清理 {deleted_random} 个过期随机延迟任务") def start_cleanup_scheduler() -> None: """启动定期清理调度器""" def cleanup_loop(): while True: try: time.sleep(300) # 每5分钟执行一次清理 cleanup_expired_data() except Exception as e: logger.error(f"清理任务执行失败: {e}") cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="cleanup-scheduler") cleanup_thread.start() logger.info("内存清理调度器已启动")