179 lines
5.0 KiB
Python
179 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from __future__ import annotations
|
||
|
||
import db_pool
|
||
from db.utils import escape_html, get_cst_now_str
|
||
|
||
|
||
def _normalize_limit(value, default: int, *, minimum: int = 1, maximum: int = 500) -> int:
|
||
try:
|
||
parsed = int(value)
|
||
except Exception:
|
||
parsed = default
|
||
parsed = max(minimum, parsed)
|
||
parsed = min(maximum, parsed)
|
||
return parsed
|
||
|
||
|
||
def _normalize_offset(value, default: int = 0) -> int:
|
||
try:
|
||
parsed = int(value)
|
||
except Exception:
|
||
parsed = default
|
||
return max(0, parsed)
|
||
|
||
|
||
def _safe_text(value) -> str:
|
||
if value is None:
|
||
return ""
|
||
text = str(value)
|
||
return escape_html(text) if text else ""
|
||
|
||
|
||
def _build_feedback_filter_sql(status_filter=None) -> tuple[str, list]:
|
||
where_clauses = ["1=1"]
|
||
params = []
|
||
|
||
if status_filter:
|
||
where_clauses.append("status = ?")
|
||
params.append(status_filter)
|
||
|
||
return " AND ".join(where_clauses), params
|
||
|
||
|
||
def _normalize_feedback_stats_row(row) -> dict:
|
||
row_dict = dict(row) if row else {}
|
||
return {
|
||
"total": int(row_dict.get("total") or 0),
|
||
"pending": int(row_dict.get("pending") or 0),
|
||
"replied": int(row_dict.get("replied") or 0),
|
||
"closed": int(row_dict.get("closed") or 0),
|
||
}
|
||
|
||
|
||
def create_bug_feedback(user_id, username, title, description, contact=""):
|
||
"""创建Bug反馈(带XSS防护)"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
INSERT INTO bug_feedbacks (user_id, username, title, description, contact, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
user_id,
|
||
_safe_text(username),
|
||
_safe_text(title),
|
||
_safe_text(description),
|
||
_safe_text(contact),
|
||
get_cst_now_str(),
|
||
),
|
||
)
|
||
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
|
||
|
||
def get_bug_feedbacks(limit=100, offset=0, status_filter=None):
|
||
"""获取Bug反馈列表(管理员用)"""
|
||
safe_limit = _normalize_limit(limit, 100, minimum=1, maximum=1000)
|
||
safe_offset = _normalize_offset(offset, 0)
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
where_sql, params = _build_feedback_filter_sql(status_filter=status_filter)
|
||
sql = f"""
|
||
SELECT * FROM bug_feedbacks
|
||
WHERE {where_sql}
|
||
ORDER BY created_at DESC
|
||
LIMIT ? OFFSET ?
|
||
"""
|
||
cursor.execute(sql, params + [safe_limit, safe_offset])
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
|
||
def get_user_feedbacks(user_id, limit=50):
|
||
"""获取用户自己的反馈列表"""
|
||
safe_limit = _normalize_limit(limit, 50, minimum=1, maximum=1000)
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT * FROM bug_feedbacks
|
||
WHERE user_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(user_id, safe_limit),
|
||
)
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
|
||
def get_feedback_by_id(feedback_id):
|
||
"""根据ID获取反馈详情"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM bug_feedbacks WHERE id = ?", (feedback_id,))
|
||
row = cursor.fetchone()
|
||
return dict(row) if row else None
|
||
|
||
|
||
def reply_feedback(feedback_id, admin_reply):
|
||
"""管理员回复反馈(带XSS防护)"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
UPDATE bug_feedbacks
|
||
SET admin_reply = ?, status = 'replied', replied_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(_safe_text(admin_reply), get_cst_now_str(), feedback_id),
|
||
)
|
||
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def close_feedback(feedback_id):
|
||
"""关闭反馈"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
UPDATE bug_feedbacks
|
||
SET status = 'closed'
|
||
WHERE id = ?
|
||
""",
|
||
(feedback_id,),
|
||
)
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def delete_feedback(feedback_id):
|
||
"""删除反馈"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM bug_feedbacks WHERE id = ?", (feedback_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def get_feedback_stats():
|
||
"""获取反馈统计"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"""
|
||
SELECT
|
||
COUNT(*) as total,
|
||
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
|
||
SUM(CASE WHEN status = 'replied' THEN 1 ELSE 0 END) as replied,
|
||
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed
|
||
FROM bug_feedbacks
|
||
"""
|
||
)
|
||
return _normalize_feedback_stats_row(cursor.fetchone())
|