#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import database import email_service from app_logger import get_logger from app_security import get_rate_limit_ip, require_ip_not_locked, validate_email, validate_password from flask import Blueprint, jsonify, request from flask_login import current_user, login_required from routes.pages import render_app_spa_or_legacy from services.state import check_email_rate_limit, check_ip_request_rate, safe_iter_task_status_items logger = get_logger("app") api_user_bp = Blueprint("api_user", __name__) @api_user_bp.route("/api/announcements/active", methods=["GET"]) @login_required def get_active_announcement(): """获取当前用户应展示的公告(若无则返回announcement=null)""" try: user_id = int(current_user.id) except Exception: return jsonify({"announcement": None}) announcement = database.get_active_announcement_for_user(user_id) if not announcement: return jsonify({"announcement": None}) return jsonify( { "announcement": { "id": announcement.get("id"), "title": announcement.get("title", ""), "content": announcement.get("content", ""), "image_url": announcement.get("image_url") or "", "created_at": announcement.get("created_at"), } } ) @api_user_bp.route("/api/announcements//dismiss", methods=["POST"]) @login_required def dismiss_announcement(announcement_id): """用户永久关闭某条公告(本次公告不再弹窗)""" try: user_id = int(current_user.id) except Exception: return jsonify({"error": "请先登录"}), 401 announcement = database.get_announcement_by_id(announcement_id) if not announcement: return jsonify({"error": "公告不存在"}), 404 database.dismiss_announcement_for_user(user_id, announcement_id) return jsonify({"success": True}) @api_user_bp.route("/api/feedback", methods=["POST"]) @login_required def submit_feedback(): """用户提交Bug反馈""" data = request.get_json() title = data.get("title", "").strip() description = data.get("description", "").strip() contact = data.get("contact", "").strip() if not title or not description: return jsonify({"error": "标题和描述不能为空"}), 400 if len(title) > 100: return jsonify({"error": "标题不能超过100个字符"}), 400 if len(description) > 2000: return jsonify({"error": "描述不能超过2000个字符"}), 400 user_info = database.get_user_by_id(current_user.id) username = user_info["username"] if user_info else f"用户{current_user.id}" feedback_id = database.create_bug_feedback( user_id=current_user.id, username=username, title=title, description=description, contact=contact, ) return jsonify({"message": "反馈提交成功", "id": feedback_id}) @api_user_bp.route("/api/feedback", methods=["GET"]) @login_required def get_my_feedbacks(): """获取当前用户的反馈列表""" feedbacks = database.get_user_feedbacks(current_user.id) return jsonify(feedbacks) @api_user_bp.route("/api/user/vip", methods=["GET"]) @login_required def get_current_user_vip(): """获取当前用户VIP信息""" vip_info = database.get_user_vip_info(current_user.id) user_info = database.get_user_by_id(current_user.id) vip_info["username"] = user_info["username"] if user_info else "Unknown" return jsonify(vip_info) @api_user_bp.route("/api/user/password", methods=["POST"]) @login_required def change_user_password(): """用户修改自己的密码""" data = request.get_json() current_password = data.get("current_password") new_password = data.get("new_password") if not current_password or not new_password: return jsonify({"error": "请填写完整信息"}), 400 is_valid, error_msg = validate_password(new_password) if not is_valid: return jsonify({"error": error_msg}), 400 user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 username = user.get("username", "") if not username or not database.verify_user(username, current_password): return jsonify({"error": "当前密码错误"}), 400 if database.admin_reset_user_password(current_user.id, new_password): return jsonify({"success": True}) return jsonify({"error": "密码更新失败"}), 500 @api_user_bp.route("/api/user/email", methods=["GET"]) @login_required def get_user_email(): """获取当前用户的邮箱信息""" user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 return jsonify({"email": user.get("email", ""), "email_verified": user.get("email_verified", False)}) @api_user_bp.route("/api/user/kdocs", methods=["GET"]) @login_required def get_user_kdocs_settings(): """获取当前用户的金山文档设置""" settings = database.get_user_kdocs_settings(current_user.id) if not settings: return jsonify({"kdocs_unit": "", "kdocs_auto_upload": 0}) return jsonify(settings) @api_user_bp.route("/api/user/kdocs", methods=["POST"]) @login_required def update_user_kdocs_settings(): """更新当前用户的金山文档设置""" data = request.get_json() or {} kdocs_unit = data.get("kdocs_unit") kdocs_auto_upload = data.get("kdocs_auto_upload") if kdocs_unit is not None: kdocs_unit = str(kdocs_unit or "").strip() if len(kdocs_unit) > 50: return jsonify({"error": "县区长度不能超过50"}), 400 if kdocs_auto_upload is not None: if isinstance(kdocs_auto_upload, bool): kdocs_auto_upload = 1 if kdocs_auto_upload else 0 try: kdocs_auto_upload = int(kdocs_auto_upload) except Exception: return jsonify({"error": "自动上传开关必须是0或1"}), 400 if kdocs_auto_upload not in (0, 1): return jsonify({"error": "自动上传开关必须是0或1"}), 400 if not database.update_user_kdocs_settings( current_user.id, kdocs_unit=kdocs_unit, kdocs_auto_upload=kdocs_auto_upload, ): return jsonify({"error": "更新失败"}), 400 settings = database.get_user_kdocs_settings(current_user.id) or {"kdocs_unit": "", "kdocs_auto_upload": 0} return jsonify({"success": True, "settings": settings}) @api_user_bp.route("/api/user/bind-email", methods=["POST"]) @login_required @require_ip_not_locked def bind_user_email(): """发送邮箱绑定验证邮件""" data = request.get_json() or {} email = data.get("email", "").strip().lower() if not email: return jsonify({"error": "请输入有效的邮箱地址"}), 400 is_valid, error_msg = validate_email(email) if not is_valid: return jsonify({"error": error_msg}), 400 client_ip = get_rate_limit_ip() allowed, error_msg = check_ip_request_rate(client_ip, "email") if not allowed: return jsonify({"error": error_msg}), 429 allowed, error_msg = check_email_rate_limit(email, "bind_email") if not allowed: return jsonify({"error": error_msg}), 429 settings = email_service.get_email_settings() if not settings.get("enabled", False): return jsonify({"error": "邮件功能未启用,请联系管理员"}), 400 existing_user = database.get_user_by_email(email) if existing_user and existing_user["id"] != current_user.id: return jsonify({"error": "该邮箱已被其他用户绑定"}), 400 user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 if user.get("email") == email and user.get("email_verified"): return jsonify({"error": "该邮箱已绑定并验证"}), 400 result = email_service.send_bind_email_verification(user_id=current_user.id, email=email, username=user["username"]) if result["success"]: return jsonify({"success": True, "message": "验证邮件已发送,请查收"}) return jsonify({"error": result["error"]}), 500 @api_user_bp.route("/api/verify-bind-email/") def verify_bind_email(token): """验证邮箱绑定Token""" result = email_service.verify_bind_email_token(token) if result: user_id = result["user_id"] email = result["email"] if database.update_user_email(user_id, email, verified=True): spa_initial_state = { "page": "verify_result", "success": True, "title": "邮箱绑定成功", "message": f"邮箱 {email} 已成功绑定到您的账号!", "primary_label": "返回登录", "primary_url": "/login", "redirect_url": "/login", "redirect_seconds": 5, } return render_app_spa_or_legacy("verify_success.html", spa_initial_state=spa_initial_state) error_message = "邮箱绑定失败,请重试" spa_initial_state = { "page": "verify_result", "success": False, "title": "绑定失败", "error_message": error_message, "primary_label": "返回登录", "primary_url": "/login", } return render_app_spa_or_legacy( "verify_failed.html", legacy_context={"error_message": error_message}, spa_initial_state=spa_initial_state, ) error_message = "验证链接已过期或无效,请重新发送验证邮件" spa_initial_state = { "page": "verify_result", "success": False, "title": "链接无效", "error_message": error_message, "primary_label": "返回登录", "primary_url": "/login", } return render_app_spa_or_legacy( "verify_failed.html", legacy_context={"error_message": error_message}, spa_initial_state=spa_initial_state, ) @api_user_bp.route("/api/user/unbind-email", methods=["POST"]) @login_required def unbind_user_email(): """解绑用户邮箱""" user = database.get_user_by_id(current_user.id) if not user: return jsonify({"error": "用户不存在"}), 404 if not user.get("email"): return jsonify({"error": "当前未绑定邮箱"}), 400 if database.update_user_email(current_user.id, None, verified=False): return jsonify({"success": True, "message": "邮箱已解绑"}) return jsonify({"error": "解绑失败"}), 500 @api_user_bp.route("/api/user/email-notify", methods=["GET"]) @login_required def get_user_email_notify(): """获取用户邮件通知偏好""" enabled = database.get_user_email_notify(current_user.id) return jsonify({"enabled": enabled}) @api_user_bp.route("/api/user/email-notify", methods=["POST"]) @login_required def update_user_email_notify(): """更新用户邮件通知偏好""" data = request.get_json() enabled = data.get("enabled", True) if database.update_user_email_notify(current_user.id, enabled): return jsonify({"success": True}) return jsonify({"error": "更新失败"}), 500 @api_user_bp.route("/api/run_stats", methods=["GET"]) @login_required def get_run_stats(): """获取当前用户的运行统计""" user_id = current_user.id stats = database.get_user_run_stats(user_id) current_running = 0 for _, info in safe_iter_task_status_items(): if info.get("user_id") == user_id and info.get("status") == "运行中": current_running += 1 return jsonify( { "today_completed": stats.get("completed", 0), "current_running": current_running, "today_failed": stats.get("failed", 0), "today_items": stats.get("total_items", 0), "today_attachments": stats.get("total_attachments", 0), } ) @api_user_bp.route("/api/kdocs/status", methods=["GET"]) @login_required def get_kdocs_status_for_user(): """获取金山文档在线状态(用户端简化版)""" try: # 检查系统是否启用了金山文档功能 cfg = database.get_system_config() or {} kdocs_enabled = int(cfg.get("kdocs_enabled") or 0) if not kdocs_enabled: return jsonify({"enabled": False, "online": False, "message": "未启用"}) # 获取金山文档状态 from services.kdocs_uploader import get_kdocs_uploader kdocs = get_kdocs_uploader() status = kdocs.get_status() login_required_flag = status.get("login_required", False) last_login_ok = status.get("last_login_ok") # 判断是否在线 is_online = not login_required_flag and last_login_ok is True return jsonify({ "enabled": True, "online": is_online, "message": "就绪" if is_online else "离线" }) except Exception as e: logger.error(f"获取金山文档状态失败: {e}") return jsonify({"enabled": False, "online": False, "message": "获取失败"})