#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import random import uuid from typing import Any, Optional from app_logger import get_logger class HoneypotResponder: """蜜罐响应生成器 - 返回假成功响应,欺骗攻击者""" def __init__(self, *, rng: Optional[random.Random] = None) -> None: self._rng = rng or random.SystemRandom() self._logger = get_logger("app") def generate_fake_response(self, endpoint: str, original_data: dict = None) -> dict: """ 根据端点生成假的成功响应 策略: - 邮件发送类: {"success": True, "message": "邮件已发送"} - 注册类: {"success": True, "user_id": fake_uuid} - 登录类: {"success": True} 但不设置session - 通用: {"success": True, "message": "操作成功"} """ endpoint_text = str(endpoint or "").strip() endpoint_lc = endpoint_text.lower() category = self._classify_endpoint(endpoint_lc) response: dict[str, Any] = {"success": True} if category == "email": response["message"] = "邮件已发送" elif category == "register": response["user_id"] = str(uuid.uuid4()) elif category == "login": # 登录类:保持正常成功响应,但不进行任何 session / token 设置(调用方负责不写 session) pass else: response["message"] = "操作成功" response = self._merge_safe_fields(response, original_data) self._logger.warning( "蜜罐响应已生成: endpoint=%s, category=%s, keys=%s", endpoint_text[:256], category, sorted(response.keys()), ) return response def should_use_honeypot(self, risk_score: int) -> bool: """风险分>=80使用蜜罐响应""" score = self._normalize_risk_score(risk_score) use = score >= 80 self._logger.debug("蜜罐判定: risk_score=%s => %s", score, use) return use def delay_response(self, risk_score: int) -> float: """ 根据风险分计算延迟时间 0-20: 0秒 21-50: 随机0.5-1秒 51-80: 随机1-3秒 81-100: 随机3-8秒(蜜罐模式额外延迟消耗攻击者时间) """ score = self._normalize_risk_score(risk_score) delay = 0.0 if score <= 20: delay = 0.0 elif score <= 50: delay = float(self._rng.uniform(0.5, 1.0)) elif score <= 80: delay = float(self._rng.uniform(1.0, 3.0)) else: delay = float(self._rng.uniform(3.0, 8.0)) self._logger.debug("蜜罐延迟计算: risk_score=%s => delay_seconds=%.3f", score, delay) return delay # ==================== Internal ==================== def _normalize_risk_score(self, risk_score: Any) -> int: try: score = int(risk_score) except Exception: score = 0 return max(0, min(100, score)) def _classify_endpoint(self, endpoint_lc: str) -> str: if not endpoint_lc: return "generic" # 先匹配更具体的:注册 / 登录 if any(k in endpoint_lc for k in ["/register", "register", "signup", "sign-up"]): return "register" if any(k in endpoint_lc for k in ["/login", "login", "signin", "sign-in"]): return "login" # 邮件相关:发送验证码 / 重置密码 / 重发验证等 if any(k in endpoint_lc for k in ["email", "mail", "forgot-password", "reset-password", "resend-verify"]): return "email" return "generic" def _merge_safe_fields(self, base: dict, original_data: Optional[dict]) -> dict: if not isinstance(original_data, dict) or not original_data: return base # 避免把攻击者输入或真实业务结果回显得太明显;仅合并少量“形状字段” safe_bool_keys = {"need_verify", "need_captcha"} merged = dict(base) for key in safe_bool_keys: if key in original_data and key not in merged: try: merged[key] = bool(original_data.get(key)) except Exception: continue return merged