Files
zsglpt/routes/api_auth.py

421 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import random
import secrets
import time
import database
import email_service
from app_logger import get_logger
from app_security import get_client_ip, require_ip_not_locked, validate_password
from flask import Blueprint, jsonify, redirect, render_template, request, url_for
from flask_login import login_required, login_user, logout_user
from routes.pages import render_app_spa_or_legacy
from services.accounts_service import load_user_accounts
from services.models import User
from services.state import (
check_ip_rate_limit,
record_failed_captcha,
safe_cleanup_expired_captcha,
safe_delete_captcha,
safe_set_captcha,
safe_verify_and_consume_captcha,
)
logger = get_logger("app")
api_auth_bp = Blueprint("api_auth", __name__)
@api_auth_bp.route("/api/register", methods=["POST"])
@require_ip_not_locked
def register():
"""用户注册"""
data = request.json
username = data.get("username", "").strip()
password = data.get("password", "").strip()
email = data.get("email", "").strip()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not username or not password:
return jsonify({"error": "用户名和密码不能为空"}), 400
client_ip = get_client_ip()
allowed, error_msg = check_ip_rate_limit(client_ip)
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
email_settings = email_service.get_email_settings()
email_verify_enabled = email_settings.get("register_verify_enabled", False) and email_settings.get("enabled", False)
if email_verify_enabled and not email:
return jsonify({"error": "启用邮箱验证后,邮箱为必填项"}), 400
if email and "@" not in email:
return jsonify({"error": "邮箱格式不正确"}), 400
system_config = database.get_system_config()
auto_approve_enabled = system_config.get("auto_approve_enabled", 0) == 1
auto_approve_hourly_limit = system_config.get("auto_approve_hourly_limit", 10)
auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7)
if auto_approve_enabled or email_verify_enabled:
hourly_count = database.get_hourly_registration_count()
if hourly_count >= auto_approve_hourly_limit:
return jsonify({"error": f"注册人数过多,请稍后再试(每小时限制{auto_approve_hourly_limit}人)"}), 429
user_id = database.create_user(username, password, email)
if user_id:
if email_verify_enabled and email:
result = email_service.send_register_verification_email(email=email, username=username, user_id=user_id)
if result["success"]:
return jsonify(
{
"success": True,
"message": "注册成功!验证邮件已发送,请查收邮箱并点击链接完成验证",
"need_verify": True,
}
)
logger.error(f"注册验证邮件发送失败: {result['error']}")
return jsonify(
{
"success": True,
"message": f"注册成功,但验证邮件发送失败({result['error']})。请稍后在登录页面重新发送验证邮件",
"need_verify": True,
}
)
if auto_approve_enabled:
database.approve_user(user_id)
if auto_approve_vip_days > 0:
database.set_user_vip(user_id, auto_approve_vip_days)
return jsonify({"success": True, "message": f"注册成功!已自动审核通过,赠送{auto_approve_vip_days}天VIP"})
return jsonify({"success": True, "message": "注册成功!已自动审核通过"})
return jsonify({"success": True, "message": "注册成功,请等待管理员审核"})
return jsonify({"error": "用户名已存在"}), 400
@api_auth_bp.route("/api/verify-email/<token>")
def verify_email(token):
"""验证邮箱 - 用户点击邮件中的链接"""
result = email_service.verify_email_token(token, email_service.EMAIL_TYPE_REGISTER)
if result:
user_id = result["user_id"]
email = result["email"]
database.approve_user(user_id)
system_config = database.get_system_config()
auto_approve_vip_days = system_config.get("auto_approve_vip_days", 7)
if auto_approve_vip_days > 0:
database.set_user_vip(user_id, auto_approve_vip_days)
logger.info(f"用户邮箱验证成功: user_id={user_id}, email={email}")
spa_initial_state = {
"page": "verify_result",
"success": True,
"title": "验证成功",
"message": "您的邮箱已验证成功!账号已激活,现在可以登录使用了。",
"primary_label": "立即登录",
"primary_url": "/login",
"redirect_url": "/login",
"redirect_seconds": 5,
}
return render_app_spa_or_legacy("verify_success.html", spa_initial_state=spa_initial_state)
logger.warning(f"邮箱验证失败: token={token[:20]}...")
error_message = "验证链接无效或已过期,请重新注册或申请重发验证邮件"
spa_initial_state = {
"page": "verify_result",
"success": False,
"title": "验证失败",
"error_message": error_message,
"primary_label": "重新注册",
"primary_url": "/register",
"secondary_label": "返回登录",
"secondary_url": "/login",
}
return render_app_spa_or_legacy(
"verify_failed.html",
legacy_context={"error_message": error_message},
spa_initial_state=spa_initial_state,
)
@api_auth_bp.route("/api/resend-verify-email", methods=["POST"])
@require_ip_not_locked
def resend_verify_email():
"""重发验证邮件"""
data = request.json
email = data.get("email", "").strip()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not email:
return jsonify({"error": "请输入邮箱"}), 400
client_ip = get_client_ip()
allowed, error_msg = check_ip_rate_limit(client_ip)
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
user = database.get_user_by_email(email)
if not user:
return jsonify({"error": "该邮箱未注册"}), 404
if user["status"] == "approved":
return jsonify({"error": "该账号已验证通过,请直接登录"}), 400
result = email_service.resend_register_verification_email(user_id=user["id"], email=email, username=user["username"])
if result["success"]:
return jsonify({"success": True, "message": "验证邮件已重新发送,请查收"})
return jsonify({"error": result["error"]}), 500
@api_auth_bp.route("/api/email/verify-status")
def get_email_verify_status():
"""获取邮箱验证功能状态公开API"""
try:
settings = email_service.get_email_settings()
return jsonify(
{
"email_enabled": settings.get("enabled", False),
"register_verify_enabled": settings.get("register_verify_enabled", False) and settings.get("enabled", False),
}
)
except Exception:
return jsonify({"email_enabled": False, "register_verify_enabled": False})
@api_auth_bp.route("/api/forgot-password", methods=["POST"])
@require_ip_not_locked
def forgot_password():
"""发送密码重置邮件"""
data = request.json
email = data.get("email", "").strip()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
if not email:
return jsonify({"error": "请输入邮箱"}), 400
client_ip = get_client_ip()
allowed, error_msg = check_ip_rate_limit(client_ip)
if not allowed:
return jsonify({"error": error_msg}), 429
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
is_locked = record_failed_captcha(client_ip)
if is_locked:
return jsonify({"error": "验证码错误次数过多,IP已被锁定1小时"}), 429
return jsonify({"error": message}), 400
email_settings = email_service.get_email_settings()
if not email_settings.get("enabled", False):
return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400
user = database.get_user_by_email(email)
if user and user.get("status") == "approved":
result = email_service.send_password_reset_email(email=email, username=user["username"], user_id=user["id"])
if not result["success"]:
logger.error(f"密码重置邮件发送失败: {result['error']}")
return jsonify({"success": True, "message": "如果该邮箱已注册,您将收到密码重置邮件"})
@api_auth_bp.route("/reset-password/<token>")
def reset_password_page(token):
"""密码重置页面"""
result = email_service.verify_password_reset_token(token)
valid = bool(result)
error_message = "" if valid else "重置链接无效或已过期,请重新申请密码重置"
legacy_context = {"token": token, "valid": valid, "error_message": error_message}
spa_initial_state = {"page": "reset_password", "token": token, "valid": valid, "error_message": error_message}
return render_app_spa_or_legacy(
"reset_password.html",
legacy_context=legacy_context,
spa_initial_state=spa_initial_state,
)
@api_auth_bp.route("/api/reset-password-confirm", methods=["POST"])
def reset_password_confirm():
"""确认密码重置"""
data = request.json
token = data.get("token", "").strip()
new_password = data.get("new_password", "").strip()
if not token or not new_password:
return jsonify({"error": "参数不完整"}), 400
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({"error": error_msg}), 400
result = email_service.confirm_password_reset(token)
if not result:
return jsonify({"error": "重置链接无效或已过期"}), 400
user_id = result["user_id"]
if database.admin_reset_user_password(user_id, new_password):
logger.info(f"用户密码重置成功: user_id={user_id}")
return jsonify({"success": True, "message": "密码重置成功"})
return jsonify({"error": "密码重置失败"}), 500
@api_auth_bp.route("/api/reset_password_request", methods=["POST"])
def request_password_reset():
"""用户申请重置密码(需要审核)"""
data = request.json
username = data.get("username", "").strip()
email = data.get("email", "").strip()
new_password = data.get("new_password", "").strip()
if not username or not new_password:
return jsonify({"error": "用户名和新密码不能为空"}), 400
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({"error": error_msg}), 400
user = database.get_user_by_username(username)
if user:
if email and user.get("email") != email:
pass
else:
database.create_password_reset_request(user["id"], new_password)
return jsonify({"message": "如果账号存在,密码重置申请已提交,请等待管理员审核"})
@api_auth_bp.route("/api/generate_captcha", methods=["POST"])
def generate_captcha():
"""生成4位数字验证码图片"""
import base64
import uuid
from io import BytesIO
session_id = str(uuid.uuid4())
code = "".join([str(secrets.randbelow(10)) for _ in range(4)])
safe_set_captcha(session_id, {"code": code, "expire_time": time.time() + 300, "failed_attempts": 0})
safe_cleanup_expired_captcha()
try:
from PIL import Image, ImageDraw, ImageFont
import io
width, height = 160, 60
image = Image.new("RGB", (width, height), color=(255, 255, 255))
draw = ImageDraw.Draw(image)
for _ in range(6):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line(
[(x1, y1), (x2, y2)],
fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)),
width=1,
)
for _ in range(80):
x = random.randint(0, width)
y = random.randint(0, height)
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
font = None
font_paths = [
"/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/freefont/FreeSansBold.ttf",
]
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, 42)
break
except Exception:
continue
if font is None:
font = ImageFont.load_default()
for i, char in enumerate(code):
x = 12 + i * 35 + random.randint(-3, 3)
y = random.randint(5, 12)
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
draw.text((x, y), char, font=font, fill=color)
buffer = io.BytesIO()
image.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return jsonify({"session_id": session_id, "captcha_image": f"data:image/png;base64,{img_base64}"})
except ImportError as e:
logger.error(f"PIL库未安装验证码功能不可用: {e}")
safe_delete_captcha(session_id)
return jsonify({"error": "验证码服务暂不可用请联系管理员安装PIL库"}), 503
@api_auth_bp.route("/api/login", methods=["POST"])
@require_ip_not_locked
def login():
"""用户登录"""
data = request.json
username = data.get("username", "").strip()
password = data.get("password", "").strip()
captcha_session = data.get("captcha_session", "")
captcha_code = data.get("captcha", "").strip()
need_captcha = data.get("need_captcha", False)
if need_captcha:
success, message = safe_verify_and_consume_captcha(captcha_session, captcha_code)
if not success:
return jsonify({"error": message}), 400
user = database.verify_user(username, password)
if not user:
return jsonify({"error": "用户名或密码错误", "need_captcha": True}), 401
if user["status"] != "approved":
return jsonify({"error": "账号未审核,请等待管理员审核", "need_captcha": False}), 401
user_obj = User(user["id"])
login_user(user_obj)
load_user_accounts(user["id"])
return jsonify({"success": True})
@api_auth_bp.route("/api/logout", methods=["POST"])
@login_required
def logout():
"""用户登出"""
logout_user()
return jsonify({"success": True})