Files
zsglpt/routes/api_user.py
yuyx 4c492122dd feat: support announcement image upload
# Conflicts:
#	database.py
#	db/migrations.py
#	routes/admin_api/core.py
#	static/admin/.vite/manifest.json
#	static/admin/assets/AnnouncementsPage-Btl9JP7M.js
#	static/admin/assets/EmailPage-CwqlBGU2.js
#	static/admin/assets/FeedbacksPage-B_qDNL3q.js
#	static/admin/assets/LogsPage-DzdymdrQ.js
#	static/admin/assets/ReportPage-Bp26gOA-.js
#	static/admin/assets/SettingsPage-__r25pN8.js
#	static/admin/assets/SystemPage-C1OfxrU-.js
#	static/admin/assets/UsersPage-DhnABKcY.js
#	static/admin/assets/email-By53DCWv.js
#	static/admin/assets/email-ByiJ74rd.js
#	static/admin/assets/email-DkWacopQ.js
#	static/admin/assets/index-D5wU2pVd.js
#	static/admin/assets/tasks-1acmkoIX.js
#	static/admin/assets/update-DdQLVpC3.js
#	static/admin/assets/users-B1w166uc.js
#	static/admin/assets/users-CPJP5r-B.js
#	static/admin/assets/users-CnIyvFWm.js
#	static/admin/index.html
#	static/app/.vite/manifest.json
#	static/app/assets/AccountsPage-C48gJL8c.js
#	static/app/assets/AccountsPage-D387XNsv.js
#	static/app/assets/AccountsPage-DBJCAsJz.js
#	static/app/assets/LoginPage-BgK_Vl6X.js
#	static/app/assets/RegisterPage-CwADxWfe.js
#	static/app/assets/ResetPasswordPage-CVfZX_5z.js
#	static/app/assets/SchedulesPage-CWuZpJ5h.js
#	static/app/assets/SchedulesPage-Dw-mXbG5.js
#	static/app/assets/SchedulesPage-DwzGOBuc.js
#	static/app/assets/ScreenshotsPage-C6vX2U3V.js
#	static/app/assets/ScreenshotsPage-CreOSjVc.js
#	static/app/assets/ScreenshotsPage-DuTeRzLR.js
#	static/app/assets/VerifyResultPage-BzGlCgtE.js
#	static/app/assets/VerifyResultPage-CN_nr4V6.js
#	static/app/assets/VerifyResultPage-CNbQc83z.js
#	static/app/assets/accounts-BFaVMUve.js
#	static/app/assets/accounts-BYq3lLev.js
#	static/app/assets/accounts-Bc9j2moH.js
#	static/app/assets/auth-Dk_ApO4B.js
#	static/app/assets/index-BIng7uZJ.css
#	static/app/assets/index-CDxVo_1Z.js
#	static/app/index.html
2026-01-06 12:15:16 +08:00

