#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import json import os import time from pathlib import Path from typing import Tuple from app_config import get_config config = get_config() def _resolve_path(path: Path) -> Path: try: return path.expanduser().resolve() except Exception: return path def get_data_dir() -> Path: """返回 data 目录(默认由 DB_FILE 的父目录推导)。""" db_file = Path(str(config.DB_FILE or "data/app_data.db")) return _resolve_path(db_file).parent def get_update_dir() -> Path: return get_data_dir() / "update" def get_update_status_path() -> Path: return get_update_dir() / "status.json" def get_update_request_path() -> Path: return get_update_dir() / "request.json" def get_update_result_path() -> Path: return get_update_dir() / "result.json" def get_update_jobs_dir() -> Path: return get_update_dir() / "jobs" def get_update_job_log_path(job_id: str) -> Path: return get_update_jobs_dir() / f"{job_id}.log" def ensure_update_dirs() -> None: get_update_jobs_dir().mkdir(parents=True, exist_ok=True) def load_json_file(path: Path) -> Tuple[dict, str | None]: try: with open(path, "r", encoding="utf-8") as f: return dict(json.load(f) or {}), None except FileNotFoundError: return {}, None except Exception as e: return {}, f"{type(e).__name__}: {e}" def write_json_atomic(path: Path, data: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(f"{path.suffix}.tmp.{os.getpid()}.{int(time.time() * 1000)}") with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True) f.flush() os.fsync(f.fileno()) os.replace(tmp, path) def tail_text_file(path: Path, *, max_bytes: int = 200_000) -> Tuple[str, bool]: """读取文件末尾 max_bytes,返回(text, truncated)。""" max_bytes = max(1, int(max_bytes)) try: with open(path, "rb") as f: f.seek(0, os.SEEK_END) size = f.tell() start = max(0, size - max_bytes) f.seek(start, os.SEEK_SET) data = f.read() text = data.decode("utf-8", errors="replace") truncated = start > 0 if truncated: parts = text.splitlines(True) if len(parts) > 1: text = "".join(parts[1:]) return text, truncated except FileNotFoundError: return "", False except Exception as e: return f"[tail_error] {type(e).__name__}: {e}\n", False def sanitize_job_id(value: object) -> str | None: import re text = str(value or "").strip() if not text: return None if not re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9_.-]{0,63}", text): return None return text