Files
zsglpt/security/honeypot.py
Yu Yon 53c78e8e3c feat: 添加安全模块 + Dockerfile添加curl支持健康检查
主要更新:
- 新增 security/ 安全模块 (风险评估、威胁检测、蜜罐等)
- Dockerfile 添加 curl 以支持 Docker 健康检查
- 前端页面更新 (管理后台、用户端)
- 数据库迁移和 schema 更新
- 新增 kdocs 上传服务
- 添加安全相关测试用例

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-08 17:48:33 +08:00

127 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
import uuid
from typing import Any, Optional
from app_logger import get_logger
class HoneypotResponder:
"""蜜罐响应生成器 - 返回假成功响应,欺骗攻击者"""
def __init__(self, *, rng: Optional[random.Random] = None) -> None:
self._rng = rng or random.SystemRandom()
self._logger = get_logger("app")
def generate_fake_response(self, endpoint: str, original_data: dict = None) -> dict:
"""
根据端点生成假的成功响应
策略:
- 邮件发送类: {"success": True, "message": "邮件已发送"}
- 注册类: {"success": True, "user_id": fake_uuid}
- 登录类: {"success": True} 但不设置session
- 通用: {"success": True, "message": "操作成功"}
"""
endpoint_text = str(endpoint or "").strip()
endpoint_lc = endpoint_text.lower()
category = self._classify_endpoint(endpoint_lc)
response: dict[str, Any] = {"success": True}
if category == "email":
response["message"] = "邮件已发送"
elif category == "register":
response["user_id"] = str(uuid.uuid4())
elif category == "login":
# 登录类:保持正常成功响应,但不进行任何 session / token 设置(调用方负责不写 session
pass
else:
response["message"] = "操作成功"
response = self._merge_safe_fields(response, original_data)
self._logger.warning(
"蜜罐响应已生成: endpoint=%s, category=%s, keys=%s",
endpoint_text[:256],
category,
sorted(response.keys()),
)
return response
def should_use_honeypot(self, risk_score: int) -> bool:
"""风险分>=80使用蜜罐响应"""
score = self._normalize_risk_score(risk_score)
use = score >= 80
self._logger.debug("蜜罐判定: risk_score=%s => %s", score, use)
return use
def delay_response(self, risk_score: int) -> float:
"""
根据风险分计算延迟时间
0-20: 0秒
21-50: 随机0.5-1秒
51-80: 随机1-3秒
81-100: 随机3-8秒蜜罐模式额外延迟消耗攻击者时间
"""
score = self._normalize_risk_score(risk_score)
delay = 0.0
if score <= 20:
delay = 0.0
elif score <= 50:
delay = float(self._rng.uniform(0.5, 1.0))
elif score <= 80:
delay = float(self._rng.uniform(1.0, 3.0))
else:
delay = float(self._rng.uniform(3.0, 8.0))
self._logger.debug("蜜罐延迟计算: risk_score=%s => delay_seconds=%.3f", score, delay)
return delay
# ==================== Internal ====================
def _normalize_risk_score(self, risk_score: Any) -> int:
try:
score = int(risk_score)
except Exception:
score = 0
return max(0, min(100, score))
def _classify_endpoint(self, endpoint_lc: str) -> str:
if not endpoint_lc:
return "generic"
# 先匹配更具体的:注册 / 登录
if any(k in endpoint_lc for k in ["/register", "register", "signup", "sign-up"]):
return "register"
if any(k in endpoint_lc for k in ["/login", "login", "signin", "sign-in"]):
return "login"
# 邮件相关:发送验证码 / 重置密码 / 重发验证等
if any(k in endpoint_lc for k in ["email", "mail", "forgot-password", "reset-password", "resend-verify"]):
return "email"
return "generic"
def _merge_safe_fields(self, base: dict, original_data: Optional[dict]) -> dict:
if not isinstance(original_data, dict) or not original_data:
return base
# 避免把攻击者输入或真实业务结果回显得太明显;仅合并少量“形状字段”
safe_bool_keys = {"need_verify", "need_captcha"}
merged = dict(base)
for key in safe_bool_keys:
if key in original_data and key not in merged:
try:
merged[key] = bool(original_data.get(key))
except Exception:
continue
return merged