Files
zsglpt/db/announcements.py

162 lines
5.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import db_pool
from db.utils import get_cst_now_str
def _normalize_limit(value, default: int, *, minimum: int = 1, maximum: int = 500) -> int:
try:
parsed = int(value)
except Exception:
parsed = default
parsed = max(minimum, parsed)
parsed = min(maximum, parsed)
return parsed
def _normalize_offset(value, default: int = 0) -> int:
try:
parsed = int(value)
except Exception:
parsed = default
return max(0, parsed)
def _normalize_announcement_payload(title, content, image_url):
normalized_title = str(title or "").strip()
normalized_content = str(content or "").strip()
normalized_image = str(image_url or "").strip() or None
return normalized_title, normalized_content, normalized_image
def _deactivate_all_active_announcements(cursor, cst_time: str) -> None:
cursor.execute("UPDATE announcements SET is_active = 0, updated_at = ? WHERE is_active = 1", (cst_time,))
def create_announcement(title, content, image_url=None, is_active=True):
"""创建公告(默认启用;启用时会自动停用其他公告)"""
title, content, image_url = _normalize_announcement_payload(title, content, image_url)
if not title or not content:
return None
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
if is_active:
_deactivate_all_active_announcements(cursor, cst_time)
cursor.execute(
"""
INSERT INTO announcements (title, content, image_url, is_active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(title, content, image_url, 1 if is_active else 0, cst_time, cst_time),
)
conn.commit()
return cursor.lastrowid
def get_announcement_by_id(announcement_id):
"""根据ID获取公告"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM announcements WHERE id = ?", (announcement_id,))
row = cursor.fetchone()
return dict(row) if row else None
def get_announcements(limit=50, offset=0):
"""获取公告列表(管理员用)"""
safe_limit = _normalize_limit(limit, 50)
safe_offset = _normalize_offset(offset, 0)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM announcements
ORDER BY created_at DESC, id DESC
LIMIT ? OFFSET ?
""",
(safe_limit, safe_offset),
)
return [dict(row) for row in cursor.fetchall()]
def set_announcement_active(announcement_id, is_active):
"""启用/停用公告;启用时会自动停用其他公告"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cst_time = get_cst_now_str()
if is_active:
_deactivate_all_active_announcements(cursor, cst_time)
cursor.execute(
"""
UPDATE announcements
SET is_active = 1, updated_at = ?
WHERE id = ?
""",
(cst_time, announcement_id),
)
else:
cursor.execute(
"""
UPDATE announcements
SET is_active = 0, updated_at = ?
WHERE id = ?
""",
(cst_time, announcement_id),
)
conn.commit()
return cursor.rowcount > 0
def delete_announcement(announcement_id):
"""删除公告(同时清理用户关闭记录)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM announcement_dismissals WHERE announcement_id = ?", (announcement_id,))
cursor.execute("DELETE FROM announcements WHERE id = ?", (announcement_id,))
conn.commit()
return cursor.rowcount > 0
def get_active_announcement_for_user(user_id):
"""获取当前用户应展示的启用公告(已永久关闭的不再返回)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT a.*
FROM announcements a
LEFT JOIN announcement_dismissals d
ON d.announcement_id = a.id AND d.user_id = ?
WHERE a.is_active = 1 AND d.announcement_id IS NULL
ORDER BY a.created_at DESC, a.id DESC
LIMIT 1
""",
(user_id,),
)
row = cursor.fetchone()
return dict(row) if row else None
def dismiss_announcement_for_user(user_id, announcement_id):
"""用户永久关闭某条公告(幂等)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR IGNORE INTO announcement_dismissals (user_id, announcement_id, dismissed_at)
VALUES (?, ?, ?)
""",
(user_id, announcement_id, get_cst_now_str()),
)
conn.commit()
return cursor.rowcount >= 0