Files
zsglpt/routes/api_auth.py

453 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
import secrets
import time
import database
import email_service
from app_logger import get_logger
from app_security import get_rate_limit_ip, require_ip_not_locked, validate_email, validate_password, validate_username
from flask import Blueprint, jsonify, redirect, render_template, request, url_for
from flask_login import login_required, login_user, logout_user
from routes.pages import render_app_spa_or_legacy
from services.accounts_service import load_user_accounts
from services.models import User
from services.state import (
check_ip_request_rate,
check_login_captcha_required,
clear_login_failures,
record_failed_captcha,
record_login_failure,
safe_cleanup_expired_captcha,
safe_delete_captcha,
safe_set_captcha,
safe_verify_and_consume_captcha,
)
logger = get_logger("app")
api_auth_bp = Blueprint("api_auth", __name__)
@api_auth_bp.route("/api/register", methods=["POST"])
@require_ip_not_locked
def register():
"""用户注册"""
data = request.json or {}
username = data.get("username", "").strip()
password = data.get("password", "").strip()
email = data.get("email", "").strip().lower()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
is_valid, error_msg = validate_username(username)
if not is_valid:
return jsonify({"error": error_msg}), 400
is_valid, error_msg = validate_password(password)
if not is_valid:
return jsonify({"error": error_msg}), 400
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "register")
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
email_settings = email_service.get_email_settings()
email_verify_enabled = email_settings.get("register_verify_enabled", False) and email_settings.get("enabled", False)
if email_verify_enabled and not email:
return jsonify({"error": "启用邮箱验证后,邮箱为必填项"}), 400
if email:
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
system_config = database.get_system_config()
auto_approve_enabled = system_config.get("auto_approve_enabled", 0) == 1
auto_approve_hourly_limit = system_config.get("auto_approve_hourly_limit", 10)
auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7)
hourly_limit = int(auto_approve_hourly_limit) if isinstance(auto_approve_hourly_limit, int) else 10
if hourly_limit > 0:
hourly_count = database.get_hourly_registration_count()
if hourly_count >= hourly_limit:
return jsonify({"error": f"注册人数过多,请稍后再试(每小时限制{hourly_limit}人)"}), 429
user_id = database.create_user(username, password, email)
if user_id:
if auto_approve_enabled:
if auto_approve_vip_days > 0:
database.set_user_vip(user_id, auto_approve_vip_days)
if email_verify_enabled and email:
result = email_service.send_register_verification_email(email=email, username=username, user_id=user_id)
if result["success"]:
message = "注册成功!验证邮件已发送(可直接登录,建议完成邮箱验证)"
if auto_approve_enabled and auto_approve_vip_days > 0:
message += f",赠送{auto_approve_vip_days}天VIP"
return jsonify({"success": True, "message": message, "need_verify": True})
logger.error(f"注册验证邮件发送失败: {result['error']}")
message = f"注册成功,但验证邮件发送失败({result['error']})。你仍可直接登录"
if auto_approve_enabled and auto_approve_vip_days > 0:
message += f",赠送{auto_approve_vip_days}天VIP"
return jsonify({"success": True, "message": message, "need_verify": True})
message = "注册成功!可直接登录"
if auto_approve_enabled and auto_approve_vip_days > 0:
message += f",赠送{auto_approve_vip_days}天VIP"
return jsonify({"success": True, "message": message})
return jsonify({"error": "用户名已存在"}), 400
@api_auth_bp.route("/api/verify-email/<token>")
def verify_email(token):
"""验证邮箱 - 用户点击邮件中的链接"""
result = email_service.verify_email_token(token, email_service.EMAIL_TYPE_REGISTER)
if result:
user_id = result["user_id"]
email = result["email"]
database.approve_user(user_id)
system_config = database.get_system_config()
auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7)
if auto_approve_vip_days > 0:
database.set_user_vip(user_id, auto_approve_vip_days)
logger.info(f"用户邮箱验证成功: user_id={user_id}, email={email}")
spa_initial_state = {
"page": "verify_result",
"success": True,
"title": "验证成功",
"message": "您的邮箱已验证成功!账号已激活,现在可以登录使用了。",
"primary_label": "立即登录",
"primary_url": "/login",
"redirect_url": "/login",
"redirect_seconds": 5,
}
return render_app_spa_or_legacy("verify_success.html", spa_initial_state=spa_initial_state)
logger.warning(f"邮箱验证失败: token={token[:20]}...")
error_message = "验证链接无效或已过期,请重新注册或申请重发验证邮件"
spa_initial_state = {
"page": "verify_result",
"success": False,
"title": "验证失败",
"error_message": error_message,
"primary_label": "重新注册",
"primary_url": "/register",
"secondary_label": "返回登录",
"secondary_url": "/login",
}
return render_app_spa_or_legacy(
"verify_failed.html",
legacy_context={"error_message": error_message},
spa_initial_state=spa_initial_state,
)
@api_auth_bp.route("/api/resend-verify-email", methods=["POST"])
@require_ip_not_locked
def resend_verify_email():
"""重发验证邮件"""
data = request.json or {}
email = data.get("email", "").strip().lower()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not email:
return jsonify({"error": "请输入邮箱"}), 400
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "email")
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
user = database.get_user_by_email(email)
if not user:
return jsonify({"error": "该邮箱未注册"}), 404
if user["status"] == "approved":
return jsonify({"error": "该账号已验证通过,请直接登录"}), 400
result = email_service.resend_register_verification_email(user_id=user["id"], email=email, username=user["username"])
if result["success"]:
return jsonify({"success": True, "message": "验证邮件已重新发送,请查收"})
return jsonify({"error": result["error"]}), 500
@api_auth_bp.route("/api/email/verify-status")
def get_email_verify_status():
"""获取邮箱验证功能状态公开API"""
try:
settings = email_service.get_email_settings()
return jsonify(
{
"email_enabled": settings.get("enabled", False),
"register_verify_enabled": settings.get("register_verify_enabled", False) and settings.get("enabled", False),
}
)
except Exception:
return jsonify({"email_enabled": False, "register_verify_enabled": False})
@api_auth_bp.route("/api/forgot-password", methods=["POST"])
@require_ip_not_locked
def forgot_password():
"""发送密码重置邮件"""
data = request.json or {}
email = data.get("email", "").strip().lower()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not email:
return jsonify({"error": "请输入邮箱"}), 400
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "email")
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
email_settings = email_service.get_email_settings()
if not email_settings.get("enabled", False):
return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400
user = database.get_user_by_email(email)
if user and user.get("status") == "approved":
result = email_service.send_password_reset_email(email=email, username=user["username"], user_id=user["id"])
if not result["success"]:
logger.error(f"密码重置邮件发送失败: {result['error']}")
return jsonify({"success": True, "message": "如果该邮箱已注册,您将收到密码重置邮件"})
@api_auth_bp.route("/reset-password/<token>")
def reset_password_page(token):
"""密码重置页面"""
result = email_service.verify_password_reset_token(token)
valid = bool(result)
error_message = "" if valid else "重置链接无效或已过期,请重新申请密码重置"
legacy_context = {"token": token, "valid": valid, "error_message": error_message}
spa_initial_state = {"page": "reset_password", "token": token, "valid": valid, "error_message": error_message}
return render_app_spa_or_legacy(
"reset_password.html",
legacy_context=legacy_context,
spa_initial_state=spa_initial_state,
)
@api_auth_bp.route("/api/reset-password-confirm", methods=["POST"])
def reset_password_confirm():
"""确认密码重置"""
data = request.json or {}
token = data.get("token", "").strip()
new_password = data.get("new_password", "").strip()
if not token or not new_password:
return jsonify({"error": "参数不完整"}), 400
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({"error": error_msg}), 400
result = email_service.confirm_password_reset(token)
if not result:
return jsonify({"error": "重置链接无效或已过期"}), 400
user_id = result["user_id"]
if database.admin_reset_user_password(user_id, new_password):
logger.info(f"用户密码重置成功: user_id={user_id}")
return jsonify({"success": True, "message": "密码重置成功"})
return jsonify({"error": "密码重置失败"}), 500
@api_auth_bp.route("/api/reset_password_request", methods=["POST"])
def request_password_reset():
"""用户申请重置密码(需要审核)"""
data = request.json or {}
username = data.get("username", "").strip()
email = data.get("email", "").strip().lower()
new_password = data.get("new_password", "").strip()
if not username or not new_password:
return jsonify({"error": "用户名和新密码不能为空"}), 400
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({"error": error_msg}), 400
if email:
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
user = database.get_user_by_username(username)
if user:
if email and user.get("email") != email:
pass
else:
database.create_password_reset_request(user["id"], new_password)
return jsonify({"message": "如果账号存在,密码重置申请已提交,请等待管理员审核"})
@api_auth_bp.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码图片"""
import base64
import uuid
from io import BytesIO
session_id = str(uuid.uuid4())
code = "".join([str(secrets.randbelow(10)) for _ in range(4)])
safe_set_captcha(session_id, {"code": code, "expire_time": time.time() + 300, "failed_attempts": 0})
safe_cleanup_expired_captcha()
try:
from PIL import Image, ImageDraw, ImageFont
import io
width, height = 160, 60
image = Image.new("RGB", (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
for _ in range(6):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line(
[(x1, y1), (x2, y2)],
fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)),
width=1,
)
for _ in range(80):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
font = None
font_paths = [
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
]
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, 42)
break
except Exception:
continue
if font is None:
font = ImageFont.load_default()
for i, char in enumerate(code):
x = 12 + i * 35 + random.randint(-3, 3)
y = random.randint(5, 12)
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
draw.text((x, y), char, font=font, fill=color)
buffer = io.BytesIO()
image.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return jsonify({"session_id": session_id, "captcha_image": f"data:image/png;base64,{img_base64}"})
except ImportError as e:
logger.error(f"PIL库未安装验证码功能不可用: {e}")
safe_delete_captcha(session_id)
return jsonify({"error": "验证码服务暂不可用请联系管理员安装PIL库"}), 503
@api_auth_bp.route("/api/login", methods=["POST"])
@require_ip_not_locked
def login():
"""用户登录"""
data = request.json or {}
username = data.get("username", "").strip()
password = data.get("password", "").strip()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
need_captcha = data.get("need_captcha", False)
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "login")
if not allowed:
return jsonify({"error": error_msg, "need_captcha": True}), 429
captcha_required = check_login_captcha_required(client_ip) or bool(need_captcha)
if captcha_required:
if not captcha_session or not captcha_code:
return jsonify({"error": "请填写验证码", "need_captcha": True}), 400
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
record_login_failure(client_ip)
return jsonify({"error": message, "need_captcha": True}), 400
user = database.verify_user(username, password)
if not user:
record_login_failure(client_ip)
return jsonify({"error": "用户名或密码错误", "need_captcha": check_login_captcha_required(client_ip)}), 401
if user["status"] != "approved":
return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401
clear_login_failures(client_ip)
user_obj = User(user["id"])
login_user(user_obj)
load_user_accounts(user["id"])
return jsonify({"success": True})
@api_auth_bp.route("/api/logout", methods=["POST"])
@login_required
def logout():
"""用户登出"""
logout_user()
return jsonify({"success": True})