#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import threading import time from datetime import datetime import database import requests from app_logger import get_logger from app_security import is_safe_outbound_url from flask import jsonify, request from routes.admin_api import admin_api_bp from routes.decorators import admin_required from services.scheduler import run_scheduled_task from services.time_utils import BEIJING_TZ, get_beijing_now logger = get_logger("app") _server_cpu_percent_lock = threading.Lock() _server_cpu_percent_last: float | None = None _server_cpu_percent_last_ts = 0.0 def _get_server_cpu_percent() -> float: import psutil global _server_cpu_percent_last, _server_cpu_percent_last_ts now = time.time() with _server_cpu_percent_lock: if _server_cpu_percent_last is not None and (now - _server_cpu_percent_last_ts) < 0.5: return _server_cpu_percent_last try: if _server_cpu_percent_last is None: cpu_percent = float(psutil.cpu_percent(interval=0.1)) else: cpu_percent = float(psutil.cpu_percent(interval=None)) except Exception: cpu_percent = float(_server_cpu_percent_last or 0.0) if cpu_percent < 0: cpu_percent = 0.0 _server_cpu_percent_last = cpu_percent _server_cpu_percent_last_ts = now return cpu_percent @admin_api_bp.route("/kdocs/status", methods=["GET"]) @admin_required def get_kdocs_status_api(): """获取金山文档上传状态""" try: from services.kdocs_uploader import get_kdocs_uploader uploader = get_kdocs_uploader() status = uploader.get_status() live = str(request.args.get("live", "")).lower() in ("1", "true", "yes") # 重启后首次查询时(last_login_ok is None)自动做一次实时状态校验 should_live_check = live or status.get("last_login_ok") is None if should_live_check: live_status = uploader.refresh_login_status() if live_status.get("success"): logged_in = bool(live_status.get("logged_in")) status["logged_in"] = logged_in status["last_login_ok"] = logged_in status["login_required"] = not logged_in if live_status.get("error"): status["last_error"] = live_status.get("error") else: status["logged_in"] = True if status.get("last_login_ok") else False if status.get("last_login_ok") is False else None if status.get("last_login_ok") is True and status.get("last_error") == "操作超时": status["last_error"] = None return jsonify(status) except Exception as e: return jsonify({"error": f"获取状态失败: {e}"}), 500 @admin_api_bp.route("/kdocs/qr", methods=["POST"]) @admin_required def get_kdocs_qr_api(): """获取金山文档登录二维码""" try: from services.kdocs_uploader import get_kdocs_uploader uploader = get_kdocs_uploader() data = request.get_json(silent=True) or {} force = bool(data.get("force")) if not force: force = str(request.args.get("force", "")).lower() in ("1", "true", "yes") result = uploader.request_qr(force=force) if not result.get("success"): return jsonify({"error": result.get("error", "获取二维码失败")}), 400 return jsonify(result) except Exception as e: return jsonify({"error": f"获取二维码失败: {e}"}), 500 @admin_api_bp.route("/kdocs/clear-login", methods=["POST"]) @admin_required def clear_kdocs_login_api(): """清除金山文档登录态""" try: from services.kdocs_uploader import get_kdocs_uploader uploader = get_kdocs_uploader() result = uploader.clear_login() if not result.get("success"): return jsonify({"error": result.get("error", "清除失败")}), 400 return jsonify({"success": True}) except Exception as e: return jsonify({"error": f"清除失败: {e}"}), 500 @admin_api_bp.route("/schedule/execute", methods=["POST"]) @admin_required def execute_schedule_now(): """立即执行定时任务(无视定时时间和星期限制)""" try: threading.Thread(target=run_scheduled_task, args=(True,), daemon=True).start() logger.info("[立即执行定时任务] 管理员手动触发定时任务执行(跳过星期检查)") return jsonify({"message": "定时任务已开始执行,请查看任务列表获取进度"}) except Exception as e: logger.error(f"[立即执行定时任务] 启动失败: {str(e)}") return jsonify({"error": f"启动失败: {str(e)}"}), 500 @admin_api_bp.route("/proxy/config", methods=["GET"]) @admin_required def get_proxy_config_api(): """获取代理配置""" config_data = database.get_system_config() return jsonify( { "proxy_enabled": config_data.get("proxy_enabled", 0), "proxy_api_url": config_data.get("proxy_api_url", ""), "proxy_expire_minutes": config_data.get("proxy_expire_minutes", 3), } ) @admin_api_bp.route("/proxy/config", methods=["POST"]) @admin_required def update_proxy_config_api(): """更新代理配置""" data = request.json or {} proxy_enabled = data.get("proxy_enabled") proxy_api_url = (data.get("proxy_api_url", "") or "").strip() proxy_expire_minutes = data.get("proxy_expire_minutes") if proxy_enabled is not None and proxy_enabled not in [0, 1]: return jsonify({"error": "proxy_enabled必须是0或1"}), 400 if proxy_expire_minutes is not None: if not isinstance(proxy_expire_minutes, int) or proxy_expire_minutes < 1: return jsonify({"error": "代理有效期必须是大于0的整数"}), 400 if database.update_system_config( proxy_enabled=proxy_enabled, proxy_api_url=proxy_api_url, proxy_expire_minutes=proxy_expire_minutes, ): return jsonify({"message": "代理配置已更新"}) return jsonify({"error": "更新失败"}), 400 @admin_api_bp.route("/proxy/test", methods=["POST"]) @admin_required def test_proxy_api(): """测试代理连接""" data = request.json or {} api_url = (data.get("api_url") or "").strip() if not api_url: return jsonify({"error": "请提供API地址"}), 400 if not is_safe_outbound_url(api_url): return jsonify({"error": "API地址不可用或不安全"}), 400 try: response = requests.get(api_url, timeout=10) if response.status_code == 200: ip_port = response.text.strip() if ip_port and ":" in ip_port: return jsonify({"success": True, "proxy": ip_port, "message": f"代理获取成功: {ip_port}"}) return jsonify({"success": False, "message": f"代理格式错误: {ip_port}"}), 400 return jsonify({"success": False, "message": f"HTTP错误: {response.status_code}"}), 400 except Exception as e: return jsonify({"success": False, "message": f"连接失败: {str(e)}"}), 500 @admin_api_bp.route("/server/info", methods=["GET"]) @admin_required def get_server_info_api(): """获取服务器信息""" import psutil cpu_percent = _get_server_cpu_percent() memory = psutil.virtual_memory() memory_total = f"{memory.total / (1024**3):.1f}GB" memory_used = f"{memory.used / (1024**3):.1f}GB" memory_percent = memory.percent disk = psutil.disk_usage("/") disk_total = f"{disk.total / (1024**3):.1f}GB" disk_used = f"{disk.used / (1024**3):.1f}GB" disk_percent = disk.percent boot_time = datetime.fromtimestamp(psutil.boot_time(), tz=BEIJING_TZ) uptime_delta = get_beijing_now() - boot_time days = uptime_delta.days hours = uptime_delta.seconds // 3600 uptime = f"{days}天{hours}小时" return jsonify( { "cpu_percent": cpu_percent, "memory_total": memory_total, "memory_used": memory_used, "memory_percent": memory_percent, "disk_total": disk_total, "disk_used": disk_used, "disk_percent": disk_percent, "uptime": uptime, } )