security: harden proxy IP trust, token flow, health and sessions

This commit is contained in:
2026-02-09 09:14:47 +08:00
parent f645a0f8ea
commit ebfac7266b
7 changed files with 199 additions and 79 deletions

View File

@@ -453,30 +453,87 @@ def get_client_ip(trust_proxy=False):
return request.remote_addr
def _load_trusted_proxy_networks():
"""加载可信代理 CIDR 列表。"""
default_cidrs = "127.0.0.1/32,::1/128,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fc00::/7"
raw = str(os.environ.get("TRUSTED_PROXY_CIDRS", default_cidrs) or "").strip()
if not raw:
return []
networks = []
for segment in raw.split(","):
cidr_text = str(segment or "").strip()
if not cidr_text:
continue
try:
networks.append(ipaddress.ip_network(cidr_text, strict=False))
except ValueError:
continue
return networks
_TRUSTED_PROXY_NETWORKS = _load_trusted_proxy_networks()
def _parse_ip_address(candidate: str):
try:
return ipaddress.ip_address(str(candidate or "").strip())
except ValueError:
return None
def _is_trusted_proxy_ip(ip_obj) -> bool:
if ip_obj is None:
return False
for network in _TRUSTED_PROXY_NETWORKS:
try:
if ip_obj.version != network.version:
continue
if ip_obj in network:
return True
except Exception:
continue
return False
def _extract_real_ip_from_forwarded_chain() -> str | None:
"""基于 X-Forwarded-For 链反向提取最靠近应用侧的“非代理”来源 IP。"""
forwarded = str(request.headers.get("X-Forwarded-For", "") or "")
candidates = []
for segment in forwarded.split(","):
ip_text = str(segment or "").strip()
ip_obj = _parse_ip_address(ip_text)
if ip_obj is None:
continue
candidates.append((str(ip_obj), ip_obj))
# 若存在 X-Forwarded-For按“从右到左”剥离可信代理。
if candidates:
for ip_text, ip_obj in reversed(candidates):
if _is_trusted_proxy_ip(ip_obj):
continue
return ip_text
return candidates[0][0]
real_ip_text = str(request.headers.get("X-Real-IP", "") or "").strip()
real_ip_obj = _parse_ip_address(real_ip_text)
if real_ip_obj is None:
return None
return str(real_ip_obj)
def get_rate_limit_ip() -> str:
"""在可信代理场景下取真实IP用于限流/风控。"""
remote_addr = request.remote_addr or ""
try:
remote_ip = ipaddress.ip_address(remote_addr)
except ValueError:
remote_ip = None
remote_ip = _parse_ip_address(remote_addr)
if remote_ip is None:
return remote_addr
if remote_ip and (remote_ip.is_private or remote_ip.is_loopback or remote_ip.is_link_local):
forwarded = request.headers.get("X-Forwarded-For", "")
if forwarded:
candidate = forwarded.split(",")[0].strip()
try:
ipaddress.ip_address(candidate)
return candidate
except ValueError:
pass
real_ip = request.headers.get("X-Real-IP", "").strip()
if real_ip:
try:
ipaddress.ip_address(real_ip)
return real_ip
except ValueError:
pass
# 仅当请求来自可信代理时才信任转发头。
if _is_trusted_proxy_ip(remote_ip):
forwarded_real_ip = _extract_real_ip_from_forwarded_chain()
if forwarded_real_ip:
return forwarded_real_ip
return remote_addr