Files
zcglxt/backend_new/app/crud/operation_log.py

312 lines
8.9 KiB
Python

"""
操作日志CRUD操作
"""
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy import select, and_, or_, func, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.operation_log import OperationLog
class OperationLogCRUD:
"""操作日志CRUD类"""
async def get(self, db: AsyncSession, log_id: int) -> Optional[OperationLog]:
"""
根据ID获取操作日志
Args:
db: 数据库会话
log_id: 日志ID
Returns:
OperationLog对象或None
"""
result = await db.execute(
select(OperationLog).where(OperationLog.id == log_id)
)
return result.scalar_one_or_none()
async def get_multi(
self,
db: AsyncSession,
*,
skip: int = 0,
limit: int = 100,
operator_id: Optional[int] = None,
operator_name: Optional[str] = None,
module: Optional[str] = None,
operation_type: Optional[str] = None,
result: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
keyword: Optional[str] = None
) -> tuple[List[OperationLog], int]:
"""
获取操作日志列表
Args:
db: 数据库会话
skip: 跳过条数
limit: 返回条数
operator_id: 操作人ID
operator_name: 操作人姓名
module: 模块名称
operation_type: 操作类型
result: 操作结果
start_time: 开始时间
end_time: 结束时间
keyword: 关键词
Returns:
(日志列表, 总数)
"""
# 构建查询条件
conditions = []
if operator_id:
conditions.append(OperationLog.operator_id == operator_id)
if operator_name:
conditions.append(OperationLog.operator_name.ilike(f"%{operator_name}%"))
if module:
conditions.append(OperationLog.module == module)
if operation_type:
conditions.append(OperationLog.operation_type == operation_type)
if result:
conditions.append(OperationLog.result == result)
if start_time:
conditions.append(OperationLog.created_at >= start_time)
if end_time:
conditions.append(OperationLog.created_at <= end_time)
if keyword:
conditions.append(
or_(
OperationLog.url.ilike(f"%{keyword}%"),
OperationLog.params.ilike(f"%{keyword}%"),
OperationLog.error_msg.ilike(f"%{keyword}%")
)
)
# 查询总数
count_query = select(func.count(OperationLog.id))
if conditions:
count_query = count_query.where(and_(*conditions))
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
# 查询数据
query = select(OperationLog)
if conditions:
query = query.where(and_(*conditions))
query = query.order_by(desc(OperationLog.created_at))
query = query.offset(skip).limit(limit)
result = await db.execute(query)
items = result.scalars().all()
return list(items), total
async def create(
self,
db: AsyncSession,
*,
obj_in: Dict[str, Any]
) -> OperationLog:
"""
创建操作日志
Args:
db: 数据库会话
obj_in: 创建数据
Returns:
OperationLog对象
"""
db_obj = OperationLog(**obj_in)
db.add(db_obj)
await db.flush()
await db.refresh(db_obj)
return db_obj
async def get_statistics(
self,
db: AsyncSession,
*,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""
获取操作日志统计信息
Args:
db: 数据库会话
start_time: 开始时间
end_time: 结束时间
Returns:
统计信息
"""
# 构建时间条件
conditions = []
if start_time:
conditions.append(OperationLog.created_at >= start_time)
if end_time:
conditions.append(OperationLog.created_at <= end_time)
where_clause = and_(*conditions) if conditions else None
# 总数
total_query = select(func.count(OperationLog.id))
if where_clause:
total_query = total_query.where(where_clause)
total_result = await db.execute(total_query)
total_count = total_result.scalar() or 0
# 成功数
success_query = select(func.count(OperationLog.id)).where(OperationLog.result == "success")
if where_clause:
success_query = success_query.where(where_clause)
success_result = await db.execute(success_query)
success_count = success_result.scalar() or 0
# 失败数
failed_count = total_count - success_count
# 今日操作数
today = datetime.utcnow().date()
today_start = datetime.combine(today, datetime.min.time())
today_query = select(func.count(OperationLog.id)).where(OperationLog.created_at >= today_start)
today_result = await db.execute(today_query)
today_count = today_result.scalar() or 0
# 模块分布
module_query = select(
OperationLog.module,
func.count(OperationLog.id).label('count')
).group_by(OperationLog.module)
if where_clause:
module_query = module_query.where(where_clause)
module_result = await db.execute(module_query)
module_distribution = [
{"module": row[0], "count": row[1]}
for row in module_result
]
# 操作类型分布
operation_query = select(
OperationLog.operation_type,
func.count(OperationLog.id).label('count')
).group_by(OperationLog.operation_type)
if where_clause:
operation_query = operation_query.where(where_clause)
operation_result = await db.execute(operation_query)
operation_distribution = [
{"operation_type": row[0], "count": row[1]}
for row in operation_result
]
return {
"total_count": total_count,
"success_count": success_count,
"failed_count": failed_count,
"today_count": today_count,
"module_distribution": module_distribution,
"operation_distribution": operation_distribution,
}
async def delete_old_logs(
self,
db: AsyncSession,
*,
days: int = 90
) -> int:
"""
删除旧日志
Args:
db: 数据库会话
days: 保留天数
Returns:
删除的日志数量
"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
# 查询要删除的日志
result = await db.execute(
select(OperationLog.id).where(OperationLog.created_at < cutoff_date)
)
ids_to_delete = [row[0] for row in result]
if not ids_to_delete:
return 0
# 批量删除
from sqlalchemy import delete
delete_stmt = delete(OperationLog).where(OperationLog.id.in_(ids_to_delete))
await db.execute(delete_stmt)
return len(ids_to_delete)
async def get_operator_top(
self,
db: AsyncSession,
*,
limit: int = 10,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict[str, Any]]:
"""
获取操作排行榜
Args:
db: 数据库会话
limit: 返回条数
start_time: 开始时间
end_time: 结束时间
Returns:
操作排行列表
"""
# 构建时间条件
conditions = []
if start_time:
conditions.append(OperationLog.created_at >= start_time)
if end_time:
conditions.append(OperationLog.created_at <= end_time)
query = select(
OperationLog.operator_id,
OperationLog.operator_name,
func.count(OperationLog.id).label('count')
).group_by(
OperationLog.operator_id,
OperationLog.operator_name
).order_by(
desc('count')
).limit(limit)
if conditions:
query = query.where(and_(*conditions))
result = await db.execute(query)
return [
{
"operator_id": row[0],
"operator_name": row[1],
"count": row[2]
}
for row in result
]
# 创建全局实例
operation_log_crud = OperationLogCRUD()