Python 异步并发从 asyncio 到结构化并发的实战思考一、asyncio 的坑当并发量上来之后Python 的 asyncio 确实是异步编程的事实标准但真把它推向生产环境的极限时你会发现它远没有教程里那么优雅。之前我们维护的一个向量检索服务用 asyncio 并发查询 5 个 Milvus Collection。理论上并发查询的延迟应该只比单次查询高一点点但实测 P99 延迟反而比串行高了 40%。问题出在哪协程泄漏。某个查询没设超时卡在 TCP 等待上。其他协程虽然创建了但都在等事件循环调度而事件循环被那个阻塞的协程占着。更常见的问题是协程孤儿——你asyncio.create_task()创建了一个任务没await它也没存引用。这个任务就在后台默默跑异常被吞掉资源占着你还以为一切正常。asyncio 的原始 API 是面向底层事件循环设计的给你create_task、gather、wait就像给你一堆砖头让你盖房子。你需要的是结构化并发——让协程的生命周期与代码块绑定就像with语句让资源生命周期与代码块绑定一样。二、结构化并发给协程套上“笼子”非结构化 vs 结构化非结构化并发就像 Go 的go func()——启动一个 goroutine它什么时候结束、出不出错、有没有泄漏你全不知道。结构化并发要求简单粗暴所有子任务必须在父任务的作用域内完成。graph TB subgraph 非结构化: 协程散养 A1[主协程] -- B1[子协程1] A1 -- B2[子协程2] B1 -.- C1[??? 何时结束] B2 -.- C2[??? 异常去哪了] end subgraph 结构化: 生命周期受控 A2[主协程] -- B3[子协程1] A2 -- B4[子协程2] B3 -- D1[完成/超时/取消] B4 -- D2[完成/超时/取消] D1 -- A2 D2 -- A2 end核心就三条子任务不逃逸子协程的引用不能泄露到父作用域外。父等子结束父协程必须等所有子协程完事不管成功还是失败才退出。异常不丢失子协程的异常必须传给父协程。Python 的实现路径Python 3.11 引入了TaskGroup算是官方的结构化并发方案。但在生产环境你还需要超时控制、并发限制和优雅取消。sequenceDiagram participant Main as 主协程 participant TG as TaskGroup participant T1 as 任务1 participant T2 as 任务2 Main-TG: async with TaskGroup() TG-T1: create_task(query_1) TG-T2: create_task(query_2) Note over T2: T2 抛出异常 T2--TG: 异常! TG-T1: 取消 (cancel) TG--Main: ExceptionGroup三、生产级工具库实现下面是一个我在生产环境用的结构化并发工具库支持超时、并发限制和异常聚合。 结构化并发工具库 - 生产级 asyncio 并发管理 import asyncio import time from dataclasses import dataclass, field from typing import Any, Callable, Coroutine, TypeVar from contextlib import asynccontextmanager T TypeVar(T) dataclass class TaskResult: 单个任务的执行结果 task_id: str success: bool value: Any None error: Exception | None None duration_ms: float 0.0 dataclass class BatchResult: 批量任务的聚合结果 total: int 0 succeeded: int 0 failed: int 0 cancelled: int 0 results: list[TaskResult] field(default_factorylist) total_duration_ms: float 0.0 property def success_rate(self) - float: return self.succeeded / self.total if self.total 0 else 0.0 class ConcurrencyLimiter: 并发限制器基于 Semaphore防止下游服务被压垮 额外提供了等待队列深度监控 def __init__(self, max_concurrency: int): self._semaphore asyncio.Semaphore(max_concurrency) self._active_count 0 self._waiting_count 0 property def active_count(self) - int: return self._active_count property def waiting_count(self) - int: return self._waiting_count asynccontextmanager async def acquire(self): 上下文管理器方式获取并发槽位 self._waiting_count 1 try: await self._semaphore.acquire() self._waiting_count - 1 self._active_count 1 try: yield finally: self._active_count - 1 self._semaphore.release() except asyncio.CancelledError: self._waiting_count - 1 raise class StructuredConcurrencyRunner: 结构化并发执行器 核心保证所有任务在作用域结束时要么完成、要么取消不存在孤儿任务 def __init__( self, max_concurrency: int 10, task_timeout: float 30.0, retry_on_timeout: bool False, ): self._limiter ConcurrencyLimiter(max_concurrency) self._task_timeout task_timeout self._retry_on_timeout retry_on_timeout async def run_single( self, coro_factory: Callable[[], Coroutine], task_id: str, ) - TaskResult: 执行单个任务带并发限制和超时控制 start time.monotonic() try: async with self._limiter.acquire(): # 超时控制在并发槽位内才开始计时 result await asyncio.wait_for( coro_factory(), timeoutself._task_timeout, ) duration (time.monotonic() - start) * 1000 return TaskResult( task_idtask_id, successTrue, valueresult, duration_msduration, ) except asyncio.TimeoutError: duration (time.monotonic() - start) * 1000 return TaskResult( task_idtask_id, successFalse, errorTimeoutError(f任务超时 ({self._task_timeout}s)), duration_msduration, ) except asyncio.CancelledError: duration (time.monotonic() - start) * 1000 return TaskResult( task_idtask_id, successFalse, errorCancelledError(任务被取消), duration_msduration, ) except Exception as e: duration (time.monotonic() - start) * 1000 return TaskResult( task_idtask_id, successFalse, errore, duration_msduration, ) async def run_batch( self, coro_factories: list[tuple[str, Callable[[], Coroutine]]], fail_fast: bool False, ) - BatchResult: 批量执行任务结构化保证所有任务在返回前都已结束 fail_fastTrue 时任一任务失败立即取消其余任务 batch_start time.monotonic() batch_result BatchResult(totallen(coro_factories)) if not coro_factories: return batch_result pending_tasks: dict[asyncio.Task, str] {} async def _wrapped(task_id: str, factory: Callable[[], Coroutine]): 包装每个任务确保结果被收集 result await self.run_single(factory, task_id) batch_result.results.append(result) if result.success: batch_result.succeeded 1 else: batch_result.failed 1 if fail_fast and not isinstance(result.error, asyncio.CancelledError): # 快速失败取消所有未完成的任务 for task in pending_tasks: task.cancel() return result try: async with asyncio.TaskGroup() as tg: for task_id, factory in coro_factories: task tg.create_task(_wrapped(task_id, factory)) pending_tasks[task] task_id except* Exception as eg: # Python 3.11 ExceptionGroup 语法 for exc in eg.exceptions: if not isinstance(exc, (asyncio.CancelledError,)): batch_result.failed 1 batch_result.total_duration_ms (time.monotonic() - batch_start) * 1000 batch_result.cancelled batch_result.total - batch_result.succeeded - batch_result.failed return batch_result # 使用示例向量检索服务的并发查询 async def mock_milvus_query(collection: str, vector: list[float]) - dict: 模拟 Milvus 查询随机延迟 import random delay random.uniform(0.1, 0.5) await asyncio.sleep(delay) return {collection: collection, matches: [{id: 1, score: 0.95}]} async def main(): 并发查询多个 Collection结构化保证无泄漏 runner StructuredConcurrencyRunner( max_concurrency5, # 最多 5 个并发查询 task_timeout2.0, # 单次查询超时 2 秒 ) collections [articles, papers, docs, wiki, code] query_vector [0.1] * 128 # 构建任务列表每个任务是一个 (id, factory) 元组 tasks [ (fquery_{col}, lambda colcol: mock_milvus_query(col, query_vector)) for col in collections ] result await runner.run_batch(tasks) print(f总数: {result.total}, 成功: {result.succeeded}, f失败: {result.failed}, 取消: {result.cancelled}) print(f成功率: {result.success_rate:.1%}) print(f总耗时: {result.total_duration_ms:.0f}ms) for r in result.results: status ✓ if r.success else ✗ print(f {status} {r.task_id}: {r.duration_ms:.0f}ms) if __name__ __main__: asyncio.run(main())几个关键设计点coro_factory而非coro传入协程工厂函数而非协程对象。因为协程对象一旦创建就开始计时如果你在队列里等了 5 秒才拿到并发槽位超时早就过了。工厂函数确保只在获取槽位后才创建协程。fail_fast模式当任一任务失败时立即取消所有未完成的任务。这在“全有或全无”的业务场景中很有用比如分布式事务中的并行校验。ExceptionGroup处理Python 3.11 的TaskGroup会抛出ExceptionGroup里面可能包含多个异常。用except*语法可以分类处理避免一个异常掩盖其他异常。四、结构化并发的代价不是所有场景都该用TaskGroup 的异常传播问题TaskGroup的设计哲学是“一个失败全部取消”。这在很多场景下过于激进。比如你要并发查询 5 个数据源其中 1 个超时了你可能希望拿到其余 4 个的结果而不是全部取消。应对策略在TaskGroup内部用try/except包裹每个任务把异常转化为TaskResult不让异常传播到TaskGroup层面。上面的代码就是这么做的——_wrapped函数吞掉了异常转为结果对象。并发限制的背压问题ConcurrencyLimiter用 Semaphore 控制并发数但 Semaphore 不提供背压机制——当并发槽位满了新的请求会排队等待而不是被拒绝。如果上游的请求速率远超下游的处理能力等待队列会无限增长最终 OOM。应对策略给 Semaphore 加上等待超时。如果等了 5 秒还拿不到槽位直接返回“服务繁忙”错误而不是让请求无限等待。适用边界CPU 密集型任务asyncio 是单线程的CPU 密集型任务会阻塞事件循环。用ProcessPoolExecutorloop.run_in_executor替代。需要精确顺序的场景结构化并发的子任务完成顺序不确定如果业务要求严格的执行顺序应该用串行或链式异步。长时间运行的后台任务结构化并发要求子任务在父作用域内结束不适合“启动后不管”的后台任务。这种场景用asyncio.create_task 显式生命周期管理更合适。五、总结结构化并发的核心思想是让协程的生命周期与代码作用域绑定消除协程泄漏和孤儿任务的风险。Python 3.11 的TaskGroup提供了官方支持但生产环境还需要并发限制、超时控制和异常聚合等增强能力。协程工厂模式确保超时计时从获取并发槽位开始而非从创建协程开始。结构化并发不适用于 CPU 密集型任务和长时间后台任务在这些场景下应选择多进程或显式生命周期管理。