- 修复前端路由守卫:未登录时不显示提示,直接跳转登录页 - 修复API拦截器:401错误不显示提示,直接跳转 - 增强验证码显示:图片尺寸从120x40增加到200x80 - 增大验证码字体:从28号增加到48号 - 优化验证码字符:排除易混淆的0和1 - 减少干扰线:从5条减少到3条,添加背景色优化 - 增强登录API日志:添加详细的调试日志 - 增强验证码生成和验证日志 - 优化异常处理和错误追踪 影响文件: - src/router/index.ts - src/api/request.ts - app/services/auth_service.py - app/api/v1/auth.py - app/schemas/user.py 测试状态: - 前端构建通过 - 后端语法检查通过 - 验证码显示效果优化完成 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
312 lines
8.9 KiB
Python
312 lines
8.9 KiB
Python
"""
|
|
操作日志CRUD操作
|
|
"""
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import select, and_, or_, func, desc
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.models.operation_log import OperationLog
|
|
|
|
|
|
class OperationLogCRUD:
|
|
"""操作日志CRUD类"""
|
|
|
|
async def get(self, db: AsyncSession, log_id: int) -> Optional[OperationLog]:
|
|
"""
|
|
根据ID获取操作日志
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
log_id: 日志ID
|
|
|
|
Returns:
|
|
OperationLog对象或None
|
|
"""
|
|
result = await db.execute(
|
|
select(OperationLog).where(OperationLog.id == log_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_multi(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
operator_id: Optional[int] = None,
|
|
operator_name: Optional[str] = None,
|
|
module: Optional[str] = None,
|
|
operation_type: Optional[str] = None,
|
|
result: Optional[str] = None,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
keyword: Optional[str] = None
|
|
) -> tuple[List[OperationLog], int]:
|
|
"""
|
|
获取操作日志列表
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
skip: 跳过条数
|
|
limit: 返回条数
|
|
operator_id: 操作人ID
|
|
operator_name: 操作人姓名
|
|
module: 模块名称
|
|
operation_type: 操作类型
|
|
result: 操作结果
|
|
start_time: 开始时间
|
|
end_time: 结束时间
|
|
keyword: 关键词
|
|
|
|
Returns:
|
|
(日志列表, 总数)
|
|
"""
|
|
# 构建查询条件
|
|
conditions = []
|
|
|
|
if operator_id:
|
|
conditions.append(OperationLog.operator_id == operator_id)
|
|
|
|
if operator_name:
|
|
conditions.append(OperationLog.operator_name.ilike(f"%{operator_name}%"))
|
|
|
|
if module:
|
|
conditions.append(OperationLog.module == module)
|
|
|
|
if operation_type:
|
|
conditions.append(OperationLog.operation_type == operation_type)
|
|
|
|
if result:
|
|
conditions.append(OperationLog.result == result)
|
|
|
|
if start_time:
|
|
conditions.append(OperationLog.created_at >= start_time)
|
|
|
|
if end_time:
|
|
conditions.append(OperationLog.created_at <= end_time)
|
|
|
|
if keyword:
|
|
conditions.append(
|
|
or_(
|
|
OperationLog.url.ilike(f"%{keyword}%"),
|
|
OperationLog.params.ilike(f"%{keyword}%"),
|
|
OperationLog.error_msg.ilike(f"%{keyword}%")
|
|
)
|
|
)
|
|
|
|
# 查询总数
|
|
count_query = select(func.count(OperationLog.id))
|
|
if conditions:
|
|
count_query = count_query.where(and_(*conditions))
|
|
total_result = await db.execute(count_query)
|
|
total = total_result.scalar() or 0
|
|
|
|
# 查询数据
|
|
query = select(OperationLog)
|
|
if conditions:
|
|
query = query.where(and_(*conditions))
|
|
query = query.order_by(desc(OperationLog.created_at))
|
|
query = query.offset(skip).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
items = result.scalars().all()
|
|
|
|
return list(items), total
|
|
|
|
async def create(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
obj_in: Dict[str, Any]
|
|
) -> OperationLog:
|
|
"""
|
|
创建操作日志
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
obj_in: 创建数据
|
|
|
|
Returns:
|
|
OperationLog对象
|
|
"""
|
|
db_obj = OperationLog(**obj_in)
|
|
db.add(db_obj)
|
|
await db.flush()
|
|
await db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
async def get_statistics(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
获取操作日志统计信息
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
start_time: 开始时间
|
|
end_time: 结束时间
|
|
|
|
Returns:
|
|
统计信息
|
|
"""
|
|
# 构建时间条件
|
|
conditions = []
|
|
if start_time:
|
|
conditions.append(OperationLog.created_at >= start_time)
|
|
if end_time:
|
|
conditions.append(OperationLog.created_at <= end_time)
|
|
|
|
where_clause = and_(*conditions) if conditions else None
|
|
|
|
# 总数
|
|
total_query = select(func.count(OperationLog.id))
|
|
if where_clause:
|
|
total_query = total_query.where(where_clause)
|
|
total_result = await db.execute(total_query)
|
|
total_count = total_result.scalar() or 0
|
|
|
|
# 成功数
|
|
success_query = select(func.count(OperationLog.id)).where(OperationLog.result == "success")
|
|
if where_clause:
|
|
success_query = success_query.where(where_clause)
|
|
success_result = await db.execute(success_query)
|
|
success_count = success_result.scalar() or 0
|
|
|
|
# 失败数
|
|
failed_count = total_count - success_count
|
|
|
|
# 今日操作数
|
|
today = datetime.utcnow().date()
|
|
today_start = datetime.combine(today, datetime.min.time())
|
|
today_query = select(func.count(OperationLog.id)).where(OperationLog.created_at >= today_start)
|
|
today_result = await db.execute(today_query)
|
|
today_count = today_result.scalar() or 0
|
|
|
|
# 模块分布
|
|
module_query = select(
|
|
OperationLog.module,
|
|
func.count(OperationLog.id).label('count')
|
|
).group_by(OperationLog.module)
|
|
if where_clause:
|
|
module_query = module_query.where(where_clause)
|
|
module_result = await db.execute(module_query)
|
|
module_distribution = [
|
|
{"module": row[0], "count": row[1]}
|
|
for row in module_result
|
|
]
|
|
|
|
# 操作类型分布
|
|
operation_query = select(
|
|
OperationLog.operation_type,
|
|
func.count(OperationLog.id).label('count')
|
|
).group_by(OperationLog.operation_type)
|
|
if where_clause:
|
|
operation_query = operation_query.where(where_clause)
|
|
operation_result = await db.execute(operation_query)
|
|
operation_distribution = [
|
|
{"operation_type": row[0], "count": row[1]}
|
|
for row in operation_result
|
|
]
|
|
|
|
return {
|
|
"total_count": total_count,
|
|
"success_count": success_count,
|
|
"failed_count": failed_count,
|
|
"today_count": today_count,
|
|
"module_distribution": module_distribution,
|
|
"operation_distribution": operation_distribution,
|
|
}
|
|
|
|
async def delete_old_logs(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
days: int = 90
|
|
) -> int:
|
|
"""
|
|
删除旧日志
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
days: 保留天数
|
|
|
|
Returns:
|
|
删除的日志数量
|
|
"""
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days)
|
|
|
|
# 查询要删除的日志
|
|
result = await db.execute(
|
|
select(OperationLog.id).where(OperationLog.created_at < cutoff_date)
|
|
)
|
|
ids_to_delete = [row[0] for row in result]
|
|
|
|
if not ids_to_delete:
|
|
return 0
|
|
|
|
# 批量删除
|
|
from sqlalchemy import delete
|
|
delete_stmt = delete(OperationLog).where(OperationLog.id.in_(ids_to_delete))
|
|
await db.execute(delete_stmt)
|
|
|
|
return len(ids_to_delete)
|
|
|
|
async def get_operator_top(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
limit: int = 10,
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
获取操作排行榜
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
limit: 返回条数
|
|
start_time: 开始时间
|
|
end_time: 结束时间
|
|
|
|
Returns:
|
|
操作排行列表
|
|
"""
|
|
# 构建时间条件
|
|
conditions = []
|
|
if start_time:
|
|
conditions.append(OperationLog.created_at >= start_time)
|
|
if end_time:
|
|
conditions.append(OperationLog.created_at <= end_time)
|
|
|
|
query = select(
|
|
OperationLog.operator_id,
|
|
OperationLog.operator_name,
|
|
func.count(OperationLog.id).label('count')
|
|
).group_by(
|
|
OperationLog.operator_id,
|
|
OperationLog.operator_name
|
|
).order_by(
|
|
desc('count')
|
|
).limit(limit)
|
|
|
|
if conditions:
|
|
query = query.where(and_(*conditions))
|
|
|
|
result = await db.execute(query)
|
|
return [
|
|
{
|
|
"operator_id": row[0],
|
|
"operator_name": row[1],
|
|
"count": row[2]
|
|
}
|
|
for row in result
|
|
]
|
|
|
|
|
|
# 创建全局实例
|
|
operation_log_crud = OperationLogCRUD()
|