#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import db_pool from db.utils import escape_html, get_cst_now_str def _normalize_limit(value, default: int, *, minimum: int = 1, maximum: int = 500) -> int: try: parsed = int(value) except Exception: parsed = default parsed = max(minimum, parsed) parsed = min(maximum, parsed) return parsed def _normalize_offset(value, default: int = 0) -> int: try: parsed = int(value) except Exception: parsed = default return max(0, parsed) def _safe_text(value) -> str: if value is None: return "" text = str(value) return escape_html(text) if text else "" def _build_feedback_filter_sql(status_filter=None) -> tuple[str, list]: where_clauses = ["1=1"] params = [] if status_filter: where_clauses.append("status = ?") params.append(status_filter) return " AND ".join(where_clauses), params def _normalize_feedback_stats_row(row) -> dict: row_dict = dict(row) if row else {} return { "total": int(row_dict.get("total") or 0), "pending": int(row_dict.get("pending") or 0), "replied": int(row_dict.get("replied") or 0), "closed": int(row_dict.get("closed") or 0), } def create_bug_feedback(user_id, username, title, description, contact=""): """创建Bug反馈(带XSS防护)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO bug_feedbacks (user_id, username, title, description, contact, created_at) VALUES (?, ?, ?, ?, ?, ?) """, ( user_id, _safe_text(username), _safe_text(title), _safe_text(description), _safe_text(contact), get_cst_now_str(), ), ) conn.commit() return cursor.lastrowid def get_bug_feedbacks(limit=100, offset=0, status_filter=None): """获取Bug反馈列表(管理员用)""" safe_limit = _normalize_limit(limit, 100, minimum=1, maximum=1000) safe_offset = _normalize_offset(offset, 0) with db_pool.get_db() as conn: cursor = conn.cursor() where_sql, params = _build_feedback_filter_sql(status_filter=status_filter) sql = f""" SELECT * FROM bug_feedbacks WHERE {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ? """ cursor.execute(sql, params + [safe_limit, safe_offset]) return [dict(row) for row in cursor.fetchall()] def get_user_feedbacks(user_id, limit=50): """获取用户自己的反馈列表""" safe_limit = _normalize_limit(limit, 50, minimum=1, maximum=1000) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM bug_feedbacks WHERE user_id = ? ORDER BY created_at DESC LIMIT ? """, (user_id, safe_limit), ) return [dict(row) for row in cursor.fetchall()] def get_feedback_by_id(feedback_id): """根据ID获取反馈详情""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM bug_feedbacks WHERE id = ?", (feedback_id,)) row = cursor.fetchone() return dict(row) if row else None def reply_feedback(feedback_id, admin_reply): """管理员回复反馈(带XSS防护)""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE bug_feedbacks SET admin_reply = ?, status = 'replied', replied_at = ? WHERE id = ? """, (_safe_text(admin_reply), get_cst_now_str(), feedback_id), ) conn.commit() return cursor.rowcount > 0 def close_feedback(feedback_id): """关闭反馈""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE bug_feedbacks SET status = 'closed' WHERE id = ? """, (feedback_id,), ) conn.commit() return cursor.rowcount > 0 def delete_feedback(feedback_id): """删除反馈""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM bug_feedbacks WHERE id = ?", (feedback_id,)) conn.commit() return cursor.rowcount > 0 def get_feedback_stats(): """获取反馈统计""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute( """ SELECT COUNT(*) as total, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END) as replied, SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed FROM bug_feedbacks """ ) return _normalize_feedback_stats_row(cursor.fetchone())