Phase 1 - 威胁检测引擎: - security/threat_detector.py: JNDI/SQL/XSS/路径遍历/命令注入检测 - security/constants.py: 威胁检测规则和评分常量 - 数据库表: threat_events, ip_risk_scores, user_risk_scores, ip_blacklist Phase 2 - 风险评分与黑名单: - security/risk_scorer.py: IP/用户风险评分引擎,支持分数衰减 - security/blacklist.py: 黑名单管理,自动封禁规则 Phase 3 - 响应策略: - security/honeypot.py: 蜜罐响应生成器 - security/response_handler.py: 渐进式响应策略 Phase 4 - 集成: - security/middleware.py: Flask安全中间件 - routes/admin_api/security.py: 管理后台安全仪表板API - 36个测试用例全部通过 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
38 lines
1.3 KiB
Python
38 lines
1.3 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
from functools import wraps
|
||
|
||
from flask import jsonify, redirect, request, session, url_for
|
||
|
||
from services.runtime import get_logger
|
||
|
||
|
||
def admin_required(f):
|
||
"""管理员权限装饰器(不改变原有接口/行为)。"""
|
||
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
try:
|
||
logger = get_logger()
|
||
except Exception:
|
||
import logging
|
||
|
||
logger = logging.getLogger("app")
|
||
logger.debug(f"[admin_required] 检查会话,admin_id存在: {'admin_id' in session}")
|
||
if "admin_id" not in session:
|
||
logger.warning(f"[admin_required] 拒绝访问 {request.path} - session中无admin_id")
|
||
is_api = (
|
||
request.blueprint in {"admin_api", "admin_security", "admin_security_yuyx"}
|
||
or request.path.startswith("/yuyx/api")
|
||
or request.path.startswith("/api/admin")
|
||
)
|
||
if is_api:
|
||
return jsonify({"error": "需要管理员权限"}), 403
|
||
return redirect(url_for("pages.admin_login_page"))
|
||
logger.info(f"[admin_required] 管理员 {session.get('admin_username')} 访问 {request.path}")
|
||
return f(*args, **kwargs)
|
||
|
||
return decorated_function
|