大模型推理服务性能压测教程
本教程面向内部技术人员指导如何使用llm_benchmark.py对 Chat 类大模型进行系统性性能压测并基于测试结果填写评估报告模板。1. 环境准备1.1 压测客户端操作系统Linux推荐 openEuler / UbuntuPython 版本≥ 3.8依赖库bashpip install aiohttp --break-system-packages # 若系统保护可加 --user网络与目标模型服务内网互通避免公网延迟干扰。1.2 获取脚本将llm_benchmark.py拷贝到工作目录赋予执行权限可选bashchmod x llm_benchmark.py1.3 确认服务地址记录待测模型的URL和MODEL_NAME例如URLhttp://1xx.xx.xx.6x:233xx/v1/chat/completionsModelQwQ-32B注意接口必须兼容 OpenAI Chat Completion 流式格式支持stream和stream_options。2. 脚本参数速查bashpython llm_benchmark.py --help参数必填说明示例--url✅服务端 API 地址http://127.0.0.1:8000/v1/chat/completions--model✅模型名称须服务端识别Qwen2.5-14B--prompt❌测试用提示词默认见脚本你好--max-tokens❌最大输出 token 数默认 5122048--timeout❌单请求超时秒数默认 120180--num-requests❌每个并发档位的请求总数默认 20建议≥50以保证统计稳定性50--concurrency❌固定单一并发数与--concurrency-sweep二选一10--concurrency-sweep❌并发扫描档位逗号分隔推荐1,5,10,20,501,5,10,20,50--api-key❌若服务需要认证则填写Bearer xxx3. 测试场景设计T1~T4根据业务需求通常执行4 组场景从极短输出到长文生成全面覆盖。场景编号场景名称--max-tokens推荐--prompt核心目的T1TTFT 基线16你好排除生成耗时获取纯调度/prefill 延迟基线T2实时对话64用一句话总结人工智能在建筑领域最核心的应用。模拟实时交互体验响应速度T3基准场景 ⭐512请详细介绍一下人工智能在建筑行业的应用场景包括但不限于智能监控、人脸识别等方面尽量展开说明。横向对比主场景最贴近业务T4文书生成2048与 T3相同 prompt隔离输出长度变量测试长输出稳定性关键T3 和 T4 使用相同 prompt仅--max-tokens不同才能对比输出长度对 TPS 的影响。4. 执行压测完整步骤4.1 单模型单场景测试以T3 基准场景为例执行一次并发扫描bashpython llm_benchmark.py \ --url http://1xx.xx.xx.6x:233xx/v1/chat/completions \ --model QwQ-32B \ --prompt 请详细介绍一下人工智能在建筑行业的应用场景包括但不限于智能监控、人脸识别等方面尽量展开说明。 \ --max-tokens 512 \ --timeout 120 \ --num-requests 50 \ --concurrency-sweep 1,5,10,20,504.2 完整四场景自动化脚本推荐为每个模型建立 Shell 脚本顺序执行 T1~T4并在每次测试间sleep 5秒让服务恢复。bash#!/bin/bash MODEL_URLhttp://1xx.xx.xx.6x:233xx/v1/chat/completions MODEL_NAMEQwQ-32B # T1 python llm_benchmark.py --url $MODEL_URL --model $MODEL_NAME \ --prompt 你好 --max-tokens 16 --timeout 120 --num-requests 50 \ --concurrency-sweep 1,5,10,20,50 sleep 5 # T2 python llm_benchmark.py --url $MODEL_URL --model $MODEL_NAME \ --prompt 用一句话总结人工智能在建筑领域最核心的应用。 --max-tokens 64 \ --timeout 120 --num-requests 50 --concurrency-sweep 1,5,10,20,50 sleep 5 # T3 python llm_benchmark.py --url $MODEL_URL --model $MODEL_NAME \ --prompt 请详细介绍一下人工智能在建筑行业的应用场景包括但不限于智能监控、人脸识别等方面尽量展开说明。 \ --max-tokens 512 --timeout 120 --num-requests 50 --concurrency-sweep 1,5,10,20,50 sleep 5 # T4 python llm_benchmark.py --url $MODEL_URL --model $MODEL_NAME \ --prompt 请详细介绍一下人工智能在建筑行业的应用场景包括但不限于智能监控、人脸识别等方面尽量展开说明。 \ --max-tokens 2048 --timeout 120 --num-requests 50 --concurrency-sweep 1,5,10,20,50注意若 T4 高并发下大量超时脚本可能卡住可提前终止该档位CtrlC后继续下一档。4.3 输出保存将终端输出重定向至日志文件方便后续提取数据bashbash run_test.sh 21 | tee model_name_T3.log5. 结果解读与关键指标5.1 脚本输出示例text并发数: 50 | 请求数: 50 (成功 50 / 失败 0) 墙钟总耗时: 52.34s | QPS: 0.96 系统总Token吞吐 (system TPS): 1003.44 tokens/s -- 核心并发能力指标 单请求平均生成速率: 39.87 tokens/s TTFT avg/p50/p90/p99: 0.083s / 0.083s / 0.091s / 0.097s 总延迟 avg/p50/p90/p99: 12.84s / 12.58s / 13.51s / 14.05s5.2 指标定义与用途指标含义业务意义TTFT P50/P90/P99首 Token 延迟分布用户感知的“响应速度”P90 反映绝大多数体验system_tps所有请求累计 token 数 ÷ 墙钟总耗时服务端总吞吐能力横向对比核心单请求平均生成速率token数 ÷ 该请求总耗时纯生成阶段模型本身生成速度不含排队总延迟 P50/P90/P99端到端完整响应耗时用户等待完整结果的时间吞吐拐点system_tps 不再随并发增加而增长的点容量规划上限超过则排队加剧5.3 关键判断规则TTFT 劣化拐点观察 TTFT P90 随并发变化若某并发下突增如从 0.5s 升至 3s则说明服务开始严重排队。吞吐拐点在汇总对比表中当 system_tps 增幅显著减小 5%或开始下降时当前并发即为饱和点。稳定性失败率 0% 说明服务不堪重负需降低并发或优化。6. 填写报告模板6.1 数据提取从脚本输出填入表格每个模型的每一场景T1~T4均需填写类似下表的行数据直接取自脚本输出的对应并发行。并发数成功/总请求失败率系统TPSQPSTTFT AVG (s)TTFT P50 (s)TTFT P90 (s)TTFT P99 (s)延迟 AVG (s)延迟 P50 (s)延迟 P90 (s)延迟 P99 (s)150/500%39.870.080.0830.0830.0910.09712.8412.5813.5114.05.......................................注意失败率 (总请求 - 成功数) / 总请求 × 100%。6.2 填写单请求基准T3 并发1从 T3 并发1 的行中提取平均 TTFT单请求 TPS脚本输出中有单请求平均生成速率平均总延迟平均输出 Token 数/请求 单请求 TPS × 平均总延迟可近似估算finish_reason需查看服务端返回若输出 token 数远小于 max-tokens 且finish_reason为stop说明模型提前结束TPS 可能虚高。6.3 填写性能曲线描述基于 T3 数据观察各并发下的 system_tps 和 TTFT P90给出吞吐拐点例如并发数达到 50 时system_tps1093继续增加可能持平未饱和。TTFT 拐点例如并发数超过 20 后TTFT P90 由 0.21s 增至 0.46s但仍低于 0.5s可接受。输出长度影响对比 T3 和 T4 并发1 的单请求 TPS计算变化百分比公式(T4 - T3) / T3 * 100%。稳定性指出开始出现失败的并发阈值。6.4 横向对比与问题汇总在报告第四章横向对比分析中填入各模型在 T3 下的峰值 system_tps、TTFT 基线等。问题清单按实际观察填写严重程度自行判定。7. 常见问题 FAQQ1脚本报错ModuleNotFoundError: No module named aiohttpA安装依赖pip install aiohttp若系统限制可加--user或--break-system-packagesPython 3.11。Q2为什么system_tps很高但单请求平均生成速率很低A高并发下多个请求并行system_tps 是累计值单请求生成速率受 GPU 时间片影响会下降。正常现象只要 system_tps 增长说明并发扩展有效。Q3输出 token 数远小于--max-tokens怎么回事A模型主动停止finish_reasonstop可能因为 prompt 较短或模型认为已回答完整。这不是错误但 TPS 会偏高因为生成长度短。若要测试最大长度能力可调整 prompt 或增加--max-tokens但需注意服务超时。Q4高并发下大量超时脚本卡住怎么办A可提前设置--timeout较小如 60s让失败快速返回或直接 CtrlC 终止当前档位记录失败率后继续下一档。在报告中注明“并发50 时因超时主动终止”。Q5如何确保分位数 P90/P99 稳定A--num-requests至少 30建议 50 以上。脚本会给出警告若样本数 10此时分位数不可靠需增加请求数重测。Q6不同模型的--concurrency-sweep需要一致吗A必须一致否则横向对比失去意义。若某模型在 50 并发下全失败仍执行 50 档并记录失败率但可不填性能数据。Q7能否测试 Embedding 或 Reranker 服务A本脚本仅针对 Chat 类流式输出。Embedding/Reranker 有独立的压测脚本embedding_benchmark_v2.py、reranker_benchmark_v2.py用法类似但指标为 sentences/s 或 pairs/s。 附录llm_benchmark.py源码#!/usr/bin/env python3 # -*- coding: utf-8 -*- 大模型接口压测脚本 测试指标: TTFT(首token延迟)、单请求TPS、系统总TPS、并发能力、P50/P90/P99延迟 依赖: pip install aiohttp --break-system-packages 使用示例: 单次测试: python llm_benchmark.py --url http://localhost:8000/v1/chat/completions \ --model qwen2.5-72b --concurrency 1 --num-requests 5 并发扫描测试(自动测试多个并发档位): python llm_benchmark.py --url http://localhost:8000/v1/chat/completions \ --model qwen2.5-72b --concurrency-sweep 1,5,10,20,50 --num-requests 50 import argparse import asyncio import json import math import statistics import time import aiohttp DEFAULT_PROMPT 请详细介绍一下人工智能在建筑行业的应用场景包括但不限于智能监控、人脸识别等方面尽量展开说明。 class RequestResult: def __init__(self): self.success False self.error None self.ttft None self.total_time None self.token_count 0 self.start_time None self.end_time None def percentile(sorted_data: list, p: float): 计算百分位数。 使用 ceil 插值index ceil(n * p) - 1与 numpy percentile methodlower 行为一致。 要求 sorted_data 已排序p 取值范围 (0, 1]。 n len(sorted_data) if n 0: return None idx int(math.ceil(n * p)) - 1 idx max(0, min(idx, n - 1)) # 边界保护防止越界 return sorted_data[idx] async def send_streaming_request(session, url, model, prompt, max_tokens, headers, timeout): 发送一条流式请求统计 TTFT 和 token 生成速率 result RequestResult() # Fix: 加入 stream_options 请求服务端返回精确的 usage token 数 payload { model: model, messages: [{role: user, content: prompt}], stream: True, max_tokens: max_tokens, temperature: 0.7, stream_options: {include_usage: True}, } result.start_time time.perf_counter() first_token_time None token_count 0 usage_completion_tokens None try: async with session.post( url, jsonpayload, headersheaders, timeoutaiohttp.ClientTimeout(totaltimeout) ) as resp: if resp.status ! 200: result.error fHTTP {resp.status}: {await resp.text()} return result async for line in resp.content: line line.decode(utf-8, errorsignore).strip() if not line or not line.startswith(data:): continue data_str line[len(data:):].strip() if data_str [DONE]: break try: chunk json.loads(data_str) except json.JSONDecodeError: continue choices chunk.get(choices, []) has_content False if choices: delta choices[0].get(delta, {}) if delta.get(content): has_content True if has_content: if first_token_time is None: first_token_time time.perf_counter() token_count 1 # chunk 数近似有 usage 时会被覆盖 if chunk.get(usage): usage_completion_tokens chunk[usage].get(completion_tokens) result.end_time time.perf_counter() result.total_time result.end_time - result.start_time result.ttft (first_token_time - result.start_time) if first_token_time else None # 优先使用服务端返回的精确 token 数fallback 到 chunk 计数近似值 result.token_count usage_completion_tokens if usage_completion_tokens else token_count result.success True except asyncio.TimeoutError: result.error Timeout result.end_time time.perf_counter() except Exception as e: result.error str(e) result.end_time time.perf_counter() return result async def run_concurrency_level(url, model, prompt, max_tokens, timeout, concurrency, num_requests, api_keyNone): 在给定并发数下跑 num_requests 个请求返回汇总统计 # Fix: headers 在函数内部构建彻底避免多档位间共享同一对象的隐患 headers {Content-Type: application/json} if api_key: headers[Authorization] fBearer {api_key} connector aiohttp.TCPConnector(limit0) sem asyncio.Semaphore(concurrency) async def bound_request(session): async with sem: return await send_streaming_request(session, url, model, prompt, max_tokens, headers, timeout) async with aiohttp.ClientSession(connectorconnector) as session: wall_start time.perf_counter() tasks [bound_request(session) for _ in range(num_requests)] results await asyncio.gather(*tasks) wall_end time.perf_counter() wall_time wall_end - wall_start success_results [r for r in results if r.success] failed [r for r in results if not r.success] if not success_results: print(f [并发{concurrency}] 全部失败示例错误: {failed[0].error if failed else unknown}) return None ttfts sorted([r.ttft for r in success_results if r.ttft is not None]) total_times sorted([r.total_time for r in success_results]) token_counts [r.token_count for r in success_results] total_tokens sum(token_counts) # Fix: 样本数 10 时给出警告P90/P99 数值仅供参考 if len(ttfts) 10: print(f ⚠ 警告: 成功样本数{len(ttfts)} 10P90/P99 分位数统计意义有限建议增大 --num-requests 到 20 以上) # token 计数来源提示 has_usage any(r.token_count 0 for r in success_results) if not has_usage: print( ⚠ 警告: 服务端未返回 usage.completion_tokensTPS 使用 chunk 数近似数值偏低仅供横向对比参考) per_req_tps [ r.token_count / r.total_time for r in success_results if r.total_time and r.total_time 0 and r.token_count 0 ] summary { concurrency: concurrency, num_requests: num_requests, num_success: len(success_results), num_failed: len(failed), wall_time_s: wall_time, qps: len(success_results) / wall_time if wall_time 0 else 0, total_tokens: total_tokens, system_tps: total_tokens / wall_time if wall_time 0 else 0, avg_per_request_tps: statistics.mean(per_req_tps) if per_req_tps else None, # TTFT 分位数 —— Fix: 使用 ceil 插值不再偏移到最大值 ttft_avg: statistics.mean(ttfts) if ttfts else None, ttft_p50: percentile(ttfts, 0.50), ttft_p90: percentile(ttfts, 0.90), ttft_p99: percentile(ttfts, 0.99), # 端到端延迟分位数 —— Fix: 同上 latency_avg: statistics.mean(total_times), latency_p50: percentile(total_times, 0.50), latency_p90: percentile(total_times, 0.90), latency_p99: percentile(total_times, 0.99), } return summary def print_summary(summary): if summary is None: return print(f\n 并发数: {summary[concurrency]} | 请求数: {summary[num_requests]} f(成功 {summary[num_success]} / 失败 {summary[num_failed]})) print(f 墙钟总耗时: {summary[wall_time_s]:.2f}s | QPS: {summary[qps]:.2f}) print(f 系统总Token吞吐 (system TPS): {summary[system_tps]:.2f} tokens/s -- 核心并发能力指标) if summary[avg_per_request_tps]: print(f 单请求平均生成速率: {summary[avg_per_request_tps]:.2f} tokens/s) if summary[ttft_avg] is not None: p99_str f{summary[ttft_p99]:.3f}s if summary[ttft_p99] is not None else N/A print(f TTFT avg/p50/p90/p99: {summary[ttft_avg]:.3f}s / {summary[ttft_p50]:.3f}s f / {summary[ttft_p90]:.3f}s / {p99_str}) p99_lat f{summary[latency_p99]:.2f}s if summary[latency_p99] is not None else N/A print(f 总延迟 avg/p50/p90/p99: {summary[latency_avg]:.2f}s / {summary[latency_p50]:.2f}s f / {summary[latency_p90]:.2f}s / {p99_lat}) async def main(): parser argparse.ArgumentParser(descriptionLLM 接口压测脚本) parser.add_argument(--url, requiredTrue, help接口地址如 http://localhost:8000/v1/chat/completions) parser.add_argument(--model, requiredTrue, help模型名称) parser.add_argument(--api-key, defaultNone, helpAPI Key如需要) parser.add_argument(--prompt, defaultDEFAULT_PROMPT, help测试用的prompt) parser.add_argument(--max-tokens, typeint, default512, help单次生成的最大token数) parser.add_argument(--timeout, typeint, default120, help单请求超时时间(秒)) parser.add_argument(--num-requests, typeint, default20, help每个并发档位测试的总请求数) parser.add_argument(--concurrency, typeint, defaultNone, help单一并发数测试) parser.add_argument(--concurrency-sweep, typestr, defaultNone, help并发扫描逗号分隔如 1,5,10,20,50) args parser.parse_args() if args.concurrency_sweep: levels [int(x.strip()) for x in args.concurrency_sweep.split(,)] elif args.concurrency: levels [args.concurrency] else: levels [1, 5, 10, 20] print(f目标接口: {args.url}) print(f模型: {args.model}) print(f测试并发档位: {levels}) print(f每档位请求数: {args.num_requests}) print( * 70) all_summaries [] for level in levels: print(f\n 开始测试并发数 {level} ...) summary await run_concurrency_level( args.url, args.model, args.prompt, args.max_tokens, args.timeout, level, args.num_requests, args.api_key ) print_summary(summary) if summary: all_summaries.append(summary) await asyncio.sleep(1) if len(all_summaries) 1: print(\n * 70) print(汇总对比表 (寻找系统吞吐的拐点/饱和点):) print(f{并发:6} {成功率:8} {系统TPS:10} {QPS:8} {TTFT_p50(s):12} {TTFT_p90(s):12} {延迟_p50(s):12} {延迟_p90(s):12}) for s in all_summaries: success_rate f{s[num_success]}/{s[num_requests]} ttft_p50 f{s[ttft_p50]:.3f} if s[ttft_p50] is not None else N/A ttft_p90 f{s[ttft_p90]:.3f} if s[ttft_p90] is not None else N/A lat_p90 f{s[latency_p90]:.2f} if s[latency_p90] is not None else N/A print(f{s[concurrency]:6} {success_rate:8} {s[system_tps]:10.2f} f{s[qps]:8.2f} {ttft_p50:12} {ttft_p90:12} {s[latency_p50]:12.2f} {lat_p90:12}) print(\n判断依据: 当并发数上升但 system_tps 不再增长甚至下降时说明已达到服务端处理能力上限吞吐拐点。) if __name__ __main__: asyncio.run(main())