#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations from datetime import datetime import pytz import db_pool from db.utils import escape_html def create_bug_feedback(user_id, username, title, description, contact=""): """创建Bug反馈(带XSS防护)""" with db_pool.get_db() as conn: cursor = conn.cursor() cst_tz = pytz.timezone("Asia/Shanghai") cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S") safe_title = escape_html(title) if title else "" safe_description = escape_html(description) if description else "" safe_contact = escape_html(contact) if contact else "" safe_username = escape_html(username) if username else "" cursor.execute( """ INSERT INTO bug_feedbacks (user_id, username, title, description, contact, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (user_id, safe_username, safe_title, safe_description, safe_contact, cst_time), ) conn.commit() return cursor.lastrowid def get_bug_feedbacks(limit=100, offset=0, status_filter=None): """获取Bug反馈列表(管理员用)""" with db_pool.get_db() as conn: cursor = conn.cursor() sql = "SELECT * FROM bug_feedbacks WHERE 1=1" params = [] if status_filter: sql += " AND status = ?" params.append(status_filter) sql += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(sql, params) return [dict(row) for row in cursor.fetchall()] def get_user_feedbacks(user_id, limit=50): """获取用户自己的反馈列表""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM bug_feedbacks WHERE user_id = ? ORDER BY created_at DESC LIMIT ? """, (user_id, limit), ) return [dict(row) for row in cursor.fetchall()] def get_feedback_by_id(feedback_id): """根据ID获取反馈详情""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM bug_feedbacks WHERE id = ?", (feedback_id,)) row = cursor.fetchone() return dict(row) if row else None def reply_feedback(feedback_id, admin_reply): """管理员回复反馈(带XSS防护)""" with db_pool.get_db() as conn: cursor = conn.cursor() cst_tz = pytz.timezone("Asia/Shanghai") cst_time = datetime.now(cst_tz).strftime("%Y-%m-%d %H:%M:%S") safe_reply = escape_html(admin_reply) if admin_reply else "" cursor.execute( """ UPDATE bug_feedbacks SET admin_reply = ?, status = 'replied', replied_at = ? WHERE id = ? """, (safe_reply, cst_time, feedback_id), ) conn.commit() return cursor.rowcount > 0 def close_feedback(feedback_id): """关闭反馈""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE bug_feedbacks SET status = 'closed' WHERE id = ? """, (feedback_id,), ) conn.commit() return cursor.rowcount > 0 def delete_feedback(feedback_id): """删除反馈""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM bug_feedbacks WHERE id = ?", (feedback_id,)) conn.commit() return cursor.rowcount > 0 def get_feedback_stats(): """获取反馈统计""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT COUNT(*) as total, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END) as replied, SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed FROM bug_feedbacks """ ) row = cursor.fetchone() return dict(row) if row else {"total": 0, "pending": 0, "replied": 0, "closed": 0}