Files
zsglpt/routes/admin_api/email_api.py

215 lines
7.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import email_service
from app_logger import get_logger
from app_security import validate_email
from flask import jsonify, request
from routes.admin_api import admin_api_bp
from routes.decorators import admin_required
logger = get_logger("app")
@admin_api_bp.route("/email/settings", methods=["GET"])
@admin_required
def get_email_settings_api():
"""获取全局邮件设置"""
try:
settings = email_service.get_email_settings()
return jsonify(settings)
except Exception as e:
logger.error(f"获取邮件设置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/email/settings", methods=["POST"])
@admin_required
def update_email_settings_api():
"""更新全局邮件设置"""
try:
data = request.json or {}
enabled = data.get("enabled", False)
failover_enabled = data.get("failover_enabled", True)
register_verify_enabled = data.get("register_verify_enabled")
login_alert_enabled = data.get("login_alert_enabled")
base_url = data.get("base_url")
task_notify_enabled = data.get("task_notify_enabled")
email_service.update_email_settings(
enabled=enabled,
failover_enabled=failover_enabled,
register_verify_enabled=register_verify_enabled,
login_alert_enabled=login_alert_enabled,
base_url=base_url,
task_notify_enabled=task_notify_enabled,
)
return jsonify({"success": True})
except Exception as e:
logger.error(f"更新邮件设置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs", methods=["GET"])
@admin_required
def get_smtp_configs_api():
"""获取所有SMTP配置列表"""
try:
configs = email_service.get_smtp_configs(include_password=False)
return jsonify(configs)
except Exception as e:
logger.error(f"获取SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs", methods=["POST"])
@admin_required
def create_smtp_config_api():
"""创建SMTP配置"""
try:
data = request.json or {}
if not data.get("host"):
return jsonify({"error": "SMTP服务器地址不能为空"}), 400
if not data.get("username"):
return jsonify({"error": "SMTP用户名不能为空"}), 400
config_id = email_service.create_smtp_config(data)
return jsonify({"success": True, "id": config_id})
except Exception as e:
logger.error(f"创建SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs/<int:config_id>", methods=["GET"])
@admin_required
def get_smtp_config_api(config_id):
"""获取单个SMTP配置详情"""
try:
config_data = email_service.get_smtp_config(config_id, include_password=False)
if not config_data:
return jsonify({"error": "配置不存在"}), 404
return jsonify(config_data)
except Exception as e:
logger.error(f"获取SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs/<int:config_id>", methods=["PUT"])
@admin_required
def update_smtp_config_api(config_id):
"""更新SMTP配置"""
try:
data = request.json or {}
if email_service.update_smtp_config(config_id, data):
return jsonify({"success": True})
return jsonify({"error": "更新失败"}), 400
except Exception as e:
logger.error(f"更新SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs/<int:config_id>", methods=["DELETE"])
@admin_required
def delete_smtp_config_api(config_id):
"""删除SMTP配置"""
try:
if email_service.delete_smtp_config(config_id):
return jsonify({"success": True})
return jsonify({"error": "删除失败"}), 400
except Exception as e:
logger.error(f"删除SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs/<int:config_id>/test", methods=["POST"])
@admin_required
def test_smtp_config_api(config_id):
"""测试SMTP配置"""
try:
data = request.json or {}
test_email = str(data.get("email", "") or "").strip()
if not test_email:
return jsonify({"error": "请提供测试邮箱"}), 400
is_valid, error_msg = validate_email(test_email)
if not is_valid:
return jsonify({"error": error_msg}), 400
result = email_service.test_smtp_config(config_id, test_email)
return jsonify(result)
except Exception as e:
logger.error(f"测试SMTP配置失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@admin_api_bp.route("/smtp/configs/<int:config_id>/primary", methods=["POST"])
@admin_required
def set_primary_smtp_config_api(config_id):
"""设置主SMTP配置"""
try:
if email_service.set_primary_smtp_config(config_id):
return jsonify({"success": True})
return jsonify({"error": "设置失败"}), 400
except Exception as e:
logger.error(f"设置主SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/smtp/configs/primary/clear", methods=["POST"])
@admin_required
def clear_primary_smtp_config_api():
"""取消主SMTP配置"""
try:
email_service.clear_primary_smtp_config()
return jsonify({"success": True})
except Exception as e:
logger.error(f"取消主SMTP配置失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/email/stats", methods=["GET"])
@admin_required
def get_email_stats_api():
"""获取邮件发送统计"""
try:
stats = email_service.get_email_stats()
return jsonify(stats)
except Exception as e:
logger.error(f"获取邮件统计失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/email/logs", methods=["GET"])
@admin_required
def get_email_logs_api():
"""获取邮件发送日志"""
try:
page = request.args.get("page", 1, type=int)
page_size = request.args.get("page_size", 20, type=int)
email_type = request.args.get("type", None)
status = request.args.get("status", None)
page_size = min(max(page_size, 10), 100)
result = email_service.get_email_logs(page, page_size, email_type, status)
return jsonify(result)
except Exception as e:
logger.error(f"获取邮件日志失败: {e}")
return jsonify({"error": str(e)}), 500
@admin_api_bp.route("/email/logs/cleanup", methods=["POST"])
@admin_required
def cleanup_email_logs_api():
"""清理过期邮件日志"""
try:
data = request.json or {}
days = data.get("days", 30)
days = min(max(days, 7), 365)
deleted = email_service.cleanup_email_logs(days)
return jsonify({"success": True, "deleted": deleted})
except Exception as e:
logger.error(f"清理邮件日志失败: {e}")
return jsonify({"error": str(e)}), 500