From ebfac7266bb5c6ad5a3c6c9ffc4bc883e0302fd5 Mon Sep 17 00:00:00 2001 From: yuyx <237899745@qq.com> Date: Mon, 9 Feb 2026 09:14:47 +0800 Subject: [PATCH] security: harden proxy IP trust, token flow, health and sessions --- README.md | 5 +- app_security.py | 97 ++++++++++++++++++++++++++++-------- email_service.py | 103 ++++++++++++++++++++------------------- routes/admin_api/core.py | 14 ++++-- routes/api_auth.py | 33 +++++++++++-- routes/api_user.py | 5 +- routes/health.py | 21 +++++++- 7 files changed, 199 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 5e8c341..320ddea 100644 --- a/README.md +++ b/README.md @@ -201,9 +201,12 @@ cd /www/wwwroot/zsglpt ```bash mkdir -p data logs 截图 -chmod 777 data logs 截图 +chown -R 1000:1000 data logs 截图 +chmod 750 data logs 截图 ``` +> 说明:避免使用 `chmod 777`。如容器内运行用户不是 `1000:1000`,请改为实际 UID/GID。 + ### 步骤5: 构建并启动Docker容器 ```bash diff --git a/app_security.py b/app_security.py index 281b0fd..4bce639 100755 --- a/app_security.py +++ b/app_security.py @@ -453,30 +453,87 @@ def get_client_ip(trust_proxy=False): return request.remote_addr +def _load_trusted_proxy_networks(): + """加载可信代理 CIDR 列表。""" + default_cidrs = "127.0.0.1/32,::1/128,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,fc00::/7" + raw = str(os.environ.get("TRUSTED_PROXY_CIDRS", default_cidrs) or "").strip() + if not raw: + return [] + + networks = [] + for segment in raw.split(","): + cidr_text = str(segment or "").strip() + if not cidr_text: + continue + try: + networks.append(ipaddress.ip_network(cidr_text, strict=False)) + except ValueError: + continue + return networks + + +_TRUSTED_PROXY_NETWORKS = _load_trusted_proxy_networks() + + +def _parse_ip_address(candidate: str): + try: + return ipaddress.ip_address(str(candidate or "").strip()) + except ValueError: + return None + + +def _is_trusted_proxy_ip(ip_obj) -> bool: + if ip_obj is None: + return False + for network in _TRUSTED_PROXY_NETWORKS: + try: + if ip_obj.version != network.version: + continue + if ip_obj in network: + return True + except Exception: + continue + return False + + +def _extract_real_ip_from_forwarded_chain() -> str | None: + """基于 X-Forwarded-For 链反向提取最靠近应用侧的“非代理”来源 IP。""" + forwarded = str(request.headers.get("X-Forwarded-For", "") or "") + candidates = [] + for segment in forwarded.split(","): + ip_text = str(segment or "").strip() + ip_obj = _parse_ip_address(ip_text) + if ip_obj is None: + continue + candidates.append((str(ip_obj), ip_obj)) + + # 若存在 X-Forwarded-For,按“从右到左”剥离可信代理。 + if candidates: + for ip_text, ip_obj in reversed(candidates): + if _is_trusted_proxy_ip(ip_obj): + continue + return ip_text + return candidates[0][0] + + real_ip_text = str(request.headers.get("X-Real-IP", "") or "").strip() + real_ip_obj = _parse_ip_address(real_ip_text) + if real_ip_obj is None: + return None + return str(real_ip_obj) + + def get_rate_limit_ip() -> str: """在可信代理场景下取真实IP,用于限流/风控。""" remote_addr = request.remote_addr or "" - try: - remote_ip = ipaddress.ip_address(remote_addr) - except ValueError: - remote_ip = None + remote_ip = _parse_ip_address(remote_addr) + if remote_ip is None: + return remote_addr - if remote_ip and (remote_ip.is_private or remote_ip.is_loopback or remote_ip.is_link_local): - forwarded = request.headers.get("X-Forwarded-For", "") - if forwarded: - candidate = forwarded.split(",")[0].strip() - try: - ipaddress.ip_address(candidate) - return candidate - except ValueError: - pass - real_ip = request.headers.get("X-Real-IP", "").strip() - if real_ip: - try: - ipaddress.ip_address(real_ip) - return real_ip - except ValueError: - pass + # 仅当请求来自可信代理时才信任转发头。 + if _is_trusted_proxy_ip(remote_ip): + forwarded_real_ip = _extract_real_ip_from_forwarded_chain() + if forwarded_real_ip: + return forwarded_real_ip return remote_addr diff --git a/email_service.py b/email_service.py index 32fcdb8..ca490ce 100644 --- a/email_service.py +++ b/email_service.py @@ -1370,20 +1370,18 @@ def generate_email_token(email: str, token_type: str, user_id: int = None) -> st return token -def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]: - """ - 验证Token - - Returns: - 成功返回 {'user_id': int, 'email': str},失败返回 None - """ +def _get_email_token_payload(token: str, token_type: str) -> Optional[Dict[str, Any]]: + """获取并校验邮件Token(不消费)。""" with db_pool.get_db() as conn: cursor = conn.cursor() - cursor.execute(""" + cursor.execute( + """ SELECT id, user_id, email, expires_at, used FROM email_tokens WHERE token = ? AND token_type = ? - """, (token, token_type)) + """, + (token, token_type), + ) row = cursor.fetchone() if not row: @@ -1391,21 +1389,55 @@ def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]: token_id, user_id, email, expires_at, used = row - # 检查是否已使用 if used: return None - # 检查是否过期 if parse_datetime(expires_at) < datetime.now(BEIJING_TZ): return None - # 标记为已使用 - cursor.execute(""" - UPDATE email_tokens SET used = 1 WHERE id = ? - """, (token_id,)) - conn.commit() + return {'token_id': token_id, 'user_id': user_id, 'email': email} - return {'user_id': user_id, 'email': email} + +def consume_email_token(token_id: int) -> bool: + """将邮件Token标记为已使用。""" + with db_pool.get_db() as conn: + cursor = conn.cursor() + cursor.execute( + """ + UPDATE email_tokens + SET used = 1 + WHERE id = ? AND used = 0 + """, + (int(token_id),), + ) + conn.commit() + return cursor.rowcount > 0 + + +def verify_email_token(token: str, token_type: str, *, consume: bool = True) -> Optional[Dict[str, Any]]: + """ + 验证Token + + Args: + token: token字符串 + token_type: token类型 + consume: 是否在验证成功后立刻消费(默认True) + + Returns: + consume=True: {'user_id': int, 'email': str} + consume=False: {'token_id': int, 'user_id': int, 'email': str} + 失败返回 None + """ + payload = _get_email_token_payload(token, token_type) + if not payload: + return None + + if consume: + if not consume_email_token(payload['token_id']): + return None + return {'user_id': payload['user_id'], 'email': payload['email']} + + return payload def check_rate_limit(email: str, token_type: str) -> bool: @@ -1600,29 +1632,7 @@ def verify_password_reset_token(token: str) -> Optional[Dict[str, Any]]: Returns: 成功返回 {'user_id': int, 'email': str},失败返回 None """ - with db_pool.get_db() as conn: - cursor = conn.cursor() - cursor.execute(""" - SELECT id, user_id, email, expires_at, used - FROM email_tokens - WHERE token = ? AND token_type = ? - """, (token, EMAIL_TYPE_RESET)) - - row = cursor.fetchone() - if not row: - return None - - token_id, user_id, email, expires_at, used = row - - # 检查是否已使用 - if used: - return None - - # 检查是否过期 - if parse_datetime(expires_at) < datetime.now(BEIJING_TZ): - return None - - return {'user_id': user_id, 'email': email, 'token_id': token_id} + return verify_email_token(token, EMAIL_TYPE_RESET, consume=False) def confirm_password_reset(token: str) -> Optional[Dict[str, Any]]: @@ -1636,13 +1646,8 @@ def confirm_password_reset(token: str) -> Optional[Dict[str, Any]]: if not result: return None - # 标记为已使用 - with db_pool.get_db() as conn: - cursor = conn.cursor() - cursor.execute(""" - UPDATE email_tokens SET used = 1 WHERE id = ? - """, (result['token_id'],)) - conn.commit() + if not consume_email_token(result['token_id']): + return None return {'user_id': result['user_id'], 'email': result['email']} @@ -1706,14 +1711,14 @@ def send_bind_email_verification( ) -def verify_bind_email_token(token: str) -> Optional[Dict[str, Any]]: +def verify_bind_email_token(token: str, *, consume: bool = True) -> Optional[Dict[str, Any]]: """ 验证邮箱绑定Token Returns: 成功返回 {'user_id': int, 'email': str},失败返回 None """ - return verify_email_token(token, EMAIL_TYPE_BIND) + return verify_email_token(token, EMAIL_TYPE_BIND, consume=consume) def send_security_alert_email( diff --git a/routes/admin_api/core.py b/routes/admin_api/core.py index 3449169..526f111 100644 --- a/routes/admin_api/core.py +++ b/routes/admin_api/core.py @@ -3,6 +3,8 @@ from __future__ import annotations import os +import stat +import tempfile import time import database @@ -153,6 +155,9 @@ def admin_logout(): session.pop("admin_id", None) session.pop("admin_username", None) session.pop("admin_reauth_until", None) + session.pop("_user_id", None) + session.pop("_fresh", None) + session.pop("_id", None) return jsonify({"success": True}) @@ -200,11 +205,14 @@ time.sleep(3) os._exit(0) """ - with open("/tmp/restart_container.py", "w") as f: - f.write(restart_script) + with tempfile.NamedTemporaryFile("w", suffix=".py", prefix="restart_container_", delete=False) as temp_file: + temp_file.write(restart_script) + script_path = temp_file.name + + os.chmod(script_path, stat.S_IRUSR | stat.S_IWUSR) subprocess.Popen( - ["python3", "/tmp/restart_container.py"], + ["python3", script_path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, diff --git a/routes/api_auth.py b/routes/api_auth.py index 2e65443..a37a17a 100644 --- a/routes/api_auth.py +++ b/routes/api_auth.py @@ -15,7 +15,7 @@ import email_service from app_config import get_config from app_logger import get_logger from app_security import get_rate_limit_ip, require_ip_not_locked, validate_email, validate_password, validate_username -from flask import Blueprint, jsonify, request +from flask import Blueprint, jsonify, request, session from flask_login import login_required, login_user, logout_user from routes.pages import render_app_spa_or_legacy from services.accounts_service import load_user_accounts @@ -279,19 +279,38 @@ def register(): @api_auth_bp.route("/api/verify-email/") def verify_email(token): """验证邮箱 - 用户点击邮件中的链接""" - result = email_service.verify_email_token(token, email_service.EMAIL_TYPE_REGISTER) + result = email_service.verify_email_token(token, email_service.EMAIL_TYPE_REGISTER, consume=False) if result: + token_id = result["token_id"] user_id = result["user_id"] email = result["email"] - database.approve_user(user_id) + if not database.approve_user(user_id): + logger.error(f"用户邮箱验证失败: 用户审核更新失败 user_id={user_id}, email={email}") + error_message = "验证处理失败,请稍后重试" + spa_initial_state = { + "page": "verify_result", + "success": False, + "title": "验证失败", + "error_message": error_message, + "primary_label": "返回登录", + "primary_url": "/login", + } + return render_app_spa_or_legacy( + "verify_failed.html", + legacy_context={"error_message": error_message}, + spa_initial_state=spa_initial_state, + ) system_config = database.get_system_config() auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7) if auto_approve_vip_days > 0: database.set_user_vip(user_id, auto_approve_vip_days) + if not email_service.consume_email_token(token_id): + logger.warning(f"用户邮箱验证后Token消费失败: token_id={token_id}, user_id={user_id}") + logger.info(f"用户邮箱验证成功: user_id={user_id}, email={email}") spa_initial_state = { "page": "verify_result", @@ -499,6 +518,11 @@ def reset_password_confirm(): @api_auth_bp.route("/api/generate_captcha", methods=["POST"]) def generate_captcha(): """生成4位数字验证码图片""" + client_ip = get_rate_limit_ip() + allowed, error_msg = check_ip_request_rate(client_ip, "login") + if not allowed: + return jsonify({"error": error_msg}), 429 + session_id = str(uuid.uuid4()) code = "".join(str(secrets.randbelow(10)) for _ in range(4)) @@ -578,4 +602,7 @@ def login(): def logout(): """用户登出""" logout_user() + session.pop("admin_id", None) + session.pop("admin_username", None) + session.pop("admin_reauth_until", None) return jsonify({"success": True}) diff --git a/routes/api_user.py b/routes/api_user.py index 8ccaece..c0b4831 100644 --- a/routes/api_user.py +++ b/routes/api_user.py @@ -321,13 +321,16 @@ def bind_user_email(): @api_user_bp.route("/api/verify-bind-email/") def verify_bind_email(token): """验证邮箱绑定Token""" - result = email_service.verify_bind_email_token(token) + result = email_service.verify_bind_email_token(token, consume=False) if result: + token_id = result["token_id"] user_id = result["user_id"] email = result["email"] if database.update_user_email(user_id, email, verified=True): + if not email_service.consume_email_token(token_id): + logger.warning(f"邮箱绑定成功但Token消费失败: token_id={token_id}, user_id={user_id}") return _render_verify_bind_success(email) return _render_verify_bind_failed(title="绑定失败", error_message="邮箱绑定失败,请重试") diff --git a/routes/health.py b/routes/health.py index 5dfe5e9..820cbe4 100644 --- a/routes/health.py +++ b/routes/health.py @@ -14,6 +14,18 @@ from services.time_utils import get_beijing_now health_bp = Blueprint("health", __name__) _PROCESS_START_TS = time.time() +_INCLUDE_HEALTH_METRICS = str(os.environ.get("HEALTH_INCLUDE_METRICS", "0")).strip().lower() in { + "1", + "true", + "yes", + "on", +} +_EXPOSE_HEALTH_ERRORS = str(os.environ.get("HEALTH_EXPOSE_ERRORS", "0")).strip().lower() in { + "1", + "true", + "yes", + "on", +} def _build_runtime_metrics() -> dict: @@ -75,13 +87,18 @@ def health_check(): database.get_system_config() except Exception as e: db_ok = False - db_error = f"{type(e).__name__}: {e}" + if _EXPOSE_HEALTH_ERRORS: + db_error = f"{type(e).__name__}: {e}" + else: + db_error = "db_unavailable" payload = { "ok": db_ok, "time": get_beijing_now().strftime("%Y-%m-%d %H:%M:%S"), "db_ok": db_ok, "db_error": db_error, - "metrics": _build_runtime_metrics(), } + if _INCLUDE_HEALTH_METRICS: + payload["metrics"] = _build_runtime_metrics() + return jsonify(payload), (200 if db_ok else 500)