第4章人机协作——interrupt 与审批网关本章目标读完本章你会能解释 Human-in-the-Loop 为什么是生产级 Agent 的必需品而非可选项能用interrupt()在图执行的任意节点暂停等待人类输入能用Command(resume...)传入人类决策并恢复执行能设计一个完整的AI 生成 → 人类审核 → 批准/驳回 → 发布工作流知识讲解从一个生活例子开始你是一家新媒体编辑部的编辑。你的工作流是这样的选题策划 → 撰稿 → 主编审核 → ┬→ 通过 → 排版发布 └→ 驳回 → 修改 → 主编再审关键在于主编审核这个环节。没有它文章直接发出去了——标题党、事实错误、观点跑偏全是你的锅。有它AI 可以尽情发挥因为最终把关的是人。LangGraph 里这个主编审核环节就是interrupt()。它在图的执行路径上设了一道闸门——Agent 跑到这里就停下把结果展示给你等你的指令。你可以批准继续往下走进入发布环节驳回带着修改意见回到生成环节Agent 重新写甚至修改 State 里的任意字段再继续——不只是过/不过二选一编辑部的角色LangGraph 的对应撰稿编辑Agent 的生成节点主编你人类操作者审稿环节interrupt()通过发吧 / 重写导语Command(resume...)工作原理interrupt() 做了什么interrupt()是 LangGraph 内置的函数。当一个节点调用它时图立即暂停执行——后续节点不会被调用Checkpointer 保存当前 State——包括 interrupt 发生时的完整上下文控制权交还给调用方——invoke()或stream()会返回一个特殊结果包含 interrupt 信息等待Command(resume...)恢复——调用方准备好后传入人类决策图从暂停处继续agent 节点执行 → 调用 interrupt(请审核) → ⏸️ 暂停 ↓ 人类审核... → Command(resume{approved: True}) → ▶️ 继续和 checkpointer 的关系interrupt()能工作的前提是图有 checkpointer。没有 checkpointer中断时的 State 无处存放恢复也无从谈起。这是第 3 章是本章前置依赖的原因。实际上interrupt()的实现就是在当前 super-step 的末尾把 State 写入 checkpoint然后抛出GraphInterrupt异常——这个异常被 LangGraph 运行时捕获把控制权还给调用方。恢复时从 checkpoint 读取 State从interrupt()的下一行继续执行。不只通过/不通过——Command 的三种恢复方式# 方式 1只恢复继续最简单 Command(resume批准发布) # 方式 2恢复同时更新 State Command(resume驳回, update{feedback: 标题不够吸引人重写}) # 方式 3恢复同时跳转到指定节点动态改道 Command(resume驳回, update{feedback: 重写}, gotogenerate_content)方式 3 特别有用——驳回时可以直接跳回生成节点跳过发布节点的前置检查。思考一下如果你在interrupt()暂停后不小心关掉了终端下次重启还能继续吗提示想想第 3 章 SqliteSaver 的持久化能力。代码实战基础版理解 interrupt 的暂停与恢复先从一个最简的二节点图开始纯粹理解interrupt()的机制。新建文件chapter04_interrupt_basics.py 第 4 章 基础演示 Ainterrupt 的暂停与恢复机制 理解图如何在中途等待人类输入 from typing import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from langgraph.types import interrupt, Command # interrupt 和 Command # # State # class ReviewState(TypedDict): draft: str # AI 生成的草稿 feedback: str # 人类的审核意见 final: str # 最终版本 # # 节点——interrupt 放在等待审核节点中 # def generate_draft(state: ReviewState) - dict: 模拟 AI 生成内容 print([generate_draft] AI 正在撰写草稿...) draft LangGraph 是 2026 年最值得学习的 AI Agent 框架——它用图的思维编排复杂工作流。 return {draft: draft} def human_review(state: ReviewState) - dict: 人类审核节点——调用 interrupt 暂停 draft state[draft] print(f\n{*50}) print(f 待审核内容\n{draft}) print(f{*50}) # interrupt()——图在这里暂停 # 参数是你想展示给人类的信息 human_decision interrupt(请审核以上内容。输入 approve 批准或输入修改建议。) # ⚠ 下面这行代码在人类调用 Command(resume...) 之后才会执行 print(f[human_review] 收到审核结果: {human_decision}) # 根据人类输入做不同处理 if human_decision approve: return {feedback: approved} else: return {feedback: human_decision} def finalize(state: ReviewState) - dict: 根据审核结果决定最终版本 if state[feedback] approved: final f✅ 已发布{state[draft]} else: final f 需修改意见{state[feedback]}——原稿{state[draft]} print(f[finalize] {final}) return {final: final} # # 构建图 # def build_review_graph(): builder StateGraph(ReviewState) builder.add_node(generate_draft, generate_draft) builder.add_node(human_review, human_review) builder.add_node(finalize, finalize) builder.add_edge(START, generate_draft) builder.add_edge(generate_draft, human_review) builder.add_edge(human_review, finalize) builder.add_edge(finalize, END) # ⚠ interrupt 必须配合 checkpointer return builder.compile(checkpointerMemorySaver()) # # 运行——分两步先 invoke 触发中断再 invoke 传入人类决策 # if __name__ __main__: graph build_review_graph() config {configurable: {thread_id: review-demo-1}} # --- 第一步invoke——图跑到 human_review 节点中的 interrupt 就会暂停 --- print( 启动图...) result graph.invoke( {draft: , feedback: , final: }, config ) # 图暂停后会返回当前 State包含 interrupt 信息 print(f\n⏸️ 图已暂停) print(f draft: {result[draft][:60]}...) print(f feedback: {result[feedback]}) print(f final: {result[final]}) # --- 第二步人类做决定后用 Command 恢复 --- print(\n * 50) print( 人类审核批准发布) print( * 50) # Command(resume...) 传入人类的决定图从 interrupt 的下一行继续 result graph.invoke( Command(resumeapprove), # resume 的值就是 interrupt() 的返回值 config # 必须用同一个 config同一个 thread_id ) print(f\n✅ 最终结果: {result[final]})运行这段代码观察控制台输出 1. 第一次invoke打印了待审核内容后没有继续——图在interrupt()处暂停了 2. 第二次invoke传入Command(resumeapprove)后human_review节点从interrupt()的下一行继续执行最终走到finalize逐行解析interrupt(请审核以上内容...)这是本章最核心的 API。参数是一个字符串人类可读的描述返回值是Command(resume...)中传入的值。关键理解interrupt()被调用时它下面的代码不会立即执行——控制权交还给 Python 调用方。只有当你再次invoke并传入Command(resume...)后interrupt()才会返回那个值继续往下走。Command(resumeapprove)Command是恢复执行的核心原语。resume参数的值会作为interrupt()的返回值。没有它图永远停在中断点。两次invoke必须用相同的config相同的thread_id因为中断时的 State 存储在 checkpoint 中恢复时需要通过同一个thread_id找到它。如果换了thread_idLangGraph 会把它当成全新的会话——找不到中断点从START开始执行。扩展版LocalTrend 内容审核工作流现在把 interrupt 接入 LocalTrend 项目——Agent 搜索趋势 → 分析风格 → 生成内容 →暂停等人类审核→ 批准后生成发布稿。 第 4 章 扩展演示LocalTrend 内容审核工作流 AI 生成 → 人类审核 → 批准/驳回 → 发布 import os from typing import TypedDict, Annotated, Literal from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.types import interrupt, Command from langchain_core.messages import HumanMessage, AIMessage, ToolMessage from langchain_core.tools import tool from langchain_openai import ChatOpenAI import sqlite3 # # 配置 # llm ChatOpenAI( modeldeepseek-chat, api_keyos.getenv(DEEPSEEK_API_KEY, your-api-key-here), base_urlhttps://api.deepseek.com, temperature0.7, ) # # State——新增审核相关字段 # class LocalTrendState(TypedDict): messages: Annotated[list, add_messages] iteration_count: int # 审核相关字段 generated_content: str # AI 生成的待审核内容 platform: str # 目标平台公众号/小红书/微博... review_decision: str # 审核结果approved / rejected review_feedback: str # 审核意见 published: bool # 是否已发布 # # 工具 # tool def search_trending(query: str) - str: 搜索当前热门话题趋势 print(f [工具] 搜索: {query}) knowledge { AI: AI 爆款Agent 开发实战、RAG 从零到一、LLM 应用架构是近期高流量话题。, 职场: 职场爆款AI 时代的核心竞争力、副业月入过万复盘情绪共鸣强。, } for key, val in knowledge.items(): if key in query: return val return f关于{query}的趋势分析。 tool def analyze_style(content: str) - str: 分析内容的写作风格和爆款特征 print(f [工具] 分析风格) return 风格标题含数字情感词开头用痛点切入正文分点论述案例结尾设互动问题。 tools [search_trending, analyze_style] llm_with_tools llm.bind_tools(tools) # # 节点 # def agent_node(state: LocalTrendState) - dict: ReAct 思考节点 response llm_with_tools.invoke(state[messages]) return { messages: [response], iteration_count: state.get(iteration_count, 0) 1, } def tool_node(state: LocalTrendState) - dict: 工具执行节点 last_msg state[messages][-1] tool_map {t.name: t for t in tools} results [] for tc in last_msg.tool_calls: result tool_map[tc[name]].invoke(tc[args]) results.append(ToolMessage(contentresult, tool_call_idtc[id])) return {messages: results} def should_continue(state: LocalTrendState) - Literal[tools, generate, __end__]: 增强路由ReAct 结束后进入内容生成 last_msg state[messages][-1] if state.get(iteration_count, 0) 8: return generate if hasattr(last_msg, tool_calls) and last_msg.tool_calls: return tools # 不再直接 END而是进入内容生成阶段 return generate # # 内容生成 审核节点 # def generate_content(state: LocalTrendState) - dict: 基于分析结果生成内容 print(\n[generate_content] 正在基于趋势分析生成内容...) # 从消息历史中提取关键信息 context for msg in state[messages]: if hasattr(msg, content) and msg.content: if isinstance(msg, ToolMessage): context str(msg.content) \n platform state.get(platform, 微信公众号) prompt HumanMessage(content( f基于以下趋势分析为{platform}写一篇爆款短文200字以内\n{context}\n\n f要求标题有吸引力开头有痛点结尾有互动引导。 )) response llm.invoke([prompt]) print(f 生成完成内容长度: {len(response.content)} 字符) return {generated_content: response.content} def review_gate(state: LocalTrendState) - dict: 审核网关——Human-in-the-Loop 的核心 content state[generated_content] platform state.get(platform, 微信公众号) print(f\n{*60}) print(f 待审核内容{platform}:) print(f{*60}) print(content) print(f{*60}) # 中断等待人类决策 decision interrupt( f请审核以上 {platform} 内容。\n f回复 approve 批准发布或输入修改意见驳回。 ) print(f[review_gate] 审核结果: {decision}) if decision.lower() approve or decision.lower() 批准: return {review_decision: approved, review_feedback: } else: return {review_decision: rejected, review_feedback: decision} def handle_approved(state: LocalTrendState) - dict: 批准后的发布处理 print(\n[handle_approved] ✅ 审核通过准备发布...) final_content f【已发布】\n平台{state[platform]}\n\n{state[generated_content]} return {published: True, generated_content: final_content} def handle_rejected(state: LocalTrendState) - dict: 驳回后的处理——把修改意见返回给 Agent 重新生成 print(f\n[handle_rejected] ❌ 审核驳回) print(f 修改意见: {state[review_feedback]}) # 构造带修改意见的消息让 Agent 重新生成 feedback_msg HumanMessage( contentf上一篇内容被驳回。修改意见{state[review_feedback]}。 f请根据意见重新生成一篇内容200字以内。 ) return { messages: [feedback_msg], published: False, } def review_router(state: LocalTrendState) - Literal[approved, rejected]: 根据审核结果路由 if state[review_decision] approved: return approved return rejected # # 构建完整的 LocalTrend 审核工作流 # def build_localtrend_graph(db_path: str localtrend_review.db): builder StateGraph(LocalTrendState) # 原有节点 builder.add_node(agent, agent_node) builder.add_node(tools, tool_node) # 新节点 builder.add_node(generate_content, generate_content) builder.add_node(review_gate, review_gate) builder.add_node(handle_approved, handle_approved) builder.add_node(handle_rejected, handle_rejected) # 图的拓扑结构 builder.add_edge(START, agent) # ReAct 循环 builder.add_conditional_edges( agent, should_continue, {tools: tools, generate: generate_content} ) builder.add_edge(tools, agent) # 内容生成 → 审核网关 builder.add_edge(generate_content, review_gate) # 审核分支 builder.add_conditional_edges( review_gate, review_router, {approved: handle_approved, rejected: handle_rejected} ) builder.add_edge(handle_approved, END) # 驳回后回到 agent_node——利用已有的 ReAct 能力重新生成 builder.add_edge(handle_rejected, agent) conn sqlite3.connect(db_path, check_same_threadFalse) return builder.compile(checkpointerSqliteSaver(conn)) # # 运行演练 # if __name__ __main__: graph build_localtrend_graph() config {configurable: {thread_id: localtrend-review-1}} # --- 第一步启动探索 生成 --- print( * 60) print( LocalTrend 启动搜索 AI 趋势 → 分析 → 生成公众号内容) print( * 60) result graph.invoke( { messages: [ HumanMessage( content搜索 AI 领域的爆款趋势然后为微信公众号写一篇爆款文章。 ) ], iteration_count: 0, generated_content: , platform: 微信公众号, review_decision: , review_feedback: , published: False, }, config ) # 图在 review_gate 中断了 print(f\n⏸️ 图已在审核网关暂停) print(f 等待人类审核...) # --- 第二步模拟终端交互--- print(\n * 60) print( 进入人工审核环节) print( * 60) print(在真实应用中这里会展示给用户在 UI 上审核。) print(本演示中我们模拟两种场景\n) # 场景 A批准 print(--- 场景 A批准发布 ---) result graph.invoke(Command(resumeapprove), config) print(f发布状态: {已发布 ✅ if result[published] else 未发布}) print(f发布内容预览: {result[generated_content][:120]}...) # 场景 B用新 thread 模拟驳回流程 print(\n\n--- 场景 B驳回重写 ---) config_b {configurable: {thread_id: localtrend-review-2}} # 第一步启动 graph.invoke( { messages: [HumanMessage(content搜索 AI 爆款趋势写一篇小红书风格的内容。)], iteration_count: 0, generated_content: , platform: 小红书, review_decision: , review_feedback: , published: False, }, config_b ) print(⏸️ 暂停等待审核...) # 第二步驳回模拟主编不满意 print(\n 主编驳回标题不够吸引人小红书风格需要更多 emoji 和短句) result_b graph.invoke( Command(resume标题不够吸引人小红书风格需要更多 emoji 和短句), config_b ) # 驳回后回到 agentagent 会调用 LLM 重新搜索和生成 # 再次跑到 review_gate 中断 print(f\n⏸️ 再次暂停等待二次审核...) print(f 新内容: {result_b[generated_content][:120]}...) # 第三步批准修改后的版本 result_b graph.invoke(Command(resumeapprove), config_b) print(f\n✅ 最终发布状态: {已发布 if result_b[published] else 未发布})运行后观察注意场景 B 的执行路径 1. Agent 搜索趋势 → 分析 → 生成内容 →review_gate中断2. 你传入驳回意见 →handle_rejected将意见包装为HumanMessage→ 回到agent_node3. Agent 看到驳回意见重新调用 LLM → 这次不会再自动调用工具因为消息历史里已经有分析结果直接在上下文中重写 4. 新内容再次到达review_gate→再次中断等你的最终决定这就是一个完整的人机协作循环AI 干活 → 人类把关 → 不行就重做 → 通过就发布。⚠️常见坑驳回后回到 agent_node 时iteration_count可能已经很高。如果你的should_continue中设置了 8的上限驳回导致的额外轮次可能会触及这个上限。在实际项目中应该在驳回时重置迭代计数器——练习中会让你做这个改进。本章小结Human-in-the-Loop 是生产级 Agent 的必需品——AI 生成的内容必须有人把关特别是涉及对外发布、资金操作等高风险场景。interrupt()在图执行路径上设暂停点——节点函数调用它时图保存 State 并交还控制权。Command(resume...)恢复暂停的图——resume的值成为interrupt()的返回值节点从下一行继续执行。Checkpointer 是 interrupt 的前置依赖——没有 checkpointer中断的 State 无处存储。同一个thread_id跨越暂停和恢复——换了 thread_id 等于找不到案发现场。驳回不是失败是工作流的一部分——通过条件路由驳回后可以回到生成节点重做形成 AI-人类-AI 协作循环。关键术语术语释义Human-in-the-Loop (HITL)人机协作模式AI 执行关键步骤前暂停等待人类决策interrupt()LangGraph 内置函数在节点中调用后暂停图执行等待 Command 恢复Command恢复图执行的原语resume传值给 interruptupdate更新 Stategoto可动态跳转审核网关interrupt 的典型应用——在生成和发布之间设置审核节点GraphInterruptinterrupt 底层抛出的异常被 LangGraph 运行时静默捕获并转为暂停resumeCommand 的核心参数其值作为 interrupt() 的返回值传递给暂停的节点