Pi Agent 对接实现:消息解析、重试与取消
JSON 事件。听起来也简单——拉起进程、读输出、解析就好——可真上手了你会发现对接一个 agent CLI跟对接一个普通 CLI完全是两码事。普通 CLI 你读完 stdout 拿个退出码事情也就过去了。可 agent CLI 偏偏有三个让人头疼的特点第一它的事件流是私有协议。turn_start、session、message_update、message_end、turn_end、agent_end——这些是 pi 自家定义的不是什么行业标准。每个想消费它的上层都得各自处理一遍等于把 pi 的内部细节泄漏得到处都是。就像隔着距离看一个人你以为看清了其实只是看到了她想给你看的那一面罢了。第二它的失败语义特别暧昧。agent 跑着跑着可能网络抖了一下、模型限流了、进程崩了这时候到底要不要重试在哪儿重试重试会不会把已经半截写出去的会话状态搞乱这是架构决策不是随手写个for循环就能解决的。第三它长且可中断。一个 turn 可能跑几十秒甚至几分钟中间用户随时可能想取消。取消的时候进程不能变孤儿工具调用不能留半成品已经吐出的内容又不能丢。这里的水比想象中深多了。为了解决这些痛点我们花了点时间把对接路径理顺了。后面会具体说这里先剧透一句真正的难点不在拉起进程而在分清职责。关于 HagiCode本文分享的方案来自 HagiCode 项目——一个 AI 代码助手支持多模型、多 agent CLI 后端。GitHub 仓库HagiCode-org/site欢迎来点个 Star。下面讲的所有代码、所有踩过的坑都是这个项目里真实跑着的。其实写出来也不过是给自己留个念想而已。整体分层HagiCode 把 AI 能力对接拆成两层底层是Hagicode.Libs提供可复用的 provider 原语ICliProviderTOptions专门负责拉起一个 CLI agent、把它的输出归一化成共享消息流。上层是hagicode-core提供项目级的 thin adapterIAIProvider负责把业务请求翻译成 provider 的参数、消费共享消息流、对外暴露统一的流式 chunk。Pi 的接入就走这条路。底层PiProvider拉 pi 进程、读 JSON 事件流、归一化成共享消息上层PiCliProvider把AIRequest翻成PiOptions、消费CliMessage、对外吐AIStreamingChunk。这三件事——消息解析、重试、取消——分别落在三个不同的地方PiJsonEventMapper、一个看似奇怪的归档提案、还有CliProcessManager。下面一个个说。消息解析Pi 私有事件怎么变成共享消息pi 在--mode json --print下按行输出 JSON 事件。这套事件是 pi 私有的绝不能直接漏给上层否则每个消费方都要耦合 pi 的内部细节pi 一升级事件结构你全项目跟着改。其实这种泄漏跟把心事写在脸上没什么两样——别人看着累自己也不见得舒服。我们用PiJsonEventMapper做了一层翻译把 pi 的事件归一化成共享的CliMessage。CliMessage定义在HagiCode.Libs.Core/Transport/CliMessage.cs结构非常简单就是一个(Type, Content)的 record。映射关系大致如下pi 事件共享消息用途sessionsession.started/session.resumed会话生命周期message_updatetext 类assistant流式正文增量message_updatethinking 类assistant.thought思考链message_updatetool 类tool.call/tool.update工具调用发起message_end/turn_endtoolResulttool.completed/tool.failed工具结果turn_end/agent_endterminal.completed本轮结束非零退出 / 解析失败terminal.failed终态失败这张表只是个速查里面有两个关键技巧是踩坑之后才摸索出来的值得展开讲。技巧一cumulative snapshot 转 delta这是最容易翻车的点。pi 的message_update事件发的不是增量而是累积全文——每来一个 token它把到目前为止的完整文本重新发一遍。如果你直接把收到内容转发给前端用户会看到内容反复重复第一条是你第二条是你好第三条是你好第四条是你好世……前端会以为这是四次独立的输出。其实重复这种东西看一次是新鲜看十次就是厌倦罢了。解决办法是前缀比对算出真正的增量// 关键pi 发的是累积快照不是增量// 用前缀比对把增量抠出来否则前端会看到重复内容if (text.StartsWith(_lastAssistantTextSnapshot, StringComparison.Ordinal)){var delta text[_lastAssistantTextSnapshot.Length..];_lastAssistantTextSnapshot text;return delta.Length 0 ? null : delta;}这里还有个隐藏的坑跨 turn 的前缀重放。pi 在工具调用结束、assistant 重新接着说的时候会再次把之前那段文本从头发一遍。如果你只记一个全局快照就会把重放的内容当成增量导致工具调用后又出现一段重复。PiProviderTests里专门有个用例ExecuteAsync_deduplicates_replayed_assistant_prefix_after_tool_turns覆盖这个场景。换句话说工具调用前后的快照要对齐处理不能各自为政。技巧二thinking 要缓冲到 turn 结束再发思考链thinking不能每收到一个 token 就往外吐。pi 在工具调用中途会塞进来一堆思考碎片如果实时转发流的顺序会乱成一锅粥——一会儿是 assistant 正文一会儿是思考碎片一会儿又是 tool.call。这意义吗其实也没什么意义只是徒增混乱而已。我们的做法是收到 thinking 事件时先放进BufferThinkingSnapshot暂存等message_end或turn_end且stopReason ! toolUse时再统一DrainBufferedThinkingMessages。这样工具调用中途的思考碎片就不会污染主流turn 结束时一次性给出完整的思考过程。容错坏的行不能让流崩掉agent CLI 不是教科书里的理想系统它偶尔会吐出一行非 JSON或者一个 JSON 没有type字段。如果你在这里抛异常整个流就死了用户什么也看不到。毕竟现实世界总有些不完美谁能保证每行都规规矩矩呢我们的策略是任何一行解析失败都不中断流而是收集到_invalidOutputLines。等进程结束后在Complete()里把这些坏行拼进terminal.failed的诊断文本。这样用户看到错误时能直接看到 pi 到底吐了什么乱七八糟的东西而不是一个干巴巴的parse error。重试provider 层不做谁做这是整个对接里最容易踩的坑。直觉上对接一个 CLI 应该带重试可 HagiCode 在一个归档提案里主动移除了provider 层的全部自动重试。提案叫remove-provider-auto-retry-support。为什么不自动重试提案背景写得非常直白。重试逻辑原本散落在两个地方Hagicode.Libs里有一份OpenCode 风格的 fresh-runtime replayhagicode-core里又有一份ProviderErrorAutoRetryCoordinator。两边都各搞各的导致到底重不重试成了一个隐藏在 provider 内部的隐式行为会偷偷改变失败时机、会话续跑方式和聊天状态流。你想想就头大用户发一条消息provider 内部自己重试了三次前两次都失败、第三次成功了。上层完全不知道中间发生了什么会话状态、token 计数、UI 进度全部对不上。这种隐式行为怎么说呢是架构里的慢性毒药罢了。于是边界被收敛成一句话provider 收敛回单次尝试语义调用方需将无重试状态视为正常单次执行结果。落到 PiProvider 上是什么样落到代码上就是三件事PiOptions里没有任何 retry 相关字段——没有maxAttempts、没有retryDelay、没有retryClassifier。ExecuteAsync一次 pi 进程跑完就结束失败直接给terminal.failed。之前为自动重试服务的那些分类器ClaudeCodeRetryableTerminalFailureClassifier、CodexRetryableTerminalFailureClassifier之类只要纯粹服务于自动重试的全部从活跃路径移除。但请注意重试能力并没有消失只是上移了。提案明确写着为后续由更高层统一接管重试留出稳定边界。配置项providerErrorAutoRetry的 DTO、归一化、序列化、前端设置页 round-trip 全部保留只是它不再驱动 provider 执行。毕竟有些东西不是真的不要了只是换了种方式留着而已。那要重试怎么办如果你要在 pi 之上加重试正确做法是在PiCliProvider的调用方做——比如你的会话编排层HagiCode 里是 Orleans 的SessionGrain前端可能是 chat 编排层。拿到terminal.failed后自己判断是否可重试自己决定延迟和次数再发一次ExecuteAsync。一个最小可用模式长这样// 重试逻辑放在调用方不要塞回 PiProvider// 否则会破坏 provider 刚建立起来的单次尝试边界async TaskAIResponse ExecuteWithRetryAsync(AIRequest req, int maxAttempts, CancellationToken ct){for (var attempt 1; ; attempt){var response await provider.ExecuteAsync(req, ct);// 成功或达到上限就返回if (response.FinishReason ! FinishReason.Unknown || attempt maxAttempts)return response;// 只对可重试的终态失败重试网络、5xx、进程崩溃// model rejected、auth failure 这类重试也无意义别重试await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);}}判断可重试的分类逻辑现在不在 provider 里调用方自己定义。providerErrorAutoRetry配置maxAttempts、retryDelay、enabled仍可从前端设置页读到但真正驱动重试的是你的编排层不是PiProvider。这一点请反复念三遍。取消token 透传 三段式停机取消这事PiProvider 自己几乎不实现全委托给CliProcessManagerPiProvider 只负责两件事把CancellationToken传下去异常时做善后。全链路透传链路是这样的一路传到底调用方 CancellationToken→ PiCliProvider.StreamCoreAsync(cancellationToken)→ PiProvider.ExecuteProcessAsync([EnumeratorCancellation] cancellationToken)→ ReadLineAsync(cancellationToken) / WaitForExitAsync(cancellationToken)→ 异常时 _processManager.StopAsync(handle, CancellationToken.None)注意最后一行清理的时候用的是CancellationToken.None不是用户传进来的那个 token。这是个细节但极其重要。原因是用户的 token已经取消了。如果你拿这个已经取消的 token 去做清理清理任务会立刻被取消掉进程就变孤儿了——pi 还在后台跑没人收CPU 和内存白白占着。所以清理必须用CancellationToken.None确保清理动作一定能执行完。其实跟人一样有些事得在它彻底停下来之后再好好收尾否则就是留一地鸡毛罢了。三段式递进停机CliProcessManager.StopProcessAsync是个三段式递进的停机流程时间常量定义在文件顶部// 优雅停止的耐心先给进程自己收尾的时间private static readonly TimeSpan GracefulStopTimeout TimeSpan.FromSeconds(2);// 强制 kill 后等待进程真正退出的耐心private static readonly TimeSpan StopWaitTimeout TimeSpan.FromSeconds(5);三段是这样递进的中断信号。TryInterruptAsync先往 stdin 写一个\u0003就是 CtrlC 字符Unix 下再额外kill -INT pid。这一步是为了让 pi 自己优雅收尾——它能感知到中断把正在写的东西收个尾。优雅等待。最多等 2 秒看进程是不是自己退了。强制 kill。还没退就直接Process.Kill(entireProcessTree: true)把整棵进程树一起杀再最多等 5 秒确认它真死了。为什么要entireProcessTree: true因为 pi 跑工具的时候会派生子进程——比如 provider 路由到的本地模型进程、跑的 bash 子进程。只杀父进程子进程会变孤儿继续跑。整棵树一起杀才干净。Windows 下没有 SIGINT 这回事只能靠 CtrlC 字符所以跨平台行为会有差异这个心里要有数。PiProvider 的异常善后PiProvider 的ExecuteProcessAsync在ReadLineAsync抛异常时会用ExceptionDispatchInfo.Capture把异常暂存跳出循环后调StopAsync清理进程再pendingException.Throw()把原始异常重新抛给上层。为什么要暂存再抛因为如果直接抛进程还没来得及回收就成了孤儿如果在StopAsync之前抛清理逻辑根本走不到。暂存一下先保证进程一定被回收再把原始的OperationCanceledException语义完整保留给调用方——调用方拿到这个异常就能判断哦是用户主动取消而不是出错了。启动失败的统一契约还有个细节值得单独提一下。进程启动失败——比如 pi 可执行文件不存在、权限不对——PiProvider不抛异常而是合成一条terminal.failed消息然后yield break。为什么要这样因为如果抛异常上层消费方就得处理两种完全不同的语义一种是流式消费过程中正常的消息一种是还没开始流就抛的异常。这会让消费方的await foreach变得特别难写。统一成永远先给你消息、再结束流之后消费方的逻辑就一致了拿到terminal.failed就算失败拿到terminal.completed就算成功不需要 try/catch 分叉处理。这是个小但重要的设计决策让契约稳定下来。实践消费流的正确姿势参考 HagiCode 里PiScenarioMessageReaderlibs 的 console 测试场景和PiCliProvider.StreamCoreAsynccore 的 thin adapter消费方大概长这样await foreach (var message in provider.ExecuteAsync(options, prompt, cancellationToken)){// 1. 失败要优先短路别再处理后续消息if (NormalizedAcpCliAdapter.TryGetFailureMessage(message.Content, out var failure)){yield return new AIStreamingChunk { Type StreamingChunkType.Error, ErrorMessage failure };yield break; // terminal.failed 之后流就结束了}// 2. assistant 文本是 cumulative snapshot自己再做一次增量计算if (message.Type assistant TryGetText(message.Content, out var text)){var delta ReconcileSnapshot(text); // 前缀比对if (!string.IsNullOrEmpty(delta)) yield return Chunk(delta);}// 3. terminal.completed 是唯一可靠的结束信号if (message.Type terminal.completed) break;}常见坑速查把这一路踩过的坑整理成一张表方便后来人现象原因处理前端看到 assistant 文本重复没做 cumulative 转 delta用ReconcileAssistantTextSnapshot做前缀比对取消后进程还在跑清理用了已经取消的 token改用CancellationToken.None做清理重试不生效把重试写进了 PiProvider但 provider 是单次尝试语义上移到调用方编排层pi 报错信息丢失没读terminal.failed的诊断字段完整透传text/invalid_output_lines/stderr工具调用中途收到思考碎片直接转发了 thinking 事件缓冲到 turn 结束再DrainBufferedThinkingMessages怎么验证libs 层用StubCliProcessManagermock 进程单测覆盖参数构建、事件归一化、增量去重、失败透传这些纯逻辑。真实 CLI 路径用HAGICODE_REAL_CLI_TESTS环境变量 opt-in用真实模型跑 trip 场景。core 层的PiCliProviderTests验证 thin adapter 的AIStreamingChunk投影和 session binding。# 在 Hagicode.Libs 仓库跑 Pi 相关单测dotnet test --filter FullyQualifiedName~PiProviderTests# 跑真实 CLI 集成测试需要本地装好 piHAGICODE_REAL_CLI_TESTS1 dotnet test --filter FullyQualifiedName~PiProviderTests.RealCli总结把这三件事串起来对接 pi 的心智模型其实就一句话让每一层只做自己的事。消息解析交给PiJsonEventMapper私有事件归一化成共享CliMessagecumulative snapshot 转成 deltathinking 缓冲到 turn 结束。重试交给调用方provider 单次尝试谁想重试谁自己在上层做配置保留但不再驱动 provider。取消交给CliProcessManagerCancellationToken全链路透传清理用CancellationToken.None三段式递进停机中断信号 → 优雅等待 → 强制 kill 整棵进程树。这套边界划清楚之后对接一个新的 agent CLI 几乎成了流水线活——你只需要写一个新的XxxProvider和XxxJsonEventMapper重试、取消、消息契约、错误处理这些横切逻辑全部复用。这也是 HagiCode 能同时支持多个 agent CLI 后端claude code、codex、pi、gemini cli 等等而不至于乱套的根本原因。最后再说一遍那个最重要的边界不要在 provider 层加重试。把这一点想通对接 agent CLI 这件事也就过去大半了……总结回到“Pi Agent 对接实现消息解析、重试与取消”这个主题真正值得反复确认的不是零散技巧而是约束条件、实现边界和工程取舍是否已经看清。只要把文中的判断依据沉淀成稳定的检查项后续面对类似问题时就能更快做出可靠决策。