Prompt 工程进阶:从单次调用到 Agent 工作流的结构化编排
Prompt 工程进阶从单次调用到 Agent 工作流的结构化编排一、当 Prompt 不再够用——从指令到工作流单次 Prompt 调用解决不了的问题正在变得越来越多。用户说“帮我整理这周的会议纪要提取待办事项分配给对应负责人并同步到项目管理工具”——这不是一个 Prompt 能搞定的事而是一个多步骤、有依赖、需纠错的工作流。生产环境里Prompt 工程主要卡在三个地方。第一复杂任务单 Prompt 容易崩溃把多步任务塞进一个 Prompt模型要么漏步骤要么在步骤间产生幻觉。一个“分析报告生成”任务单 Prompt 的步骤完成率只有 63%拆成多步工作流后能提升到 94%。第二上下文窗口是硬约束长 Prompt 一旦超过模型上限要么截断丢失信息要么被迫换更贵的模型。第三错误传播不可控单 Prompt 里某一步出错后续所有步骤都基于错误前提继续没法中途修正。某内容团队的自动化工作流实践显示把“选题调研→大纲生成→初稿撰写→审校修订”拆成 4 步 Agent 工作流后产出质量评分从 3.2 提升到 4.55 分制。而且每步都能独立重试整体 Token 消耗反而下降了 28%因为每步只需要传递必要上下文不用全量信息。二、Agent 工作流的编排模型从 ReAct 到 DAG 调度Agent 工作流的核心问题是怎么编排多步任务让它既灵活又可控。flowchart TB subgraph 任务解析层 P1[用户输入] P2[任务分解器br/Task Decomposer] P3[依赖图构建br/DAG Builder] end subgraph 执行编排层 E1{调度器br/Scheduler} E2[并行执行组 A] E3[并行执行组 B] E4[串行执行步骤] end subgraph 单步执行引擎 S1[Prompt 模板渲染br/变量注入 约束注入] S2[LLM 调用br/带重试与超时] S3[输出解析与验证br/JSON Schema 校验] S4{校验通过?} S5[自修复重试br/将错误信息回注 Prompt] end subgraph 上下文管理 C1[步骤间上下文传递br/只传必要信息] C2[全局状态存储br/共享变量池] C3[上下文压缩br/摘要 关键信息提取] end subgraph 监控与恢复 M1[步骤级日志] M2[断点续跑br/从失败步骤恢复] M3[人工介入点br/关键决策暂停] end P1 -- P2 -- P3 -- E1 E1 -- E2 E1 -- E3 E1 -- E4 E2 -- S1 E3 -- S1 E4 -- S1 S1 -- S2 -- S3 -- S4 S4 --|否| S5 -- S1 S4 --|是| C1 C1 -- C2 C2 -- C3 C2 -- M1 M1 -- M2 M1 -- M3这套架构有三个关键点。一是 DAG 调度任务分解成有向无环图没依赖的步骤并行执行有依赖的串行执行最大化吞吐。二是自修复循环每步输出经过 JSON Schema 校验不通过时把错误信息回注 Prompt 重试而不是直接失败。三是上下文压缩步骤间只传递必要信息通过摘要和关键信息提取控制上下文膨胀。三、生产级 Agent 工作流引擎的代码实现下面这段代码实现了一个完整的 Agent 工作流引擎包含任务分解、DAG 调度、自修复执行和上下文管理。import asyncio import json import re import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Optional import httpx # 工作流基础模型 class StepStatus(Enum): 步骤状态 PENDING pending RUNNING running SUCCESS success FAILED failed SKIPPED skipped dataclass class WorkflowStep: 工作流步骤定义 step_id: str name: str prompt_template: str # 支持 {variable} 占位符 dependencies: list[str] field(default_factorylist) # 依赖的步骤 ID output_schema: dict field(default_factorydict) # JSON Schema 校验 max_retries: int 2 timeout_seconds: int 60 # 运行时状态 status: StepStatus StepStatus.PENDING output: Any None error: str attempts: int 0 latency_ms: float 0.0 dataclass class WorkflowContext: 工作流上下文步骤间共享的状态 variables: dict field(default_factorydict) step_outputs: dict[str, Any] field(default_factorydict) def set(self, key: str, value: Any) - None: self.variables[key] value def get(self, key: str, default: Any None) - Any: return self.variables.get(key, default) def save_step_output(self, step_id: str, output: Any) - None: self.step_outputs[step_id] output def get_step_output(self, step_id: str) - Any: return self.step_outputs.get(step_id) def get_step_summary(self, step_ids: list[str], max_chars: int 500) - str: 获取指定步骤的压缩摘要控制上下文长度 parts [] total_chars 0 for sid in step_ids: output self.step_outputs.get(sid) if output is None: continue text json.dumps(output, ensure_asciiFalse) if not isinstance(output, str) else output if total_chars len(text) max_chars: # 截断并标注 remaining max_chars - total_chars text text[:remaining] ...[已截断] parts.append(f[{sid}的输出] {text}) total_chars len(text) return \n.join(parts) # DAG 调度器 class DAGScheduler: DAG 调度器分析步骤依赖生成执行计划 staticmethod def build_execution_plan(steps: list[WorkflowStep]) - list[list[str]]: 构建执行计划每层包含可并行执行的步骤 ID # 拓扑排序 分层 step_map {s.step_id: s for s in steps} in_degree: dict[str, int] {s.step_id: 0 for s in steps} dependents: dict[str, list[str]] {s.step_id: [] for s in steps} for step in steps: for dep in step.dependencies: if dep not in step_map: raise ValueError(f步骤 {step.step_id} 依赖不存在的步骤 {dep}) dependents[dep].append(step.step_id) in_degree[step.step_id] 1 # Kahn 算法分层 layers: list[list[str]] [] queue [sid for sid, deg in in_degree.items() if deg 0] while queue: # 当前层所有入度为 0 的步骤可并行 layers.append(sorted(queue)) next_queue [] for sid in queue: for dependent in dependents[sid]: in_degree[dependent] - 1 if in_degree[dependent] 0: next_queue.append(dependent) queue next_queue # 检测循环依赖 total_scheduled sum(len(layer) for layer in layers) if total_scheduled ! len(steps): raise ValueError(检测到循环依赖无法构建执行计划) return layers # Prompt 模板渲染器 class PromptRenderer: Prompt 模板渲染变量注入 约束注入 staticmethod def render(template: str, context: WorkflowContext, step: WorkflowStep) - str: 渲染 Prompt 模板注入变量和输出约束 # 替换 {variable} 占位符 rendered template for key, value in context.variables.items(): placeholder { key } if placeholder in rendered: rendered rendered.replace(placeholder, str(value)) # 替换 {step:step_id} 占位符引用其他步骤的输出 step_ref_pattern r\{step:([\w_])\} for match in re.finditer(step_ref_pattern, rendered): ref_step_id match.group(1) output context.get_step_output(ref_step_id) if output is not None: text json.dumps(output, ensure_asciiFalse) if not isinstance(output, str) else output rendered rendered.replace(match.group(0), text) # 注入输出格式约束 if step.output_schema: schema_str json.dumps(step.output_schema, ensure_asciiFalse, indent2) format_constraint ( f\n\n请严格按照以下 JSON Schema 格式输出 f不要输出任何其他内容\njson\n{schema_str}\n ) rendered format_constraint return rendered # 输出解析与校验器 class OutputValidator: 输出解析与校验JSON 提取 Schema 校验 staticmethod def parse_and_validate(raw_output: str, schema: dict) - tuple[Any, str]: 解析 LLM 输出并校验返回 (解析结果, 错误信息) # 尝试提取 JSON json_str OutputValidator._extract_json(raw_output) if json_str is None: return None, 无法从输出中提取 JSON请确保输出为合法 JSON 格式 try: parsed json.loads(json_str) except json.JSONDecodeError as e: return None, fJSON 解析失败{str(e)} # 简化的 Schema 校验检查必需字段 if schema: required schema.get(required, []) if isinstance(parsed, dict): missing [f for f in required if f not in parsed] if missing: return None, f缺少必需字段{, .join(missing)} return parsed, staticmethod def _extract_json(text: str) - Optional[str]: 从文本中提取 JSON 字符串 # 尝试提取 json ... 代码块 pattern rjson\s*([\s\S]*?)\s* match re.search(pattern, text) if match: return match.group(1) # 尝试提取 { ... } 或 [ ... ] for start_char, end_char in [({, }), ([, ])]: start text.find(start_char) if start ! -1: depth 0 for i in range(start, len(text)): if text[i] start_char: depth 1 elif text[i] end_char: depth - 1 if depth 0: return text[start : i 1] return None # Agent 工作流引擎 class AgentWorkflowEngine: Agent 工作流引擎串联 DAG 调度、执行、校验、自修复 def __init__(self, llm_client, checkpoint_callback: Optional[Callable] None): self._llm llm_client self._scheduler DAGScheduler() self._renderer PromptRenderer() self._validator OutputValidator() self._checkpoint checkpoint_callback # 断点回调 async def execute( self, steps: list[WorkflowStep], initial_context: WorkflowContext, ) - dict: 执行完整工作流 start_time time.monotonic() # 构建执行计划 execution_plan self._scheduler.build_execution_plan(steps) step_map {s.step_id: s for s in steps} # 逐层执行 for layer_idx, layer in enumerate(execution_plan): # 同层步骤并行执行 tasks [ self._execute_step(step_map[sid], initial_context) for sid in layer ] results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 for sid, result in zip(layer, results): step step_map[sid] if isinstance(result, Exception): step.status StepStatus.FAILED step.error str(result) elif result.get(success): step.status StepStatus.SUCCESS step.output result[output] initial_context.save_step_output(sid, result[output]) else: step.status StepStatus.FAILED step.error result.get(error, 未知错误) # 检查是否有失败步骤决定是否继续 failed_in_layer [ sid for sid in layer if step_map[sid].status StepStatus.FAILED ] if failed_in_layer: # 标记依赖失败步骤的后续步骤为 SKIPPED self._mark_dependents_skipped( failed_in_layer, step_map, execution_plan[layer_idx 1 :] ) # 断点回调 if self._checkpoint: self._checkpoint(initial_context, step_map) total_latency (time.monotonic() - start_time) * 1000 return { status: completed if all( s.status in (StepStatus.SUCCESS, StepStatus.SKIPPED) for s in steps ) else partial_failure, steps: [ { step_id: s.step_id, name: s.name, status: s.status.value, attempts: s.attempts, latency_ms: round(s.latency_ms, 2), error: s.error, } for s in steps ], outputs: initial_context.step_outputs, total_latency_ms: round(total_latency, 2), } async def _execute_step( self, step: WorkflowStep, context: WorkflowContext ) - dict: 执行单个步骤带自修复重试 step.status StepStatus.RUNNING step_start time.monotonic() for attempt in range(step.max_retries 1): step.attempts attempt 1 # 渲染 Prompt prompt self._renderer.render(step.prompt_template, context, step) # 调用 LLM带超时 try: raw_output await asyncio.wait_for( self._llm.chat(prompt), timeoutstep.timeout_seconds, ) except asyncio.TimeoutError: if attempt step.max_retries: continue step.latency_ms (time.monotonic() - step_start) * 1000 return {success: False, error: f步骤超时{step.timeout_seconds}s} except Exception as e: if attempt step.max_retries: continue step.latency_ms (time.monotonic() - step_start) * 1000 return {success: False, error: fLLM 调用失败{str(e)}} # 校验输出 if step.output_schema: parsed, error self._validator.parse_and_validate( raw_output, step.output_schema ) if error: if attempt step.max_retries: # 自修复将错误信息回注 Prompt step.prompt_template ( f\n\n[上一次尝试的错误{error}] f\n请修正上述问题后重新输出。 ) continue step.latency_ms (time.monotonic() - step_start) * 1000 return {success: False, error: error} else: parsed raw_output step.latency_ms (time.monotonic() - step_start) * 1000 return {success: True, output: parsed} step.latency_ms (time.monotonic() - step_start) * 1000 return {success: False, error: 超过最大重试次数} staticmethod def _mark_dependents_skipped( failed_ids: list[str], step_map: dict[str, WorkflowStep], remaining_layers: list[list[str]], ) - None: 将依赖失败步骤的后续步骤标记为 SKIPPED for layer in remaining_layers: for sid in layer: step step_map[sid] if any(dep in failed_ids for dep in step.dependencies): step.status StepStatus.SKIPPED step.error 上游步骤失败 # 使用示例会议纪要整理工作流 async def demo_meeting_workflow(): 演示会议纪要整理 Agent 工作流 # 模拟 LLM 客户端 class MockLLM: async def chat(self, prompt: str) - str: # 模拟不同步骤的输出 if 提取关键议题 in prompt: return json\n{topics: [Q3 目标回顾, 新功能排期, 资源分配], decisions: [Q3 目标下调 10%]}\n elif 待办事项 in prompt: return json\n{action_items: [{task: 更新 Q3 目标文档, assignee: 张三, deadline: 2025-07-01}, {task: 新功能技术方案评审, assignee: 李四, deadline: 2025-07-05}]}\n elif 格式化 in prompt: return json\n{summary: 本次会议回顾了 Q3 目标并决定下调 10%讨论了新功能排期和资源分配问题。, formatted_output: 会议纪要已生成}\n return {result: ok} engine AgentWorkflowEngine(llm_clientMockLLM()) # 定义工作流步骤 steps [ WorkflowStep( step_idextract_topics, name提取关键议题, prompt_template从以下会议记录中提取关键议题和决策\n{meeting_notes}, output_schema{ type: object, required: [topics, decisions], }, max_retries2, ), WorkflowStep( step_idextract_actions, name提取待办事项, prompt_template( 基于会议记录和已提取的议题提取待办事项\n 会议记录{meeting_notes}\n 关键议题{step:extract_topics} ), output_schema{ type: object, required: [action_items], }, dependencies[extract_topics], max_retries2, ), WorkflowStep( step_idformat_output, name格式化输出, prompt_template( 将以下信息整理为结构化会议纪要\n 议题{step:extract_topics}\n 待办{step:extract_actions} ), output_schema{ type: object, required: [summary, formatted_output], }, dependencies[extract_topics, extract_actions], max_retries2, ), ] # 初始化上下文 context WorkflowContext() context.set(meeting_notes, 张三Q3 目标完成度只有 70%建议下调 10%...) # 执行工作流 result await engine.execute(steps, context) print(json.dumps(result, ensure_asciiFalse, indent2)) if __name__ __main__: asyncio.run(demo_meeting_workflow())核心设计要点如下。DAGScheduler用 Kahn 算法对步骤做拓扑排序和分层同层步骤可并行执行。PromptRenderer支持变量占位符{variable}和步骤引用{step:step_id}自动注入输出格式约束。OutputValidator从 LLM 输出中提取 JSON 并校验必需字段校验失败时将错误信息回注 Prompt 实现自修复。AgentWorkflowEngine逐层调度执行失败步骤的下游自动标记为 SKIPPED断点回调支持从失败步骤恢复。四、Agent 工作流的架构成本与适用边界Agent 工作流显著提升了复杂任务的处理质量但也引入了不可忽视的架构复杂度。延迟的线性叠加。串行步骤的延迟是累加的4 步工作流如果每步平均 3 秒总延迟至少 12 秒。虽然 DAG 调度允许并行但多数实际工作流中步骤间存在强依赖并行度有限。流式输出可以缓解用户感知延迟但无法缩短实际执行时间。Token 消耗的放大效应。每步都需要独立的系统提示和上下文注入4 步工作流的 Token 消耗通常是单次调用的 2-3 倍。自修复重试进一步放大消耗一次校验失败就多一轮完整调用。错误处理的复杂度。步骤失败后的处理策略重试、跳过、降级、人工介入需要针对每个步骤单独配置配置不当可能导致工作流卡死或产出不完整。_mark_dependents_skipped是简化处理生产环境需要更细粒度的降级策略。适用边界。Agent 工作流适合多步骤、有依赖、需校验的复杂任务如内容生产流水线、数据分析管道、自动化测试流程。不适合简单的单轮问答或实时交互场景这些场景中工作流的编排开销远大于收益。禁用场景。当任务步骤间的依赖关系不确定或动态变化时静态 DAG 无法处理需要更灵活的动态规划策略。当步骤数量超过 20 时DAG 的可维护性急剧下降应考虑子工作流拆分。五、总结Agent 工作流的核心价值是将复杂任务从单次 Prompt 调用拆解为多步骤、可校验、可自修复的结构化编排。DAG 调度实现步骤并行Prompt 模板渲染实现变量注入与约束注入输出校验与自修复循环保障每步质量上下文压缩控制 Token 膨胀。架构代价集中在延迟叠加、Token 放大和错误处理复杂度三个方面。选择 Agent 工作流需确认任务具备多步骤、有依赖、需校验三个特征且延迟预算允许秒级响应。结构化编排不是过度工程而是复杂任务走向生产可用的必经之路。