Files
zcglxt/backend_new/app/api/v1/notifications.py

310 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
消息通知管理API路由
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import and_
from app.core.deps import get_db, get_current_user
from app.schemas.notification import (
NotificationCreate,
NotificationUpdate,
NotificationResponse,
NotificationQueryParams,
NotificationBatchCreate,
NotificationBatchUpdate,
NotificationStatistics,
NotificationSendFromTemplate
)
from app.services.notification_service import notification_service
router = APIRouter()
@router.get("/", response_model=Dict[str, Any])
async def get_notifications(
skip: int = Query(0, ge=0, description="跳过条数"),
limit: int = Query(20, ge=1, le=100, description="返回条数"),
notification_type: Optional[str] = Query(None, description="通知类型"),
priority: Optional[str] = Query(None, description="优先级"),
is_read: Optional[bool] = Query(None, description="是否已读"),
start_time: Optional[datetime] = Query(None, description="开始时间"),
end_time: Optional[datetime] = Query(None, description="结束时间"),
keyword: Optional[str] = Query(None, description="关键词"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取消息通知列表
- **skip**: 跳过条数
- **limit**: 返回条数最大100
- **notification_type**: 通知类型筛选
- **priority**: 优先级筛选
- **is_read**: 是否已读筛选
- **start_time**: 开始时间筛选
- **end_time**: 结束时间筛选
- **keyword**: 关键词搜索
注意:普通用户只能查看自己的通知,管理员可以查看所有通知
"""
recipient_id = None if current_user.is_superuser else current_user.id
return await notification_service.get_notifications(
db,
skip=skip,
limit=limit,
recipient_id=recipient_id,
notification_type=notification_type,
priority=priority,
is_read=is_read,
start_time=start_time,
end_time=end_time,
keyword=keyword
)
@router.get("/unread-count", response_model=Dict[str, Any])
async def get_unread_count(
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取当前用户未读通知数量
返回未读通知数量
"""
return await notification_service.get_unread_count(db, current_user.id)
@router.get("/statistics", response_model=Dict[str, Any])
async def get_notification_statistics(
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取当前用户通知统计信息
返回通知总数、未读数、已读数、高优先级数、紧急通知数、类型分布等统计信息
"""
return await notification_service.get_statistics(db, current_user.id)
@router.get("/{notification_id}", response_model=Dict[str, Any])
async def get_notification(
notification_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取消息通知详情
- **notification_id**: 通知ID
注意:只能查看自己的通知,管理员可以查看所有通知
"""
notification = await notification_service.get_notification(db, notification_id)
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="通知不存在"
)
# 检查权限
if not current_user.is_superuser and notification["recipient_id"] != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权查看此通知"
)
return notification
@router.post("/", response_model=Dict[str, Any], status_code=status.HTTP_201_CREATED)
async def create_notification(
obj_in: NotificationCreate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
创建消息通知
- **recipient_id**: 接收人ID
- **title**: 通知标题
- **content**: 通知内容
- **notification_type**: 通知类型
- **priority**: 优先级low/normal/high/urgent
- **related_entity_type**: 关联实体类型
- **related_entity_id**: 关联实体ID
- **action_url**: 操作链接
- **extra_data**: 额外数据
- **send_email**: 是否发送邮件
- **send_sms**: 是否发送短信
- **expire_at**: 过期时间
"""
try:
return await notification_service.create_notification(db, obj_in=obj_in)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post("/batch", response_model=Dict[str, Any])
async def batch_create_notifications(
batch_in: NotificationBatchCreate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
批量创建消息通知
- **recipient_ids**: 接收人ID列表
- **title**: 通知标题
- **content**: 通知内容
- **notification_type**: 通知类型
- **priority**: 优先级
- **action_url**: 操作链接
- **extra_data**: 额外数据
"""
return await notification_service.batch_create_notifications(db, batch_in=batch_in)
@router.post("/from-template", response_model=Dict[str, Any])
async def send_from_template(
template_in: NotificationSendFromTemplate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
从模板发送通知
- **template_code**: 模板编码
- **recipient_ids**: 接收人ID列表
- **variables**: 模板变量
- **related_entity_type**: 关联实体类型
- **related_entity_id**: 关联实体ID
- **action_url**: 操作链接
"""
try:
return await notification_service.send_from_template(db, template_in=template_in)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.put("/{notification_id}/read", response_model=Dict[str, Any])
async def mark_notification_as_read(
notification_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
标记通知为已读
- **notification_id**: 通知ID
"""
try:
notification = await notification_service.get_notification(db, notification_id)
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="通知不存在"
)
# 检查权限
if not current_user.is_superuser and notification["recipient_id"] != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权操作此通知"
)
return await notification_service.mark_as_read(db, notification_id)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.put("/read-all", response_model=Dict[str, Any])
async def mark_all_as_read(
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
标记所有未读通知为已读
将当前用户的所有未读通知标记为已读
"""
return await notification_service.mark_all_as_read(db, current_user.id)
@router.delete("/{notification_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_notification(
notification_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
删除消息通知
- **notification_id**: 通知ID
"""
notification = await notification_service.get_notification(db, notification_id)
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="通知不存在"
)
# 检查权限
if not current_user.is_superuser and notification["recipient_id"] != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权删除此通知"
)
await notification_service.delete_notification(db, notification_id)
return None
@router.post("/batch-delete", response_model=Dict[str, Any])
async def batch_delete_notifications(
notification_ids: List[int],
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
批量删除消息通知
- **notification_ids**: 通知ID列表
"""
# 检查权限
if not current_user.is_superuser:
# 普通用户只能删除自己的通知
notifications = await notification_service.get_notifications(
db,
skip=0,
limit=len(notification_ids) * 2
)
valid_ids = [
n["id"] for n in notifications["items"]
if n["recipient_id"] == current_user.id and n["id"] in notification_ids
]
if not valid_ids:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="没有有效的通知ID"
)
notification_ids = valid_ids
return await notification_service.batch_delete_notifications(db, notification_ids)