从提示词到智能体Prompt Engineering 驱动 Agent 工作流的工程化构建一、Prompt 的天花板为什么单次提示无法解决复杂任务大模型的能力边界更多取决于 Prompt 设计而非模型本身。一个精心设计的单次提示能让 GPT-4 写出好文章但无法完成多步骤工作流——比如调研竞品、分析差异、生成报告、发送邮件。问题在于单次 Prompt 缺乏状态。模型生成回答时无法回溯中间结果不能根据上一步输出调整策略遇到错误时也不会自动重试。对于需要多步推理、工具调用和条件分支的任务单次 Prompt 的能力明显受限。Agent 工作流把复杂任务拆成子步骤每个步骤由独立 Prompt 驱动通过状态传递和条件判断连接。这就像把全能员工变成协作团队——每人专注一件事通过流程编排达成目标。但工程化 Agent 工作流并不简单。Prompt 依赖如何管理工具结果怎么可靠注入下一步条件分支如何避免死循环这些都需要系统方案。二、Agent 工作流架构从 Prompt 编排到执行引擎的全链路设计生产级 Agent 工作流系统包含四个核心层任务规划层、Prompt 编排层、工具执行层和状态管理层。flowchart TB A[用户目标输入] -- B[任务规划器] B -- C[子任务拆解] C -- D[执行计划 DAG] D -- E[步骤1: Prompt 模板渲染] E -- F[步骤1: 大模型推理] F -- G[步骤1: 输出解析] G -- H{步骤1: 条件判断} H --|需要工具调用| I[工具执行器] I -- J[工具结果注入] J -- K[步骤2: Prompt 模板渲染] H --|直接通过| K K -- L[步骤2: 大模型推理] L -- M[步骤2: 输出解析] M -- N{步骤2: 条件判断} N --|需要重试| F N --|任务完成| O[结果聚合] O -- P[最终输出] subgraph 状态管理层 Q[全局上下文存储] R[步骤执行日志] S[工具调用记录] end E -.- Q F -.- R I -.- S J -.- Q subgraph Prompt 编排层 E K end subgraph 工具执行层 I J end任务规划器是 Agent 的大脑。它接收用户目标拆解为有序子任务列表生成执行计划 DAG有向无环图。DAG 每个节点是子任务边表示依赖关系。规划器本身也是 Prompt 驱动模块——系统提示要求模型输出结构化 JSON 计划而非自由文本。Prompt 编排层负责将每个子任务渲染为具体 Prompt。关键设计是模板 上下文注入每个子任务有预定义 Prompt 模板模板变量在运行时从全局上下文动态填充。这样步骤 2 的 Prompt 能引用步骤 1 的输出实现跨步骤信息传递。工具执行层处理模型生成的工具调用请求。当模型判断需要调用外部工具如搜索引擎、数据库查询、API 调用时输出结构化工具调用指令工具执行器解析指令、执行调用、将结果注入回全局上下文。这个循环模型推理 → 工具调用 → 结果注入 → 再次推理是 Agent 的核心执行模式。状态管理层维护全局上下文、执行日志和工具调用记录。它是所有步骤共享的工作记忆确保信息在步骤间可靠传递。同时执行日志支持断点续跑——某个步骤失败时可从上一步成功状态恢复而非从头开始。三、生产级 Agent 工作流引擎核心代码实现以下代码基于 Python 构建展示从任务规划到执行引擎的完整链路import json from enum import Enum from dataclasses import dataclass, field from typing import Any, Optional, Callable # ---- 数据结构定义 ---- class StepStatus(Enum): PENDING pending RUNNING running SUCCESS success FAILED failed SKIPPED skipped class ToolCallStatus(Enum): PENDING pending EXECUTED executed FAILED failed dataclass class ToolCall: 工具调用记录 tool_name: str arguments: dict result: Any None status: ToolCallStatus ToolCallStatus.PENDING dataclass class StepResult: 单个步骤的执行结果 step_id: str output: str tool_calls: list[ToolCall] field(default_factorylist) status: StepStatus StepStatus.SUCCESS metadata: dict field(default_factorydict) dataclass class ExecutionContext: 全局执行上下文所有步骤共享的工作记忆 variables: dict field(default_factorydict) # 模板变量 step_results: dict[str, StepResult] field(default_factorydict) # 步骤结果 execution_log: list[dict] field(default_factorylist) # 执行日志 def set_variable(self, key: str, value: Any): 设置模板变量供后续步骤的 Prompt 引用 self.variables[key] value def get_variable(self, key: str, default: Any None) - Any: return self.variables.get(key, default) def log(self, step_id: str, event: str, detail: str ): 记录执行日志支持断点续跑和调试 self.execution_log.append({ step_id: step_id, event: event, detail: detail, }) # ---- Prompt 模板引擎 ---- class PromptTemplate: Prompt 模板支持变量注入和上下文引用 def __init__(self, system: str, user_template: str): self.system system self.user_template user_template def render(self, context: ExecutionContext) - tuple[str, str]: 将模板中的变量占位符替换为上下文中的实际值 rendered_user self.user_template for key, value in context.variables.items(): placeholder f{{{{{key}}}}} rendered_user rendered_user.replace(placeholder, str(value)) return self.system, rendered_user # ---- 任务规划器 ---- class TaskPlanner: 将用户目标拆解为结构化的执行计划 PLANNER_SYSTEM_PROMPT ( 你是一个任务规划器。将用户的目标拆解为有序的子任务列表。 每个子任务包含step_id、description、depends_on依赖的步骤ID列表、 tool_required是否需要工具调用。 输出格式为 JSON 数组不要输出其他内容。 ) def plan(self, goal: str) - list[dict]: 生成执行计划生产环境调用大模型 # 简化实现返回预定义的计划结构 # 实际场景中调用大模型生成动态计划 user_prompt f目标{goal}\n\n请生成执行计划。 plan_json call_llm_for_planning( self.PLANNER_SYSTEM_PROMPT, user_prompt ) return plan_json # ---- 工具注册表 ---- class ToolRegistry: 工具注册与执行Agent 调用外部能力的统一入口 def __init__(self): self._tools: dict[str, Callable] {} def register(self, name: str, func: Callable, description: str ): 注册工具函数 self._tools[name] { func: func, description: description, } def execute(self, name: str, arguments: dict) - Any: 执行工具调用包含异常处理 if name not in self._tools: raise ValueError(f未注册的工具{name}) tool self._tools[name] try: result tool[func](**arguments) return result except Exception as e: # 工具调用失败时返回错误信息而非抛出异常 # 让 Agent 有机会根据错误信息调整策略 return f工具调用失败{name}错误{str(e)} def get_tool_descriptions(self) - str: 生成工具描述文本供 Prompt 注入 descriptions [] for name, tool in self._tools.items(): descriptions.append(f- {name}: {tool[description]}) return \n.join(descriptions) # ---- Agent 执行引擎 ---- class AgentExecutor: Agent 工作流执行引擎编排步骤、管理状态、处理工具调用 def __init__( self, tool_registry: ToolRegistry, max_retries: int 2, ): self.tool_registry tool_registry self.max_retries max_retries def execute_plan( self, plan: list[dict], step_templates: dict[str, PromptTemplate], context: ExecutionContext, ) - dict: 执行完整的 Agent 工作流计划 completed_steps: set[str] set() # 按依赖关系排序执行拓扑排序的简化实现 remaining list(plan) while remaining: # 找出所有依赖已满足的步骤 ready_steps [ s for s in remaining if all( dep in completed_steps for dep in s.get(depends_on, []) ) ] if not ready_steps: # 死锁检测存在无法满足依赖的步骤 context.log(system, deadlock, 存在无法满足的依赖关系) break for step in ready_steps: step_id step[step_id] result self._execute_step( step_id, step, step_templates, context ) context.step_results[step_id] result if result.status StepStatus.SUCCESS: completed_steps.add(step_id) # 将步骤输出注入到上下文变量中 context.set_variable( fstep_{step_id}_output, result.output ) elif result.status StepStatus.FAILED: # 步骤失败跳过依赖此步骤的后续步骤 context.log(step_id, step_failed, result.output) remaining.remove(step) # 聚合所有成功步骤的输出 final_output self._aggregate_results(context) return final_output def _execute_step( self, step_id: str, step: dict, templates: dict[str, PromptTemplate], context: ExecutionContext, ) - StepResult: 执行单个步骤支持重试和工具调用循环 context.log(step_id, step_started, step[description]) template templates.get(step_id) if not template: return StepResult( step_idstep_id, output, statusStepStatus.FAILED, metadata{error: f未找到步骤模板{step_id}}, ) retries 0 while retries self.max_retries: try: # 渲染 Prompt system, user template.render(context) # 注入工具描述如果步骤需要工具 if step.get(tool_required): tool_desc self.tool_registry.get_tool_descriptions() user f可用工具\n{tool_desc}\n\n{user} # 调用大模型 model_output call_llm_for_step(system, user) # 解析工具调用如果模型请求调用工具 tool_calls self._parse_tool_calls(model_output) if tool_calls: for tc in tool_calls: tc.result self.tool_registry.execute( tc.tool_name, tc.arguments ) tc.status ( ToolCallStatus.EXECUTED if not isinstance(tc.result, str) or 失败 not in tc.result else ToolCallStatus.FAILED ) # 将工具结果注入上下文 context.set_variable( ftool_{tc.tool_name}_result, tc.result ) # 工具调用后再次调用模型生成最终输出 tool_results_str json.dumps( [{tool: tc.tool_name, result: tc.result} for tc in tool_calls], ensure_asciiFalse, ) context.set_variable(tool_results, tool_results_str) system2, user2 template.render(context) model_output call_llm_for_step(system2, user2) context.log(step_id, step_completed, model_output[:200]) return StepResult( step_idstep_id, outputmodel_output, tool_callstool_calls, statusStepStatus.SUCCESS, ) except Exception as e: retries 1 context.log( step_id, step_retry, f第 {retries} 次重试错误{str(e)} ) if retries self.max_retries: return StepResult( step_idstep_id, outputf步骤执行失败{str(e)}, statusStepStatus.FAILED, ) return StepResult( step_idstep_id, output, statusStepStatus.FAILED ) def _parse_tool_calls(self, model_output: str) - list[ToolCall]: 解析模型输出中的工具调用指令 # 生产环境使用结构化输出如 function calling # 此处为简化实现解析 JSON 格式的工具调用 tool_calls [] try: # 尝试从输出中提取 JSON 工具调用 if tool_call in model_output: parts model_output.split(tool_call)[1:] for part in parts: json_str part.split()[0].strip() call_data json.loads(json_str) tool_calls.append(ToolCall( tool_namecall_data[tool], argumentscall_data.get(arguments, {}), )) except (json.JSONDecodeError, KeyError, IndexError): pass return tool_calls def _aggregate_results(self, context: ExecutionContext) - dict: 聚合所有步骤的执行结果 successful { sid: r.output for sid, r in context.step_results.items() if r.status StepStatus.SUCCESS } failed { sid: r.output for sid, r in context.step_results.items() if r.status StepStatus.FAILED } return { results: successful, failed_steps: failed, total_steps: len(context.step_results), success_rate: ( len(successful) / len(context.step_results) if context.step_results else 0 ), execution_log: context.execution_log, } # ---- 辅助函数 ---- def call_llm_for_planning(system: str, user: str) - list[dict]: 调用大模型生成执行计划——生产环境对接实际 API # 简化实现 return [ {step_id: research, description: 调研相关信息, depends_on: [], tool_required: True}, {step_id: analyze, description: 分析调研结果, depends_on: [research], tool_required: False}, {step_id: generate, description: 生成最终输出, depends_on: [analyze], tool_required: False}, ] def call_llm_for_step(system: str, user: str) - str: 调用大模型执行单步推理——生产环境对接实际 API return f[步骤输出] System长度: {len(system)}, User长度: {len(user)}这段代码的设计核心是模板驱动 上下文传递 工具循环。PromptTemplate将每个步骤的 Prompt 参数化运行时从ExecutionContext中动态填充变量实现步骤间的信息传递。ToolRegistry提供统一的工具注册和执行入口工具调用失败时返回错误信息而非抛出异常让 Agent 有机会根据错误调整策略。AgentExecutor的_execute_step方法实现了模型推理 → 工具调用 → 结果注入 → 再次推理的核心循环并支持最大重试次数限制。四、Agent 工作流的工程代价与适用边界Agent 工作流在解决复杂任务的同时引入了显著的工程复杂度和运行时开销。执行延迟随步骤增加而累积。每个步骤都涉及一次大模型推理加上可能的工具调用和网络往返。一个 5 步骤的 Agent 工作流端到端延迟可能达到 15 ~ 30 秒。对于实时交互场景这个延迟不可接受。缓解方案对独立步骤做并行执行减少串行等待时间对高频工作流建立缓存命中时跳过规划直接执行。Prompt 级联错误的放大效应。步骤 1 的输出是步骤 2 的输入。如果步骤 1 的模型输出偏离预期格式错误、信息遗漏错误会级联放大到后续所有步骤。解决方案在每个步骤的输出解析环节加入格式校验和重试机制对关键步骤设置输出 schema 约束使用结构化输出如 JSON mode降低格式错误率。规划器的可靠性瓶颈。任务规划器本身是大模型驱动的它的输出质量直接影响整个工作流的成败。当用户目标模糊或涉及领域知识时规划器可能生成不合理的计划。工程上的折中对规划器的输出做 schema 校验和合理性检查不合理的计划要求重新生成同时支持人工干预允许用户修改自动生成的计划。状态管理的内存压力。全局上下文随着步骤执行不断膨胀。如果工作流包含 10 个步骤每个步骤的输出 500 tokens加上工具调用结果上下文可能累积到 5000 tokens。当工作流更长时上下文窗口可能溢出。解决方案对历史步骤的输出做摘要压缩只保留关键信息设置上下文窗口的上限超限时自动淘汰最早的步骤结果。适用边界。Agent 工作流适用于多步骤、需要工具调用、有条件分支的复杂任务数据分析管线、内容生成流水线、自动化运维流程。不适用于简单的单轮问答、对延迟极度敏感的实时场景、或步骤间无依赖关系的并行任务后者应直接使用并行调用而非串行 Agent。五、总结Prompt Engineering 驱动的 Agent 工作流本质上是将大模型的单次推理能力扩展为多步协作能力。任务规划器负责拆解Prompt 编排层负责渲染工具执行层负责外部交互状态管理层负责信息传递——四层协同才能让 Agent 从回答问题进化为完成任务。落地路线建议第一步定义工作流的步骤模板体系每个步骤的 Prompt 模板明确输入变量和输出格式第二步构建工具注册表统一管理外部工具的注册、调用和异常处理第三步实现 Agent 执行引擎支持步骤的依赖排序、重试机制和工具调用循环第四步搭建全局上下文管理确保步骤间的信息可靠传递并支持上下文压缩和断点续跑第五步建立执行监控和日志系统追踪每个步骤的延迟、成功率和工具调用情况持续优化 Prompt 模板和工作流编排。从一条 Prompt 到一个智能体跨越的不是技术的鸿沟而是工程体系的构建。每一层抽象都有代价每一次编排都有风险但正是这些严谨的工程实践让 Agent 从概念走向了生产。