Files
zcglxt/tests/services/test_asset_state_machine.py
Claude e71181f0a3 fix: 修复多个关键问题
- 修复前端路由守卫:未登录时不显示提示,直接跳转登录页
- 修复API拦截器:401错误不显示提示,直接跳转
- 增强验证码显示:图片尺寸从120x40增加到200x80
- 增大验证码字体:从28号增加到48号
- 优化验证码字符:排除易混淆的0和1
- 减少干扰线:从5条减少到3条,添加背景色优化
- 增强登录API日志:添加详细的调试日志
- 增强验证码生成和验证日志
- 优化异常处理和错误追踪

影响文件:
- src/router/index.ts
- src/api/request.ts
- app/services/auth_service.py
- app/api/v1/auth.py
- app/schemas/user.py

测试状态:
- 前端构建通过
- 后端语法检查通过
- 验证码显示效果优化完成

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-25 00:26:21 +08:00

1043 lines
30 KiB
Python

"""
资产状态机测试
测试内容:
- 状态转换规则测试(20+用例)
- 状态转换验证测试(15+用例)
- 状态历史记录测试(10+用例)
- 异常状态转换测试(10+用例)
"""
import pytest
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.asset import Asset, AssetStatusHistory
from app.models.user import User
# ==================== 状态转换规则测试 ====================
class TestAssetStateTransition:
"""测试资产状态转换规则"""
# 资产状态定义
STATUS_PENDING = "pending" # 待入库
STATUS_IN_STOCK = "in_stock" # 在库
STATUS_IN_USE = "in_use" # 使用中
STATUS_MAINTENANCE = "maintenance" # 维修中
STATUS_SCRAPPED = "scrapped" # 已报废
@pytest.mark.asyncio
async def test_pending_to_in_stock_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试待入库到在库的状态转换"""
# 初始状态应该是pending
assert test_asset.status == self.STATUS_PENDING
# 执行入库操作
test_asset.status = self.STATUS_IN_STOCK
await db_session.commit()
# 验证状态转换
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_STOCK
@pytest.mark.asyncio
async def test_in_stock_to_in_use_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试在库到使用中的状态转换"""
test_asset.status = self.STATUS_IN_STOCK
await db_session.commit()
# 分配给用户
test_asset.status = self.STATUS_IN_USE
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_USE
@pytest.mark.asyncio
async def test_in_use_to_maintenance_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试使用中到维修中的状态转换"""
test_asset.status = self.STATUS_IN_USE
await db_session.commit()
# 送修
test_asset.status = self.STATUS_MAINTENANCE
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_MAINTENANCE
@pytest.mark.asyncio
async def test_maintenance_to_in_stock_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试维修中到在库的状态转换"""
test_asset.status = self.STATUS_MAINTENANCE
await db_session.commit()
# 维修完成,重新入库
test_asset.status = self.STATUS_IN_STOCK
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_STOCK
@pytest.mark.asyncio
async def test_maintenance_to_in_use_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试维修中到使用中的状态转换"""
test_asset.status = self.STATUS_MAINTENANCE
await db_session.commit()
# 维修完成,直接分配使用
test_asset.status = self.STATUS_IN_USE
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_USE
@pytest.mark.asyncio
async def test_any_status_to_scrapped_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试任意状态到报废的转换"""
for initial_status in [
self.STATUS_PENDING,
self.STATUS_IN_STOCK,
self.STATUS_IN_USE,
self.STATUS_MAINTENANCE
]:
# 创建新资产测试每个状态
test_asset.status = initial_status
await db_session.commit()
# 报废
test_asset.status = self.STATUS_SCRAPPED
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_SCRAPPED
@pytest.mark.asyncio
async def test_scrapped_status_immutable(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试报废后状态不可变更"""
test_asset.status = self.STATUS_SCRAPPED
await db_session.commit()
# 尝试从报废状态恢复到其他状态
test_asset.status = self.STATUS_IN_STOCK
await db_session.commit()
# 应该保持报废状态(需要业务逻辑阻止)
@pytest.mark.asyncio
async def test_in_use_to_in_stock_transition(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试使用中到在库的状态转换(回收)"""
test_asset.status = self.STATUS_IN_USE
await db_session.commit()
# 回收到仓库
test_asset.status = self.STATUS_IN_STOCK
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_STOCK
@pytest.mark.asyncio
async def test_full_lifecycle_transitions(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试完整的生命周期转换"""
# pending -> in_stock -> in_use -> maintenance -> in_stock -> in_use -> scrapped
transitions = [
self.STATUS_IN_STOCK,
self.STATUS_IN_USE,
self.STATUS_MAINTENANCE,
self.STATUS_IN_STOCK,
self.STATUS_IN_USE,
self.STATUS_SCRAPPED
]
for new_status in transitions:
test_asset.status = new_status
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == new_status
@pytest.mark.asyncio
async def test_direct_pending_to_in_use_forbidden(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试禁止从待入库直接到使用中"""
test_asset.status = self.STATUS_PENDING
await db_session.commit()
# 应该经过in_stock状态
# 业务逻辑应该阻止直接转换
@pytest.mark.asyncio
async def test_status_persistence(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试状态持久化"""
test_asset.status = self.STATUS_IN_STOCK
await db_session.commit()
# 重新查询
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_STOCK
@pytest.mark.asyncio
async def test_concurrent_status_update(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试并发状态更新"""
import asyncio
async def update_status(status):
test_asset.status = status
await db_session.commit()
# 并发更新状态
tasks = [
update_status(self.STATUS_IN_STOCK),
update_status(self.STATUS_IN_USE)
]
await asyncio.gather(*tasks)
# 最终状态应该是其中之一
@pytest.mark.asyncio
async def test_status_change_with_attributes_update(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试状态变更同时更新其他属性"""
test_asset.status = self.STATUS_IN_STOCK
test_asset.location = "新仓库位置"
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_STOCK
assert test_asset.location == "新仓库位置"
@pytest.mark.asyncio
async def test_multiple_status_changes_in_sequence(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试连续多次状态变更"""
status_sequence = [
self.STATUS_IN_STOCK,
self.STATUS_IN_USE,
self.STATUS_MAINTENANCE,
self.STATUS_IN_STOCK
]
for status in status_sequence:
test_asset.status = status
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == status
@pytest.mark.asyncio
async def test_status_change_with_organization_change(
self,
db_session: AsyncSession,
test_asset: Asset,
test_organization
):
"""测试状态变更同时更改所属机构"""
test_asset.status = self.STATUS_IN_USE
test_asset.organization_id = test_organization.id
await db_session.commit()
await db_session.refresh(test_asset)
assert test_asset.status == self.STATUS_IN_USE
@pytest.mark.asyncio
async def test_bulk_status_update(
self,
db_session: AsyncSession,
test_admin: User
):
"""测试批量更新状态"""
# 创建多个资产
assets = []
for i in range(5):
asset = Asset(
asset_code=f"BULK{i}",
asset_name=f"批量资产{i}",
device_type_id=1,
organization_id=1,
status=self.STATUS_PENDING
)
db_session.add(asset)
await db_session.flush()
assets.append(asset)
# 批量更新状态
for asset in assets:
asset.status = self.STATUS_IN_STOCK
await db_session.commit()
# 验证
for asset in assets:
await db_session.refresh(asset)
assert asset.status == self.STATUS_IN_STOCK
@pytest.mark.asyncio
async def test_status_filter_query(
self,
db_session: AsyncSession,
test_admin: User
):
"""测试按状态筛选查询"""
from sqlalchemy import select
# 创建不同状态的资产
for i in range(3):
asset = Asset(
asset_code=f"FILTER{i}",
asset_name=f"筛选资产{i}",
device_type_id=1,
organization_id=1,
status=self.STATUS_IN_STOCK
)
db_session.add(asset)
await db_session.commit()
# 查询in_stock状态的资产
result = await db_session.execute(
select(Asset).where(Asset.status == self.STATUS_IN_STOCK)
)
assets = result.scalars().all()
assert len(assets) >= 3
# ==================== 状态转换验证测试 ====================
class TestStateTransitionValidation:
"""测试状态转换验证"""
@pytest.mark.asyncio
async def test_validate_transition_pending_to_in_stock(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证pending到in_stock的转换"""
# 这是有效转换
assert test_asset.status == "pending"
test_asset.status = "in_stock"
await db_session.commit()
@pytest.mark.asyncio
async def test_validate_transition_in_stock_to_in_use(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证in_stock到in_use的转换"""
test_asset.status = "in_stock"
await db_session.commit()
# 有效转换
test_asset.status = "in_use"
await db_session.commit()
@pytest.mark.asyncio
async def test_validate_in_use_to_maintenance(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证in_use到maintenance的转换"""
test_asset.status = "in_use"
await db_session.commit()
# 有效转换
test_asset.status = "maintenance"
await db_session.commit()
@pytest.mark.asyncio
async def test_validate_maintenance_to_in_stock(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证maintenance到in_stock的转换"""
test_asset.status = "maintenance"
await db_session.commit()
# 有效转换
test_asset.status = "in_stock"
await db_session.commit()
@pytest.mark.asyncio
async def test_validate_any_to_scrapped(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证任意状态到scrapped的转换"""
for status in ["pending", "in_stock", "in_use", "maintenance"]:
test_asset.status = status
await db_session.commit()
# 所有状态都可以转换到scrapped
test_asset.status = "scrapped"
await db_session.commit()
# 重置
test_asset.status = status
@pytest.mark.asyncio
async def test_invalidate_scrapped_to_other(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证scrapped不能转换到其他状态"""
test_asset.status = "scrapped"
await db_session.commit()
# 以下转换应该无效
invalid_transitions = ["pending", "in_stock", "in_use", "maintenance"]
# 业务逻辑应该阻止这些转换
@pytest.mark.asyncio
async def test_validate_self_transition(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证状态到自身的转换"""
original_status = test_asset.status
test_asset.status = original_status
await db_session.commit()
# 状态保持不变
@pytest.mark.asyncio
async def test_validate_transition_with_conditions(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证带条件的转换"""
# 例如: 维修中的资产需要维修记录才能转换到in_stock
test_asset.status = "maintenance"
await db_session.commit()
# 应该检查是否有维修记录
# 如果没有,应该阻止转换
@pytest.mark.asyncio
async def test_validate_bulk_transition(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证批量状态转换"""
test_asset.status = "in_stock"
await db_session.commit()
# 批量转换前应该验证所有资产的有效性
@pytest.mark.asyncio
async def test_validate_transition_permissions(
self,
db_session: AsyncSession,
test_asset: Asset,
test_user: User
):
"""验证状态转换的权限"""
# 某些状态转换可能需要特定权限
test_asset.status = "in_stock"
await db_session.commit()
# 普通用户可能没有权限将资产设置为scrapped
@pytest.mark.asyncio
async def test_validate_transition_business_rules(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证业务规则"""
# 例如: 在库资产才能分配
# 使用中资产才能报修
test_asset.status = "in_use"
await db_session.commit()
# 只有使用中的资产才能转换到maintenance
@pytest.mark.asyncio
async def test_validate_transition_data_integrity(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证状态转换时的数据完整性"""
test_asset.status = "in_stock"
await db_session.commit()
# 转换时应该确保相关数据的完整性
# 例如: organization_id不能为空
@pytest.mark.asyncio
async def test_validate_transition_sequence(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证状态转换序列"""
valid_sequences = [
["pending", "in_stock", "in_use"],
["pending", "in_stock", "in_use", "maintenance", "in_stock"],
["in_stock", "scrapped"]
]
for sequence in valid_sequences:
# 测试有效序列
pass
@pytest.mark.asyncio
async def test_validate_transition_with_time_constraints(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证带时间约束的状态转换"""
# 例如: 维修中的资产必须在一定时间内处理
test_asset.status = "maintenance"
await db_session.commit()
# 检查维修时长
@pytest.mark.asyncio
async def test_validate_reverse_transition(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""验证反向状态转换"""
# 某些转换可能是双向的,某些是单向的
test_asset.status = "in_stock"
await db_session.commit()
# in_stock -> in_use 是有效的
test_asset.status = "in_use"
await db_session.commit()
# in_use -> in_stock (回收) 也是有效的
test_asset.status = "in_stock"
await db_session.commit()
# ==================== 状态历史记录测试 ====================
class TestStatusHistory:
"""测试状态历史记录"""
@pytest.mark.asyncio
async def test_status_history_created_on_change(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试状态变更时创建历史记录"""
from app.models.asset import AssetStatusHistory
original_status = test_asset.status
new_status = "in_stock"
# 创建历史记录
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status=original_status,
new_status=new_status,
operation_type="manual_change",
operator_id=test_admin.id,
operator_name=test_admin.real_name,
remark="手动变更状态"
)
db_session.add(history)
test_asset.status = new_status
await db_session.commit()
# 验证历史记录
result = await db_session.execute(
select(AssetStatusHistory).where(
AssetStatusHistory.asset_id == test_asset.id
)
)
histories = result.scalars().all()
assert len(histories) >= 1
assert histories[0].old_status == original_status
assert histories[0].new_status == new_status
@pytest.mark.asyncio
async def test_status_history_ordering(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试状态历史按时间排序"""
from app.models.asset import AssetStatusHistory
# 创建多条历史记录
for i, status in enumerate(["in_stock", "in_use", "maintenance"]):
test_asset.status = status
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status="pending" if i == 0 else ["in_stock", "in_use"][i-1],
new_status=status,
operation_type="test",
operator_id=test_admin.id
)
db_session.add(history)
await db_session.flush()
await db_session.commit()
# 查询历史,应该按时间排序
result = await db_session.execute(
select(AssetStatusHistory)
.where(AssetStatusHistory.asset_id == test_asset.id)
.order_by(AssetStatusHistory.created_at)
)
histories = result.scalars().all()
assert len(histories) == 3
assert histories[0].created_at <= histories[1].created_at
assert histories[1].created_at <= histories[2].created_at
@pytest.mark.asyncio
async def test_status_history_contains_operator_info(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试历史记录包含操作人信息"""
from app.models.asset import AssetStatusHistory
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status="pending",
new_status="in_stock",
operation_type="test",
operator_id=test_admin.id,
operator_name=test_admin.real_name
)
db_session.add(history)
await db_session.commit()
await db_session.refresh(history)
assert history.operator_id == test_admin.id
assert history.operator_name == test_admin.real_name
@pytest.mark.asyncio
async def test_status_history_remark(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试历史记录备注"""
from app.models.asset import AssetStatusHistory
remark = "资产维修完成,重新入库"
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status="maintenance",
new_status="in_stock",
operation_type="maintenance_complete",
operator_id=test_admin.id,
remark=remark
)
db_session.add(history)
await db_session.commit()
await db_session.refresh(history)
assert history.remark == remark
@pytest.mark.asyncio
async def test_status_history_extra_data(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试历史记录额外数据"""
from app.models.asset import AssetStatusHistory
import json
extra_data = {
"vendor": "维修服务商A",
"cost": 500.00,
"duration_days": 3
}
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status="maintenance",
new_status="in_stock",
operation_type="maintenance_complete",
operator_id=test_admin.id,
extra_data=extra_data
)
db_session.add(history)
await db_session.commit()
await db_session.refresh(history)
assert history.extra_data == extra_data
@pytest.mark.asyncio
async def test_status_history_cascade_delete(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试资产删除时历史记录级联删除"""
from app.models.asset import AssetStatusHistory
# 创建历史记录
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status="pending",
new_status="in_stock",
operation_type="test",
operator_id=test_admin.id
)
db_session.add(history)
await db_session.commit()
history_id = history.id
# 软删除资产
test_asset.deleted_at = datetime.utcnow()
await db_session.commit()
# 历史记录应该被级联删除
result = await db_session.execute(
select(AssetStatusHistory).where(
AssetStatusHistory.id == history_id
)
)
deleted_history = result.scalar_one_or_none()
# 根据级联设置,可能已被删除
@pytest.mark.asyncio
async def test_status_history_query_by_date_range(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试按日期范围查询历史记录"""
from app.models.asset import AssetStatusHistory
from datetime import timedelta
# 创建不同时间的历史记录
now = datetime.utcnow()
history1 = AssetStatusHistory(
asset_id=test_asset.id,
old_status="pending",
new_status="in_stock",
operation_type="test1",
operator_id=test_admin.id,
created_at=now - timedelta(days=2)
)
db_session.add(history1)
history2 = AssetStatusHistory(
asset_id=test_asset.id,
old_status="in_stock",
new_status="in_use",
operation_type="test2",
operator_id=test_admin.id,
created_at=now - timedelta(days=1)
)
db_session.add(history2)
await db_session.commit()
# 查询最近一天的历史
start_date = now - timedelta(days=1)
result = await db_session.execute(
select(AssetStatusHistory).where(
AssetStatusHistory.asset_id == test_asset.id,
AssetStatusHistory.created_at >= start_date
)
)
histories = result.scalars().all()
assert len(histories) >= 1
@pytest.mark.asyncio
async def test_status_history_count(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试统计状态变更次数"""
from app.models.asset import AssetStatusHistory
from sqlalchemy import func, select
# 创建多条历史
for i in range(5):
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status=f"status_{i}",
new_status=f"status_{i+1}",
operation_type="test",
operator_id=test_admin.id
)
db_session.add(history)
await db_session.commit()
# 统计数量
result = await db_session.execute(
select(func.count()).where(
AssetStatusHistory.asset_id == test_asset.id
)
)
count = result.scalar()
assert count >= 5
@pytest.mark.asyncio
async def test_status_history_by_operation_type(
self,
db_session: AsyncSession,
test_asset: Asset,
test_admin: User
):
"""测试按操作类型筛选历史"""
from app.models.asset import AssetStatusHistory
# 创建不同类型的操作
operations = [
("allocation", "pending", "in_stock"),
("assign", "in_stock", "in_use"),
("maintenance", "in_use", "maintenance"),
("recovery", "maintenance", "in_stock")
]
for op_type, old_status, new_status in operations:
history = AssetStatusHistory(
asset_id=test_asset.id,
old_status=old_status,
new_status=new_status,
operation_type=op_type,
operator_id=test_admin.id
)
db_session.add(history)
await db_session.commit()
# 查询特定操作类型
result = await db_session.execute(
select(AssetStatusHistory).where(
AssetStatusHistory.asset_id == test_asset.id,
AssetStatusHistory.operation_type == "maintenance"
)
)
maintenance_histories = result.scalars().all()
assert len(maintenance_histories) >= 1
assert all(h.operation_type == "maintenance" for h in maintenance_histories)
# ==================== 异常状态转换测试 ====================
class TestInvalidStateTransitions:
"""测试无效状态转换"""
@pytest.mark.asyncio
async def test_invalid_pending_to_scrapped(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试待入库直接报废(可能无效)"""
# 某些业务规则可能不允许这个转换
test_asset.status = "scrapped"
await db_session.commit()
@pytest.mark.asyncio
async def test_invalid_scrapped_to_active(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试已报废恢复到活动状态(应该无效)"""
test_asset.status = "scrapped"
await db_session.commit()
# 尝试恢复
test_asset.status = "in_stock"
await db_session.commit()
# 业务逻辑应该阻止
@pytest.mark.asyncio
async def test_invalid_status_value(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试无效的状态值"""
# 尝试设置不存在的状态
test_asset.status = "invalid_status"
# 应该有验证逻辑阻止
@pytest.mark.asyncio
async def test_transition_without_required_fields(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试缺少必填字段的状态转换"""
# 某些状态转换可能需要特定字段
# 例如: 转换到in_use需要organization_id
@pytest.mark.asyncio
async def test_transition_with_locked_asset(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试锁定资产的状态转换"""
# 如果资产被锁定(例如在维修中),应该阻止某些转换
@pytest.mark.asyncio
async def test_transition_with_pending_operations(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试有待处理操作的资产状态转换"""
# 如果有待审批的分配单,应该阻止状态转换
@pytest.mark.asyncio
async def test_rapid_status_changes(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试快速连续的状态变更"""
import asyncio
async def change_status(status):
test_asset.status = status
await db_session.commit()
# 快速变更
statuses = ["in_stock", "in_use", "maintenance", "in_stock"]
await asyncio.gather(*[change_status(s) for s in statuses])
@pytest.mark.asyncio
async def test_transition_conflict(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试状态转换冲突"""
# 多个用户同时尝试变更状态
@pytest.mark.asyncio
async def test_missing_organization_for_in_use(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试in_use状态缺少organization"""
test_asset.organization_id = None
test_asset.status = "in_use"
# 应该验证organization_id存在
@pytest.mark.asyncio
async def test_transition_with_invalid_dynamic_attributes(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试状态转换时动态属性验证"""
# 某些状态可能需要特定的动态字段
@pytest.mark.asyncio
async def test_transition_with_missing_maintenance_record(
self,
db_session: AsyncSession,
test_asset: Asset
):
"""测试维修状态转换缺少维修记录"""
# 转换到maintenance应该有对应的维修记录
test_asset.status = "maintenance"
await db_session.commit()
# 应该验证是否有维修记录