Files
zcglxt/backend/app/services/notification_service.py

447 lines
13 KiB
Python

"""
消息通知服务层
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.crud.notification import notification_crud
from app.models.notification import NotificationTemplate
from app.models.user import User
from app.schemas.notification import (
NotificationCreate,
NotificationBatchCreate,
NotificationSendFromTemplate
)
import json
class NotificationService:
"""消息通知服务类"""
async def get_notification(self, db: AsyncSession, notification_id: int) -> Optional[Dict[str, Any]]:
"""
获取消息通知详情
Args:
db: 数据库会话
notification_id: 通知ID
Returns:
通知信息
"""
notification = await notification_crud.get(db, notification_id)
if not notification:
return None
return {
"id": notification.id,
"recipient_id": notification.recipient_id,
"recipient_name": notification.recipient_name,
"title": notification.title,
"content": notification.content,
"notification_type": notification.notification_type,
"priority": notification.priority,
"is_read": notification.is_read,
"read_at": notification.read_at,
"related_entity_type": notification.related_entity_type,
"related_entity_id": notification.related_entity_id,
"action_url": notification.action_url,
"extra_data": notification.extra_data,
"sent_via_email": notification.sent_via_email,
"sent_via_sms": notification.sent_via_sms,
"created_at": notification.created_at,
"expire_at": notification.expire_at,
"type": notification.notification_type,
"link": notification.action_url,
"extra": notification.extra_data,
}
async def get_notifications(
self,
db: AsyncSession,
*,
skip: int = 0,
limit: int = 20,
recipient_id: Optional[int] = None,
notification_type: Optional[str] = None,
priority: Optional[str] = None,
is_read: Optional[bool] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
keyword: Optional[str] = None
) -> Dict[str, Any]:
"""
获取消息通知列表
Args:
db: 数据库会话
skip: 跳过条数
limit: 返回条数
recipient_id: 接收人ID
notification_type: 通知类型
priority: 优先级
is_read: 是否已读
start_time: 开始时间
end_time: 结束时间
keyword: 关键词
Returns:
通知列表和总数
"""
items, total = await notification_crud.get_multi(
db,
skip=skip,
limit=limit,
recipient_id=recipient_id,
notification_type=notification_type,
priority=priority,
is_read=is_read,
start_time=start_time,
end_time=end_time,
keyword=keyword
)
return {
"items": [
{
"id": item.id,
"recipient_id": item.recipient_id,
"recipient_name": item.recipient_name,
"title": item.title,
"content": item.content,
"notification_type": item.notification_type,
"priority": item.priority,
"is_read": item.is_read,
"read_at": item.read_at,
"action_url": item.action_url,
"created_at": item.created_at,
"type": item.notification_type,
"link": item.action_url,
"extra": item.extra_data,
}
for item in items
],
"total": total
}
async def create_notification(
self,
db: AsyncSession,
obj_in: NotificationCreate
) -> Dict[str, Any]:
"""
创建消息通知
Args:
db: 数据库会话
obj_in: 创建数据
Returns:
创建的通知信息
"""
# 获取接收人信息
user_result = await db.execute(
select(User).where(User.id == obj_in.recipient_id)
)
user = user_result.scalar_one_or_none()
if not user:
raise ValueError("接收人不存在")
# 转换为字典
obj_in_data = obj_in.model_dump()
obj_in_data["recipient_name"] = user.real_name
# 处理复杂类型
if obj_in_data.get("extra_data"):
obj_in_data["extra_data"] = json.loads(obj_in.extra_data.model_dump_json()) if isinstance(obj_in.extra_data, dict) else obj_in.extra_data
# 设置邮件和短信发送标记
obj_in_data["sent_via_email"] = obj_in_data.pop("send_email", False)
obj_in_data["sent_via_sms"] = obj_in_data.pop("send_sms", False)
notification = await notification_crud.create(db, obj_in=obj_in_data)
# TODO: 发送邮件和短信
# if notification.sent_via_email:
# await self._send_email(notification)
# if notification.sent_via_sms:
# await self._send_sms(notification)
return {
"id": notification.id,
"recipient_id": notification.recipient_id,
"title": notification.title,
}
async def batch_create_notifications(
self,
db: AsyncSession,
batch_in: NotificationBatchCreate
) -> Dict[str, Any]:
"""
批量创建消息通知
Args:
db: 数据库会话
batch_in: 批量创建数据
Returns:
创建结果
"""
# 获取接收人信息
user_results = await db.execute(
select(User).where(User.id.in_(batch_in.recipient_ids))
)
users = {user.id: user.real_name for user in user_results.scalars()}
# 准备通知数据
notification_data = {
"title": batch_in.title,
"content": batch_in.content,
"notification_type": batch_in.notification_type.value,
"priority": batch_in.priority.value,
"action_url": batch_in.action_url,
"extra_data": json.loads(batch_in.extra_data.model_dump_json()) if batch_in.extra_data else {},
}
# 批量创建
notifications = await notification_crud.batch_create(
db,
recipient_ids=batch_in.recipient_ids,
notification_data=notification_data
)
# 更新接收人姓名
for notification in notifications:
notification.recipient_name = users.get(notification.recipient_id, "")
await db.flush()
return {
"count": len(notifications),
"notification_ids": [n.id for n in notifications]
}
async def mark_as_read(
self,
db: AsyncSession,
notification_id: int
) -> Dict[str, Any]:
"""
标记为已读
Args:
db: 数据库会话
notification_id: 通知ID
Returns:
更新结果
"""
notification = await notification_crud.mark_as_read(
db,
notification_id=notification_id,
read_at=datetime.utcnow()
)
if not notification:
raise ValueError("通知不存在")
return {
"id": notification.id,
"is_read": notification.is_read,
"read_at": notification.read_at
}
async def mark_all_as_read(
self,
db: AsyncSession,
recipient_id: int
) -> Dict[str, Any]:
"""
标记所有未读为已读
Args:
db: 数据库会话
recipient_id: 接收人ID
Returns:
更新结果
"""
count = await notification_crud.mark_all_as_read(
db,
recipient_id=recipient_id,
read_at=datetime.utcnow()
)
return {
"count": count,
"message": f"已标记 {count} 条通知为已读"
}
async def batch_mark_as_read(
self,
db: AsyncSession,
notification_ids: List[int],
recipient_id: Optional[int] = None
) -> Dict[str, Any]:
"""
批量标记为已读
"""
count = await notification_crud.batch_mark_as_read(
db,
notification_ids=notification_ids,
recipient_id=recipient_id
)
return {
"count": count,
"message": f"已标记 {count} 条通知为已读"
}
async def batch_mark_as_unread(
self,
db: AsyncSession,
notification_ids: List[int],
recipient_id: Optional[int] = None
) -> Dict[str, Any]:
"""
批量标记为未读
"""
count = await notification_crud.batch_mark_as_unread(
db,
notification_ids=notification_ids,
recipient_id=recipient_id
)
return {
"count": count,
"message": f"已标记 {count} 条通知为未读"
}
async def delete_notification(self, db: AsyncSession, notification_id: int) -> None:
"""
删除消息通知
Args:
db: 数据库会话
notification_id: 通知ID
"""
await notification_crud.delete(db, notification_id=notification_id)
async def batch_delete_notifications(
self,
db: AsyncSession,
notification_ids: List[int]
) -> Dict[str, Any]:
"""
批量删除通知
Args:
db: 数据库会话
notification_ids: 通知ID列表
Returns:
删除结果
"""
count = await notification_crud.batch_delete(db, notification_ids=notification_ids)
return {
"count": count,
"message": f"已删除 {count} 条通知"
}
async def get_unread_count(self, db: AsyncSession, recipient_id: int) -> Dict[str, Any]:
"""
获取未读通知数量
Args:
db: 数据库会话
recipient_id: 接收人ID
Returns:
未读数量
"""
count = await notification_crud.get_unread_count(db, recipient_id)
return {"unread_count": count}
async def get_statistics(self, db: AsyncSession, recipient_id: int) -> Dict[str, Any]:
"""
获取通知统计信息
Args:
db: 数据库会话
recipient_id: 接收人ID
Returns:
统计信息
"""
return await notification_crud.get_statistics(db, recipient_id)
async def send_from_template(
self,
db: AsyncSession,
template_in: NotificationSendFromTemplate
) -> Dict[str, Any]:
"""
从模板发送通知
Args:
db: 数据库会话
template_in: 模板发送数据
Returns:
发送结果
"""
# 获取模板
result = await db.execute(
select(NotificationTemplate).where(
and_(
NotificationTemplate.template_code == template_in.template_code,
NotificationTemplate.is_active == True
)
)
)
template = result.scalar_one_or_none()
if not template:
raise ValueError(f"通知模板 {template_in.template_code} 不存在或未启用")
# 渲染标题和内容
title = self._render_template(template.title_template, template_in.variables)
content = self._render_template(template.content_template, template_in.variables)
# 创建批量通知数据
batch_data = NotificationBatchCreate(
recipient_ids=template_in.recipient_ids,
title=title,
content=content,
notification_type=template.notification_type,
priority=template.priority,
action_url=template_in.action_url,
extra_data={
"template_code": template.template_code,
"variables": template_in.variables
}
)
return await self.batch_create_notifications(db, batch_data)
def _render_template(self, template: str, variables: Dict[str, Any]) -> str:
"""
渲染模板
Args:
template: 模板字符串
variables: 变量字典
Returns:
渲染后的字符串
"""
try:
return template.format(**variables)
except KeyError as e:
raise ValueError(f"模板变量缺失: {e}")
# 创建全局实例
notification_service = NotificationService()