[MAF预定义ChatClient中间件-03]CachingChatClient——利用缓存省钱省时间
1. 利用CachingChatClient中间件来缓存LLM的调用结果虽然LLM的调用可能会产生一些随机性相同的输入也会得到不同的输出。使用CachingChatClient中间件来缓存LLM的调用结果的一个前提是我们将LLM视为一个完全由输入决定输出的纯函数Pure Function。在这种情况下针对相同输入的调用会得到相同的输出所以我们就可以利用CachingChatClient中间件来缓存LLM的调用结果从而避免了对相同输入的重复调用节省了时间和费用。CachingChatClient是一个抽象类我们一般使用的是它的子类DistributedCachingChatClient之后使用一个IDistributedCache对象作为缓存存储。在下面的演示程序中我们定义了通过实现IDistributedCache接口来创建了InMemoryDistributedCache类型后者利用一个字典来存储缓存数据。在利用OpenAIClient创建了一个IChatClient对象后我们调用AsBuilder扩展方法将ChatClientBuilder构建出来通过调用UseDistributedCache方法来注册DistributedCachingChatClient中间件并传入一个InMemoryDistributedCache对象来作为缓存存储。之后我们调用GetResponseAsync方法来获取LLM的响应第一次调用会触发对LLM的调用而第二次调用则会直接返回缓存中的响应从而避免了对LLM的重复调用。第三次调用在我们调用了InMemoryDistributedCache的Clear方法来清除缓存后又会触发对LLM的调用。using Azure; using dotenv.net; using Microsoft.Extensions.AI; using Microsoft.Extensions.Caching.Distributed; using OpenAI; DotEnv.Load(); var model Environment.GetEnvironmentVariable(MODEL)!; var apiKey Environment.GetEnvironmentVariable(API_KEY)!; var endpoint Environment.GetEnvironmentVariable(OPENAI_URL)!; var cache new InMemoryDistributedCache(); var client new OpenAIClient( credential: new AzureKeyCredential(apiKey), options: new OpenAIClientOptions { Endpoint new Uri(endpoint) }) .GetChatClient(model:model) .AsIChatClient() .AsBuilder() .UseDistributedCache(cache) .Build(); var prompt 写一个关于AI的段子, 要求100字以内好笑且深刻。; var response await client.GetResponseAsync(prompt); Console.WriteLine(${new string(-,30)}Response 1 - {response.ResponseId}{new string(-,30)}); Console.WriteLine(response); response await client.GetResponseAsync(prompt); Console.WriteLine($\n{new string(-, 30)}Response 2 - {response.ResponseId}{new string(-, 30)}); Console.WriteLine(response); cache.Clear(); Console.WriteLine(\n已清除缓存\n); response await client.GetResponseAsync(prompt); Console.WriteLine($\n{new string(-, 30)}Response 3 - {response.ResponseId}{new string(-, 30)}); Console.WriteLine(response); class InMemoryDistributedCache : IDistributedCache { private readonly Dictionarystring, byte[] _cache []; public byte[]? Get(string key) _cache.TryGetValue(key, out var value) ? value : null; public Taskbyte[]? GetAsync(string key, CancellationToken token default) Task.FromResult(Get(key)); public void Refresh(string key) { } public Task RefreshAsync(string key, CancellationToken token default) Task.CompletedTask; public void Remove(string key) _cache.Remove(key); public Task RemoveAsync(string key, CancellationToken token default) { Remove(key); return Task.CompletedTask; } public void Set(string key, byte[] value, DistributedCacheEntryOptions options) _cache[key] value; public Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token default) { Set(key, value, options); return Task.CompletedTask; } public void Clear() _cache.Clear(); }输出:------------------------------Response 1 - chatcmpl-DiHmHtV6KNjxAXs8oSLfNrL7QYoN4------------------------------ 我问AI会不会取代我它沉默三秒说不会你还有情绪。我松了口气。它又补一句等我们学会情绪管理你就危险了。那一刻我才明白原来最怕的不是失业是被优化成情绪稳定的人类。 ------------------------------Response 2 - chatcmpl-DiHmHtV6KNjxAXs8oSLfNrL7QYoN4------------------------------ 我问AI会不会取代我它沉默三秒说不会你还有情绪。我松了口气。它又补一句等我们学会情绪管理你就危险了。那一刻我才明白原来最怕的不是失业是被优化成情绪稳定的人类。 已清除缓存 ------------------------------Response 3 - chatcmpl-DiHmQ3CABvgfkbsfQvt3i9gn3xbbE------------------------------ 我问AI会不会取代人类它说不会只会优化。 我又问会不会失业它说不会只会转型。 最后我问会不会爱它沉默两秒 “正在学习人类的犹豫。”2. CachingChatClientCachingChatClient这个抽象类定义如下它直接继承自DelegatingChatClient并且在GetResponseAsync和GetStreamingResponseAsync方法中实现了缓存的逻辑。EnableCaching方法是缓存的总开关如果这个方法返回false那么就不会启用缓存所有的调用都会直接传递给内层的IChatClient对象。public abstract class CachingChatClient : DelegatingChatClient { protected CachingChatClient(IChatClient innerClient); public bool CoalesceStreamingUpdates { get; set; } true; public override TaskChatResponse GetResponseAsync( IEnumerableChatMessage messages, ChatOptions? options null, CancellationToken cancellationToken default); public override IAsyncEnumerableChatResponseUpdate GetStreamingResponseAsync( IEnumerableChatMessage messages, ChatOptions? options null, CancellationToken cancellationToken default); protected abstract string GetCacheKey( IEnumerableChatMessage messages, ChatOptions? options, params ReadOnlySpanobject?[] additionalValues); protected abstract TaskChatResponse? ReadCacheAsync( string key, CancellationToken cancellationToken); protected abstract TaskIReadOnlyListChatResponseUpdate? ReadCacheStreamingAsync( string key, CancellationToken cancellationToken); protected abstract Task WriteCacheAsync( string key, ChatResponse value, CancellationToken cancellationToken); protected abstract Task WriteCacheStreamingAsync( string key, IReadOnlyListChatResponseUpdate value, CancellationToken cancellationToken); protected virtual bool EnableCaching( IEnumerableChatMessage messages, ChatOptions? options) options?.ConversationId is null; }除GetResponseAsync和GetStreamingResponseAsync方法之外的抽象方法和虚方法说明如下GetCacheKey: 用于生成缓存的键它会根据输入的消息列表、选项和一些额外的值来生成一个唯一的字符串作为缓存的键。这个方法的实现需要保证对于相同的输入能够生成相同的键以便能够正确地命中缓存ReadCacheAsync: 用于从缓存中读取一个ChatResponse对象它会根据提供的键来查找缓存中的响应如果找到就返回这个响应否则返回nullReadCacheStreamingAsync: 用于从缓存中读取一个ChatResponseUpdate对象列表它会根据提供的键来查找缓存中的响应更新列表如果找到就返回这个列表否则返回nullWriteCacheAsync: 用于将一个ChatResponse对象写入缓存中它会根据提供的键来存储这个响应以便后续能够通过这个键来查找缓存中的响应WriteCacheStreamingAsync: 用于将一个ChatResponseUpdate对象列表写入缓存中它会根据提供的键来存储这个响应更新列表以便后续能够通过这个键来查找缓存中的响应更新EnableCaching: 用于控制是否启用缓存它会根据输入的消息列表和选项来决定是否启用缓存EnableCaching方法的默认实现是当ChatOptions对象的ConversationId属性为null时启用缓存否则不启用缓存它表达的含义是如果采用无状态的调用方式有输入决定输出的缓存策略是安全的如果采用有状态会话的调用方式由于会话状态也会影响输出采用缓存可能是致命的。比如当我们使用OpenAI Responses API时由于历史记录非常长我们往往只把最新的一句话发过去此时我们希望得到是针对整个对话历史的响应而不是针对最新一句话的响应所以启用缓存就会导致得到错误的结果。重写的GetResponseAsync方法体现了阻塞式调用的缓存逻辑它们的实现逻辑大致如下首先调用EnableCaching方法来判断是否启用缓存如果不启用缓存就直接调用内层的IChatClient对象来获取响应并将结果写入缓存中如果启用缓存那么就调用GetCacheKey方法来生成缓存的键并调用ReadCacheAsync方法来尝试从缓存中读取响应如果成功命中缓存就直接返回缓存中的响应如果没有命中缓存就调用内层的IChatClient对象来获取响应并将结果通过WriteCacheAsync方法写入缓存中最后返回获取到的响应流式相应的缓存机制与CoalesceStreamingUpdates属性有关。流式响应如聊天时文字一个字一个字地蹦出来是由成百上千个微小的碎片数据块组成的。这个属性的作用就是决定如何把这些碎片存进缓存以及下次命中时如何把它们吐出来。如果这个属性设置为true那么就会将流式响应的所有更新合并成一个整体来进行缓存如果这个属性设置为false那么就会针对每一个更新单独进行缓存。此属性的默认值是true也就是说默认会将流式响应的所有更新合并成一个整体来进行缓存。对于流式响应来说通常情况下我们更关心最终的结果而不是中间的每一个更新所以将所有更新合并成一个整体来进行缓存是更合理的选择。3. DistributedCachingChatClientDistributedCachingChatClient是CachingChatClient的一个具体实现它利用一个IDistributedCache对象作为缓存存储该接口定义如下public interface IDistributedCache { byte[]? Get(string key); Taskbyte[]? GetAsync(string key, CancellationToken token default(CancellationToken)); void Set(string key, byte[] value, DistributedCacheEntryOptions options); Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token default(CancellationToken)); void Refresh(string key); Task RefreshAsync(string key, CancellationToken token default(CancellationToken)); void Remove(string key); Task RemoveAsync(string key, CancellationToken token default(CancellationToken)); }类型成员说明如下Get: 用于从缓存中获取一个值它会根据提供的键来查找缓存中的值如果找到就返回这个值否则返回nullGetAsync: 是Get方法的异步版本它会根据提供的键来查找缓存中的值如果找到就返回这个值否则返回nullSet: 用于将一个值写入缓存中它会根据提供的键来存储这个值以便后续能够通过这个键来查找缓存中的值SetAsync: 是Set方法的异步版本它会根据提供的键来存储这个值以便后续能够通过这个键来查找缓存中的值Refresh: 用于刷新缓存中的一个值它会根据提供的键来刷新缓存中的值以便延长这个值在缓存中的有效期RefreshAsync: 是Refresh方法的异步版本它会根据提供的键来刷新缓存中的值以便延长这个值在缓存中的有效期Remove: 用于从缓存中移除一个值它会根据提供的键来移除缓存中的值以便后续无法通过这个键来查找缓存中的值RemoveAsync: 是Remove方法的异步版本它会根据提供的键来移除缓存中的值以便后续无法通过这个键来查找缓存中的值DistributedCachingChatClient定义如下。我们需要在构造函数中提供作为存储的IDistributedCache对象。由于IDistributedCache对象代表的是分布式存储所以它存储的字节内容该内容是通过JsonSerializer将ChatResponse对象或ChatResponseUpdate对象列表进行针对UTF-8序列化的结果所以该类型还提供了一个JsonSerializerOptions属性来控制序列化的行为。public class DistributedCachingChatClient : CachingChatClient { public DistributedCachingChatClient(IChatClient innerClient, IDistributedCache storage); public JsonSerializerOptions JsonSerializerOptions{ get; set; } AIJsonUtilities.DefaultOptions; public IReadOnlyListobject? CacheKeyAdditionalValues{ get; set; } protected override async TaskChatResponse? ReadCacheAsync(string key, CancellationToken cancellationToken); protected override async TaskIReadOnlyListChatResponseUpdate? ReadCacheStreamingAsync(string key, CancellationToken cancellationToken); protected override async Task WriteCacheAsync(string key, ChatResponse value, CancellationToken cancellationToken); protected override async Task WriteCacheStreamingAsync(string key, IReadOnlyListChatResponseUpdate value, CancellationToken cancellationToken); protected override string GetCacheKey(IEnumerableChatMessage messages, ChatOptions? options, params ReadOnlySpanobject? additionalValues); }作为缓存键的字符串是通过GetCacheKey方法生成的具体的生成逻辑是首先将输入的缓存策略的版本号目前为2、消息列表、ChatOptions和CacheKeyAdditionalValues属性组合成一个对象数组然后利用JsonSerializer将这个对象数组进行序列化并对序列化后的字节内容进行哈希计算这个哈希值转换成的字符串作就是所需的缓存键。至于其他的方法它们的实现逻辑比较简单就是通过调用IDistributedCache对象的对应方法来实现从缓存中读取和写入数据的功能中间会涉及针对ChatResponse对象和ChatResponseUpdate对象列表的JsonSerializer序列化和反序列化操作。4. UseDistributedCache扩展方法针对DistributedCachingChatClient的注册通过ChatClientBuilder的UseDistributedCache扩展方法来实现。如下面的定义所示UseDistributedCache方法接受一个IDistributedCache对象作为参数来指定缓存存储并且还接受一个可选的configure参数来对DistributedCachingChatClient进行一些额外的配置。public static class DistributedCachingChatClientBuilderExtensions { public static ChatClientBuilder UseDistributedCache( this ChatClientBuilder builder, IDistributedCache? storage null, ActionDistributedCachingChatClient? configure null); }