#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import db_pool from db.utils import get_cst_now_str def _normalize_limit(value, default: int, *, minimum: int = 1, maximum: int = 500) -> int: try: parsed = int(value) except Exception: parsed = default parsed = max(minimum, parsed) parsed = min(maximum, parsed) return parsed def _normalize_offset(value, default: int = 0) -> int: try: parsed = int(value) except Exception: parsed = default return max(0, parsed) def _normalize_announcement_payload(title, content, image_url): normalized_title = str(title or "").strip() normalized_content = str(content or "").strip() normalized_image = str(image_url or "").strip() or None return normalized_title, normalized_content, normalized_image def _deactivate_all_active_announcements(cursor, cst_time: str) -> None: cursor.execute("UPDATE announcements SET is_active = 0, updated_at = ? WHERE is_active = 1", (cst_time,)) def create_announcement(title, content, image_url=None, is_active=True): """创建公告(默认启用;启用时会自动停用其他公告)""" title, content, image_url = _normalize_announcement_payload(title, content, image_url) if not title or not content: return None with db_pool.get_db() as conn: cursor = conn.cursor() cst_time = get_cst_now_str() if is_active: _deactivate_all_active_announcements(cursor, cst_time) cursor.execute( """ INSERT INTO announcements (title, content, image_url, is_active, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (title, content, image_url, 1 if is_active else 0, cst_time, cst_time), ) conn.commit() return cursor.lastrowid def get_announcement_by_id(announcement_id): """根据ID获取公告""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM announcements WHERE id = ?", (announcement_id,)) row = cursor.fetchone() return dict(row) if row else None def get_announcements(limit=50, offset=0): """获取公告列表(管理员用)""" safe_limit = _normalize_limit(limit, 50) safe_offset = _normalize_offset(offset, 0) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM announcements ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ? """, (safe_limit, safe_offset), ) return [dict(row) for row in cursor.fetchall()] def set_announcement_active(announcement_id, is_active): """启用/停用公告;启用时会自动停用其他公告""" with db_pool.get_db() as conn: cursor = conn.cursor() cst_time = get_cst_now_str() if is_active: _deactivate_all_active_announcements(cursor, cst_time) cursor.execute( """ UPDATE announcements SET is_active = 1, updated_at = ? WHERE id = ? """, (cst_time, announcement_id), ) else: cursor.execute( """ UPDATE announcements SET is_active = 0, updated_at = ? WHERE id = ? """, (cst_time, announcement_id), ) conn.commit() return cursor.rowcount > 0 def delete_announcement(announcement_id): """删除公告(同时清理用户关闭记录)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM announcement_dismissals WHERE announcement_id = ?", (announcement_id,)) cursor.execute("DELETE FROM announcements WHERE id = ?", (announcement_id,)) conn.commit() return cursor.rowcount > 0 def get_active_announcement_for_user(user_id): """获取当前用户应展示的启用公告(已永久关闭的不再返回)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT a.* FROM announcements a LEFT JOIN announcement_dismissals d ON d.announcement_id = a.id AND d.user_id = ? WHERE a.is_active = 1 AND d.announcement_id IS NULL ORDER BY a.created_at DESC, a.id DESC LIMIT 1 """, (user_id,), ) row = cursor.fetchone() return dict(row) if row else None def dismiss_announcement_for_user(user_id, announcement_id): """用户永久关闭某条公告(幂等)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ INSERT OR IGNORE INTO announcement_dismissals (user_id, announcement_id, dismissed_at) VALUES (?, ?, ?) """, (user_id, announcement_id, get_cst_now_str()), ) conn.commit() return cursor.rowcount >= 0