Phase 1 - 威胁检测引擎: - security/threat_detector.py: JNDI/SQL/XSS/路径遍历/命令注入检测 - security/constants.py: 威胁检测规则和评分常量 - 数据库表: threat_events, ip_risk_scores, user_risk_scores, ip_blacklist Phase 2 - 风险评分与黑名单: - security/risk_scorer.py: IP/用户风险评分引擎,支持分数衰减 - security/blacklist.py: 黑名单管理,自动封禁规则 Phase 3 - 响应策略: - security/honeypot.py: 蜜罐响应生成器 - security/response_handler.py: 渐进式响应策略 Phase 4 - 集成: - security/middleware.py: Flask安全中间件 - routes/admin_api/security.py: 管理后台安全仪表板API - 36个测试用例全部通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
132 lines
4.2 KiB
Python
132 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
import random
|
||
import time
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
from typing import Any, Optional
|
||
|
||
from app_logger import get_logger
|
||
|
||
|
||
class ResponseAction(Enum):
|
||
ALLOW = "allow" # 正常放行
|
||
ENHANCE_CAPTCHA = "enhance_captcha" # 增强验证码
|
||
DELAY = "delay" # 静默延迟
|
||
HONEYPOT = "honeypot" # 蜜罐响应
|
||
BLOCK = "block" # 直接拒绝
|
||
|
||
|
||
@dataclass
|
||
class ResponseStrategy:
|
||
action: ResponseAction
|
||
delay_seconds: float = 0
|
||
captcha_level: int = 1 # 1=普通4位, 2=6位, 3=滑块
|
||
message: str | None = None
|
||
|
||
|
||
class ResponseHandler:
|
||
"""响应策略处理器"""
|
||
|
||
def __init__(self, *, rng: Optional[random.Random] = None) -> None:
|
||
self._rng = rng or random.SystemRandom()
|
||
self._logger = get_logger("app")
|
||
|
||
def get_strategy(self, risk_score: int, is_banned: bool = False) -> ResponseStrategy:
|
||
"""
|
||
根据风险分获取响应策略
|
||
|
||
0-20分: ALLOW, 无延迟, 普通验证码
|
||
21-40分: ALLOW, 无延迟, 6位验证码
|
||
41-60分: DELAY, 1-2秒延迟
|
||
61-80分: DELAY, 2-5秒延迟
|
||
81-100分: HONEYPOT, 3-8秒延迟
|
||
已封禁: BLOCK
|
||
"""
|
||
score = self._normalize_risk_score(risk_score)
|
||
|
||
if is_banned:
|
||
strategy = ResponseStrategy(action=ResponseAction.BLOCK, message="访问被拒绝")
|
||
self._logger.warning("响应策略: BLOCK (banned=%s, risk_score=%s)", is_banned, score)
|
||
return strategy
|
||
|
||
if score <= 20:
|
||
strategy = ResponseStrategy(action=ResponseAction.ALLOW, delay_seconds=0, captcha_level=1)
|
||
elif score <= 40:
|
||
strategy = ResponseStrategy(action=ResponseAction.ALLOW, delay_seconds=0, captcha_level=2)
|
||
elif score <= 60:
|
||
strategy = ResponseStrategy(action=ResponseAction.DELAY, delay_seconds=float(self._rng.uniform(1.0, 2.0)))
|
||
elif score <= 80:
|
||
strategy = ResponseStrategy(action=ResponseAction.DELAY, delay_seconds=float(self._rng.uniform(2.0, 5.0)))
|
||
else:
|
||
strategy = ResponseStrategy(action=ResponseAction.HONEYPOT, delay_seconds=float(self._rng.uniform(3.0, 8.0)))
|
||
|
||
strategy.captcha_level = self._normalize_captcha_level(strategy.captcha_level)
|
||
|
||
self._logger.info(
|
||
"响应策略: action=%s risk_score=%s delay=%.3f captcha_level=%s",
|
||
strategy.action.value,
|
||
score,
|
||
float(strategy.delay_seconds or 0),
|
||
int(strategy.captcha_level),
|
||
)
|
||
return strategy
|
||
|
||
def apply_delay(self, strategy: ResponseStrategy):
|
||
"""应用延迟(使用time.sleep)"""
|
||
if strategy is None:
|
||
return
|
||
delay = 0.0
|
||
try:
|
||
delay = float(getattr(strategy, "delay_seconds", 0) or 0)
|
||
except Exception:
|
||
delay = 0.0
|
||
|
||
if delay <= 0:
|
||
return
|
||
|
||
self._logger.debug("应用延迟: action=%s delay=%.3f", getattr(strategy.action, "value", strategy.action), delay)
|
||
time.sleep(delay)
|
||
|
||
def get_captcha_requirement(self, strategy: ResponseStrategy) -> dict:
|
||
"""返回验证码要求 {"required": True, "level": 2}"""
|
||
level = 1
|
||
try:
|
||
level = int(getattr(strategy, "captcha_level", 1) or 1)
|
||
except Exception:
|
||
level = 1
|
||
level = self._normalize_captcha_level(level)
|
||
|
||
required = True
|
||
try:
|
||
required = getattr(strategy, "action", None) != ResponseAction.BLOCK
|
||
except Exception:
|
||
required = True
|
||
|
||
payload = {"required": bool(required), "level": level}
|
||
self._logger.debug("验证码要求: %s", payload)
|
||
return payload
|
||
|
||
# ==================== Internal ====================
|
||
|
||
def _normalize_risk_score(self, risk_score: Any) -> int:
|
||
try:
|
||
score = int(risk_score)
|
||
except Exception:
|
||
score = 0
|
||
return max(0, min(100, score))
|
||
|
||
def _normalize_captcha_level(self, level: Any) -> int:
|
||
try:
|
||
i = int(level)
|
||
except Exception:
|
||
i = 1
|
||
if i <= 1:
|
||
return 1
|
||
if i == 2:
|
||
return 2
|
||
return 3
|
||
|