Files
zsglpt/routes/api_screenshots.py

309 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
from datetime import datetime
from typing import Iterator
import database
from app_config import get_config
from app_logger import get_logger
from app_security import is_safe_path
from flask import Blueprint, jsonify, request, send_from_directory
from flask_login import current_user, login_required
from PIL import Image, ImageOps
from services.client_log import log_to_client
from services.time_utils import BEIJING_TZ
config = get_config()
SCREENSHOTS_DIR = config.SCREENSHOTS_DIR
_IMAGE_EXTENSIONS = (".png", ".jpg", ".jpeg")
_THUMBNAIL_DIR = os.path.join(SCREENSHOTS_DIR, ".thumbs")
_THUMBNAIL_MAX_SIZE = (480, 270)
_THUMBNAIL_QUALITY = 80
try:
_RESAMPLE_FILTER = Image.Resampling.LANCZOS
except AttributeError: # Pillow<9 fallback
_RESAMPLE_FILTER = Image.LANCZOS
api_screenshots_bp = Blueprint("api_screenshots", __name__)
logger = get_logger("app")
def _get_user_prefix(user_id: int) -> str:
return f"u{int(user_id)}"
def _get_username(user_id: int) -> str:
user_info = database.get_user_by_id(user_id)
return str(user_info.get("username") or "") if user_info else ""
def _list_all_usernames() -> list[str]:
users = database.get_all_users()
result = []
for row in users:
username = str(row.get("username") or "").strip()
if username:
result.append(username)
return result
def _resolve_user_owned_prefix(
filename: str,
*,
user_id: int,
username: str,
all_usernames: list[str] | None = None,
) -> str | None:
lower_name = filename.lower()
if not lower_name.endswith(_IMAGE_EXTENSIONS):
return None
# 新版命名u{user_id}_...
id_prefix = _get_user_prefix(user_id)
if filename.startswith(id_prefix + "_"):
return id_prefix
# 兼容旧版命名:{username}_...
username = str(username or "").strip()
if not username:
return None
if all_usernames is None:
all_usernames = _list_all_usernames()
matched_usernames = [item for item in all_usernames if filename.startswith(item + "_")]
if not matched_usernames:
return None
# 取“最长匹配用户名”,避免 foo 越权读取 foo_bar 的截图。
max_len = max(len(item) for item in matched_usernames)
winners = [item for item in matched_usernames if len(item) == max_len]
if len(winners) != 1:
return None
if winners[0] != username:
return None
return winners[0]
def _iter_user_screenshot_entries(user_id: int, username: str, all_usernames: list[str]) -> Iterator[tuple[os.DirEntry, str]]:
if not os.path.exists(SCREENSHOTS_DIR):
return
with os.scandir(SCREENSHOTS_DIR) as entries:
for entry in entries:
if not entry.is_file():
continue
matched_prefix = _resolve_user_owned_prefix(
entry.name,
user_id=user_id,
username=username,
all_usernames=all_usernames,
)
if not matched_prefix:
continue
yield entry, matched_prefix
def _build_display_name(filename: str, owner_prefix: str) -> str:
prefix = f"{owner_prefix}_"
if filename.startswith(prefix):
return filename[len(prefix) :]
return filename
def _thumbnail_name(filename: str) -> str:
stem, _ = os.path.splitext(filename)
return f"{stem}.thumb.jpg"
def _thumbnail_path(filename: str) -> str:
return os.path.join(_THUMBNAIL_DIR, _thumbnail_name(filename))
def _ensure_thumbnail(source_path: str, thumb_path: str) -> bool:
if not os.path.exists(source_path):
return False
source_mtime = os.path.getmtime(source_path)
if os.path.exists(thumb_path) and os.path.getmtime(thumb_path) >= source_mtime:
return True
os.makedirs(_THUMBNAIL_DIR, exist_ok=True)
with Image.open(source_path) as image:
image = ImageOps.exif_transpose(image)
if image.mode != "RGB":
image = image.convert("RGB")
image.thumbnail(_THUMBNAIL_MAX_SIZE, _RESAMPLE_FILTER)
image.save(
thumb_path,
format="JPEG",
quality=_THUMBNAIL_QUALITY,
optimize=True,
progressive=True,
)
os.utime(thumb_path, (source_mtime, source_mtime))
return True
def _remove_thumbnail(filename: str) -> None:
thumb_path = _thumbnail_path(filename)
if os.path.exists(thumb_path):
os.remove(thumb_path)
def _parse_optional_pagination(default_limit: int = 24, *, max_limit: int = 100) -> tuple[int | None, int | None, bool]:
limit_raw = request.args.get("limit")
offset_raw = request.args.get("offset")
if (limit_raw is None) and (offset_raw is None):
return None, None, False
try:
limit = int(limit_raw if limit_raw is not None else default_limit)
except (ValueError, TypeError):
limit = default_limit
limit = max(1, min(limit, max_limit))
try:
offset = int(offset_raw if offset_raw is not None else 0)
except (ValueError, TypeError):
offset = 0
offset = max(0, offset)
return limit, offset, True
@api_screenshots_bp.route("/api/screenshots", methods=["GET"])
@login_required
def get_screenshots():
"""获取当前用户的截图列表"""
user_id = current_user.id
username = _get_username(user_id)
try:
screenshots = []
all_usernames = _list_all_usernames()
for entry, matched_prefix in _iter_user_screenshot_entries(user_id, username, all_usernames):
filename = entry.name
stat = entry.stat()
created_time = datetime.fromtimestamp(stat.st_mtime, tz=BEIJING_TZ)
screenshots.append(
{
"filename": filename,
"display_name": _build_display_name(filename, matched_prefix),
"size": stat.st_size,
"created": created_time.strftime("%Y-%m-%d %H:%M:%S"),
"_created_ts": stat.st_mtime,
}
)
screenshots.sort(key=lambda item: item.get("_created_ts", 0), reverse=True)
for item in screenshots:
item.pop("_created_ts", None)
limit, offset, paged = _parse_optional_pagination(default_limit=24, max_limit=100)
if paged:
total = len(screenshots)
items = screenshots[offset : offset + limit]
return jsonify({"items": items, "total": total, "limit": limit, "offset": offset})
return jsonify(screenshots)
except Exception as e:
logger.warning(f"[screenshots] 获取截图列表失败(user_id={user_id}): {e}")
return jsonify({"error": "获取截图列表失败"}), 500
@api_screenshots_bp.route("/screenshots/<filename>")
@login_required
def serve_screenshot(filename):
"""提供原图文件访问"""
user_id = current_user.id
username = _get_username(user_id)
if not _resolve_user_owned_prefix(filename, user_id=user_id, username=username):
return jsonify({"error": "无权访问"}), 403
if not is_safe_path(SCREENSHOTS_DIR, filename):
return jsonify({"error": "非法路径"}), 403
return send_from_directory(SCREENSHOTS_DIR, filename)
@api_screenshots_bp.route("/screenshots/thumb/<filename>")
@login_required
def serve_screenshot_thumbnail(filename):
"""提供缩略图访问(失败时自动回退原图)"""
user_id = current_user.id
username = _get_username(user_id)
if not _resolve_user_owned_prefix(filename, user_id=user_id, username=username):
return jsonify({"error": "无权访问"}), 403
if not is_safe_path(SCREENSHOTS_DIR, filename):
return jsonify({"error": "非法路径"}), 403
source_path = os.path.join(SCREENSHOTS_DIR, filename)
if not os.path.exists(source_path):
return jsonify({"error": "文件不存在"}), 404
thumb_path = _thumbnail_path(filename)
try:
if _ensure_thumbnail(source_path, thumb_path) and os.path.exists(thumb_path):
return send_from_directory(_THUMBNAIL_DIR, os.path.basename(thumb_path), max_age=86400, conditional=True)
except Exception:
pass
return send_from_directory(SCREENSHOTS_DIR, filename, max_age=3600, conditional=True)
@api_screenshots_bp.route("/api/screenshots/<filename>", methods=["DELETE"])
@login_required
def delete_screenshot(filename):
"""删除指定截图"""
user_id = current_user.id
username = _get_username(user_id)
if not _resolve_user_owned_prefix(filename, user_id=user_id, username=username):
return jsonify({"error": "无权删除"}), 403
if not is_safe_path(SCREENSHOTS_DIR, filename):
return jsonify({"error": "非法路径"}), 403
try:
filepath = os.path.join(SCREENSHOTS_DIR, filename)
if os.path.exists(filepath):
os.remove(filepath)
_remove_thumbnail(filename)
log_to_client(f"删除截图: {filename}", user_id)
return jsonify({"success": True})
return jsonify({"error": "文件不存在"}), 404
except Exception as e:
logger.warning(f"[screenshots] 删除截图失败(user_id={user_id}, filename={filename}): {e}")
return jsonify({"error": "删除截图失败"}), 500
@api_screenshots_bp.route("/api/screenshots/clear", methods=["POST"])
@login_required
def clear_all_screenshots():
"""清空当前用户的所有截图"""
user_id = current_user.id
username = _get_username(user_id)
try:
deleted_count = 0
all_usernames = _list_all_usernames()
for entry, _ in _iter_user_screenshot_entries(user_id, username, all_usernames):
os.remove(entry.path)
_remove_thumbnail(entry.name)
deleted_count += 1
log_to_client(f"清理了 {deleted_count} 个截图文件", user_id)
return jsonify({"success": True, "deleted": deleted_count})
except Exception as e:
logger.warning(f"[screenshots] 清空截图失败(user_id={user_id}): {e}")
return jsonify({"error": "清空截图失败"}), 500