目录简介条件路由的应用场景实验案例准备工作定义数据传输模型入口节点垃圾邮件检测Executor下游节点A正常邮件处理发送下游节点B垃圾邮件处理构建工作流小结示例源码简介大家好我是Edison。上一篇我们学习了MAF中如何进行工作流的状态共享相信你一定对WorkflowContext有了更多的了解。本篇我们来了解下MAF中工作流是如何实现条件路由的。条件路由的应用场景在实际业务场景中一个AI工作流的多个步骤之间往往会传递一些数据有些步骤只会在数据满足一定条件下才会被触发而有些步骤只会在数据不满足条件的时候才被触发总之这种if-else的决策在工作流中很常见。在MAF中我们可以使用 Conditional Edge 即条件边 或 待决策函数的边来实现这种工作流内部if-else决策需求。实验案例今天来实践上面这个企业内部邮件检测的工作流案例假设我们是一个企业客服团队我们每天会接收成百上千封用户发来的Email邮件其中既包含正常咨询 也夹杂很多 垃圾信息。那么如何降低人工分拣成本减少人工负担 又同时保留 风控可观测性便于扩展更多节点就是我们需要解决的问题。因此我们可以构建一个基于LLM的工作流首先通过一个垃圾邮件检测器接收客户发来的邮件通过LLM判断是否为垃圾邮件输出一个判断结果关键数据IsSpam true/false。其次系统根据判断结果IsSpam决定交由哪个后续节点处理。如果判断结果是非垃圾邮件则转交给发送邮件执行器进行处理比如发送给对应的客服代表否则则交给垃圾邮件处理执行器进行截断并记录或者上报人工处理等等。最后结束工作流。这个案例展示了Conditional Edge 条件路由的经典场景后续它也可以扩展为多级路由。准备工作在今天的这个案例中我们仍然创建了一个.NET控制台应用程序安装了以下NuGet包Microsoft.Agents.AI.OpenAIMicrosoft.Agents.AI.WorkflowsMicrosoft.Extensions.AI.OpenAI我们的配置文件中定义了LLM API的信息{ OpenAI: { EndPoint: https://api.siliconflow.cn, ApiKey: ******************************, ModelId: Qwen/Qwen3-30B-A3B-Instruct-2507 } }这里我们使用 SiliconCloud 提供的 Qwen/Qwen3-30B-A3B-Instruct-2507 模型之前的 Qwen2.5 模型在这个案例中不适用。你可以通过这个URL注册账号https://cloud.siliconflow.cn/i/DomqCefW 获取大量免费的Token来进行本次实验。然后我们将配置文件中的API信息读取出来var config new ConfigurationBuilder() .AddJsonFile($appsettings.json, optional: false, reloadOnChange: true) .Build(); var openAIProvider config.GetSection(OpenAI).GetOpenAIProvider();定义数据传输模型首先我们定义一下在这个工作流中需要生成传递的数据模型1DetectionResult 拉件邮件检测结果public sealed class DetectionResult { [JsonPropertyName(is_spam)] public bool IsSpam { get; set; } [JsonPropertyName(reason)] public string Reason { get; set; } string.Empty; [JsonIgnore] public string EmailId { get; set; } string.Empty; }2EmailStateConstants 常量类似于Cache Key的作用​​​​​​​internal static class EmailStateConstants { public const string EmailStateScope EmailState; }3EmailMessage EmailResponse DTO作用​​​​​​​internal sealed class EmailMessage { [JsonPropertyName(email_id)] public string EmailId { get; set; } string.Empty; [JsonPropertyName(email_content)] public string EmailContent { get; set; } string.Empty; } public sealed class EmailResponse { [JsonPropertyName(response)] public string Response { get; set; } string.Empty; }入口节点垃圾邮件检测Executor这个垃圾邮件检测是本流程的核心节点它接收用户邮件内容并调用LLM做检测最后生成该邮件的唯一ID并将邮件原文写入工作流共享状态存储区返回唯一ID供下游节点使用。​​​​​​​internal sealed class SpamDetectionExecutor : ExecutorChatMessage, DetectionResult { private readonly AIAgent _agent; private readonly AgentThread _thread; public SpamDetectionExecutor(AIAgent agent) : base(SpamDetectionExecutor) { // 创建 Agent 和对话线程 this._agent agent; this._thread this._agent.GetNewThread(); } public override async ValueTaskDetectionResult HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken default) { var trackedEmail new EmailMessage { EmailId Guid.NewGuid().ToString(N), EmailContent message.Text }; await context.QueueStateUpdateAsync(trackedEmail.EmailId, trackedEmail, scopeName: EmailStateConstants.EmailStateScope, cancellationToken); var agentResponse await _agent.RunAsync(message, cancellationToken: cancellationToken); var detectionResult JsonSerializer.DeserializeDetectionResult(agentResponse.Text) ?? throw new InvalidOperationException(无法解析 Spam Detection 响应。); detectionResult.EmailId trackedEmail.EmailId; return detectionResult; } }在这个Executor中它接收我们如下所示定义好的Agent来实现​​​​​​​var spamDetectionAgent new ChatClientAgent( chatClient, new ChatClientAgentOptions(instructions: You are a spam detection assistant that labels spam emails with reasons.) { ChatOptions new() { ResponseFormat ChatResponseFormat.ForJsonSchemaDetectionResult() } });可以看到我们在ChatOptions中指定了该Agent返回的消息需要进行序列化到强类型便于后续通过强类型数据进行决策路由。下游节点A正常邮件处理发送这里我们针对识别到的正常邮件开发两个执行器假设其用于邮件处和转发1邮件处理读取共享状态区的原文然后调用Agent输出JSON回复。​​​​​​​internal sealed class EmailAssistantExecutor : ExecutorDetectionResult, EmailResponse { private readonly AIAgent _agent; private readonly AgentThread _thread; public EmailAssistantExecutor(AIAgent agent) : base(EmailAssistantExecutor) { // 创建 Agent 和对话线程 this._agent agent; this._thread this._agent.GetNewThread(); } public override async ValueTaskEmailResponse HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken default) { if (message.IsSpam) throw new InvalidOperationException(Spam 邮件不应进入 EmailAssistantExecutor。); var email await context.ReadStateAsyncEmailMessage(message.EmailId, scopeName: EmailStateConstants.EmailStateScope, cancellationToken) ?? throw new InvalidOperationException(找不到对应 Email 内容。); var agentResponse await _agent.RunAsync(email.EmailContent, _thread, cancellationToken: cancellationToken); var emailResponse JsonSerializer.DeserializeEmailResponse(agentResponse.Text) ?? throw new InvalidOperationException(无法解析 Email Assistant 响应。); return emailResponse; } }这里的Agent定义如下​​​​​​​var emailAssistantAgent new ChatClientAgent( chatClient, new ChatClientAgentOptions(instructions: You are an enterprise email assistant. You can provide professional Chinese responses to user.) { ChatOptions new() { ResponseFormat ChatResponseFormat.ForJsonSchemaEmailResponse() } });2邮件转发模拟邮件转发到具体的客服这里仅仅使用YieldOutputAsync完成工作流输出消息内容。​​​​​​​internal sealed class EmailSendingExecutor() : ExecutorEmailResponse(EmailSendingExecutor) { public override async ValueTask HandleAsync(EmailResponse message, IWorkflowContext context, CancellationToken cancellationToken default) { await context.YieldOutputAsync($ Email 已发送{message.Response}, cancellationToken); } }下游节点B垃圾邮件处理当判断到是垃圾邮件时转交给该执行器处理这里模拟输出了一段风险提示实际中可能是上报人工跟进等等操作​​​​​​​internal sealed class SpamHandlingExecutor() : ExecutorDetectionResult(SpamHandlingExecutor) { public override async ValueTask HandleAsync(DetectionResult message, IWorkflowContext context, CancellationToken cancellationToken default) { if (!message.IsSpam) throw new InvalidOperationException(非垃圾邮件不应进入 SpamHandlingExecutor。); await context.YieldOutputAsync($ 垃圾邮件{message.Reason}, cancellationToken); } }构建工作流现在万事俱备只欠一个Workflow现在Lets do it!Step1: 获取ChatClient​​​​​​​var chatClient new OpenAIClient( new ApiKeyCredential(openAIProvider.ApiKey), new OpenAIClientOptions { Endpoint new Uri(openAIProvider.Endpoint) }) .GetChatClient(openAIProvider.ModelId) .AsIChatClient();Step2: 实例化自定义Agent Executors​​​​​​​var spamDetectionExecutor new SpamDetectionExecutor(spamDetectionAgent); var emailAssistantExecutor new EmailAssistantExecutor(emailAssistantAgent); var sendEmailExecutor new EmailSendingExecutor(); var handleSpamExecutor new SpamHandlingExecutor();Step3: 创建路由决策工作流​​​​​​​Funcobject?, bool BuildCondition(bool expectedSpamFlag) detection detection is DetectionResult dr dr.IsSpam expectedSpamFlag; var conditionalWorkflow new WorkflowBuilder(spamDetectionExecutor) .AddEdge(spamDetectionExecutor, emailAssistantExecutor, condition: BuildCondition(false)) .AddEdge(emailAssistantExecutor, sendEmailExecutor) .AddEdge(spamDetectionExecutor, handleSpamExecutor, condition: BuildCondition(true)) .WithOutputFrom(handleSpamExecutor, sendEmailExecutor) .Build(); Console.OutputEncoding Encoding.UTF8; Console.WriteLine(✅ Conditional Workflow 构建完成);注意这里我们首先创建了一个委托方法写明了我们如何做决策的即通过判断DetectionResult的IsSpam字段来决定的。Step4: 测试工作流首先为了便于后续测试我们将执行工作流封装为一个静态方法​​​​​​​static async Task RunConditionalWorkflowAsync( Workflow conditionalWorkflow, string scenarioName, string emailContent, CancellationToken cancellationToken default) { Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━); Console.WriteLine($ 测试场景{scenarioName}); Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━); var chatMessage new ChatMessage(ChatRole.User, emailContent); await using var run await InProcessExecution.StreamAsync(conditionalWorkflow, chatMessage); await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); await foreach (WorkflowEvent evt in run.WatchStreamAsync()) { switch (evt) { case ExecutorCompletedEvent completedEvent: Console.WriteLine($✅ {completedEvent.ExecutorId} 完成); break; case WorkflowOutputEvent outputEvent: Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━); Console.WriteLine( 工作流执行完成); Console.WriteLine(━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n); Console.WriteLine(${outputEvent.Data}); break; case WorkflowErrorEvent errorEvent: Console.WriteLine(✨ 收到 Workflow Error Event); Console.WriteLine(${errorEvent.Data}); break; default: break; } } }测试用例1正常咨询邮件输入​​​​​​​var scenarioName1 正常咨询 → EmailAssistant 分支; var emailContent1 客服团队你好我想确认上周提交的采购订单是否已经发货如果还有缺少信息请告知。; await RunConditionalWorkflowAsync( conditionalWorkflow, scenarioName1, emailContent1); Console.WriteLine(✅ 正常邮件路径验证完成);测试结果如下图所示可以看见对于正常咨询邮件进行正常的邮件处理和转发。测试用例2垃圾邮件输入​​​​​​​var scenarioName2 垃圾邮件 → HandleSpam 分支; var emailContent2 令人惊喜的投资机会只需支付保证金即可在 24 小时内获得 10 倍收益点击可疑链接领取奖励。; await RunConditionalWorkflowAsync( conditionalWorkflow, scenarioName2, emailContent2); Console.WriteLine(✅ 垃圾邮件路径验证完成);测试结果如下图所示可以看见对于垃圾邮件进行有效的拦截后续还可以进行上报人工跟踪等等。小结本文介绍了MAF中的条件路由Conditional Edge以及如何实现条件路由最后给出了一个企业内部邮件检测工作流案例供参考。下一篇​​​​​​​我们将继续学习MAF中工作流的swtich-case路由模式来增强企业内部邮件安全案例的功能做到更为精细的分类处理。示例源码Github: https://github.com/EdisonTalk/MAFD引入地址