307 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import database
import email_service
from app_logger import get_logger
from app_security import get_rate_limit_ip, require_ip_not_locked, validate_email, validate_password
from flask import Blueprint, jsonify, request
from flask_login import current_user, login_required
from routes.pages import render_app_spa_or_legacy
from services.state import check_email_rate_limit, check_ip_request_rate, safe_iter_task_status_items
logger = get_logger("app")
api_user_bp = Blueprint("api_user", __name__)
@api_user_bp.route("/api/announcements/active", methods=["GET"])
@login_required
def get_active_announcement():
"""获取当前用户应展示的公告若无则返回announcement=null"""
try:
user_id = int(current_user.id)
except Exception:
return jsonify({"announcement": None})
announcement = database.get_active_announcement_for_user(user_id)
if not announcement:
return jsonify({"announcement": None})
return jsonify(
{
"announcement": {
"id": announcement.get("id"),
"title": announcement.get("title", ""),
"content": announcement.get("content", ""),
"image_url": announcement.get("image_url") or "",
"created_at": announcement.get("created_at"),
}
}
)
@api_user_bp.route("/api/announcements/<int:announcement_id>/dismiss", methods=["POST"])
@login_required
def dismiss_announcement(announcement_id):
"""用户永久关闭某条公告(本次公告不再弹窗)"""
try:
user_id = int(current_user.id)
except Exception:
return jsonify({"error": "请先登录"}), 401
announcement = database.get_announcement_by_id(announcement_id)
if not announcement:
return jsonify({"error": "公告不存在"}), 404
database.dismiss_announcement_for_user(user_id, announcement_id)
return jsonify({"success": True})
@api_user_bp.route("/api/feedback", methods=["POST"])
@login_required
def submit_feedback():
"""用户提交Bug反馈"""
data = request.get_json()
title = data.get("title", "").strip()
description = data.get("description", "").strip()
contact = data.get("contact", "").strip()
if not title or not description:
return jsonify({"error": "标题和描述不能为空"}), 400
if len(title) > 100:
return jsonify({"error": "标题不能超过100个字符"}), 400
if len(description) > 2000:
return jsonify({"error": "描述不能超过2000个字符"}), 400
user_info = database.get_user_by_id(current_user.id)
username = user_info["username"] if user_info else f"用户{current_user.id}"
feedback_id = database.create_bug_feedback(
user_id=current_user.id,
username=username,
title=title,
description=description,
contact=contact,
)
return jsonify({"message": "反馈提交成功", "id": feedback_id})
@api_user_bp.route("/api/feedback", methods=["GET"])
@login_required
def get_my_feedbacks():
"""获取当前用户的反馈列表"""
feedbacks = database.get_user_feedbacks(current_user.id)
return jsonify(feedbacks)
@api_user_bp.route("/api/user/vip", methods=["GET"])
@login_required
def get_current_user_vip():
"""获取当前用户VIP信息"""
vip_info = database.get_user_vip_info(current_user.id)
user_info = database.get_user_by_id(current_user.id)
vip_info["username"] = user_info["username"] if user_info else "Unknown"
return jsonify(vip_info)
@api_user_bp.route("/api/user/password", methods=["POST"])
@login_required
def change_user_password():
"""用户修改自己的密码"""
data = request.get_json()
current_password = data.get("current_password")
new_password = data.get("new_password")
if not current_password or not new_password:
return jsonify({"error": "请填写完整信息"}), 400
is_valid, error_msg = validate_password(new_password)
if not is_valid:
return jsonify({"error": error_msg}), 400
user = database.get_user_by_id(current_user.id)
if not user:
return jsonify({"error": "用户不存在"}), 404
username = user.get("username", "")
if not username or not database.verify_user(username, current_password):
return jsonify({"error": "当前密码错误"}), 400
if database.admin_reset_user_password(current_user.id, new_password):
return jsonify({"success": True})
return jsonify({"error": "密码更新失败"}), 500
@api_user_bp.route("/api/user/email", methods=["GET"])
@login_required
def get_user_email():
"""获取当前用户的邮箱信息"""
user = database.get_user_by_id(current_user.id)
if not user:
return jsonify({"error": "用户不存在"}), 404
return jsonify({"email": user.get("email", ""), "email_verified": user.get("email_verified", False)})
@api_user_bp.route("/api/user/bind-email", methods=["POST"])
@login_required
@require_ip_not_locked
def bind_user_email():
"""发送邮箱绑定验证邮件"""
data = request.get_json() or {}
email = data.get("email", "").strip().lower()
if not email:
return jsonify({"error": "请输入有效的邮箱地址"}), 400
is_valid, error_msg = validate_email(email)
if not is_valid:
return jsonify({"error": error_msg}), 400
client_ip = get_rate_limit_ip()
allowed, error_msg = check_ip_request_rate(client_ip, "email")
if not allowed:
return jsonify({"error": error_msg}), 429
allowed, error_msg = check_email_rate_limit(email, "bind_email")
if not allowed:
return jsonify({"error": error_msg}), 429
settings = email_service.get_email_settings()
if not settings.get("enabled", False):
return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400
existing_user = database.get_user_by_email(email)
if existing_user and existing_user["id"] != current_user.id:
return jsonify({"error": "该邮箱已被其他用户绑定"}), 400
user = database.get_user_by_id(current_user.id)
if not user:
return jsonify({"error": "用户不存在"}), 404
if user.get("email") == email and user.get("email_verified"):
return jsonify({"error": "该邮箱已绑定并验证"}), 400
result = email_service.send_bind_email_verification(user_id=current_user.id, email=email, username=user["username"])
if result["success"]:
return jsonify({"success": True, "message": "验证邮件已发送,请查收"})
return jsonify({"error": result["error"]}), 500
@api_user_bp.route("/api/verify-bind-email/<token>")
def verify_bind_email(token):
"""验证邮箱绑定Token"""
result = email_service.verify_bind_email_token(token)
if result:
user_id = result["user_id"]
email = result["email"]
if database.update_user_email(user_id, email, verified=True):
spa_initial_state = {
"page": "verify_result",
"success": True,
"title": "邮箱绑定成功",
"message": f"邮箱 {email} 已成功绑定到您的账号!",
"primary_label": "返回登录",
"primary_url": "/login",
"redirect_url": "/login",
"redirect_seconds": 5,
}
return render_app_spa_or_legacy("verify_success.html", spa_initial_state=spa_initial_state)
error_message = "邮箱绑定失败,请重试"
spa_initial_state = {
"page": "verify_result",
"success": False,
"title": "绑定失败",
"error_message": error_message,
"primary_label": "返回登录",
"primary_url": "/login",
}
return render_app_spa_or_legacy(
"verify_failed.html",
legacy_context={"error_message": error_message},
spa_initial_state=spa_initial_state,
)
error_message = "验证链接已过期或无效,请重新发送验证邮件"
spa_initial_state = {
"page": "verify_result",
"success": False,
"title": "链接无效",
"error_message": error_message,
"primary_label": "返回登录",
"primary_url": "/login",
}
return render_app_spa_or_legacy(
"verify_failed.html",
legacy_context={"error_message": error_message},
spa_initial_state=spa_initial_state,
)
@api_user_bp.route("/api/user/unbind-email", methods=["POST"])
@login_required
def unbind_user_email():
"""解绑用户邮箱"""
user = database.get_user_by_id(current_user.id)
if not user:
return jsonify({"error": "用户不存在"}), 404
if not user.get("email"):
return jsonify({"error": "当前未绑定邮箱"}), 400
if database.update_user_email(current_user.id, None, verified=False):
return jsonify({"success": True, "message": "邮箱已解绑"})
return jsonify({"error": "解绑失败"}), 500
@api_user_bp.route("/api/user/email-notify", methods=["GET"])
@login_required
def get_user_email_notify():
"""获取用户邮件通知偏好"""
enabled = database.get_user_email_notify(current_user.id)
return jsonify({"enabled": enabled})
@api_user_bp.route("/api/user/email-notify", methods=["POST"])
@login_required
def update_user_email_notify():
"""更新用户邮件通知偏好"""
data = request.get_json()
enabled = data.get("enabled", True)
if database.update_user_email_notify(current_user.id, enabled):
return jsonify({"success": True})
return jsonify({"error": "更新失败"}), 500
@api_user_bp.route("/api/run_stats", methods=["GET"])
@login_required
def get_run_stats():
"""获取当前用户的运行统计"""
user_id = current_user.id
stats = database.get_user_run_stats(user_id)
current_running = 0
for _, info in safe_iter_task_status_items():
if info.get("user_id") == user_id and info.get("status") == "运行中":
current_running += 1
return jsonify(
{
"today_completed": stats.get("completed", 0),
"current_running": current_running,
"today_failed": stats.get("failed", 0),
"today_items": stats.get("total_items", 0),
"today_attachments": stats.get("total_attachments", 0),
}
)