Files
zsglpt/security/response_handler.py
yuyx 46253337eb feat: 实现完整安全防护系统
Phase 1 - 威胁检测引擎:
- security/threat_detector.py: JNDI/SQL/XSS/路径遍历/命令注入检测
- security/constants.py: 威胁检测规则和评分常量
- 数据库表: threat_events, ip_risk_scores, user_risk_scores, ip_blacklist

Phase 2 - 风险评分与黑名单:
- security/risk_scorer.py: IP/用户风险评分引擎,支持分数衰减
- security/blacklist.py: 黑名单管理,自动封禁规则

Phase 3 - 响应策略:
- security/honeypot.py: 蜜罐响应生成器
- security/response_handler.py: 渐进式响应策略

Phase 4 - 集成:
- security/middleware.py: Flask安全中间件
- routes/admin_api/security.py: 管理后台安全仪表板API
- 36个测试用例全部通过

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-27 01:28:38 +08:00

132 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
from app_logger import get_logger
class ResponseAction(Enum):
ALLOW = "allow" # 正常放行
ENHANCE_CAPTCHA = "enhance_captcha" # 增强验证码
DELAY = "delay" # 静默延迟
HONEYPOT = "honeypot" # 蜜罐响应
BLOCK = "block" # 直接拒绝
@dataclass
class ResponseStrategy:
action: ResponseAction
delay_seconds: float = 0
captcha_level: int = 1 # 1=普通4位, 2=6位, 3=滑块
message: str | None = None
class ResponseHandler:
"""响应策略处理器"""
def __init__(self, *, rng: Optional[random.Random] = None) -> None:
self._rng = rng or random.SystemRandom()
self._logger = get_logger("app")
def get_strategy(self, risk_score: int, is_banned: bool = False) -> ResponseStrategy:
"""
根据风险分获取响应策略
0-20分: ALLOW, 无延迟, 普通验证码
21-40分: ALLOW, 无延迟, 6位验证码
41-60分: DELAY, 1-2秒延迟
61-80分: DELAY, 2-5秒延迟
81-100分: HONEYPOT, 3-8秒延迟
已封禁: BLOCK
"""
score = self._normalize_risk_score(risk_score)
if is_banned:
strategy = ResponseStrategy(action=ResponseAction.BLOCK, message="访问被拒绝")
self._logger.warning("响应策略: BLOCK (banned=%s, risk_score=%s)", is_banned, score)
return strategy
if score <= 20:
strategy = ResponseStrategy(action=ResponseAction.ALLOW, delay_seconds=0, captcha_level=1)
elif score <= 40:
strategy = ResponseStrategy(action=ResponseAction.ALLOW, delay_seconds=0, captcha_level=2)
elif score <= 60:
strategy = ResponseStrategy(action=ResponseAction.DELAY, delay_seconds=float(self._rng.uniform(1.0, 2.0)))
elif score <= 80:
strategy = ResponseStrategy(action=ResponseAction.DELAY, delay_seconds=float(self._rng.uniform(2.0, 5.0)))
else:
strategy = ResponseStrategy(action=ResponseAction.HONEYPOT, delay_seconds=float(self._rng.uniform(3.0, 8.0)))
strategy.captcha_level = self._normalize_captcha_level(strategy.captcha_level)
self._logger.info(
"响应策略: action=%s risk_score=%s delay=%.3f captcha_level=%s",
strategy.action.value,
score,
float(strategy.delay_seconds or 0),
int(strategy.captcha_level),
)
return strategy
def apply_delay(self, strategy: ResponseStrategy):
"""应用延迟使用time.sleep"""
if strategy is None:
return
delay = 0.0
try:
delay = float(getattr(strategy, "delay_seconds", 0) or 0)
except Exception:
delay = 0.0
if delay <= 0:
return
self._logger.debug("应用延迟: action=%s delay=%.3f", getattr(strategy.action, "value", strategy.action), delay)
time.sleep(delay)
def get_captcha_requirement(self, strategy: ResponseStrategy) -> dict:
"""返回验证码要求 {"required": True, "level": 2}"""
level = 1
try:
level = int(getattr(strategy, "captcha_level", 1) or 1)
except Exception:
level = 1
level = self._normalize_captcha_level(level)
required = True
try:
required = getattr(strategy, "action", None) != ResponseAction.BLOCK
except Exception:
required = True
payload = {"required": bool(required), "level": level}
self._logger.debug("验证码要求: %s", payload)
return payload
# ==================== Internal ====================
def _normalize_risk_score(self, risk_score: Any) -> int:
try:
score = int(risk_score)
except Exception:
score = 0
return max(0, min(100, score))
def _normalize_captcha_level(self, level: Any) -> int:
try:
i = int(level)
except Exception:
i = 1
if i <= 1:
return 1
if i == 2:
return 2
return 3