refactor: optimize structure, stability and runtime performance

This commit is contained in:
2026-02-07 00:35:11 +08:00
parent fae21329d7
commit bf29ac1924
44 changed files with 6894 additions and 4792 deletions

View File

@@ -2,16 +2,20 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import base64
import random
import secrets
import threading
import time
import uuid
from io import BytesIO
import database
import email_service
from app_config import get_config
from app_logger import get_logger
from app_security import get_rate_limit_ip, require_ip_not_locked, validate_email, validate_password, validate_username
from flask import Blueprint, jsonify, redirect, render_template, request, url_for
from flask import Blueprint, jsonify, request
from flask_login import login_required, login_user, logout_user
from routes.pages import render_app_spa_or_legacy
from services.accounts_service import load_user_accounts
@@ -39,12 +43,162 @@ config = get_config()
api_auth_bp = Blueprint("api_auth", __name__)
_CAPTCHA_FONT_PATHS = [
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
]
_CAPTCHA_FONT = None
_CAPTCHA_FONT_LOCK = threading.Lock()
def _get_json_payload() -> dict:
data = request.get_json(silent=True)
return data if isinstance(data, dict) else {}
def _load_captcha_font(image_font_module):
global _CAPTCHA_FONT
if _CAPTCHA_FONT is not None:
return _CAPTCHA_FONT
with _CAPTCHA_FONT_LOCK:
if _CAPTCHA_FONT is not None:
return _CAPTCHA_FONT
for font_path in _CAPTCHA_FONT_PATHS:
try:
_CAPTCHA_FONT = image_font_module.truetype(font_path, 42)
break
except Exception:
continue
if _CAPTCHA_FONT is None:
_CAPTCHA_FONT = image_font_module.load_default()
return _CAPTCHA_FONT
def _generate_captcha_image_data_uri(code: str) -> str:
from PIL import Image, ImageDraw, ImageFont
width, height = 160, 60
image = Image.new("RGB", (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
for _ in range(6):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line(
[(x1, y1), (x2, y2)],
fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)),
width=1,
)
for _ in range(80):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
font = _load_captcha_font(ImageFont)
for i, char in enumerate(code):
x = 12 + i * 35 + random.randint(-3, 3)
y = random.randint(5, 12)
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
draw.text((x, y), char, font=font, fill=color)
buffer = BytesIO()
image.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return f"data:image/png;base64,{img_base64}"
def _with_vip_suffix(message: str, auto_approve_enabled: bool, auto_approve_vip_days: int) -> str:
if auto_approve_enabled and auto_approve_vip_days > 0:
return f"{message},赠送{auto_approve_vip_days}天VIP"
return message
def _verify_common_captcha(client_ip: str, captcha_session: str, captcha_code: str):
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if success:
return True, None
is_locked = record_failed_captcha(client_ip)
if is_locked:
return False, (jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429)
return False, (jsonify({"error": message}), 400)
def _verify_login_captcha_if_needed(
*,
captcha_required: bool,
captcha_session: str,
captcha_code: str,
client_ip: str,
username_key: str,
):
if not captcha_required:
return True, None
if not captcha_session or not captcha_code:
return False, (jsonify({"error": "请填写验证码", "need_captcha": True}), 400)
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if success:
return True, None
record_login_failure(client_ip, username_key)
return False, (jsonify({"error": message, "need_captcha": True}), 400)
def _send_password_reset_email_if_possible(email: str, username: str, user_id: int) -> None:
result = email_service.send_password_reset_email(email=email, username=username, user_id=user_id)
if not result["success"]:
logger.error(f"密码重置邮件发送失败: {result['error']}")
def _send_login_security_alert_if_needed(user: dict, username: str, client_ip: str) -> None:
try:
user_agent = request.headers.get("User-Agent", "")
context = database.record_login_context(user["id"], client_ip, user_agent)
if not context or (not context.get("new_ip") and not context.get("new_device")):
return
if not config.LOGIN_ALERT_ENABLED:
return
if not should_send_login_alert(user["id"], client_ip):
return
if not email_service.get_email_settings().get("login_alert_enabled", True):
return
user_info = database.get_user_by_id(user["id"]) or {}
if (not user_info.get("email")) or (not user_info.get("email_verified")):
return
if not database.get_user_email_notify(user["id"]):
return
email_service.send_security_alert_email(
email=user_info.get("email"),
username=user_info.get("username") or username,
ip_address=client_ip,
user_agent=user_agent,
new_ip=context.get("new_ip", False),
new_device=context.get("new_device", False),
user_id=user["id"],
)
except Exception:
pass
@api_auth_bp.route("/api/register", methods=["POST"])
@require_ip_not_locked
def register():
"""用户注册"""
data = request.json or {}
data = _get_json_payload()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
email = data.get("email", "").strip().lower()
@@ -67,12 +221,9 @@ def register():
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
captcha_ok, captcha_error_response = _verify_common_captcha(client_ip, captcha_session, captcha_code)
if not captcha_ok:
return captcha_error_response
email_settings = email_service.get_email_settings()
email_verify_enabled = email_settings.get("register_verify_enabled", False) and email_settings.get("enabled", False)
@@ -105,20 +256,22 @@ def register():
if email_verify_enabled and email:
result = email_service.send_register_verification_email(email=email, username=username, user_id=user_id)
if result["success"]:
message = "注册成功!验证邮件已发送(可直接登录,建议完成邮箱验证)"
if auto_approve_enabled and auto_approve_vip_days > 0:
message += f",赠送{auto_approve_vip_days}天VIP"
message = _with_vip_suffix(
"注册成功!验证邮件已发送(可直接登录,建议完成邮箱验证)",
auto_approve_enabled,
auto_approve_vip_days,
)
return jsonify({"success": True, "message": message, "need_verify": True})
logger.error(f"注册验证邮件发送失败: {result['error']}")
message = f"注册成功,但验证邮件发送失败({result['error']})。你仍可直接登录"
if auto_approve_enabled and auto_approve_vip_days > 0:
message += f",赠送{auto_approve_vip_days}天VIP"
message = _with_vip_suffix(
f"注册成功,但验证邮件发送失败({result['error']})。你仍可直接登录",
auto_approve_enabled,
auto_approve_vip_days,
)
return jsonify({"success": True, "message": message, "need_verify": True})
message = "注册成功!可直接登录"
if auto_approve_enabled and auto_approve_vip_days > 0:
message += f",赠送{auto_approve_vip_days}天VIP"
message = _with_vip_suffix("注册成功!可直接登录", auto_approve_enabled, auto_approve_vip_days)
return jsonify({"success": True, "message": message})
return jsonify({"error": "用户名已存在"}), 400
@@ -175,7 +328,7 @@ def verify_email(token):
@require_ip_not_locked
def resend_verify_email():
"""重发验证邮件"""
data = request.json or {}
data = _get_json_payload()
email = data.get("email", "").strip().lower()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
@@ -195,12 +348,9 @@ def resend_verify_email():
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
captcha_ok, captcha_error_response = _verify_common_captcha(client_ip, captcha_session, captcha_code)
if not captcha_ok:
return captcha_error_response
user = database.get_user_by_email(email)
if not user:
@@ -235,7 +385,7 @@ def get_email_verify_status():
@require_ip_not_locked
def forgot_password():
"""发送密码重置邮件"""
data = request.json or {}
data = _get_json_payload()
email = data.get("email", "").strip().lower()
username = data.get("username", "").strip()
captcha_session = data.get("captcha_session", "")
@@ -263,12 +413,9 @@ def forgot_password():
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
captcha_ok, captcha_error_response = _verify_common_captcha(client_ip, captcha_session, captcha_code)
if not captcha_ok:
return captcha_error_response
email_settings = email_service.get_email_settings()
if not email_settings.get("enabled", False):
@@ -293,20 +440,16 @@ def forgot_password():
if not allowed:
return jsonify({"error": error_msg}), 429
result = email_service.send_password_reset_email(
_send_password_reset_email_if_possible(
email=bound_email,
username=user["username"],
user_id=user["id"],
)
if not result["success"]:
logger.error(f"密码重置邮件发送失败: {result['error']}")
return jsonify({"success": True, "message": "如果该账号已绑定邮箱,您将收到密码重置邮件"})
user = database.get_user_by_email(email)
if user and user.get("status") == "approved":
result = email_service.send_password_reset_email(email=email, username=user["username"], user_id=user["id"])
if not result["success"]:
logger.error(f"密码重置邮件发送失败: {result['error']}")
_send_password_reset_email_if_possible(email=email, username=user["username"], user_id=user["id"])
return jsonify({"success": True, "message": "如果该邮箱已注册,您将收到密码重置邮件"})
@@ -331,7 +474,7 @@ def reset_password_page(token):
@api_auth_bp.route("/api/reset-password-confirm", methods=["POST"])
def reset_password_confirm():
"""确认密码重置"""
data = request.json or {}
data = _get_json_payload()
token = data.get("token", "").strip()
new_password = data.get("new_password", "").strip()
@@ -356,67 +499,15 @@ def reset_password_confirm():
@api_auth_bp.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码图片"""
import base64
import uuid
from io import BytesIO
session_id = str(uuid.uuid4())
code = "".join([str(secrets.randbelow(10)) for _ in range(4)])
code = "".join(str(secrets.randbelow(10)) for _ in range(4))
safe_set_captcha(session_id, {"code": code, "expire_time": time.time() + 300, "failed_attempts": 0})
safe_cleanup_expired_captcha()
try:
from PIL import Image, ImageDraw, ImageFont
import io
width, height = 160, 60
image = Image.new("RGB", (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
for _ in range(6):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line(
[(x1, y1), (x2, y2)],
fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)),
width=1,
)
for _ in range(80):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
font = None
font_paths = [
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
]
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, 42)
break
except Exception:
continue
if font is None:
font = ImageFont.load_default()
for i, char in enumerate(code):
x = 12 + i * 35 + random.randint(-3, 3)
y = random.randint(5, 12)
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
draw.text((x, y), char, font=font, fill=color)
buffer = io.BytesIO()
image.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return jsonify({"session_id": session_id, "captcha_image": f"data:image/png;base64,{img_base64}"})
captcha_image = _generate_captcha_image_data_uri(code)
return jsonify({"session_id": session_id, "captcha_image": captcha_image})
except ImportError as e:
logger.error(f"PIL库未安装验证码功能不可用: {e}")
safe_delete_captcha(session_id)
@@ -427,7 +518,7 @@ def generate_captcha():
@require_ip_not_locked
def login():
"""用户登录"""
data = request.json or {}
data = _get_json_payload()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
captcha_session = data.get("captcha_session", "")
@@ -452,13 +543,15 @@ def login():
return jsonify({"error": error_msg, "need_captcha": True}), 429
captcha_required = check_login_captcha_required(client_ip, username_key) or scan_locked or bool(need_captcha)
if captcha_required:
if not captcha_session or not captcha_code:
return jsonify({"error": "请填写验证码", "need_captcha": True}), 400
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
record_login_failure(client_ip, username_key)
return jsonify({"error": message, "need_captcha": True}), 400
captcha_ok, captcha_error_response = _verify_login_captcha_if_needed(
captcha_required=captcha_required,
captcha_session=captcha_session,
captcha_code=captcha_code,
client_ip=client_ip,
username_key=username_key,
)
if not captcha_ok:
return captcha_error_response
user = database.verify_user(username, password)
if not user:
@@ -476,29 +569,7 @@ def login():
login_user(user_obj)
load_user_accounts(user["id"])
try:
user_agent = request.headers.get("User-Agent", "")
context = database.record_login_context(user["id"], client_ip, user_agent)
if context and (context.get("new_ip") or context.get("new_device")):
if (
config.LOGIN_ALERT_ENABLED
and should_send_login_alert(user["id"], client_ip)
and email_service.get_email_settings().get("login_alert_enabled", True)
):
user_info = database.get_user_by_id(user["id"]) or {}
if user_info.get("email") and user_info.get("email_verified"):
if database.get_user_email_notify(user["id"]):
email_service.send_security_alert_email(
email=user_info.get("email"),
username=user_info.get("username") or username,
ip_address=client_ip,
user_agent=user_agent,
new_ip=context.get("new_ip", False),
new_device=context.get("new_device", False),
user_id=user["id"],
)
except Exception:
pass
_send_login_security_alert_if_needed(user=user, username=username, client_ip=client_ip)
return jsonify({"success": True})