Files
xb/app/api_get_routes.py

200 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import mimetypes
from http import HTTPStatus
from pathlib import Path
from typing import Any
from urllib.parse import parse_qs
def handle_get_routes(handler: Any, parsed: Any, ctx: dict[str, Any]) -> bool:
load_config = ctx["load_config"]
resolve_template_path = ctx["resolve_template_path"]
resolve_output_dir = ctx["resolve_output_dir"]
resolve_history_path = ctx["resolve_history_path"]
today_log_path = ctx["today_log_path"]
list_issue_marks = ctx["list_issue_marks"]
count_log_lines = ctx["count_log_lines"]
load_history = ctx["load_history"]
build_history_view_items = ctx["build_history_view_items"]
get_generation_progress = ctx["get_generation_progress"]
resolve_image_delivery_options = ctx["resolve_image_delivery_options"]
ISSUE_MARKS_PATH = ctx["ISSUE_MARKS_PATH"]
_HISTORY_LOCK = ctx["_HISTORY_LOCK"]
_LOG_LOCK = ctx["_LOG_LOCK"]
_DOWNLOAD_LOCK = ctx["_DOWNLOAD_LOCK"]
DOWNLOAD_CACHE = ctx["DOWNLOAD_CACHE"]
if parsed.path == "/api/config":
try:
config = load_config()
template = resolve_template_path(config)
output = resolve_output_dir(config)
except Exception as exc:
handler._send_json({"ok": False, "error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR)
return True
history_path = resolve_history_path(config)
log_path = today_log_path()
with _HISTORY_LOCK:
history = load_history(history_path)
_, active_issue_count = list_issue_marks(status="active", limit=1)
handler._send_json(
{
"ok": True,
"config": {
"template_file": str(template),
"output_dir": str(output),
"trigger_keyword": config.get("relay_handling", {}).get("trigger_keyword", "#接龙"),
"allowed_branches": config.get("branches", {}).get("allowed", []),
"type_matching": config.get("type_matching", {}),
"status_fallback": config.get("status_extraction", {}).get("fallback", "成功营销"),
"image_delivery": resolve_image_delivery_options(config),
},
"resolved_history_file": str(history_path),
"history_count": len(history),
"review_log_file": str(log_path),
"review_log_count": count_log_lines(log_path),
"issue_marks_file": str(ISSUE_MARKS_PATH),
"active_issue_count": active_issue_count,
"runtime_note": (
"当前版本支持多条解析、保险年限选择、生成PPT并按页导出PNG、逐图下载。"
),
}
)
return True
if parsed.path == "/api/log/today":
path = today_log_path()
logs: list[dict[str, Any]] = []
if path.exists():
with _LOG_LOCK:
with path.open("r", encoding="utf-8") as f:
for line in f:
s = line.strip()
if not s:
continue
try:
obj = json.loads(s)
except Exception:
continue
if isinstance(obj, dict):
logs.append(obj)
handler._send_json({"ok": True, "log_file": str(path), "count": len(logs), "logs": logs[-500:]})
return True
if parsed.path == "/api/issues":
try:
params = parse_qs(parsed.query)
status = str(params.get("status", ["active"])[0]).strip().lower()
try:
limit = int(params.get("limit", ["500"])[0])
except Exception:
limit = 500
limit = max(1, min(2000, limit))
items, total = list_issue_marks(status=status, limit=limit)
except Exception as exc:
handler._send_json({"ok": False, "error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR)
return True
handler._send_json(
{
"ok": True,
"status": status if status in {"active", "resolved", "all"} else "active",
"count": total,
"limit": limit,
"items": items,
}
)
return True
if parsed.path.startswith("/api/progress/"):
token = parsed.path.split("/api/progress/", 1)[1].strip()
if not token:
handler._send_json({"ok": False, "error": "progress token missing"}, HTTPStatus.BAD_REQUEST)
return True
item = get_generation_progress(token)
if item is None:
handler._send_json({"ok": False, "error": "progress token not found"}, HTTPStatus.NOT_FOUND)
return True
handler._send_json({"ok": True, "progress": item})
return True
if parsed.path == "/api/history":
try:
config = load_config()
history_path = resolve_history_path(config)
with _HISTORY_LOCK:
history = load_history(history_path)
except Exception as exc:
handler._send_json({"ok": False, "error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR)
return True
handler._send_json({"ok": True, "count": len(history), "history": history})
return True
if parsed.path == "/api/history/view":
try:
config = load_config()
history_path = resolve_history_path(config)
params = parse_qs(parsed.query)
try:
limit = int(params.get("limit", ["300"])[0])
except Exception:
limit = 300
limit = max(1, min(2000, limit))
with _HISTORY_LOCK:
history = load_history(history_path)
items = build_history_view_items(history, config, limit=limit)
except Exception as exc:
handler._send_json({"ok": False, "error": str(exc)}, HTTPStatus.INTERNAL_SERVER_ERROR)
return True
handler._send_json({"ok": True, "count": len(history), "limit": limit, "items": items})
return True
if parsed.path.startswith("/api/download/"):
token = parsed.path.split("/api/download/", 1)[1].strip()
if not token:
handler._send_json({"ok": False, "error": "download token missing"}, HTTPStatus.BAD_REQUEST)
return True
with _DOWNLOAD_LOCK:
meta = DOWNLOAD_CACHE.get(token)
if not meta:
handler._send_json({"ok": False, "error": "download expired or invalid"}, HTTPStatus.NOT_FOUND)
return True
file_path = Path(str(meta.get("file_path", "")))
if not file_path.exists():
handler._send_json({"ok": False, "error": "file missing"}, HTTPStatus.NOT_FOUND)
return True
content_type = str(meta.get("content_type", "")).strip()
if not content_type:
content_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
filename = str(meta.get("filename", file_path.name))
params = parse_qs(parsed.query)
inline = str(params.get("inline", ["0"])[0]).lower() in {"1", "true", "yes"}
rate_opts = {"max_kbps": 300, "chunk_size": 16 * 1024}
try:
config = load_config()
cfg_opts = resolve_image_delivery_options(config)
rate_opts["max_kbps"] = int(cfg_opts.get("max_kbps", 300))
rate_opts["chunk_size"] = int(cfg_opts.get("chunk_size", 16 * 1024))
except Exception:
pass
handler._send_file(
file_path,
content_type,
filename=None if inline else filename,
max_kbps=rate_opts["max_kbps"],
chunk_size=rate_opts["chunk_size"],
)
return True
return False