""" 性能测试 - Locust文件 测试内容: - 并发用户测试 - 接口响应时间 - 吞吐量测试 - 负载测试 - 压力测试 """ from locust import HttpUser, task, between, events from locust.runners import MasterRunner import time import random # 测试数据 TEST_USERS = [ {"username": "admin", "password": "Admin123"}, {"username": "user1", "password": "Test123"}, {"username": "user2", "password": "Test123"}, ] ASSET_NAMES = ["联想台式机", "戴尔笔记本", "惠普打印机", "苹果显示器", "罗技鼠标"] DEVICE_TYPES = [1, 2, 3, 4, 5] ORGANIZATIONS = [1, 2, 3, 4, 5] class AssetManagementUser(HttpUser): """ 资产管理系统用户模拟 模拟真实用户的行为模式 """ # 等待时间: 用户操作之间间隔1-3秒 wait_time = between(1, 3) def on_start(self): """用户登录时执行""" self.login() self.token = None self.headers = {} def login(self): """登录获取token""" user = random.choice(TEST_USERS) # 先获取验证码 captcha_resp = self.client.get("/api/v1/auth/captcha") if captcha_resp.status_code == 200: captcha_data = captcha_resp.json() captcha_key = captcha_data["data"]["captcha_key"] # 登录 login_resp = self.client.post( "/api/v1/auth/login", json={ "username": user["username"], "password": user["password"], "captcha": "1234", # 测试环境固定验证码 "captcha_key": captcha_key } ) if login_resp.status_code == 200: self.token = login_resp.json()["data"]["access_token"] self.headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } @task(10) def view_asset_list(self): """查看资产列表 (高频操作)""" self.client.get( "/api/v1/assets", headers=self.headers, params={ "page": random.randint(1, 5), "page_size": 20 } ) @task(5) def search_assets(self): """搜索资产 (中频操作)""" keywords = ["联想", "戴尔", "台式机", "笔记本", "打印机"] keyword = random.choice(keywords) self.client.get( "/api/v1/assets", headers=self.headers, params={"keyword": keyword} ) @task(3) def view_asset_detail(self): """查看资产详情 (中频操作)""" asset_id = random.randint(1, 100) self.client.get( f"/api/v1/assets/{asset_id}", headers=self.headers ) @task(2) def view_statistics(self): """查看统计数据 (低频操作)""" self.client.get( "/api/v1/statistics/overview", headers=self.headers ) @task(1) def create_asset(self): """创建资产 (低频操作)""" asset_data = { "asset_name": f"{random.choice(ASSET_NAMES)}-{int(time.time())}", "device_type_id": random.choice(DEVICE_TYPES), "organization_id": random.choice(ORGANIZATIONS), "model": f"测试型号-{int(time.time())}", "serial_number": f"SN-{int(time.time())}", "location": f"测试位置-{random.randint(1, 10)}" } self.client.post( "/api/v1/assets", headers=self.headers, json=asset_data ) @task(1) def filter_assets(self): """筛选资产 (低频操作)""" statuses = ["in_stock", "in_use", "maintenance", "scrapped"] status = random.choice(statuses) self.client.get( "/api/v1/assets", headers=self.headers, params={"status": status} ) class AssetManagementUserRead(AssetManagementUser): """ 只读用户 只执行查询操作,不执行写操作 """ @task(10) def view_asset_list(self): """查看资产列表""" self.client.get( "/api/v1/assets", headers=self.headers, params={"page": random.randint(1, 10), "page_size": 20} ) @task(5) def view_asset_detail(self): """查看资产详情""" asset_id = random.randint(1, 100) self.client.get( f"/api/v1/assets/{asset_id}", headers=self.headers ) @task(3) def search_assets(self): """搜索资产""" keywords = ["联想", "戴尔", "惠普"] self.client.get( "/api/v1/assets", headers=self.headers, params={"keyword": random.choice(keywords)} ) @task(2) def view_statistics(self): """查看统计数据""" self.client.get( "/api/v1/statistics/overview", headers=self.headers ) # 自定义事件处理器 @events.request.add_listener def on_request(request_type, name, response_time, response_length, **kwargs): """ 请求事件监听器 记录慢请求 """ if response_time > 1000: # 响应时间超过1秒 print(f"慢请求警告: {name} 耗时 {response_time}ms") @events.test_stop.add_listener def on_test_stop(environment, **kwargs): """ 测试结束事件 输出测试统计 """ if not isinstance(environment.runner, MasterRunner): print("\n" + "="*50) print("性能测试完成") print("="*50) stats = environment.stats print(f"\n总请求数: {stats.total.num_requests}") print(f"失败请求数: {stats.total.num_failures}") print(f"平均响应时间: {stats.total.avg_response_time}ms") print(f"中位数响应时间: {stats.total.median_response_time}ms") print(f"95%请求响应时间: {stats.total.get_response_time_percentile(0.95)}ms") print(f"99%请求响应时间: {stats.total.get_response_time_percentile(0.99)}ms") print(f"请求/秒 (RPS): {stats.total.total_rps}") print(f"失败率: {stats.total.fail_ratio * 100:.2f}%") # 性能指标评估 print("\n性能评估:") avg_response = stats.total.avg_response_time if avg_response < 200: print("✓ 响应时间: 优秀 (< 200ms)") elif avg_response < 500: print("✓ 响应时间: 良好 (< 500ms)") elif avg_response < 1000: print("⚠ 响应时间: 一般 (< 1000ms)") else: print("✗ 响应时间: 差 (> 1000ms)") rps = stats.total.total_rps if rps > 100: print("✓ 吞吐量: 优秀 (> 100 RPS)") elif rps > 50: print("✓ 吞吐量: 良好 (> 50 RPS)") elif rps > 20: print("⚠ 吞吐量: 一般 (> 20 RPS)") else: print("✗ 吞吐量: 差 (< 20 RPS)") fail_ratio = stats.total.fail_ratio * 100 if fail_ratio < 1: print("✓ 失败率: 优秀 (< 1%)") elif fail_ratio < 5: print("✓ 失败率: 良好 (< 5%)") else: print("✗ 失败率: 差 (> 5%)") print("="*50 + "\n") # 性能测试目标 PERFORMANCE_TARGETS = { "avg_response_time": 500, # 平均响应时间 < 500ms "p95_response_time": 1000, # 95%响应时间 < 1000ms "rps": 50, # 吞吐量 > 50 RPS "fail_ratio": 0.01 # 失败率 < 1% } class PerformanceTestRunner: """ 性能测试运行器 提供不同场景的性能测试 """ def __init__(self): self.scenarios = { "smoke": self.smoke_test, "normal": self.normal_load_test, "stress": self.stress_test, "spike": self.spike_test, "endurance": self.endurance_test } def smoke_test(self): """ 冒烟测试 少量用户,验证系统基本功能 """ return { "num_users": 10, "spawn_rate": 2, "run_time": "1m" } def normal_load_test(self): """ 正常负载测试 模拟日常使用情况 """ return { "num_users": 50, "spawn_rate": 5, "run_time": "5m" } def stress_test(self): """ 压力测试 逐步增加用户直到系统达到极限 """ return { "num_users": 200, "spawn_rate": 10, "run_time": "10m" } def spike_test(self): """ 尖峰测试 突然大量用户访问 """ return { "num_users": 500, "spawn_rate": 50, "run_time": "2m" } def endurance_test(self): """ 耐力测试 长时间稳定负载 """ return { "num_users": 100, "spawn_rate": 10, "run_time": "30m" } # 使用说明 """ 运行性能测试: 1. 冒烟测试 (10用户, 1分钟): locust -f locustfile.py --headless -u 10 -r 2 -t 1m 2. 正常负载测试 (50用户, 5分钟): locust -f locustfile.py --headless -u 50 -r 5 -t 5m 3. 压力测试 (200用户, 10分钟): locust -f locustfile.py --headless -u 200 -r 10 -t 10m 4. 尖峰测试 (500用户, 2分钟): locust -f locustfile.py --headless -u 500 -r 50 -t 2m 5. Web界面模式: locust -f locustfile.py --host=http://localhost:8000 然后访问 http://localhost:8089 6. 分布式测试 (Master): locust -f locustfile.py --master --expect-workers=4 7. 分布式测试 (Worker): locust -f locustfile.py --worker --master-host= """