1. 从脚本到场景Locust性能测试的核心设计思路上一期我们聊了Locust的基础搭建和环境配置算是把“厨房”和“灶台”都准备好了。今天这期咱们直接上手“炒菜”聊聊怎么用Locust写出真正能模拟真实用户行为、压出有效数据的性能测试脚本。很多朋友觉得性能测试脚本就是写个循环发请求但Locust的强大之处在于它能让你用Python代码像编排话剧一样精细地定义每个虚拟用户蝗虫的行为逻辑。这不仅仅是发请求更是模拟用户思考时间、操作路径、数据依赖和业务比例。一个设计良好的Locust脚本其价值远超一个简单的HTTP请求发送器。它能帮你发现接口间的依赖导致的连锁故障能模拟突发流量对系统的冲击更能通过自定义指标捕捉到那些标准响应时间之外的问题比如特定业务逻辑的缓慢或者缓存失效后的雪崩效应。所以别再把Locust脚本当成一个简单的requests库封装它是一套完整的用户行为模拟与负载生成方案。1.1 理解TaskSet用户行为的“剧本”在Locust里虚拟用户User做什么是由TaskSet类来定义的。你可以把它理解为一个“任务集”也就是用户的一套行为剧本。一个用户类继承自HttpUser可以设置一个tasks属性指向一个或多个TaskSet。最基本的用法是在用户类里直接以内联方式定义任务函数并用task装饰器指定权重。from locust import HttpUser, task, between class QuickstartUser(HttpUser): wait_time between(1, 5) # 用户执行完一个任务后等待1-5秒 task(3) # 权重为3执行频率更高 def view_items(self): self.client.get(/api/items) self.client.get(/api/items/details) task(1) # 权重为1 def add_to_cart(self): self.client.post(/api/cart/add, json{item_id: 1})这段代码定义了两个任务view_items浏览商品和add_to_cart加入购物车。因为权重是3:1所以平均来看虚拟用户执行3次浏览才会执行1次加购操作。wait_time控制着思考时间这非常关键能避免产生不切实际的、机枪扫射式的请求压力让负载更贴近真人操作。但真实场景往往更复杂。用户可能先进入首页然后在商品列表页浏览再进入某个商品详情页最后完成下单。这种有层级、有顺序的行为就需要嵌套的TaskSet来描述了。from locust import HttpUser, task, TaskSet, between class BrowseProducts(TaskSet): # 进入这个TaskSet后默认执行on_start只一次 def on_start(self): self.client.get(/home) task(10) def list_products(self): self.client.get(/api/products) task(5) def view_product_detail(self): product_id self.user.product_ids.pop() # 从用户实例中获取数据 self.client.get(f/api/products/{product_id}) task(1) def stop(self): self.interrupt() # 关键中断当前TaskSet返回父级 class ApiUser(HttpUser): wait_time between(2, 5) # 主任务集指向BrowseProducts tasks [BrowseProducts] def on_start(self): # 初始化一些用户数据例如预加载一批商品ID self.product_ids [i for i in range(100, 200)]这里ApiUser用户的主要行为剧本是BrowseProducts这个TaskSet。在BrowseProducts里我们定义了on_start方法类似setup用户进入这个任务集时会先执行一次。任务之间有层级关系stop任务中的self.interrupt()是跳出当前嵌套TaskSet的关键。如果不调用interrupt()用户将永远在这个TaskSet内循环无法执行其他同级或父级任务。注意TaskSet的嵌套可以很深但设计时要符合真实的用户操作流。过度复杂的嵌套会让脚本难以维护和理解。一个基本原则是一个TaskSet应该对应一个相对独立的功能模块或用户操作阶段。1.2 参数化与数据驱动让每个“蝗虫”独一无二上面的例子中view_product_detail任务使用了一个写死的product_id。但在实际压测中成千上万的虚拟用户如果都请求同一个商品ID不仅不符合真实场景热门商品也不会被所有人同时点同一个还会导致缓存命中率畸高测试结果失真。因此参数化是性能测试脚本的灵魂。1. 队列Queue数据共享这是处理类似“用户注册-登录”这种需要唯一数据场景的经典模式。主线程准备数据虚拟用户从中取用。import queue from locust import HttpUser, task, events # 在测试开始前准备测试数据 test_user_queue queue.Queue() events.test_start.add_listener def on_test_start(environment, **kwargs): 测试开始时初始化用户数据队列 for i in range(10000): test_user_queue.put_nowait({ username: ftest_user_{i}, email: fuser_{i}test.com, password: 123456 }) class RegisterUser(HttpUser): host http://your-api.com task def register_and_login(self): # 如果队列为空则此虚拟用户停止执行新任务 if test_user_queue.empty(): print(Test data exhausted, stopping...) self.stop(forceTrue) # 强制停止该用户 return user_data test_user_queue.get_nowait() # 注册 with self.client.post(/api/register, jsonuser_data, catch_responseTrue) as resp: if resp.status_code 200: resp.success() # 注册成功后用同一套信息登录 login_resp self.client.post(/api/login, json{ username: user_data[username], password: user_data[password] }) # 可以在这里将登录成功的token保存到用户实例中供后续任务使用 self.token login_resp.json().get(token) else: resp.failure(fRegister failed: {resp.text}) # 注意这里没有把数据放回队列实现一次性消耗2. 循环数据与CSV读取对于商品列表、城市列表等可重复使用的静态数据可以使用列表循环或读取CSV文件。import csv from itertools import cycle class DataDrivenUser(HttpUser): # 在类级别加载CSV数据并转换为循环迭代器 with open(product_ids.csv, r) as f: reader csv.DictReader(f) product_cycle cycle([row[id] for row in reader]) wait_time between(1, 3) task def get_product(self): product_id next(self.product_cycle) # 循环获取下一个ID with self.client.get(f/api/product/{product_id}, name/api/product/[id], catch_responseTrue) as resp: # 使用name参数对同一模式的URL进行聚合统计 if resp.status_code 200 and error not in resp.text: resp.success() else: resp.failure(fUnexpected response: {resp.text})这里有两个关键技巧itertools.cycle: 创建一个无限循环的迭代器确保数据用完后从头开始适合只读场景。name参数在client.get/post等方法中设置name可以将动态URL如/api/product/123和/api/product/456在Locust的统计报表中归类为/api/product/[id]使结果更加清晰否则每个不同的URL都会被单独统计图表会变得杂乱无章。实操心得数据驱动测试最容易踩的坑是数据竞争和数据耗尽。对于需要唯一性的数据如用户名务必使用线程安全的队列queue.Queue并在test_start事件中初始化。对于可重复使用的数据使用cycle或每次从文件中随机选取。同时一定要为脚本设计优雅的数据耗尽处理逻辑比如让虚拟用户安静地停止而不是抛出异常导致测试错误率飙升。2. 断言、捕获响应与自定义指标发送请求只是第一步如何判断请求是否成功、如何从响应中提取关键信息用于后续请求、如何定义业务层面的成功标准才是性能测试能否发现深层问题的关键。2.1 精细化断言与响应捕获Locust的client请求方法默认会基于HTTP状态码2xx为成功其他为失败判断成功与否。但这远远不够。一个接口可能返回200状态码但响应体里却是{code: 500, msg: 内部错误}。因此我们必须使用catch_responseTrue参数和上下文管理器来手动判断。task def create_order(self): # 假设上一个登录任务已经将token保存在self.token中 headers {Authorization: fBearer {self.token}} payload {product_id: 101, quantity: 1} with self.client.post(/api/order, jsonpayload, headersheaders, catch_responseTrue) as response: # 手动检查响应 if response.status_code ! 200: response.failure(fHTTP Status Error: {response.status_code}) return # 直接返回不再进行后续检查 try: resp_json response.json() # 业务逻辑断言code字段为0才代表成功 if resp_json.get(code) 0: response.success() # 成功时提取订单号可能用于后续查询任务 self.last_order_id resp_json.get(data, {}).get(order_id) else: response.failure(fBusiness Logic Error: {resp_json.get(msg)}) except JSONDecodeError: response.failure(Response is not valid JSON)这段代码展示了三层校验HTTP状态码、响应体JSON格式、业务自定义状态码。只有全部通过这个请求才会被记为成功。response.failure()传入的字符串会显示在Locust的失败统计中帮助你快速定位问题类型。2.2 自定义指标洞察业务性能Locust默认只统计请求的响应时间、RPS每秒请求数和失败率。但很多时候我们更关心业务指标例如“下单接口在95分位的响应时间是否超过2秒”、“查询商品详情的成功率是否低于99.9%”。Locust提供了强大的events钩子和stats对象允许我们记录自定义指标。假设我们想单独监控“支付”这个关键业务的耗时from locust import events from locust.runners import MasterRunner, WorkerRunner import time # 定义一个自定义的“支付耗时”统计条目 PAYMENT_TIME_STAT payment_processing_time events.init.add_listener def on_locust_init(environment, **kwargs): 在Locust初始化时注册自定义统计字段 # 避免在Worker节点重复注册如果使用分布式模式 if isinstance(environment.runner, MasterRunner) or not isinstance(environment.runner, WorkerRunner): environment.stats.custom_stats[PAYMENT_TIME_STAT] {} events.request.add_listener def on_request(request_type, name, response_time, response_length, exception, context, **kwargs): 监听所有请求事件如果是支付请求则记录自定义指标 if name /api/payment: # 通过name匹配我们关心的请求 # 获取或初始化该指标的统计对象 stats context.env.stats.custom_stats.get(PAYMENT_TIME_STAT) if stats: # 这里简单地将响应时间记录到自定义统计中。 # 更复杂的做法是使用PercentileStat类来计算分位数。 # 但自定义stats主要便于在Web UI中区分查看。 # 更常见的做法是直接使用response事件并打上标签。 pass # 更实用的方法在任务中直接记录一个带标签的响应事件 class PaymentUser(HttpUser): task def pay_order(self): start_time time.perf_counter() # ... 执行支付请求 ... with self.client.post(/api/payment, catch_responseTrue) as resp: # ... 响应处理 ... pass end_time time.perf_counter() processing_time int((end_time - start_time) * 1000) # 毫秒 # 触发一个自定义事件用于记录业务处理时间 events.request.fire( request_typePOST, name/api/payment, response_timeprocessing_time, response_length0, exceptionNone, contextself.environment, # 可以添加额外的元数据 meta{business_step: payment} )虽然自定义统计需要一些代码量但它能让你从“系统视角”切换到“业务视角”。你可以在Locust的Web UI中看到这个自定义的payment_processing_time指标并设置其阈值告警。注意事项自定义指标会带来额外的性能开销在超高并发如数万用户时需谨慎使用。通常对于核心的、数量不多的关键业务路径进行监控即可。另外自定义指标的数据不会自动聚合到Locust的最终报告中如果需要你可能需要编写额外的监听器来收集和计算。3. 复杂场景编排权重、序列与条件执行真实的用户行为不是随机的也不是固定顺序的它往往带有权重、依赖和条件逻辑。Locust的task装饰器、TaskSet嵌套以及Python本身的控制流可以组合出非常复杂的场景。3.1 动态权重与任务序列task装饰器的权重可以是静态整数也可以是一个返回整数的可调用对象从而实现动态权重。class SmartUser(HttpUser): wait_time between(0.5, 2) def get_browse_weight(self): 根据时间或其它条件动态调整浏览任务的权重 # 例如模拟白天浏览多晚上下单多 # 这里只是一个示例返回固定值 return 5 task(lambda self: self.get_browse_weight()) # 动态权重 def browse(self): self.client.get(/api/browse) task(1) def search(self): self.client.get(/api/search?qlocust) task def sequential_actions(self): 一个简单的顺序执行任务示例 # 1. 先获取列表 list_resp self.client.get(/api/items) # 简单解析获取第一个item的id (这里假设响应是JSON列表) if list_resp.status_code 200: try: items list_resp.json() if items: first_item_id items[0][id] # 2. 再查看第一个商品的详情 self.client.get(f/api/item/{first_item_id}, name/api/item/[id]) # 3. 最后模拟一个加入收藏的动作需要登录态 if hasattr(self, token): self.client.post(/api/favorite, json{item_id: first_item_id}) except (JSONDecodeError, KeyError, IndexError): passsequential_actions任务展示了一个简单的顺序操作。对于更复杂的、需要严格步骤的业务流如登录 - 添加商品 - 填写地址 - 支付更好的做法是将其封装在一个独立的TaskSet中并在该TaskSet的on_start或第一个任务里按顺序执行然后通过interrupt()跳出避免它被随机执行打断。3.2 条件执行与状态保持虚拟用户需要有自己的“记忆”。例如只有登录成功的用户才能执行下单操作。class StatefulUser(HttpUser): wait_time between(1, 3) def on_start(self): 用户启动时尝试登录并保存状态 self.logged_in False self.token None resp self.client.post(/api/login, json{user: test, pwd: test}) if resp.status_code 200: self.logged_in True self.token resp.json().get(token) print(fUser logged in, token: {self.token[:10]}...) else: print(Login failed, user will only perform public tasks.) task(10) def public_page(self): self.client.get(/) # 公开页面任何用户都能访问 task(5) def view_profile(self): 查看个人资料需要登录态 if not self.logged_in: # 如果未登录则跳过此任务或者执行一个降级操作 self.client.get(/login) # 例如跳转到登录页 return headers {Authorization: fBearer {self.token}} self.client.get(/api/profile, headersheaders)通过self.logged_in这样的实例变量我们为每个虚拟用户维护了独立的状态。这使得模拟“部分用户登录部分用户未登录”的混合场景成为可能测试系统对不同身份用户的处理能力和权限校验性能。4. 分布式执行与测试数据管理当单台机器无法模拟足够多的并发用户或者想从不同网络区域发起请求时就需要用到Locust的分布式模式。它采用一个Master节点和多个Worker节点的架构。Master负责分发任务、收集统计信息和提供Web UIWorker负责真正执行测试脚本生成负载。启动命令示例# 在Master节点假设IP为192.168.1.100 locust -f my_locustfile.py --master --hosthttp://target-system.com # 在Worker节点可以有多台 locust -f my_locustfile.py --worker --master-host192.168.1.100分布式下的数据陷阱分布式模式下最大的挑战是测试数据。如果你像之前那样在脚本的全局作用域或test_start监听器仅在Master进程运行中初始化一个队列Worker进程是无法共享这个队列的。每个Worker进程都会有自己独立的数据副本导致数据重复或竞争。解决方案使用中央数据服务或文件共享。预分片数据文件在测试前将测试数据如用户名列表分割成多个CSV文件每个Worker使用不同的文件。可以通过环境变量或命令行参数为Worker指定数据文件路径。使用外部存储对于需要严格唯一性的场景可以使用一个简单的中央服务如启动一个Flask小应用提供唯一ID或者利用数据库如Redis的原子操作INCR来分配ID。但要注意这个中央服务本身不能成为性能瓶颈。# 思路示例使用Redis分配唯一用户ID (需要安装redis-py) import redis from locust import events import os redis_client None events.test_start.add_listener def on_test_start(environment, **kwargs): global redis_client # 只在Master节点或独立进程初始化一次Redis连接不每个Worker都需要自己的连接。 # 更好的做法是在User类的on_start或__init__中连接。 pass class DistributedUser(HttpUser): abstract True # 作为基类 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 每个User实例化时连接Redis连接池更佳 self.redis_conn redis.Redis(hostyour-redis-host, port6379, decode_responsesTrue) def get_unique_user_id(self): 从Redis获取一个全局唯一的自增ID try: # 使用Redis的原子自增操作 uid self.redis_conn.incr(locust:global_user_counter) return fload_user_{uid} except redis.ConnectionError: # 降级方案使用本地随机ID import uuid return ffallback_user_{uuid.uuid4().hex[:8]}踩坑实录在分布式压测中我们曾因为所有Worker共用一个数据库连接池导致数据库连接数爆满压测机自身成为瓶颈。后来改为每个Worker进程独立创建连接池并在test_stop事件中统一清理问题才得以解决。另外确保你的测试脚本和依赖库在所有Worker节点上版本一致否则可能出现难以排查的诡异问题。5. 结果分析与性能瓶颈定位Locust运行结束后我们可以在Web UI上看到丰富的图表也可以在命令行使用--headless模式运行并生成HTML报告。但看数据只是第一步如何从数据中读出系统的“健康状况”和“瓶颈点”才是核心。1. 关键指标解读总RPS (Total Requests per Second)系统整体吞吐量。在并发用户数增加时观察RPS的增长曲线。如果用户数增加RPS却不再增长甚至下降说明系统已达到瓶颈。响应时间百分比50%, 95%, 99%重点关注**95分位P95和99分位P99**响应时间。即使平均响应时间很好如果P95或P99很高也意味着有相当一部分用户体验很差。例如平均响应时间200ms但P99响应时间2s说明1%的请求慢得不可接受。失败率任何非零的失败率都需要警惕。结合失败信息定位是参数错误、依赖服务超时、还是系统容量不足。用户数/并发数曲线观察响应时间、RPS随并发用户数增加的变化趋势。理想的曲线是在达到系统最佳负载点前RPS线性增长响应时间平稳缓慢上升过了最佳负载点后RPS增长停滞响应时间急剧上升。2. 定位瓶颈的“三板斧”对比法在固定并发用户数下对比不同接口或不同场景的响应时间。如果某个接口明显慢于其他同级接口它就是可疑点。递增法逐步增加并发用户数观察各项指标的变化。当响应时间突然陡增或失败率开始出现时当时的并发数就是系统在当前场景下的一个临界点。记录下这个点对应的系统资源CPU、内存、IO、数据库连接数等使用情况。剖切法利用Locust的自定义标签或请求分组name参数将一个大接口如“下单”拆解成多个子步骤“校验库存”、“计算优惠”、“创建订单”、“支付”分别统计其耗时。这样就能快速定位到是哪个子环节拖慢了整个流程。3. 结合系统监控Locust测的是“外部表现”要定位根本原因必须结合服务器的内部监控。应用服务器查看CPU使用率、内存使用、GC垃圾回收频率和耗时、线程池状态。数据库监控慢查询日志、连接数、锁等待、CPU和IO使用率。很多时候性能瓶颈的第一个信号就来自数据库。中间件/缓存检查Redis/Memcached的命中率、网络延迟。缓存失效可能导致数据库瞬间压力倍增。网络检查带宽使用率、TCP重传率、连接数。一个典型的分析流程是从Locust报表中发现P95响应时间超标 - 查看对应时间段的服务器监控发现数据库CPU持续100% - 检查数据库慢日志找到一条没有索引的全表扫描查询 - 优化SQL或添加索引 - 重新测试验证。实操心得不要只运行一次测试就下结论。性能测试结果具有波动性。一个可靠的做法是在相同的测试场景和环境下至少运行3次取其中位数或相对稳定的结果。同时每次代码发布或配置变更后都应运行基准测试Baseline Test与历史数据进行对比防止性能退化。性能测试的最终目的不是“压垮系统”而是通过科学的数据为系统的稳定、高效运行提供决策依据和信心保障。