""" 操作日志管理API路由 """ from typing import Optional, List, Dict, Any from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.ext.asyncio import AsyncSession from app.core.deps import get_db, get_current_user from app.schemas.operation_log import ( OperationLogCreate, OperationLogResponse, OperationLogQueryParams, OperationLogStatistics, OperationLogExport ) from app.services.operation_log_service import operation_log_service router = APIRouter() @router.get("/", response_model=Dict[str, Any]) async def get_operation_logs( skip: int = Query(0, ge=0, description="跳过条数"), limit: int = Query(20, ge=1, le=100, description="返回条数"), operator_id: Optional[int] = Query(None, description="操作人ID"), operator_name: Optional[str] = Query(None, description="操作人姓名"), module: Optional[str] = Query(None, description="模块名称"), operation_type: Optional[str] = Query(None, description="操作类型"), result: Optional[str] = Query(None, description="操作结果"), start_time: Optional[datetime] = Query(None, description="开始时间"), end_time: Optional[datetime] = Query(None, description="结束时间"), keyword: Optional[str] = Query(None, description="关键词"), db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 获取操作日志列表 - **skip**: 跳过条数 - **limit**: 返回条数(最大100) - **operator_id**: 操作人ID筛选 - **operator_name**: 操作人姓名筛选 - **module**: 模块名称筛选 - **operation_type**: 操作类型筛选 - **result**: 操作结果筛选 - **start_time**: 开始时间筛选 - **end_time**: 结束时间筛选 - **keyword**: 关键词搜索 """ return await operation_log_service.get_logs( db, skip=skip, limit=limit, operator_id=operator_id, operator_name=operator_name, module=module, operation_type=operation_type, result=result, start_time=start_time, end_time=end_time, keyword=keyword ) @router.get("/statistics", response_model=Dict[str, Any]) async def get_operation_statistics( start_time: Optional[datetime] = Query(None, description="开始时间"), end_time: Optional[datetime] = Query(None, description="结束时间"), db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 获取操作日志统计信息 - **start_time**: 开始时间 - **end_time**: 结束时间 返回操作总数、成功数、失败数、今日操作数、模块分布、操作类型分布等统计信息 """ return await operation_log_service.get_statistics( db, start_time=start_time, end_time=end_time ) @router.get("/top-operators", response_model=List[Dict[str, Any]]) async def get_top_operators( limit: int = Query(10, ge=1, le=50, description="返回条数"), start_time: Optional[datetime] = Query(None, description="开始时间"), end_time: Optional[datetime] = Query(None, description="结束时间"), db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 获取操作排行榜 - **limit**: 返回条数 - **start_time**: 开始时间 - **end_time**: 结束时间 返回操作次数最多的用户列表 """ return await operation_log_service.get_operator_top( db, limit=limit, start_time=start_time, end_time=end_time ) @router.get("/{log_id}", response_model=Dict[str, Any]) async def get_operation_log( log_id: int, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 获取操作日志详情 - **log_id**: 日志ID """ log = await operation_log_service.get_log(db, log_id) if not log: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="操作日志不存在" ) return log @router.post("/", response_model=Dict[str, Any], status_code=status.HTTP_201_CREATED) async def create_operation_log( obj_in: OperationLogCreate, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 创建操作日志(通常由系统自动记录) - **operator_id**: 操作人ID - **operator_name**: 操作人姓名 - **operator_ip**: 操作人IP - **module**: 模块名称 - **operation_type**: 操作类型 - **method**: 请求方法 - **url**: 请求URL - **params**: 请求参数 - **result**: 操作结果 - **error_msg**: 错误信息 - **duration**: 执行时长(毫秒) - **user_agent**: 用户代理 - **extra_data**: 额外数据 """ return await operation_log_service.create_log(db, obj_in=obj_in) @router.post("/export", response_model=List[Dict[str, Any]]) async def export_operation_logs( export_config: OperationLogExport, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 导出操作日志 - **start_time**: 开始时间 - **end_time**: 结束时间 - **operator_id**: 操作人ID - **module**: 模块名称 - **operation_type**: 操作类型 返回可导出的日志列表 """ return await operation_log_service.export_logs( db, start_time=export_config.start_time, end_time=export_config.end_time, operator_id=export_config.operator_id, module=export_config.module, operation_type=export_config.operation_type ) @router.delete("/old-logs", response_model=Dict[str, Any]) async def delete_old_logs( days: int = Query(90, ge=1, le=365, description="保留天数"), db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """ 删除旧操作日志 - **days**: 保留天数(默认90天) 删除指定天数之前的操作日志 """ # 检查权限 if not current_user.is_superuser: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="只有超级管理员可以删除日志" ) return await operation_log_service.delete_old_logs(db, days=days)