DeepAgents - Human in the loop
在开始之前先理清几个关键概念概念说明interrupt中断当 Agent 准备调用某个被监控的 tool 时HumanInTheLoopMiddleware调用 LangGraph 的interrupt()暂停图执行并抛出包含action_requests和review_configs的请求checkpoint检查点中断时图状态会被持久化。必须配置 checkpointer否则中断后无法恢复。生产环境建议用AsyncPostgresSaver测试用InMemorySaverversionv2LangGraph 1.0 的 v2 模式ainvoke()返回GraphOutput对象含.interrupts属性astream()的updates流中会出现__interrupt__事件Command(resume)用户做出决策后用Command(resume{decisions: [...]})从断点恢复执行Decision决策四种类型approve批准、reject拒绝并反馈、edit修改参数后执行、respond人类直接回答跳过 tool 执行执行生命周期用户提问 → Agent 调用 LLM 生成回复 → LLM 决定调用 tool如 execute_shell_command → after_model 钩子检查 tool 是否在 interrupt_on 中 → 是构建 HITLRequest → interrupt() → 暂停 ⌛ → 否继续执行 → 人类做出决策approve / reject / edit / respond → 恢复执行 → 执行/拒绝 tool → LLM 生成最终回复 → 返回流程逻辑以 Chainlit 聊天应用为交互载体消息处理流程如下用户在聊天页面发送消息如检查下系统负载Agent 调用 LLM 生成回复LLM 决定调用execute_shell_commandHumanInTheLoopMiddleware检测到该 tool 在interrupt_on列表中触发中断Chainlit 应用检测到中断向用户展示审批提示用户回复批准/拒绝应用用Command(resume)恢复执行Agent 根据决策执行或拒绝 tool最终返回结果给用户配置中断首先需要在创建 Agent 时配置HumanInTheLoopMiddlewarefrom deepagents import create_deep_agent from langchain.agents.middleware import HumanInTheLoopMiddleware agent create_deep_agent( modelllm, tools[execute_shell_command], checkpointercheckpointer, # 必须配置 system_prompt你是一位智能助手..., middleware[ HumanInTheLoopMiddleware( interrupt_on{ # 对 execute_shell_command 进行审批 execute_shell_command: { allowed_decisions: [approve, reject] } } ), ], )interrupt_on是一个字典key 为 tool 名称value 的可配置项True— 允许所有四种决策approve / edit / reject / respondFalse— 不拦截该 tool等同于不写{allowed_decisions: [...]}— 只允许指定决策类型还可以配置when谓词按参数条件判断是否拦截、description自定义中断提示文本invoke 模式中的实现v2 模式下的ainvoke()返回GraphOutput对象可通过.interrupts属性直接获取中断数据不需要去查 state。检测中断resp await agent.ainvoke( input{messages: [HumanMessage(contentquery)]}, configconfig, versionv2, ) if resp.interrupts: # 存在中断resp.interrupts 是 Interrupt 对象的元组 interrupt resp.interrupts[0] # interrupt.value 是 HITLRequest包含 action_requests 和 review_configs print(interrupt.value[action_requests])恢复中断用户做出决策后用Command(resume)恢复from langgraph.types import Command await agent.ainvoke( Command(resume{ decisions: [{type: approve}] # 或 {type: reject, message: ...} }), configconfig, # 必须用同一个 thread_id versionv2, )关键如何分辨新消息还是中断恢复在聊天应用中用户发来的每条消息都走同一个cl.on_message处理函数。用户说检查负载和回复批准都只是文本。解决方法是——调用前先检查是否有待处理的中断# 检查当前会话是否有待处理的中断 state await agent.aget_state(config) if state.next: # 有待处理中断 → 本次消息是审批回复构建 resume 命令 cmd Command(resume{decisions: [{type: approve}]}) await agent.ainvoke(cmd, configconfig, versionv2) else: # 无中断 → 正常对话 resp await agent.ainvoke( {messages: [HumanMessage(contentquery)]}, configconfig, versionv2 )state.next不为空表示图执行被暂停了有中断等待处理。stream 模式中的实现流式模式需要用stream_mode[messages, updates]官方推荐同时开启两种流messages流获取 LLM 的 token 级输出updates流检测中断事件__interrupt__async for chunk in agent.astream( inputinput_data, stream_mode[messages, updates], versionv2, configconfig, ): if chunk[type] messages: msg, _meta chunk[data] # msg 是 AIMessageChunk包含 content 和 tool_calls if isinstance(msg, AIMessageChunk) and msg.content: yield extract_text(msg) # 流式输出文本 elif chunk[type] updates: if __interrupt__ in chunk[data]: interrupt chunk[data][__interrupt__][0] yield format_question(interrupt) # 输出审批问题stream 模式的恢复与 invoke 类似——在调用astream()之前同样要先检查state.next来判断是正常对话还是中断恢复。完整示例下面是核心代码。checkpointer和llm的配置函数、日志模块等非核心代码省略。Agent 封装internal/agent/agent.py核心部分# --- 审批关键词匹配 --- _APPROVE_KEYWORDS frozenset( {yes, accept, approve, ok, 是, 允许, 同意, 批准} ) def _parse_decision(query: str) - str: return approve if query.strip().lower() in _APPROVE_KEYWORDS else reject def _build_resume_command(decision_type: str, actions_count: int) - Command: item {type: decision_type} if decision_type reject: item[message] user rejected this action return Command(resume{decisions: [item for _ in range(actions_count)]}) def _extract_text(message) - str: 从消息中提取纯文本兼容 str 和 list[dict] 两种 content 格式。 if not message or not hasattr(message, content): return content message.content if isinstance(content, str): return content if isinstance(content, list): return .join( b.get(text, ) for b in content if isinstance(b, dict) and b.get(type) text ) return def _format_interrupt_question(interrupt) - str: 将中断数据格式化为用户的审批问题。 action_requests interrupt.value.get(action_requests, []) review_configs interrupt.value.get(review_configs, []) allowed ( review_configs[0].get(allowed_decisions, [approve, reject]) if review_configs else [approve, reject] ) lines [] for req in action_requests: lines.append( Do you approve me to execute this action?\n\n f- name: {req[name]}\n f- args: {req[args]}\n ) lines.append(fInput your decision: {, .join(allowed)}\n) return \n.join(lines) class AIAgent: # ... __init__, _init_deep_agent, _init_tools 省略 ... async def _has_pending_interrupt(self, config: RunnableConfig) - bool: state await self._agent.aget_state(config) return bool(state.next) # --- invoke 模式 --- async def ainvoke(self, query: str, config: RunnableConfig) - str: if not self._agent: await self._init_deep_agent() # 优先处理中断恢复 if await self._has_pending_interrupt(config): state await self._agent.aget_state(config) actions_count len( state.interrupts[0].value[action_requests] ) decision _parse_decision(query) cmd _build_resume_command(decision, actions_count) await self._agent.ainvoke(cmd, configconfig, versionv2) # 恢复后取最新消息 state await self._agent.aget_state(config) if state.values and messages in state.values: return _extract_text(state.values[messages][-1]) return Oops, something went wrong. # 正常对话 resp await self._agent.ainvoke( input{messages: [HumanMessage(contentquery)]}, configconfig, versionv2, ) if resp.interrupts: return _format_interrupt_question(resp.interrupts[0]) return _extract_text(resp.value[messages][-1]) # --- stream 模式 --- async def astream(self, query: str, config: RunnableConfig): if not self._agent: await self._init_deep_agent() # 判断是中断恢复还是正常对话 state await self._agent.aget_state(config) if state.next: actions_count len( state.interrupts[0].value[action_requests]