多 Agent 协作的状态地狱:AI Agent 编排框架中的上下文管理与故障恢复
多 Agent 协作的状态地狱AI Agent 编排框架中的上下文管理与故障恢复一、三个 Agent 丢了一个包多步编排中的状态丢失问题构建一个需求分析 - 代码生成 - 测试验证的三 Agent 编排流水线看起来逻辑清晰。实际运行时却发现代码生成 Agent 偶尔丢失需求分析 Agent 传递的关键约束测试验证 Agent 有时拿不到代码生成 Agent 的完整输出。更严重的是当测试 Agent 发现代码错误需要回退到代码生成 Agent 重新生成时整个流水线的上下文状态已经丢失只能从头开始。这不是 Agent 能力不足而是编排框架缺少可靠的状态管理机制。多 Agent 编排的核心挑战不是让每个 Agent 做好本职工作而是在 Agent 之间可靠地传递和维护上下文状态。本文从状态管理模型、故障恢复策略、上下文压缩三个维度拆解 AI Agent 编排框架的工程实践。二、多 Agent 编排的状态流转与故障模式stateDiagram-v2 [*] -- INIT: 创建编排实例 INIT -- ANALYZING: 启动需求分析 Agent ANALYZING -- CODE_GEN: 分析完成传递需求文档 ANALYZING -- ANALYZING_FAILED: 分析超时/格式错误 ANALYZING_FAILED -- ANALYZING: 重试保留已有上下文 CODE_GEN -- TESTING: 代码生成完成传递代码 需求 CODE_GEN -- CODE_GEN_FAILED: 生成超时/语法错误 CODE_GEN_FAILED -- CODE_GEN: 重试保留需求文档 TESTING -- COMPLETED: 测试通过 TESTING -- CODE_GEN: 测试失败回退到代码生成 TESTING -- TESTING_FAILED: 测试框架异常 COMPLETED -- [*] TESTING_FAILED -- [*]: 超过最大重试次数 note right of CODE_GEN: 关键回退时必须保留br/需求分析阶段的上下文 note right of TESTING: 关键测试失败时br/必须传递错误详情给代码生成三种故障模式上下文丢失Agent 之间传递的消息被截断或遗漏下游 Agent 缺少关键信息。状态不一致回退到上游 Agent 时上游 Agent 的内部状态已丢失无法从断点继续。上下文膨胀每一步都把前序所有 Agent 的完整输出拼接到 Prompt 中导致 Token 数爆炸超出模型上下文窗口。三、生产级 Agent 编排框架实现3.1 持久化状态管理器 Agent 编排状态管理器——确保上下文在 Agent 间可靠传递 设计意图Agent 之间的消息传递不能依赖内存必须持久化以支持故障恢复 import json from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum from typing import Any, Optional from pathlib import Path class AgentStep(Enum): ANALYZE analyze CODE_GEN code_gen TEST test class StepStatus(Enum): PENDING pending RUNNING running COMPLETED completed FAILED failed dataclass class StepContext: 单个步骤的上下文数据 step: AgentStep status: StepStatus input_data: dict[str, Any] # 该步骤的输入 output_data: Optional[dict] None # 该步骤的输出 error: Optional[str] None started_at: Optional[str] None completed_at: Optional[str] None retry_count: int 0 dataclass class OrchestrationState: 编排实例的全局状态——所有 Agent 共享 session_id: str current_step: AgentStep steps: dict[AgentStep, StepContext] field(default_factorydict) max_retries: int 3 created_at: str field(default_factorylambda: datetime.now().isoformat()) def get_context_for_step(self, step: AgentStep) - dict[str, Any]: 获取指定步骤所需的上下文——只包含该步骤需要的信息 设计意图不是把所有前序步骤的输出都塞进 Prompt 而是只传递当前步骤真正需要的字段控制 Token 消耗 context {} if step AgentStep.CODE_GEN: # 代码生成只需要需求文档不需要分析过程的中间产物 analyze_ctx self.steps.get(AgentStep.ANALYZE) if analyze_ctx and analyze_ctx.output_data: context[requirements] analyze_ctx.output_data.get(requirements_doc) context[constraints] analyze_ctx.output_data.get(constraints) elif step AgentStep.TEST: # 测试需要代码和需求文档用于验证代码是否满足需求 code_ctx self.steps.get(AgentStep.CODE_GEN) analyze_ctx self.steps.get(AgentStep.ANALYZE) if code_ctx and code_ctx.output_data: context[code] code_ctx.output_data.get(generated_code) context[test_hints] code_ctx.output_data.get(test_hints) if analyze_ctx and analyze_ctx.output_data: context[acceptance_criteria] analyze_ctx.output_data.get( acceptance_criteria ) return context class StateManager: 持久化状态管理器——将编排状态写入磁盘支持故障恢复 def __init__(self, state_dir: Path): self.state_dir state_dir self.state_dir.mkdir(parentsTrue, exist_okTrue) def save(self, state: OrchestrationState) - None: 持久化当前状态——每次步骤变更后必须调用 state_file self.state_dir / f{state.session_id}.json # 使用原子写入避免写入中断导致状态文件损坏 temp_file state_file.with_suffix(.tmp) with open(temp_file, w) as f: json.dump(asdict(state), f, ensure_asciiFalse, indent2) temp_file.rename(state_file) # 原子操作 def load(self, session_id: str) - Optional[OrchestrationState]: 从磁盘恢复状态——故障重启后调用 state_file self.state_dir / f{session_id}.json if not state_file.exists(): return None with open(state_file) as f: data json.load(f) # 反序列化为 OrchestrationState 对象 steps {} for step_name, ctx_data in data[steps].items(): steps[AgentStep(step_name)] StepContext(**ctx_data) return OrchestrationState( session_iddata[session_id], current_stepAgentStep(data[current_step]), stepssteps, max_retriesdata[max_retries], created_atdata[created_at], )3.2 故障恢复与回退策略 编排执行器——处理 Agent 执行失败时的回退和重试 设计意图Agent 调用 LLM 天然存在不确定性必须从架构层面处理失败 import logging logger logging.getLogger(__name__) class OrchestrationExecutor: def __init__(self, state_manager: StateManager): self.state_manager state_manager self.agents { AgentStep.ANALYZE: AnalyzeAgent(), AgentStep.CODE_GEN: CodeGenAgent(), AgentStep.TEST: TestAgent(), } def execute(self, state: OrchestrationState) - OrchestrationState: 执行编排流水线支持故障恢复 while state.current_step ! AgentStep.TEST or \ state.steps.get(AgentStep.TEST, StepContext( stepAgentStep.TEST, statusStepStatus.PENDING, input_data{} )).status ! StepStatus.COMPLETED: step state.current_step step_ctx state.steps.get(step) or StepContext( stepstep, statusStepStatus.PENDING, input_data{} ) # 获取当前步骤所需的上下文 context state.get_context_for_step(step) try: step_ctx.status StepStatus.RUNNING step_ctx.started_at datetime.now().isoformat() self.state_manager.save(state) # 执行 Agent agent self.agents[step] result agent.execute(context) step_ctx.output_data result step_ctx.status StepStatus.COMPLETED step_ctx.completed_at datetime.now().isoformat() state.steps[step] step_ctx # 推进到下一步 state.current_step self._next_step(step) except AgentExecutionError as e: step_ctx.status StepStatus.FAILED step_ctx.error str(e) step_ctx.retry_count 1 state.steps[step] step_ctx if step_ctx.retry_count state.max_retries: logger.error( f步骤 {step.value} 超过最大重试次数 f({state.max_retries})编排终止 ) break # 测试失败时回退到代码生成而非重试测试 if step AgentStep.TEST: logger.info(测试失败回退到代码生成阶段) state.current_step AgentStep.CODE_GEN # 将测试错误信息注入代码生成的上下文 code_ctx state.steps[AgentStep.CODE_GEN] code_ctx.input_data[test_failure] str(e) code_ctx.status StepStatus.PENDING # 重置状态 else: logger.info(f步骤 {step.value} 失败重试中 f({step_ctx.retry_count}/{state.max_retries})) finally: self.state_manager.save(state) return state def _next_step(self, current: AgentStep) - AgentStep: step_order [AgentStep.ANALYZE, AgentStep.CODE_GEN, AgentStep.TEST] idx step_order.index(current) if idx 1 len(step_order): return step_order[idx 1] return current3.3 上下文压缩控制 Token 消耗 上下文压缩器——在 Agent 间传递时压缩上下文控制 Token 消耗 设计意图原始输出可能包含大量冗余信息压缩后只保留下游 Agent 需要的关键字段 from typing import Any class ContextCompressor: # 每个步骤的输出模板——定义必须保留的字段 OUTPUT_SCHEMAS { AgentStep.ANALYZE: { requirements_doc: str, # 需求文档摘要非原文 constraints: list[str], # 约束条件列表 acceptance_criteria: list[str], # 验收标准 }, AgentStep.CODE_GEN: { generated_code: str, # 生成的代码 test_hints: list[str], # 测试建议 assumptions: list[str], # 代码中的假设需测试验证 }, } def compress(self, step: AgentStep, raw_output: dict[str, Any]) - dict[str, Any]: 压缩 Agent 输出——只保留下游需要的字段 原始输出可能包含思维过程、中间结果等冗余信息 压缩后只保留结构化的关键输出 schema self.OUTPUT_SCHEMAS.get(step, {}) compressed {} for field_name, field_type in schema.items(): value raw_output.get(field_name) if value is None: continue # 对字符串类型做长度截断防止单个字段过长 if field_type str and isinstance(value, str): max_len 2000 # 单字段最大 2000 字符 if len(value) max_len: value value[:max_len] \n...[已截断] compressed[field_name] value return compressed四、Agent 编排框架的架构权衡4.1 持久化存储的延迟代价每次步骤变更都写入磁盘增加了约 5-10ms 的延迟。对于短链路编排3-5 步总延迟增加可忽略。但对于高频编排每秒 100 实例磁盘 I/O 可能成为瓶颈。解决方案使用 Redis 替代文件系统将延迟降到 1ms 以下。4.2 上下文压缩的信息损失压缩会丢弃 Agent 的思维过程和中间推理步骤。当下游 Agent 需要理解为什么得出这个结论时压缩后的上下文可能不够。解决方案在压缩时保留reasoning_summary字段用 2-3 句话概括推理逻辑。4.3 回退策略的循环风险测试失败回退到代码生成代码生成后再次测试失败形成循环。必须设置最大回退次数如 3 次超过后终止编排并输出诊断报告。4.4 适用场景与禁用场景场景是否适用原因代码生成流水线适用步骤间有明确依赖回退逻辑清晰数据分析流水线适用多步骤处理中间结果需要持久化实时对话 Agent不适用单步交互无需复杂状态管理简单工具调用链不适用开销大于收益直接串联调用即可五、总结AI Agent 编排的核心工程挑战是状态管理而非 Agent 本身的能力持久化状态Agent 之间的上下文不能只存在内存中必须持久化以支持故障恢复。使用原子写入避免状态文件损坏。按需传递上下文不是把所有前序输出都塞进 Prompt而是只传递当前步骤需要的字段。用输出模板约束每个步骤的产出结构。故障恢复与回退Agent 调用 LLM 天然存在不确定性必须从架构层面处理失败。测试失败时回退到代码生成而非盲目重试。上下文压缩对长输出做截断和摘要控制 Token 消耗避免超出模型上下文窗口。落地路线先实现持久化状态管理器确保上下文不丢失再实现故障恢复和回退策略最后加入上下文压缩。每一步用端到端测试验证随机注入 Agent 失败确认状态能正确恢复。