#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import random import time from dataclasses import dataclass from enum import Enum from typing import Any, Optional from app_logger import get_logger class ResponseAction(Enum): ALLOW = "allow" # 正常放行 ENHANCE_CAPTCHA = "enhance_captcha" # 增强验证码 DELAY = "delay" # 静默延迟 HONEYPOT = "honeypot" # 蜜罐响应 BLOCK = "block" # 直接拒绝 @dataclass class ResponseStrategy: action: ResponseAction delay_seconds: float = 0 captcha_level: int = 1 # 1=普通4位, 2=6位, 3=滑块 message: str | None = None class ResponseHandler: """响应策略处理器""" def __init__(self, *, rng: Optional[random.Random] = None) -> None: self._rng = rng or random.SystemRandom() self._logger = get_logger("app") def get_strategy(self, risk_score: int, is_banned: bool = False) -> ResponseStrategy: """ 根据风险分获取响应策略 0-20分: ALLOW, 无延迟, 普通验证码 21-40分: ALLOW, 无延迟, 6位验证码 41-60分: DELAY, 1-2秒延迟 61-80分: DELAY, 2-5秒延迟 81-100分: HONEYPOT, 3-8秒延迟 已封禁: BLOCK """ score = self._normalize_risk_score(risk_score) if is_banned: strategy = ResponseStrategy(action=ResponseAction.BLOCK, message="访问被拒绝") self._logger.warning("响应策略: BLOCK (banned=%s, risk_score=%s)", is_banned, score) return strategy if score <= 20: strategy = ResponseStrategy(action=ResponseAction.ALLOW, delay_seconds=0, captcha_level=1) elif score <= 40: strategy = ResponseStrategy(action=ResponseAction.ALLOW, delay_seconds=0, captcha_level=2) elif score <= 60: strategy = ResponseStrategy(action=ResponseAction.DELAY, delay_seconds=float(self._rng.uniform(1.0, 2.0))) elif score <= 80: strategy = ResponseStrategy(action=ResponseAction.DELAY, delay_seconds=float(self._rng.uniform(2.0, 5.0))) else: strategy = ResponseStrategy(action=ResponseAction.HONEYPOT, delay_seconds=float(self._rng.uniform(3.0, 8.0))) strategy.captcha_level = self._normalize_captcha_level(strategy.captcha_level) self._logger.info( "响应策略: action=%s risk_score=%s delay=%.3f captcha_level=%s", strategy.action.value, score, float(strategy.delay_seconds or 0), int(strategy.captcha_level), ) return strategy def apply_delay(self, strategy: ResponseStrategy): """应用延迟(使用time.sleep)""" if strategy is None: return delay = 0.0 try: delay = float(getattr(strategy, "delay_seconds", 0) or 0) except Exception: delay = 0.0 if delay <= 0: return self._logger.debug("应用延迟: action=%s delay=%.3f", getattr(strategy.action, "value", strategy.action), delay) time.sleep(delay) def get_captcha_requirement(self, strategy: ResponseStrategy) -> dict: """返回验证码要求 {"required": True, "level": 2}""" level = 1 try: level = int(getattr(strategy, "captcha_level", 1) or 1) except Exception: level = 1 level = self._normalize_captcha_level(level) required = True try: required = getattr(strategy, "action", None) != ResponseAction.BLOCK except Exception: required = True payload = {"required": bool(required), "level": level} self._logger.debug("验证码要求: %s", payload) return payload # ==================== Internal ==================== def _normalize_risk_score(self, risk_score: Any) -> int: try: score = int(risk_score) except Exception: score = 0 return max(0, min(100, score)) def _normalize_captcha_level(self, level: Any) -> int: try: i = int(level) except Exception: i = 1 if i <= 1: return 1 if i == 2: return 2 return 3