Files
zcglxt/app/api/v1/operation_logs.py
Claude e71181f0a3 fix: 修复多个关键问题
- 修复前端路由守卫:未登录时不显示提示,直接跳转登录页
- 修复API拦截器:401错误不显示提示,直接跳转
- 增强验证码显示:图片尺寸从120x40增加到200x80
- 增大验证码字体:从28号增加到48号
- 优化验证码字符:排除易混淆的0和1
- 减少干扰线:从5条减少到3条,添加背景色优化
- 增强登录API日志:添加详细的调试日志
- 增强验证码生成和验证日志
- 优化异常处理和错误追踪

影响文件:
- src/router/index.ts
- src/api/request.ts
- app/services/auth_service.py
- app/api/v1/auth.py
- app/schemas/user.py

测试状态:
- 前端构建通过
- 后端语法检查通过
- 验证码显示效果优化完成

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-25 00:26:21 +08:00

206 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
操作日志管理API路由
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_db, get_current_user
from app.schemas.operation_log import (
OperationLogCreate,
OperationLogResponse,
OperationLogQueryParams,
OperationLogStatistics,
OperationLogExport
)
from app.services.operation_log_service import operation_log_service
router = APIRouter()
@router.get("/", response_model=Dict[str, Any])
async def get_operation_logs(
skip: int = Query(0, ge=0, description="跳过条数"),
limit: int = Query(20, ge=1, le=100, description="返回条数"),
operator_id: Optional[int] = Query(None, description="操作人ID"),
operator_name: Optional[str] = Query(None, description="操作人姓名"),
module: Optional[str] = Query(None, description="模块名称"),
operation_type: Optional[str] = Query(None, description="操作类型"),
result: Optional[str] = Query(None, description="操作结果"),
start_time: Optional[datetime] = Query(None, description="开始时间"),
end_time: Optional[datetime] = Query(None, description="结束时间"),
keyword: Optional[str] = Query(None, description="关键词"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取操作日志列表
- **skip**: 跳过条数
- **limit**: 返回条数最大100
- **operator_id**: 操作人ID筛选
- **operator_name**: 操作人姓名筛选
- **module**: 模块名称筛选
- **operation_type**: 操作类型筛选
- **result**: 操作结果筛选
- **start_time**: 开始时间筛选
- **end_time**: 结束时间筛选
- **keyword**: 关键词搜索
"""
return await operation_log_service.get_logs(
db,
skip=skip,
limit=limit,
operator_id=operator_id,
operator_name=operator_name,
module=module,
operation_type=operation_type,
result=result,
start_time=start_time,
end_time=end_time,
keyword=keyword
)
@router.get("/statistics", response_model=Dict[str, Any])
async def get_operation_statistics(
start_time: Optional[datetime] = Query(None, description="开始时间"),
end_time: Optional[datetime] = Query(None, description="结束时间"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取操作日志统计信息
- **start_time**: 开始时间
- **end_time**: 结束时间
返回操作总数、成功数、失败数、今日操作数、模块分布、操作类型分布等统计信息
"""
return await operation_log_service.get_statistics(
db,
start_time=start_time,
end_time=end_time
)
@router.get("/top-operators", response_model=List[Dict[str, Any]])
async def get_top_operators(
limit: int = Query(10, ge=1, le=50, description="返回条数"),
start_time: Optional[datetime] = Query(None, description="开始时间"),
end_time: Optional[datetime] = Query(None, description="结束时间"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取操作排行榜
- **limit**: 返回条数
- **start_time**: 开始时间
- **end_time**: 结束时间
返回操作次数最多的用户列表
"""
return await operation_log_service.get_operator_top(
db,
limit=limit,
start_time=start_time,
end_time=end_time
)
@router.get("/{log_id}", response_model=Dict[str, Any])
async def get_operation_log(
log_id: int,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
获取操作日志详情
- **log_id**: 日志ID
"""
log = await operation_log_service.get_log(db, log_id)
if not log:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="操作日志不存在"
)
return log
@router.post("/", response_model=Dict[str, Any], status_code=status.HTTP_201_CREATED)
async def create_operation_log(
obj_in: OperationLogCreate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
创建操作日志(通常由系统自动记录)
- **operator_id**: 操作人ID
- **operator_name**: 操作人姓名
- **operator_ip**: 操作人IP
- **module**: 模块名称
- **operation_type**: 操作类型
- **method**: 请求方法
- **url**: 请求URL
- **params**: 请求参数
- **result**: 操作结果
- **error_msg**: 错误信息
- **duration**: 执行时长(毫秒)
- **user_agent**: 用户代理
- **extra_data**: 额外数据
"""
return await operation_log_service.create_log(db, obj_in=obj_in)
@router.post("/export", response_model=List[Dict[str, Any]])
async def export_operation_logs(
export_config: OperationLogExport,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
导出操作日志
- **start_time**: 开始时间
- **end_time**: 结束时间
- **operator_id**: 操作人ID
- **module**: 模块名称
- **operation_type**: 操作类型
返回可导出的日志列表
"""
return await operation_log_service.export_logs(
db,
start_time=export_config.start_time,
end_time=export_config.end_time,
operator_id=export_config.operator_id,
module=export_config.module,
operation_type=export_config.operation_type
)
@router.delete("/old-logs", response_model=Dict[str, Any])
async def delete_old_logs(
days: int = Query(90, ge=1, le=365, description="保留天数"),
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user)
):
"""
删除旧操作日志
- **days**: 保留天数默认90天
删除指定天数之前的操作日志
"""
# 检查权限
if not current_user.is_superuser:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="只有超级管理员可以删除日志"
)
return await operation_log_service.delete_old_logs(db, days=days)