LLM 工作流自动化:从 Prompt 拼接到 DAG 编排的工程跃迁
LLM 工作流自动化从 Prompt 拼接到 DAG 编排的工程跃迁一、Prompt 拼接的天花板当 LLM 调用链变成意大利面条LLM 集成的初级形态是Prompt 拼接——将用户输入塞进模板调用 API返回结果。这在单轮问答场景下足够用。但当业务需要多步骤推理时问题迅速暴露第一步的输出是第二步的输入第二步的分支取决于第一步的判断第三步需要并行调用两个不同的 LLM。代码中充斥着嵌套的await、散落的try-catch、硬编码的 Prompt 模板字符串。一个 5 步的审批工作流代码量超过 300 行修改任意一步都需要通读全部逻辑。更致命的是这种面条式代码无法可视化、无法回溯中间状态、无法在失败后从断点恢复。核心痛点LLM 调用链缺乏结构化编排能力导致复杂工作流的代码可维护性急剧恶化。二、DAG 编排模型将工作流从代码中解放出来解决面条式 LLM 调用链的方案是有向无环图DAG编排。将每个 LLM 调用抽象为 DAG 中的一个节点节点间的数据依赖关系用有向边表示。执行引擎按拓扑排序调度节点自动处理并行和依赖。graph TB START([用户输入]) -- A[意图分类节点] A --|分类查询| B[知识库检索节点] A --|分类操作| C[参数提取节点] B -- D[上下文组装节点] C -- E[工具调用节点] D -- F[回答生成节点] E -- F F -- G{质量校验节点} G --|通过| END([输出结果]) G --|不通过| A style A fill:#e1f5fe style F fill:#fff3e0 style G fill:#ffebee上图是一个典型的 LLM 工作流 DAG。关键设计点条件分支意图分类节点的输出决定后续走查询路径还是操作路径。DAG 引擎根据节点输出的路由键选择下游节点。并行扇出-扇入知识库检索和参数提取可以并行执行回答生成节点等待两者都完成后才触发。DAG 引擎自动识别无依赖关系的节点并行调度。循环重试质量校验节点不通过时将结果回传意图分类节点重新处理。DAG 中允许回边但必须设置最大重试次数防止死循环。节点抽象每个节点只关心输入 Schema 和输出 Schema不关心上下游是谁。这使得节点可以独立测试、独立替换 LLM 提供商。三、生产级代码实现轻量 DAG 编排引擎以下实现不依赖 LangGraph 或 Prefect用约 150 行 TypeScript 构建一个可用的 DAG 引擎// workflow/dag-engine.ts — 轻量 DAG 编排引擎 interface WorkflowNode { id: string; // 节点执行函数接收上游输出返回本节点结果 execute: (input: Recordstring, unknown) PromiseRecordstring, unknown; // 路由函数可选用于条件分支返回下游节点 ID 列表 route?: (output: Recordstring, unknown) string[]; } interface WorkflowEdge { from: string; to: string; // 条件谓词可选仅当谓词返回 true 时边才激活 condition?: (output: Recordstring, unknown) boolean; } interface WorkflowDefinition { nodes: WorkflowNode[]; edges: WorkflowEdge[]; entryNode: string; maxIterations: number; // 防止循环导致无限执行 } class DAGEngine { private nodeMap: Mapstring, WorkflowNode new Map(); private adjacency: Mapstring, WorkflowEdge[] new Map(); private definition: WorkflowDefinition; constructor(definition: WorkflowDefinition) { this.definition definition; // 构建节点索引和邻接表加速运行时查找 definition.nodes.forEach((n) this.nodeMap.set(n.id, n)); definition.edges.forEach((e) { const edges this.adjacency.get(e.from) ?? []; edges.push(e); this.adjacency.set(e.from, edges); }); } async run( initialInput: Recordstring, unknown ): PromiseRecordstring, unknown { const context: Recordstring, Recordstring, unknown {}; let currentNodeId: string | null this.definition.entryNode; let iterations 0; while (currentNodeId iterations this.definition.maxIterations) { const node this.nodeMap.get(currentNodeId); if (!node) { throw new Error(节点 ${currentNodeId} 未注册); } // 聚合上游节点的输出作为当前节点的输入 const upstreamOutputs this.getUpstreamOutputs(currentNodeId, context); const mergedInput { ...initialInput, ...upstreamOutputs }; // 执行当前节点捕获异常并记录到上下文 let output: Recordstring, unknown; try { output await node.execute(mergedInput); } catch (err) { output { _error: true, _message: (err as Error).message, _node: currentNodeId }; } context[currentNodeId] output; iterations; // 确定下一个节点优先使用路由函数否则按边条件过滤 const nextNodes this.resolveNextNodes(currentNodeId, output); if (nextNodes.length 0) { // 无下游节点工作流结束 break; } // 简化处理线性 DAG 取第一个下游节点 // 完整实现应支持并行扇出此处从简 currentNodeId nextNodes[0]; } if (iterations this.definition.maxIterations) { throw new Error(工作流超过最大迭代次数可能存在循环); } // 返回最后一个节点的输出作为工作流结果 const lastNodeId Object.keys(context).pop()!; return context[lastNodeId]; } // 获取上游节点的输出合并为当前输入 private getUpstreamOutputs( nodeId: string, context: Recordstring, Recordstring, unknown ): Recordstring, unknown { const merged: Recordstring, unknown {}; this.definition.edges .filter((e) e.to nodeId) .forEach((e) { if (context[e.from]) { Object.assign(merged, context[e.from]); } }); return merged; } // 解析下游节点路由函数 边条件 默认 private resolveNextNodes( nodeId: string, output: Recordstring, unknown ): string[] { const node this.nodeMap.get(nodeId)!; // 优先使用节点的路由函数 if (node.route) { return node.route(output); } // 按边条件过滤 const edges this.adjacency.get(nodeId) ?? []; return edges .filter((e) !e.condition || e.condition(output)) .map((e) e.to); } } // 使用示例构建一个意图分类 分支处理的工作流 const workflow new DAGEngine({ entryNode: classify, maxIterations: 10, nodes: [ { id: classify, execute: async (input) { // 调用 LLM 进行意图分类 const intent await classifyIntent(input.userMessage as string); return { intent, userMessage: input.userMessage }; }, // 路由函数根据分类结果决定下游节点 route: (output) { return output.intent query ? [retrieve] : [extractParams]; }, }, { id: retrieve, execute: async (input) { const docs await retrieveFromKnowledgeBase(input.userMessage as string); return { ...input, documents: docs }; }, }, { id: extractParams, execute: async (input) { const params await extractParameters(input.userMessage as string); return { ...input, params }; }, }, { id: generate, execute: async (input) { const answer await generateAnswer(input); return { answer }; }, }, ], edges: [ { from: retrieve, to: generate }, { from: extractParams, to: generate }, ], });设计要点maxIterations防止循环导致的无限执行。节点执行异常不抛出而是将错误信息写入上下文让下游节点或路由函数自行决策。route函数让条件分支逻辑内聚在节点内部而非散落在全局条件判断中。四、DAG 编排的暗面状态管理与调试的深层挑战DAG 编排解决了代码结构问题但引入了新的复杂性。状态持久化的缺失。上述轻量引擎将所有中间状态保存在内存中。一旦进程重启工作流状态全部丢失。对于耗时较长的工作流如人工审批节点等待数小时必须将状态持久化到数据库。这需要引入检查点Checkpoint机制每个节点执行完毕后将上下文快照写入持久存储。恢复时从最近的检查点继续执行。实现检查点并不复杂但需要处理 Schema 版本兼容——工作流定义更新后旧检查点的数据结构可能不兼容。LLM 调用的不确定性。DAG 的路由依赖 LLM 输出但 LLM 输出具有非确定性。同一个输入意图分类可能有时返回query有时返回operation。这导致工作流执行路径不可复现。解决方案是对路由节点的 LLM 调用设置temperature: 0并在 Prompt 中严格约束输出格式。但即使如此仍无法 100% 保证路由稳定性。并行节点的错误传播。当两个并行节点中一个失败时扇入节点应该如何处理等待全部完成忽略失败节点还是整体回滚不同业务场景需要不同策略DAG 引擎必须提供可配置的错误传播语义。适用边界DAG 编排适合步骤数 ≥ 3、存在条件分支或并行执行、且需要可视化调试的 LLM 工作流。对于简单的线性调用链步骤 ≤ 2直接写await更直观。五、总结LLM 工作流自动化的核心跃迁是从命令式的 Prompt 拼接走向声明式的 DAG 编排。将每个 LLM 调用抽象为 DAG 节点用边定义数据依赖让执行引擎自动处理调度、并行和路由。落地路线建议第一步用线性await链快速验证业务逻辑的可行性第二步当步骤数超过 3 且出现条件分支时引入 DAG 引擎重构第三步为长耗时工作流添加检查点持久化确保故障恢复能力。编排不是目的可维护性才是。