""" 资产调拨管理 API 测试 测试范围: - 调拨单CRUD测试 (20+用例) - 调拨流程测试 (15+用例) - 状态转换测试 (10+用例) - 并发测试 (5+用例) 总计: 50+ 用例 """ import pytest from datetime import datetime, timedelta from typing import List from sqlalchemy.orm import Session from app.models.transfer import Transfer, TransferItem from app.models.asset import Asset from app.models.organization import Organization from app.schemas.transfer import ( TransferCreate, TransferStatus, TransferItemType ) # ================================ # Fixtures # ================================ @pytest.fixture def test_orgs_for_transfer(db: Session) -> tuple: """创建调拨涉及的组织""" source_org = Organization( org_code="SOURCE-001", org_name="源组织", org_type="department", status="active" ) target_org = Organization( org_code="TARGET-002", org_name="目标组织", org_type="department", status="active" ) db.add(source_org) db.add(target_org) db.commit() db.refresh(source_org) db.refresh(target_org) return source_org, target_org @pytest.fixture def test_assets_for_transfer(db: Session, test_orgs_for_transfer) -> List[Asset]: """创建可用于调拨的测试资产""" source_org, _ = test_orgs_for_transfer assets = [] for i in range(5): asset = Asset( asset_code=f"TEST-TRANSF-{i+1:03d}", asset_name=f"测试调拨资产{i+1}", device_type_id=1, organization_id=source_org.id, status="in_use", purchase_date=datetime.now() - timedelta(days=30*i) ) db.add(asset) assets.append(asset) db.commit() for asset in assets: db.refresh(asset) return assets @pytest.fixture def test_transfer_order(db: Session, test_assets_for_transfer: List[Asset], test_orgs_for_transfer: tuple) -> Transfer: """创建测试调拨单""" source_org, target_org = test_orgs_for_transfer transfer = Transfer( transfer_no="TRANSF-2025-001", source_org_id=source_org.id, target_org_id=target_org.id, request_user_id=1, status=TransferStatus.PENDING, expected_transfer_date=datetime.now() + timedelta(days=7), transfer_reason="部门调整需要调拨", remark="测试调拨单" ) db.add(transfer) db.commit() db.refresh(transfer) # 添加调拨项 for asset in test_assets_for_transfer[:3]: item = TransferItem( transfer_id=transfer.id, asset_id=asset.id, item_type=TransferItemType.TRANSFER, status="pending" ) db.add(item) db.commit() return transfer # ================================ # 调拨单CRUD测试 (20+用例) # ================================ class TestTransferCRUD: """调拨单CRUD操作测试""" def test_create_transfer_with_valid_data(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试使用有效数据创建调拨单""" source_org, target_org = test_orgs_for_transfer response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [asset.id for asset in test_assets_for_transfer[:3]], "expected_transfer_date": (datetime.now() + timedelta(days=7)).isoformat(), "transfer_reason": "业务调整", "remark": "测试调拨" }, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["transfer_no"] is not None assert data["status"] == TransferStatus.PENDING assert data["asset_count"] == 3 def test_create_transfer_with_same_source_and_target_org(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试源组织和目标组织相同时应失败""" source_org, _ = test_orgs_for_transfer response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": source_org.id, "asset_ids": [test_assets_for_transfer[0].id], "transfer_reason": "无效调拨" }, headers=auth_headers ) assert response.status_code == 400 assert "源组织和目标组织不能相同" in response.json()["detail"] def test_create_transfer_with_empty_assets(self, client, auth_headers, test_orgs_for_transfer): """测试创建空资产列表的调拨单应失败""" source_org, target_org = test_orgs_for_transfer response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [], "transfer_reason": "测试" }, headers=auth_headers ) assert response.status_code == 400 assert "资产列表不能为空" in response.json()["detail"] def test_create_transfer_with_invalid_source_org(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试使用无效源组织ID创建调拨单应失败""" _, target_org = test_orgs_for_transfer response = client.post( "/api/v1/transfers/", json={ "source_org_id": 99999, "target_org_id": target_org.id, "asset_ids": [test_assets_for_transfer[0].id], "transfer_reason": "测试" }, headers=auth_headers ) assert response.status_code == 404 def test_create_transfer_with_invalid_target_org(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试使用无效目标组织ID创建调拨单应失败""" source_org, _ = test_orgs_for_transfer response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": 99999, "asset_ids": [test_assets_for_transfer[0].id], "transfer_reason": "测试" }, headers=auth_headers ) assert response.status_code == 404 def test_create_transfer_with_asset_from_different_org(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试调拨不属于源组织的资产应失败""" source_org, target_org = test_orgs_for_transfer # 创建属于其他组织的资产 other_asset = Asset( asset_code="OTHER-ASSET-001", asset_name="其他组织资产", device_type_id=1, organization_id=999, status="in_use" ) db.add(other_asset) db.commit() db.refresh(other_asset) response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [other_asset.id], "transfer_reason": "测试" }, headers=auth_headers ) assert response.status_code == 400 assert "资产不属于源组织" in response.json()["detail"] def test_create_transfer_with_in_stock_asset(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试调拨库存中资产应失败""" source_org, target_org = test_orgs_for_transfer asset = Asset( asset_code="TEST-STOCK-001", asset_name="库存资产", device_type_id=1, organization_id=source_org.id, status="in_stock" ) db.add(asset) db.commit() db.refresh(asset) response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [asset.id], "transfer_reason": "测试" }, headers=auth_headers ) assert response.status_code == 400 assert "只能调拨使用中的资产" in response.json()["detail"] def test_create_transfer_with_maintenance_asset(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试调拨维修中资产应失败""" source_org, target_org = test_orgs_for_transfer asset = Asset( asset_code="TEST-MAINT-002", asset_name="维修中资产", device_type_id=1, organization_id=source_org.id, status="maintenance" ) db.add(asset) db.commit() db.refresh(asset) response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [asset.id], "transfer_reason": "测试" }, headers=auth_headers ) assert response.status_code == 400 assert "资产状态不允许调拨" in response.json()["detail"] def test_get_transfer_list_with_pagination(self, client, auth_headers, test_transfer_order): """测试分页获取调拨单列表""" response = client.get( "/api/v1/transfers/?page=1&page_size=10", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert "items" in data assert "total" in data assert len(data["items"]) >= 1 def test_get_transfer_list_with_status_filter(self, client, auth_headers, test_transfer_order): """测试按状态筛选调拨单""" response = client.get( f"/api/v1/transfers/?status={TransferStatus.PENDING}", headers=auth_headers ) assert response.status_code == 200 data = response.json() for item in data["items"]: assert item["status"] == TransferStatus.PENDING def test_get_transfer_list_with_org_filter(self, client, auth_headers, test_transfer_order, test_orgs_for_transfer): """测试按组织筛选调拨单""" source_org, _ = test_orgs_for_transfer response = client.get( f"/api/v1/transfers/?source_org_id={source_org.id}", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert len(data["items"]) >= 1 def test_get_transfer_list_with_date_range(self, client, auth_headers, test_transfer_order): """测试按日期范围筛选调拨单""" start_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") end_date = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d") response = client.get( f"/api/v1/transfers/?start_date={start_date}&end_date={end_date}", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert len(data["items"]) >= 1 def test_get_transfer_by_id(self, client, auth_headers, test_transfer_order): """测试通过ID获取调拨单详情""" response = client.get( f"/api/v1/transfers/{test_transfer_order.id}", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["id"] == test_transfer_order.id assert data["transfer_no"] == test_transfer_order.transfer_no assert "items" in data assert "source_org" in data assert "target_org" in data def test_get_transfer_by_invalid_id(self, client, auth_headers): """测试通过无效ID获取调拨单应返回404""" response = client.get( "/api/v1/transfers/999999", headers=auth_headers ) assert response.status_code == 404 def test_get_transfer_items(self, client, auth_headers, test_transfer_order): """测试获取调拨单的资产项列表""" response = client.get( f"/api/v1/transfers/{test_transfer_order.id}/items", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert isinstance(data, list) assert len(data) == 3 def test_update_transfer_remark(self, client, auth_headers, test_transfer_order): """测试更新调拨单备注""" response = client.put( f"/api/v1/transfers/{test_transfer_order.id}", json={"remark": "更新后的备注"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["remark"] == "更新后的备注" def test_update_transfer_expected_date(self, client, auth_headers, test_transfer_order): """测试更新调拨单预期日期""" new_date = (datetime.now() + timedelta(days=14)).isoformat() response = client.put( f"/api/v1/transfers/{test_transfer_order.id}", json={"expected_transfer_date": new_date}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert "expected_transfer_date" in data def test_update_transfer_after_approval_should_fail(self, client, auth_headers, db: Session, test_transfer_order): """测试更新已审批的调拨单应失败""" test_transfer_order.status = TransferStatus.APPROVED db.commit() response = client.put( f"/api/v1/transfers/{test_transfer_order.id}", json={"remark": "不应允许更新"}, headers=auth_headers ) assert response.status_code == 400 assert "不允许修改" in response.json()["detail"] def test_delete_pending_transfer(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试删除待审批的调拨单""" source_org, target_org = test_orgs_for_transfer transfer = Transfer( transfer_no="TRANSF-DEL-001", source_org_id=source_org.id, target_org_id=target_org.id, request_user_id=1, status=TransferStatus.PENDING ) db.add(transfer) db.commit() db.refresh(transfer) response = client.delete( f"/api/v1/transfers/{transfer.id}", headers=auth_headers ) assert response.status_code == 200 def test_delete_approved_transfer_should_fail(self, client, auth_headers, db: Session, test_transfer_order): """测试删除已审批的调拨单应失败""" test_transfer_order.status = TransferStatus.APPROVED db.commit() response = client.delete( f"/api/v1/transfers/{test_transfer_order.id}", headers=auth_headers ) assert response.status_code == 400 assert "不允许删除" in response.json()["detail"] def test_create_transfer_with_duplicate_assets(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试创建包含重复资产的调拨单应去重""" source_org, target_org = test_orgs_for_transfer asset_id = test_assets_for_transfer[0].id response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [asset_id, asset_id, asset_id], "transfer_reason": "测试去重" }, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["asset_count"] == 1 # 去重后只有1个 def test_get_transfer_statistics(self, client, auth_headers, test_transfer_order): """测试获取调拨单统计信息""" response = client.get( "/api/v1/transfers/statistics/summary", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert "total_count" in data assert "status_distribution" in data # ================================ # 调拨流程测试 (15+用例) # ================================ class TestTransferWorkflow: """调拨流程测试""" def test_submit_transfer_request(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试提交调拨申请""" source_org, target_org = test_orgs_for_transfer response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [test_assets_for_transfer[0].id], "transfer_reason": "业务调整", "remark": "测试申请" }, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["status"] == TransferStatus.PENDING def test_approve_transfer_request(self, client, auth_headers, test_transfer_order, db: Session): """测试审批调拨申请""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/approve", json={"approval_comment": "调拨审批通过"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["status"] == TransferStatus.APPROVED def test_reject_transfer_request(self, client, auth_headers, test_transfer_order): """测试拒绝调拨申请""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/reject", json={"rejection_reason": "资产不足"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["status"] == TransferStatus.REJECTED def test_start_transfer_execution(self, client, auth_headers, test_transfer_order, db: Session): """测试开始执行调拨""" test_transfer_order.status = TransferStatus.APPROVED db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/start", json={"start_note": "开始调拨"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["status"] == TransferStatus.IN_TRANSIT def test_confirm_transfer_receipt(self, client, auth_headers, test_transfer_order, db: Session): """测试确认调拨接收""" test_transfer_order.status = TransferStatus.IN_TRANSIT db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/confirm-receipt", json={"receipt_note": "已接收", "receiver_name": "张三"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["status"] == TransferStatus.COMPLETED def test_confirm_receipt_updates_asset_organization(self, client, auth_headers, test_transfer_order, db: Session): """测试确认接收后资产组织应更新""" test_transfer_order.status = TransferStatus.IN_TRANSIT db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/confirm-receipt", json={"receipt_note": "已接收", "receiver_name": "李四"}, headers=auth_headers ) assert response.status_code == 200 # 验证资产组织已更新 for item in test_transfer_order.items: asset = db.query(Asset).filter(Asset.id == item.asset_id).first() assert asset.organization_id == test_transfer_order.target_org_id def test_partial_confirm_receipt(self, client, auth_headers, test_transfer_order, db: Session): """测试部分确认接收""" test_transfer_order.status = TransferStatus.IN_TRANSIT db.commit() # 确认接收部分资产 item_ids = [test_transfer_order.items[0].id, test_transfer_order.items[1].id] response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/partial-confirm", json={"item_ids": item_ids, "note": "部分接收"}, headers=auth_headers ) assert response.status_code == 200 def test_cancel_transfer_before_approval(self, client, auth_headers, test_transfer_order): """测试审批前取消调拨""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/cancel", json={"cancellation_reason": "申请有误"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["status"] == TransferStatus.CANCELLED def test_generate_transfer_document(self, client, auth_headers, test_transfer_order): """测试生成调拨单据""" response = client.get( f"/api/v1/transfers/{test_transfer_order.id}/document", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert "document_url" in data def test_add_transfer_tracking_info(self, client, auth_headers, test_transfer_order, db: Session): """测试添加调拨跟踪信息""" test_transfer_order.status = TransferStatus.IN_TRANSIT db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/tracking", json={"tracking_number": "SF1234567890", "carrier": "顺丰快递"}, headers=auth_headers ) assert response.status_code == 200 def test_get_transfer_tracking_history(self, client, auth_headers, test_transfer_order): """测试获取调拨跟踪历史""" response = client.get( f"/api/v1/transfers/{test_transfer_order.id}/tracking-history", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert isinstance(data, list) def test_verify_asset_before_transfer(self, client, auth_headers, test_transfer_order, db: Session): """测试调拨前验证资产""" asset = db.query(Asset).filter( Asset.id == test_transfer_order.items[0].asset_id ).first() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/verify-asset", json={"asset_qrcode": asset.qrcode}, headers=auth_headers ) assert response.status_code == 200 def test_transfer_with_condition_check(self, client, auth_headers, test_transfer_order): """测试带条件检查的调拨""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/check-conditions", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert "can_transfer" in data def test_batch_transfer_operations(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试批量调拨操作""" source_org, target_org = test_orgs_for_transfer # 创建多个调拨单 transfer_ids = [] for i in range(3): transfer = Transfer( transfer_no=f"TRANSF-BATCH-{i+1:03d}", source_org_id=source_org.id, target_org_id=target_org.id, request_user_id=1, status=TransferStatus.PENDING ) db.add(transfer) db.commit() db.refresh(transfer) transfer_ids.append(transfer.id) response = client.post( "/api/v1/transfers/batch-approve", json={"transfer_ids": transfer_ids, "comment": "批量审批"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 3 # ================================ # 状态转换测试 (10+用例) # ================================ class TestTransferStatusTransitions: """调拨单状态转换测试""" def test_status_transition_pending_to_approved(self, client, auth_headers, test_transfer_order): """测试状态转换: 待审批 -> 已审批""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/approve", json={"approval_comment": "审批通过"}, headers=auth_headers ) assert response.status_code == 200 assert response.json()["status"] == TransferStatus.APPROVED def test_status_transition_pending_to_rejected(self, client, auth_headers, test_transfer_order): """测试状态转换: 待审批 -> 已拒绝""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/reject", json={"rejection_reason": "理由"}, headers=auth_headers ) assert response.status_code == 200 assert response.json()["status"] == TransferStatus.REJECTED def test_status_transition_approved_to_in_transit(self, client, auth_headers, test_transfer_order, db: Session): """测试状态转换: 已审批 -> 运输中""" test_transfer_order.status = TransferStatus.APPROVED db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/start", json={}, headers=auth_headers ) assert response.status_code == 200 assert response.json()["status"] == TransferStatus.IN_TRANSIT def test_status_transition_in_transit_to_completed(self, client, auth_headers, test_transfer_order, db: Session): """测试状态转换: 运输中 -> 已完成""" test_transfer_order.status = TransferStatus.IN_TRANSIT db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/confirm-receipt", json={"receipt_note": "已完成", "receiver_name": "测试"}, headers=auth_headers ) assert response.status_code == 200 assert response.json()["status"] == TransferStatus.COMPLETED def test_invalid_transition_from_completed(self, client, auth_headers, test_transfer_order, db: Session): """测试已完成状态不允许转换""" test_transfer_order.status = TransferStatus.COMPLETED db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/approve", json={"approval_comment": "尝试重新审批"}, headers=auth_headers ) assert response.status_code == 400 def test_invalid_transition_from_rejected(self, client, auth_headers, test_transfer_order, db: Session): """测试已拒绝状态不允许转换为运输中""" test_transfer_order.status = TransferStatus.REJECTED db.commit() response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/start", json={}, headers=auth_headers ) assert response.status_code == 400 def test_status_transition_pending_to_cancelled(self, client, auth_headers, test_transfer_order): """测试状态转换: 待审批 -> 已取消""" response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/cancel", json={"cancellation_reason": "取消原因"}, headers=auth_headers ) assert response.status_code == 200 assert response.json()["status"] == TransferStatus.CANCELLED def test_get_status_transition_history(self, client, auth_headers, test_transfer_order): """测试获取状态转换历史""" # 先进行状态转换 client.post( f"/api/v1/transfers/{test_transfer_order.id}/approve", json={"approval_comment": "测试"}, headers=auth_headers ) response = client.get( f"/api/v1/transfers/{test_transfer_order.id}/status-history", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert len(data) >= 1 def test_auto_transition_on_all_items_confirmed(self, client, auth_headers, test_transfer_order, db: Session): """测试所有项确认后自动转换状态""" test_transfer_order.status = TransferStatus.IN_TRANSIT # 标记所有项为已确认 for item in test_transfer_order.items: item.status = "confirmed" db.commit() # 触发自动完成 response = client.post( f"/api/v1/transfers/{test_transfer_order.id}/check-auto-complete", headers=auth_headers ) assert response.status_code == 200 def test_get_available_status_transitions(self, client, auth_headers, test_transfer_order): """测试获取可用的状态转换""" response = client.get( f"/api/v1/transfers/{test_transfer_order.id}/available-transitions", headers=auth_headers ) assert response.status_code == 200 data = response.json() assert isinstance(data, list) # ================================ # 并发测试 (5+用例) # ================================ class TestTransferConcurrency: """调拨并发测试""" def test_concurrent_transfer_approval(self, client, auth_headers, test_transfer_order): """测试并发审批调拨单""" # 模拟两个管理员同时审批 # 第一个审批 response1 = client.post( f"/api/v1/transfers/{test_transfer_order.id}/approve", json={"approval_comment": "第一个审批"}, headers=auth_headers ) assert response1.status_code == 200 # 第二个审批应该失败(因为已经是已审批状态) response2 = client.post( f"/api/v1/transfers/{test_transfer_order.id}/approve", json={"approval_comment": "第二个审批"}, headers=auth_headers ) assert response2.status_code == 400 def test_concurrent_asset_transfer(self, client, auth_headers, db: Session, test_assets_for_transfer, test_orgs_for_transfer): """测试同时调拨同一资产""" source_org, target_org = test_orgs_for_transfer asset = test_assets_for_transfer[0] # 创建第一个调拨单 transfer1_data = { "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [asset.id], "transfer_reason": "第一次调拨" } response1 = client.post( "/api/v1/transfers/", json=transfer1_data, headers=auth_headers ) assert response1.status_code == 200 transfer1_id = response1.json()["id"] # 审批第一个调拨单 client.post( f"/api/v1/transfers/{transfer1_id}/approve", json={"approval_comment": "审批通过"}, headers=auth_headers ) # 尝试创建第二个调拨单(应该失败,因为资产已在调拨中) transfer2_data = { "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [asset.id], "transfer_reason": "第二次调拨" } response2 = client.post( "/api/v1/transfers/", json=transfer2_data, headers=auth_headers ) assert response2.status_code == 400 assert "资产已在调拨中" in response2.json()["detail"] def test_concurrent_status_update(self, client, auth_headers, test_transfer_order, db: Session): """测试并发状态更新""" # 这个测试需要使用多线程或异步来模拟真正的并发 pass def test_batch_operation_with_concurrent_requests(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试批量操作时的并发请求""" source_org, target_org = test_orgs_for_transfer # 创建多个调拨单 transfer_ids = [] for i in range(5): transfer = Transfer( transfer_no=f"TRANSF-CONCUR-{i+1:03d}", source_org_id=source_org.id, target_org_id=target_org.id, request_user_id=1, status=TransferStatus.PENDING ) db.add(transfer) db.commit() db.refresh(transfer) transfer_ids.append(transfer.id) # 批量审批 response = client.post( "/api/v1/transfers/batch-approve", json={"transfer_ids": transfer_ids, "comment": "批量并发审批"}, headers=auth_headers ) assert response.status_code == 200 data = response.json() assert data["success_count"] == 5 def test_concurrent_read_write_operations(self, client, auth_headers, test_transfer_order): """测试并发读写操作""" # 这个测试需要模拟同时读取和写入 pass # ================================ # 测试标记 # ================================ @pytest.mark.unit class TestTransferUnit: """单元测试标记""" def test_transfer_number_generation(self): """测试调拨单号生成逻辑""" pass def test_transfer_eligibility_check(self): """测试调拨资格检查逻辑""" pass @pytest.mark.integration class TestTransferIntegration: """集成测试标记""" def test_full_transfer_workflow(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """测试完整的调拨流程""" source_org, target_org = test_orgs_for_transfer # 1. 创建调拨单 create_response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [test_assets_for_transfer[0].id], "transfer_reason": "完整流程测试" }, headers=auth_headers ) assert create_response.status_code == 200 transfer_id = create_response.json()["id"] # 2. 审批 approve_response = client.post( f"/api/v1/transfers/{transfer_id}/approve", json={"approval_comment": "审批通过"}, headers=auth_headers ) assert approve_response.status_code == 200 # 3. 开始执行 start_response = client.post( f"/api/v1/transfers/{transfer_id}/start", json={}, headers=auth_headers ) assert start_response.status_code == 200 # 4. 确认接收 confirm_response = client.post( f"/api/v1/transfers/{transfer_id}/confirm-receipt", json={"receipt_note": "已接收", "receiver_name": "测试"}, headers=auth_headers ) assert confirm_response.status_code == 200 @pytest.mark.slow class TestTransferSlowTests: """慢速测试标记""" def test_large_batch_transfer(self, client, auth_headers, db: Session, test_orgs_for_transfer): """测试大批量调拨""" pass @pytest.mark.smoke class TestTransferSmoke: """冒烟测试标记 - 核心功能快速验证""" def test_create_and_approve_transfer(self, client, auth_headers, test_assets_for_transfer, test_orgs_for_transfer): """冒烟测试: 创建并审批调拨单""" source_org, target_org = test_orgs_for_transfer create_response = client.post( "/api/v1/transfers/", json={ "source_org_id": source_org.id, "target_org_id": target_org.id, "asset_ids": [test_assets_for_transfer[0].id], "transfer_reason": "冒烟测试" }, headers=auth_headers ) assert create_response.status_code == 200 transfer_id = create_response.json()["id"] approve_response = client.post( f"/api/v1/transfers/{transfer_id}/approve", json={"approval_comment": "冒烟测试"}, headers=auth_headers ) assert approve_response.status_code == 200