[MAF工作流框架揭秘-10]基于Open-Telemetry的调用链跟踪
可观测行已经成为部署于生产环境的应用必需具备的能力。而提到可观测行Open-Telemetry无疑是目前最流行的开源工具之一。基于Open-Telemetry的调用链跟踪和性能监控被应用到MAF的方方面面在如下的两篇文章中我详细介绍了基于ChatClient和Agent的调用链跟踪和性能监控OpenTelemetryChatClient-实现链路跟踪和性能监控基于Agent的调用链跟踪和性能监控就调用链跟踪来说起始Wokflow更有必要。原因很简单Worflow往往具有复杂的网络拓扑结构具体的执行路径由消息路由决定。每次调用都可以因为输入、当前外部状态以及LLM的输出的不同而走不同的路径。基于Open-Telemetry的调用链跟踪可以帮助我们清晰地看到每次调用的具体执行路径进而分析和优化Workflow的设计是否合理并在出错的时候快速定位问题所在的环节。1. 跟踪Workflow的执行路径在正是介绍Workflow针对Open-Telemetry的调用链跟踪的设计和实现原理之前我们先来看一个简单的示例程序来看看Workflow的调用链跟踪在实际中的表现。为了能够收集跟踪数据并可视化呈现调用的完整链条我们本机执行如下的命令安装了Jaegerdockerrun-d--namejaeger\-p16686:16686\-p4317:4317\jaegertracing/all-in-one:latest在如下的演示程序中我们构建了一个简单的Workflow包含4个节点分别是Foo、Bar、Baz和Qux其中Foo节点的输出同时作为Bar和Baz节点的输入而Qux节点则需要等待Bar和Baz节点都完成后才能执行。我们在Workflow构建的时候调用了WithOpenTelemetry方法来启用Open-Telemetry的调用链跟踪功能并且提供了一个ActivitySource对象来收集跟踪数据。在每个节点的实现中我们模拟了一个随机的执行时间来模拟操作的耗时。usingMicrosoft.Agents.AI.Workflows;usingOpenTelemetry;usingOpenTelemetry.Metrics;usingOpenTelemetry.Resources;usingOpenTelemetry.Trace;usingSystem.Diagnostics;varserviceNamemaf.workflow;varservceVersion1.0.0;varrandomnewRandom();varfooCreateExecutor(Foo);varbarCreateExecutor(Bar);varbazCreateExecutor(Baz);varquxCreateExecutor(Qux);using(Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName,serviceVersion:servceVersion)).AddSource(serviceName).AddConsoleExporter().AddOtlpExporter(options{options.EndpointnewUri(http://localhost:4317);options.ProtocolOpenTelemetry.Exporter.OtlpExportProtocol.Grpc;}).Build()){while(true){awaitusingvarrunawaitInProcessExecution.Default.RunStreamingAsync(BuildWorkflow(),start);awaitrun.RunToCompletionAsync();}}ExecutorBindingCreateExecutor(stringid)newFuncstring,ValueTaskstring(asyncinput{awaitTask.Delay(random.Next(100,500));returnid;}).BindAsExecutor(id);WorkflowBuildWorkflow(){returnnewWorkflowBuilder(foo).AddFanOutEdge(foo,[bar,baz]).AddFanInBarrierEdge([bar,baz],qux).WithOutputFrom(qux).WithOpenTelemetry(optionsoptions.EnableSensitiveDatatrue,newActivitySource(serviceName)).Build();}由Foo、Bar、Baz和Qux四个节点构成的Workflow具有如下的结构Jaeger呈现的调用具有与之匹配的结构我们可以清晰地看到每次调用的具体执行路径从外到内调用链记录下如下描述执行操作的Spanworkflow.session: 代表一个可以涵盖多次调用的Workflow Session一个Session绑定一个StreamingRun或者Run对象workflow_invoke: 代表在Session内针对Workflow的一次调用executor.process: 代表Workflow中一个Executor节点的一次执行message_send: 代表节点将消息发送给下一个节点edge_group.process: 代表针对始于某个节点的多条边的处理除了执行Workflow的调用链跟踪Workflow的构建过程同样被记录在另一个调用链中。![Alternative Text][1782049741740]虽然这个调用只有一个单一的Span但是这个Span利用一个名为workflow.definition的Tag记录了Workflow的定义信息这个信息可能帮助我们确定构建的Workflow是否符合预期。在我们这个例子中构建的Workflow被描述为如下这个JSON{executors:{Foo:{executorType:{assemblyName:Microsoft.Agents.AI.Workflows, Version1.10.0.0, Cultureneutral, PublicKeyTokenf300afd708cefcd3,typeName:Microsoft.Agents.AI.Workflows.FunctionExecutor2[[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e]]},executorId:Foo},Bar:{executorType:{assemblyName:Microsoft.Agents.AI.Workflows, Version1.10.0.0, Cultureneutral, PublicKeyTokenf300afd708cefcd3,typeName:Microsoft.Agents.AI.Workflows.FunctionExecutor2[[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e]]},executorId:Bar},Baz:{executorType:{assemblyName:Microsoft.Agents.AI.Workflows, Version1.10.0.0, Cultureneutral, PublicKeyTokenf300afd708cefcd3,typeName:Microsoft.Agents.AI.Workflows.FunctionExecutor2[[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e]]},executorId:Baz},Qux:{executorType:{assemblyName:Microsoft.Agents.AI.Workflows, Version1.10.0.0, Cultureneutral, PublicKeyTokenf300afd708cefcd3,typeName:Microsoft.Agents.AI.Workflows.FunctionExecutor2[[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version10.0.0.0, Cultureneutral, PublicKeyToken7cec85d7bea7798e]]},executorId:Qux}},edges:{Foo:[{$type:1,hasAssigner:false,kind:1,connection:{sourceIds:[Foo],sinkIds:[Bar,Baz]}}],Bar:[{$type:2,kind:2,connection:{sourceIds:[Bar,Baz],sinkIds:[Qux]}}],Baz:[{$type:2,kind:2,connection:{sourceIds:[Bar,Baz],sinkIds:[Qux]}}]},requestPorts:[],startExecutorId:Foo,outputExecutorIds:{Qux:[]}}2. 将Agent和LLM调用的操作纳入调用链跟踪调用链的完整性决定了其价值。如果Workflow具有一个或者多个基于AIAgent的节点那么将Agent和LLM调用的操作纳入调用链跟踪是非常有必要的。我们可以按照开篇提到的两篇文章中介绍的方式启用Agent和LLM调用的操作的调用链跟踪。在如下这个演示程序中我们创建的Workflow只有一个基于AIAgent的节点。在构建这个AIAgent管道时我们调用了UseOpenTelemetry方法来启用基于Agent调用的调用链跟踪。内部的ChatClient管道同样调用了UseOpenTelemetry方法来启用基于LLM调用的调用链跟踪。usingAzure;usingMicrosoft.Agents.AI;usingMicrosoft.Agents.AI.Workflows;usingMicrosoft.Extensions.AI;usingOpenAI;usingOpenTelemetry;usingOpenTelemetry.Metrics;usingOpenTelemetry.Resources;usingOpenTelemetry.Trace;usingSystem.ComponentModel;usingSystem.Diagnostics;dotenv.net.DotEnv.Load();varserviceNamemaf.workflow;varservceVersion1.0.0;varendpointEnvironment.GetEnvironmentVariable(OPENAI_URL)!;varmodelEnvironment.GetEnvironmentVariable(MODEL)!;varapiKeyEnvironment.GetEnvironmentVariable(API_KEY)!;varagentnewOpenAIClient(credential:newAzureKeyCredential(apiKey),options:newOpenAIClientOptions{EndpointnewUri(endpoint)}).GetChatClient(model:model).AsIChatClient().AsBuilder().UseOpenTelemetry(sourceName:serviceName).Build().AsAIAgent(name:MyAgent,tools:[AIFunctionFactory.Create(GetWeather,nameof(GetWeather))]).AsBuilder().Use(innernewOpenTelemetryAgent(inner,serviceName,true)).Build();using(Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName,serviceVersion:servceVersion)).AddSource(serviceName).AddOtlpExporter(options{options.EndpointnewUri(http://localhost:4317);options.ProtocolOpenTelemetry.Exporter.OtlpExportProtocol.Grpc;}).Build()){while(true){varworkflownewWorkflowBuilder(agent).WithOutputFrom(agent).WithOpenTelemetry(optionsoptions.EnableSensitiveDatatrue,newActivitySource(serviceName)).Build();awaitusingvarrunawaitInProcessExecution.Default.RunAsync(workflow,根据目前苏州天气提供着装建议。);foreach(varresultinrun.NewEvents.OfTypeWorkflowOutputEvent())Console.Write(result.Data);awaitTask.Delay(5000);}}[Description(获取指定城市天气信息)]staticstringGetWeather([Description(城市名称)]stringcity)${city}目前天气晴气温25摄氏度。;在Jaeger中我们同样可以清晰地看到Agent、工具和LLM调用的操作被纳入了调用链跟踪3. WorkflowTelemetryContext基于Open-Telemetry调用链跟踪在.NET是通过Activity、ActivitySource和ActivityListener这三个核心类型实现的代表每个跟踪操作的Span对应一个Activity对象。我们前面利用Jaeger展示了Workflow构建和调用的执行路径涉及的每个操作对应的Activity都是通过WorkflowTelemetryContext这个类来创建的。internalsealedclassWorkflowTelemetryContext{publicActivity?StartWorkflowBuildActivity();publicActivity?StartWorkflowSessionActivity();publicActivity?StartWorkflowRunActivity();publicActivity?StartExecutorProcessActivity(stringexecutorId,string?executorType,stringmessageType,object?message);publicActivity?StartEdgeGroupProcessActivity();publicActivity?StartMessageSendActivity(stringsourceId,string?targetId,object?message);}WorkflowTelemetryContext用于创建Activity的方法说明如下StartWorkflowBuildActivity: 代表Workflow的构建操作名称为workflow.buildStartWorkflowSessionActivity: 代表Workflow Session的开始名称为workflow.sessionStartWorkflowRunActivity: 代表Workflow Run的开始名称为workflow.invokeStartExecutorProcessActivity: 代表Workflow中一个Executor节点的执行操作名称为executor.processStartEdgeGroupProcessActivity: 代表针对始于某个节点的多条边的处理操作名称为edge_group.processStartMessageSendActivity: 代表节点将消息发送给下一个节点的操作名称为message.send。对于Tracing这种对性能影响极大的高频操作一般都需要一个显式的开关。对于Worflow来说只有WorkflowTelemetryContext的IsEnabled属性为true我们才会创建对应的Activity来记录调用链跟踪数据。静态属性Disabled提供了一个默认了一个IsEnabled为false的WorkflowTelemetryContext实例。只有在启用的情况下WorkflowTelemetryContext才会创建一个真正的ActivitySource来创建Activity对象。如果没有显式指定ActivitySource那么WorkflowTelemetryContext会默认创建一个命名为Microsoft.Agents.AI.Workflows的ActivitySource。internalsealedclassWorkflowTelemetryContext{publicboolIsEnabled{get;}publicstaticWorkflowTelemetryContextDisabled{get;}newWorkflowTelemetryContext();publicWorkflowTelemetryOptionsOptions{get;}publicActivitySourceActivitySource{get;}publicWorkflowTelemetryContext(WorkflowTelemetryOptionsoptions,ActivitySource?activitySourcenull);}4. 跟踪实现WorkflowBuilder定义了如下这个名为_telemetryContext的字段来持有WorkflowTelemetryContext对象其默认值来源于WorkflowTelemetryContext的静态属性Disabled。它同时提供了一个SetTelemetryContext方法来设置这个字段的值。WithOpenTelemetry扩展方法会创建一个新的WorkflowTelemetryContext实例并通过调用SetTelemetryContext方法将其设置到WorkflowBuilder中。publicclassWorkflowBuilder{privateWorkflowTelemetryContext_telemetryContextWorkflowTelemetryContext.Disabled;internalvoidSetTelemetryContext(WorkflowTelemetryContextcontext)_telemetryContextcontext;}publicstaticWorkflowBuilderWithOpenTelemetry(thisWorkflowBuilderbuilder,ActionWorkflowTelemetryOptions?configurenull,ActivitySource?activitySourcenull){WorkflowTelemetryOptionsworkflowTelemetryOptionsnewWorkflowTelemetryOptions();configure?.Invoke(workflowTelemetryOptions);WorkflowTelemetryContexttelemetryContextnewWorkflowTelemetryContext(workflowTelemetryOptions,activitySource);builder.SetTelemetryContext(telemetryContext);returnbuilder;}WithOpenTelemetry扩展方法除了提供一个可选的ActivitySource参数用来指定构建WorkflowTelemetryContext实例时使用的ActivitySource之外还提供了一个可选的configure参数用来配置WorkflowTelemetryOptions对象。publicsealedclassWorkflowTelemetryOptions{publicboolEnableSensitiveData{get;set;}publicboolDisableWorkflowBuild{get;set;}publicboolDisableWorkflowRun{get;set;}publicboolDisableExecutorProcess{get;set;}publicboolDisableEdgeGroupProcess{get;set;}publicboolDisableMessageSend{get;set;}}WorkflowTelemetryOptions提供的配置选项说明如下EnableSensitiveData: 是否在调用链跟踪中包含敏感数据比如消息内容等默认值为falseDisableWorkflowBuild: 是否禁用Workflow构建操作的调用链跟踪默认值为falseDisableWorkflowRun: 是否禁用Workflow调用操作的调用链跟踪默认值为falseDisableExecutorProcess: 是否禁用Workflow中Executor节点执行操作的调用链跟踪默认值为falseDisableEdgeGroupProcess: 是否禁用针对始于某个节点的多条边的处理操作的调用链跟踪默认值为falseDisableMessageSend: 是否禁用节点将消息发送给下一个节点的操作的调用链跟踪默认值为false。当我们调用WorkflowBuilder的Build方法创建Workflow对象时内部会调用WorkflowTelemetryContext的StartWorkflowBuildActivity方法来创建一个代表Workflow构建操作的Activity对象并设置以Workflow定义为核心的Tag。于此同时WorkflowBuilder会将持有的WorkflowTelemetryContext对象传递给Workflow的构造函数并由Workflow来持有这个对象。Workflow在执行过程中会根据需要调用WorkflowTelemetryContext的方法来创建Activity对象从而实现基于Open-Telemetry的调用链跟踪。publicclassWorkflow{internalWorkflowTelemetryContextTelemetryContext{get;}}