Files
zsglpt/routes/admin_api/operations_api.py

232 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import threading
import time
from datetime import datetime
import database
import requests
from app_logger import get_logger
from app_security import is_safe_outbound_url
from flask import jsonify, request
from routes.admin_api import admin_api_bp
from routes.decorators import admin_required
from services.scheduler import run_scheduled_task
from services.time_utils import BEIJING_TZ, get_beijing_now
logger = get_logger("app")
_server_cpu_percent_lock = threading.Lock()
_server_cpu_percent_last: float | None = None
_server_cpu_percent_last_ts = 0.0
def _get_server_cpu_percent() -> float:
import psutil
global _server_cpu_percent_last, _server_cpu_percent_last_ts
now = time.time()
with _server_cpu_percent_lock:
if _server_cpu_percent_last is not None and (now - _server_cpu_percent_last_ts) < 0.5:
return _server_cpu_percent_last
try:
if _server_cpu_percent_last is None:
cpu_percent = float(psutil.cpu_percent(interval=0.1))
else:
cpu_percent = float(psutil.cpu_percent(interval=None))
except Exception:
cpu_percent = float(_server_cpu_percent_last or 0.0)
if cpu_percent < 0:
cpu_percent = 0.0
_server_cpu_percent_last = cpu_percent
_server_cpu_percent_last_ts = now
return cpu_percent
@admin_api_bp.route("/kdocs/status", methods=["GET"])
@admin_required
def get_kdocs_status_api():
"""获取金山文档上传状态"""
try:
from services.kdocs_uploader import get_kdocs_uploader
uploader = get_kdocs_uploader()
status = uploader.get_status()
live = str(request.args.get("live", "")).lower() in ("1", "true", "yes")
# 重启后首次查询时last_login_ok is None自动做一次实时状态校验
should_live_check = live or status.get("last_login_ok") is None
if should_live_check:
live_status = uploader.refresh_login_status()
if live_status.get("success"):
logged_in = bool(live_status.get("logged_in"))
status["logged_in"] = logged_in
status["last_login_ok"] = logged_in
status["login_required"] = not logged_in
if live_status.get("error"):
status["last_error"] = live_status.get("error")
else:
status["logged_in"] = True if status.get("last_login_ok") else False if status.get("last_login_ok") is False else None
if status.get("last_login_ok") is True and status.get("last_error") == "操作超时":
status["last_error"] = None
return jsonify(status)
except Exception as e:
return jsonify({"error": f"获取状态失败: {e}"}), 500
@admin_api_bp.route("/kdocs/qr", methods=["POST"])
@admin_required
def get_kdocs_qr_api():
"""获取金山文档登录二维码"""
try:
from services.kdocs_uploader import get_kdocs_uploader
uploader = get_kdocs_uploader()
data = request.get_json(silent=True) or {}
force = bool(data.get("force"))
if not force:
force = str(request.args.get("force", "")).lower() in ("1", "true", "yes")
result = uploader.request_qr(force=force)
if not result.get("success"):
return jsonify({"error": result.get("error", "获取二维码失败")}), 400
return jsonify(result)
except Exception as e:
return jsonify({"error": f"获取二维码失败: {e}"}), 500
@admin_api_bp.route("/kdocs/clear-login", methods=["POST"])
@admin_required
def clear_kdocs_login_api():
"""清除金山文档登录态"""
try:
from services.kdocs_uploader import get_kdocs_uploader
uploader = get_kdocs_uploader()
result = uploader.clear_login()
if not result.get("success"):
return jsonify({"error": result.get("error", "清除失败")}), 400
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": f"清除失败: {e}"}), 500
@admin_api_bp.route("/schedule/execute", methods=["POST"])
@admin_required
def execute_schedule_now():
"""立即执行定时任务(无视定时时间和星期限制)"""
try:
threading.Thread(target=run_scheduled_task, args=(True,), daemon=True).start()
logger.info("[立即执行定时任务] 管理员手动触发定时任务执行(跳过星期检查)")
return jsonify({"message": "定时任务已开始执行,请查看任务列表获取进度"})
except Exception as e:
logger.error(f"[立即执行定时任务] 启动失败: {str(e)}")
return jsonify({"error": f"启动失败: {str(e)}"}), 500
@admin_api_bp.route("/proxy/config", methods=["GET"])
@admin_required
def get_proxy_config_api():
"""获取代理配置"""
config_data = database.get_system_config()
return jsonify(
{
"proxy_enabled": config_data.get("proxy_enabled", 0),
"proxy_api_url": config_data.get("proxy_api_url", ""),
"proxy_expire_minutes": config_data.get("proxy_expire_minutes", 3),
}
)
@admin_api_bp.route("/proxy/config", methods=["POST"])
@admin_required
def update_proxy_config_api():
"""更新代理配置"""
data = request.json or {}
proxy_enabled = data.get("proxy_enabled")
proxy_api_url = (data.get("proxy_api_url", "") or "").strip()
proxy_expire_minutes = data.get("proxy_expire_minutes")
if proxy_enabled is not None and proxy_enabled not in [0, 1]:
return jsonify({"error": "proxy_enabled必须是0或1"}), 400
if proxy_expire_minutes is not None:
if not isinstance(proxy_expire_minutes, int) or proxy_expire_minutes < 1:
return jsonify({"error": "代理有效期必须是大于0的整数"}), 400
if database.update_system_config(
proxy_enabled=proxy_enabled,
proxy_api_url=proxy_api_url,
proxy_expire_minutes=proxy_expire_minutes,
):
return jsonify({"message": "代理配置已更新"})
return jsonify({"error": "更新失败"}), 400
@admin_api_bp.route("/proxy/test", methods=["POST"])
@admin_required
def test_proxy_api():
"""测试代理连接"""
data = request.json or {}
api_url = (data.get("api_url") or "").strip()
if not api_url:
return jsonify({"error": "请提供API地址"}), 400
if not is_safe_outbound_url(api_url):
return jsonify({"error": "API地址不可用或不安全"}), 400
try:
response = requests.get(api_url, timeout=10)
if response.status_code == 200:
ip_port = response.text.strip()
if ip_port and ":" in ip_port:
return jsonify({"success": True, "proxy": ip_port, "message": f"代理获取成功: {ip_port}"})
return jsonify({"success": False, "message": f"代理格式错误: {ip_port}"}), 400
return jsonify({"success": False, "message": f"HTTP错误: {response.status_code}"}), 400
except Exception as e:
return jsonify({"success": False, "message": f"连接失败: {str(e)}"}), 500
@admin_api_bp.route("/server/info", methods=["GET"])
@admin_required
def get_server_info_api():
"""获取服务器信息"""
import psutil
cpu_percent = _get_server_cpu_percent()
memory = psutil.virtual_memory()
memory_total = f"{memory.total / (1024**3):.1f}GB"
memory_used = f"{memory.used / (1024**3):.1f}GB"
memory_percent = memory.percent
disk = psutil.disk_usage("/")
disk_total = f"{disk.total / (1024**3):.1f}GB"
disk_used = f"{disk.used / (1024**3):.1f}GB"
disk_percent = disk.percent
boot_time = datetime.fromtimestamp(psutil.boot_time(), tz=BEIJING_TZ)
uptime_delta = get_beijing_now() - boot_time
days = uptime_delta.days
hours = uptime_delta.seconds // 3600
uptime = f"{days}{hours}小时"
return jsonify(
{
"cpu_percent": cpu_percent,
"memory_total": memory_total,
"memory_used": memory_used,
"memory_percent": memory_percent,
"disk_total": disk_total,
"disk_used": disk_used,
"disk_percent": disk_percent,
"uptime": uptime,
}
)