""" 资产调拨和回收API快速测试脚本 用于验证所有端点是否可访问 """ import requests import json BASE_URL = "http://localhost:8000" TOKEN = "YOUR_TOKEN_HERE" # 需要替换为实际的token headers = { "Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json" } def test_transfer_apis(): """测试调拨API""" print("\n" + "="*60) print("测试资产调拨API") print("="*60) # 1. 获取调拨单列表 print("\n1. GET /api/v1/transfers") response = requests.get(f"{BASE_URL}/api/v1/transfers", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 获取调拨单列表成功") else: print(f" ✗ 失败: {response.text}") # 2. 获取调拨单统计 print("\n2. GET /api/v1/transfers/statistics") response = requests.get(f"{BASE_URL}/api/v1/transfers/statistics", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: data = response.json() print(f" ✓ 获取统计成功: {data}") else: print(f" ✗ 失败: {response.text}") # 3. 创建调拨单 print("\n3. POST /api/v1/transfers") create_data = { "source_org_id": 1, "target_org_id": 2, "transfer_type": "external", "title": "测试调拨单", "asset_ids": [1, 2, 3], "remark": "测试备注" } response = requests.post( f"{BASE_URL}/api/v1/transfers", headers=headers, json=create_data ) print(f" 状态码: {response.status_code}") if response.status_code == 201: data = response.json() print(f" ✓ 创建调拨单成功: {data['order_code']}") order_id = data['id'] else: print(f" ✗ 失败: {response.text}") order_id = None if order_id: # 4. 获取调拨单详情 print(f"\n4. GET /api/v1/transfers/{order_id}") response = requests.get(f"{BASE_URL}/api/v1/transfers/{order_id}", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 获取调拨单详情成功") else: print(f" ✗ 失败: {response.text}") # 5. 获取调拨单明细 print(f"\n5. GET /api/v1/transfers/{order_id}/items") response = requests.get(f"{BASE_URL}/api/v1/transfers/{order_id}/items", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 获取调拨单明细成功") else: print(f" ✗ 失败: {response.text}") # 6. 更新调拨单 print(f"\n6. PUT /api/v1/transfers/{order_id}") update_data = { "title": "更新后的标题", "remark": "更新后的备注" } response = requests.put( f"{BASE_URL}/api/v1/transfers/{order_id}", headers=headers, json=update_data ) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 更新调拨单成功") else: print(f" ✗ 失败: {response.text}") # 7. 审批调拨单 print(f"\n7. POST /api/v1/transfers/{order_id}/approve") response = requests.post( f"{BASE_URL}/api/v1/transfers/{order_id}/approve?approval_status=approved&approval_remark=测试通过", headers=headers ) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 审批调拨单成功") else: print(f" ✗ 失败: {response.text}") # 8. 开始调拨 print(f"\n8. POST /api/v1/transfers/{order_id}/start") response = requests.post(f"{BASE_URL}/api/v1/transfers/{order_id}/start", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 开始调拨成功") else: print(f" ✗ 失败: {response.text}") # 9. 完成调拨 print(f"\n9. POST /api/v1/transfers/{order_id}/complete") response = requests.post(f"{BASE_URL}/api/v1/transfers/{order_id}/complete", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 完成调拨成功") else: print(f" ✗ 失败: {response.text}") # 10. 取消调拨单(测试用) # print(f"\n10. POST /api/v1/transfers/{order_id}/cancel") # response = requests.post(f"{BASE_URL}/api/v1/transfers/{order_id}/cancel", headers=headers) # print(f" 状态码: {response.status_code}") # if response.status_code == 204: # print(f" ✓ 取消调拨单成功") # else: # print(f" ✗ 失败: {response.text}") # 11. 删除调拨单(测试用) # print(f"\n11. DELETE /api/v1/transfers/{order_id}") # response = requests.delete(f"{BASE_URL}/api/v1/transfers/{order_id}", headers=headers) # print(f" 状态码: {response.status_code}") # if response.status_code == 204: # print(f" ✓ 删除调拨单成功") # else: # print(f" ✗ 失败: {response.text}") def test_recovery_apis(): """测试回收API""" print("\n" + "="*60) print("测试资产回收API") print("="*60) # 1. 获取回收单列表 print("\n1. GET /api/v1/recoveries") response = requests.get(f"{BASE_URL}/api/v1/recoveries", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 获取回收单列表成功") else: print(f" ✗ 失败: {response.text}") # 2. 获取回收单统计 print("\n2. GET /api/v1/recoveries/statistics") response = requests.get(f"{BASE_URL}/api/v1/recoveries/statistics", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: data = response.json() print(f" ✓ 获取统计成功: {data}") else: print(f" ✗ 失败: {response.text}") # 3. 创建回收单 print("\n3. POST /api/v1/recoveries") create_data = { "recovery_type": "user", "title": "测试回收单", "asset_ids": [1, 2, 3], "remark": "测试备注" } response = requests.post( f"{BASE_URL}/api/v1/recoveries", headers=headers, json=create_data ) print(f" 状态码: {response.status_code}") if response.status_code == 201: data = response.json() print(f" ✓ 创建回收单成功: {data['order_code']}") order_id = data['id'] else: print(f" ✗ 失败: {response.text}") order_id = None if order_id: # 4. 获取回收单详情 print(f"\n4. GET /api/v1/recoveries/{order_id}") response = requests.get(f"{BASE_URL}/api/v1/recoveries/{order_id}", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 获取回收单详情成功") else: print(f" ✗ 失败: {response.text}") # 5. 获取回收单明细 print(f"\n5. GET /api/v1/recoveries/{order_id}/items") response = requests.get(f"{BASE_URL}/api/v1/recoveries/{order_id}/items", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 获取回收单明细成功") else: print(f" ✗ 失败: {response.text}") # 6. 更新回收单 print(f"\n6. PUT /api/v1/recoveries/{order_id}") update_data = { "title": "更新后的标题", "remark": "更新后的备注" } response = requests.put( f"{BASE_URL}/api/v1/recoveries/{order_id}", headers=headers, json=update_data ) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 更新回收单成功") else: print(f" ✗ 失败: {response.text}") # 7. 审批回收单 print(f"\n7. POST /api/v1/recoveries/{order_id}/approve") response = requests.post( f"{BASE_URL}/api/v1/recoveries/{order_id}/approve?approval_status=approved&approval_remark=测试通过", headers=headers ) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 审批回收单成功") else: print(f" ✗ 失败: {response.text}") # 8. 开始回收 print(f"\n8. POST /api/v1/recoveries/{order_id}/start") response = requests.post(f"{BASE_URL}/api/v1/recoveries/{order_id}/start", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 开始回收成功") else: print(f" ✗ 失败: {response.text}") # 9. 完成回收 print(f"\n9. POST /api/v1/recoveries/{order_id}/complete") response = requests.post(f"{BASE_URL}/api/v1/recoveries/{order_id}/complete", headers=headers) print(f" 状态码: {response.status_code}") if response.status_code == 200: print(f" ✓ 完成回收成功") else: print(f" ✗ 失败: {response.text}") # 10. 取消回收单(测试用) # print(f"\n10. POST /api/v1/recoveries/{order_id}/cancel") # response = requests.post(f"{BASE_URL}/api/v1/recoveries/{order_id}/cancel", headers=headers) # print(f" 状态码: {response.status_code}") # if response.status_code == 204: # print(f" ✓ 取消回收单成功") # else: # print(f" ✗ 失败: {response.text}") # 11. 删除回收单(测试用) # print(f"\n11. DELETE /api/v1/recoveries/{order_id}") # response = requests.delete(f"{BASE_URL}/api/v1/recoveries/{order_id}", headers=headers) # print(f" 状态码: {response.status_code}") # if response.status_code == 204: # print(f" ✓ 删除回收单成功") # else: # print(f" ✗ 失败: {response.text}") if __name__ == "__main__": print("\n" + "="*60) print("资产调拨和回收API测试脚本") print("="*60) print(f"\n基础URL: {BASE_URL}") print(f"Token: {TOKEN[:20]}..." if TOKEN else "Token: 未设置") if TOKEN == "YOUR_TOKEN_HERE": print("\n⚠️ 警告: 请先设置有效的TOKEN") print("使用方法:") print("1. 登录获取token: POST /api/v1/auth/login") print("2. 修改脚本中的TOKEN变量") exit(1) try: test_transfer_apis() test_recovery_apis() print("\n" + "="*60) print("测试完成") print("="*60 + "\n") except requests.exceptions.ConnectionError: print("\n✗ 无法连接到服务器,请确保API服务正在运行") print(f" 启动命令: uvicorn app.main:app --reload") except Exception as e: print(f"\n✗ 测试出错: {str(e)}")