添加自动更新功能

This commit is contained in:
2025-12-15 14:34:08 +08:00
parent 809c735498
commit 0d1397debe
26 changed files with 1021 additions and 52 deletions

109
services/update_files.py Normal file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Tuple
from app_config import get_config
config = get_config()
def _resolve_path(path: Path) -> Path:
try:
return path.expanduser().resolve()
except Exception:
return path
def get_data_dir() -> Path:
"""返回 data 目录(默认由 DB_FILE 的父目录推导)。"""
db_file = Path(str(config.DB_FILE or "data/app_data.db"))
return _resolve_path(db_file).parent
def get_update_dir() -> Path:
return get_data_dir() / "update"
def get_update_status_path() -> Path:
return get_update_dir() / "status.json"
def get_update_request_path() -> Path:
return get_update_dir() / "request.json"
def get_update_result_path() -> Path:
return get_update_dir() / "result.json"
def get_update_jobs_dir() -> Path:
return get_update_dir() / "jobs"
def get_update_job_log_path(job_id: str) -> Path:
return get_update_jobs_dir() / f"{job_id}.log"
def ensure_update_dirs() -> None:
get_update_jobs_dir().mkdir(parents=True, exist_ok=True)
def load_json_file(path: Path) -> Tuple[dict, str | None]:
try:
with open(path, "r", encoding="utf-8") as f:
return dict(json.load(f) or {}), None
except FileNotFoundError:
return {}, None
except Exception as e:
return {}, f"{type(e).__name__}: {e}"
def write_json_atomic(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(f"{path.suffix}.tmp.{os.getpid()}.{int(time.time() * 1000)}")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
def tail_text_file(path: Path, *, max_bytes: int = 200_000) -> Tuple[str, bool]:
"""读取文件末尾 max_bytes返回(text, truncated)。"""
max_bytes = max(1, int(max_bytes))
try:
with open(path, "rb") as f:
f.seek(0, os.SEEK_END)
size = f.tell()
start = max(0, size - max_bytes)
f.seek(start, os.SEEK_SET)
data = f.read()
text = data.decode("utf-8", errors="replace")
truncated = start > 0
if truncated:
parts = text.splitlines(True)
if len(parts) > 1:
text = "".join(parts[1:])
return text, truncated
except FileNotFoundError:
return "", False
except Exception as e:
return f"[tail_error] {type(e).__name__}: {e}\n", False
def sanitize_job_id(value: object) -> str | None:
import re
text = str(value or "").strip()
if not text:
return None
if not re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9_.-]{0,63}", text):
return None
return text