Nacos 2.x 源码深度解析 (四):配置中心服务端 —— 事件总线与数据持久化
一、架构通信篇《Nacos 2.x 源码深度解析 (一)架构整体全貌 —— 核心模块划分与版本演进》《Nacos 2.x 源码深度解析 (二)通信协议迭代 —— HTTP长轮询到gRPC演进》二、配置中心篇《Nacos 2.x 源码深度解析 (三)配置中心客户端 —— 启动加载与自动装配》《Nacos 2.x 源码深度解析 (四)配置中心服务端 —— 事件总线与数据持久化》《Nacos 2.x 源码深度解析 (五)gRPC 推送链路 —— 配置变更下发与动态刷新》《Nacos 2.x 源码深度解析 (六)三级缓存体系 —— 降级兜底与故障自愈机制》三、服务注册发现篇《Nacos 2.x 源码深度解析 (七)服务注册流程 —— 客户端上报与服务端存储》《Nacos 2.x 源码深度解析 (八)服务订阅机制 —— 从首次订阅到gRPC双向流变更通知》四、grpc连接内核篇《Nacos 2.x 源码深度解析 (九)双向流设计 —— 连接创建复用与销毁》《Nacos 2.x 源码深度解析 (十)心跳保活策略 —— 断线检测与重连源码》《Nacos 2.x 源码深度解析 (十一)RPC 请求调度 —— 收发模型与线程池处理》五、集群一致性篇《Nacos 2.x 源码深度解析 (十二)集群基础交互 —— 节点感知与基础数据同步》《Nacos 2.x 源码深度解析 (十三)Distro 协议 ——AP 模式异步数据同步原理》目录一、Nacos 2.x核心机制服务端配置变更的实时推送1.1 配置变更的触发入口ConfigOperationService1.2 事件总线NotifyCenter1.3 事件接力DumpService的本地转储1.4 异步调度核心NacosDelayTaskExecuteEngine1.5 本地转储处理机制ConfigCacheService1.6 集群同步机制AsyncNotifyService1.7 gRPC推送RpcConfigChangeNotifier二、全文小结在上一篇文章中我们深入分析了配置中心客户端的启动加载与自动装配机制理解了客户端如何通过 Bootstrap 父子上下文拉取远程配置、注册 gRPC 监听器并桥接 Spring Cloud 热刷新。本篇将视角转向服务端聚焦配置变更从控制台发布到推送至客户端的完整链路。当管理员在 Nacos 控制台点击“发布”按钮后一系列精密的事件驱动机制便悄然运转ConfigOperationService根据普通、标签、灰度三种发布模式将配置持久化到对应数据库表随后通过NotifyCenter事件总线发布ConfigDataChangeEvent驱动三条支线并行推进——DumpService负责将配置从数据库加载到内存缓存并写入本地磁盘AsyncNotifyService通过 gRPC 异步通知集群其他节点刷新缓存RpcConfigChangeNotifier则通过BiRequestStream双向流将轻量级变更通知推送到所有监听该配置的客户端。本文将沿着这条事件驱动链路逐一拆解NotifyCenter的发布订阅机制、NacosDelayTaskExecuteEngine的异步调度设计、ConfigCacheService的内存磁盘双存储策略以及集群间同步的平方退避重试算法。理解了服务端的这套机制就掌握了 Nacos 配置中心实现秒级推送与高可用保障的核心原理。一、Nacos 2.x核心机制服务端配置变更的实时推送配置变更实时推送功能是Nacos作为配置中心的核心能力之一。在2.x版本中这一能力完全建立在NotifyCenter事件总线之上实现了发布、存储、同步与推送的彻底解耦。当用户在控制台发布配置后请求会经过ConfigController处理并持久化到数据库随后通过NotifyCenter发布ConfigDataChangeEvent事件。事件总线会将事件广播给所有订阅者DumpService负责本地数据转储AsyncNotifyService负责集群节点同步RpcConfigChangeNotifier则通过gRPC向客户端推送变更通知完成配置变更从发布到生效的闭环。1.1 配置变更的触发入口ConfigOperationService当管理员在Nacos控制台修改配置并点击发布按钮时请求会到达服务端的ConfigController.publishConfig()方法。该方法会根据请求参数自动判断发布类型采用不同的策略将配置持久化并推送给目标客户端。Nacos一共支持三种发布方式-普通发布最常见的发布方式。配置保存后立即对所有订阅了该dataId group的客户端生效适合日常的配置变更操作。-标签发布按tag维度将同一个dataId group的配置划分为多个版本。不同客户端通过携带不同的tag获取对应的配置内容适合灰度分组、AB测试等场景——比如让tagcanary的客户端先尝鲜新配置tagstable的客户端继续使用旧配置。-灰度发布Beta配置只会推送给指定IP列表中的客户端其他客户端不受影响。这种方式粒度更细适合在小范围机器上做功能验证确认无误后再转为普通发布全量推送给所有客户端。三种发布方式的核心区别在于持久化存储的表不同以及推送通知的客户端范围不同。同时更新过程中依casMd5实现乐观锁并发控制规避多人编辑引发的数据覆盖问题。casMd5代表配置修改前的内容摘要充当版本标识请求携带该参数时服务端会比对摘要一致性匹配成功才允许更新不一致则判定配置已被改动直接驳回更新请求未携带参数则跳过版本校验直接覆盖原有配置。无论选用何种发布类型与更新策略数据都会同步存入数据库、更新内存缓存与本地磁盘快照最终统一通过ConfigChangePublisher.notifyConfigChange()发出变更事件经由gRPC双向流通道实时通知到目标客户端完整走完配置发布—存储—通知全流程。1.2 事件总线NotifyCenterNotifyCenter基于发布 - 订阅模式实现服务端事件的异步解耦与统一调度。各类服务组件在实例初始化时会主动向NotifyCenter完成订阅注册。服务通过指定事件类型声明自身关注的事件并重写onEvent()方法定义事件触发后的业务处理逻辑。整体注册流程分为两层绑定服务组件以Subscriber订阅者身份先注册到对应EventPublisher的subscribers集合容器中再将该EventPublisher与对应的事件类型完成关联存入核心映射容器publisherMap。后续系统产生事件时NotifyCenter可通过事件类型快速匹配到对应发布器遍历所有注册的订阅者并回调onEvent()方法完成事件的一对多广播分发最终实现业务模块的彻底解耦与异步联动。以DumpService为例在 Spring 容器初始化单例 Bean 时其构造方法会主动向Nacos NotifyCenter注册自身为订阅者声明订阅ConfigDataChangeEvent事件并实现onEvent方法定义事件触发后的业务逻辑。注册流程中NotifyCenter会以事件的全类名作为 Key以EventPublisherFactory创建的EventPublisher作为Value将二者存入publisherMap完成映射关系绑定。最后将当前DumpService订阅者实例注册到对应的EventPublisher中由其内部的subscribers集合统一管理实现事件与订阅者的解耦存储。com.alibaba.nacos.config.server.service.dump.DumpService // 在Spring容器初始化这个Bean时立即向NotifyCenter注册一个订阅者,此后任何ConfigDataChangeEvent事件都会回调到本类的onEvent()方法 public DumpService() { // 向Nacos事件中心注册订阅者 NotifyCenter.registerSubscriber(new Subscriber() { // 事件回调当有配置变更事件发生时被调用 Override public void onEvent(Event event) { handleConfigDataChange(event); } // 声明订阅的事件类型,只接收ConfigDataChangeEvent类型的事件 Override public Class? extends Event subscribeType() { return ConfigDataChangeEvent.class; } }); } —————————————————————————————————————————————————————————————————————————————— com.alibaba.nacos.common.notify.NotifyCenter // 事件发布者注册表Nacos事件中心的核心存储结构 private final MapString, EventPublisher publisherMap new ConcurrentHashMap(16); // 向事件中心添加订阅者 private static void addSubscriber(final Subscriber consumer, Class? extends Event subscribeType, EventPublisherFactory factory) { // 不同的事件类型对应不同的 topic互不干扰 final String topic ClassUtils.getCanonicalName(subscribeType); // synchronized保证线程安全多个订阅者同时注册同一个topic时只创建一个Publisher synchronized (NotifyCenter.class) { //如果 publisherMap 中不存在这个 topic就调用 factory 创建一个新的 EventPublisher MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize); } // 从Map中取出刚创建或已存在的EventPublisher EventPublisher publisher INSTANCE.publisherMap.get(topic); // 将订阅者添加到 Publisher 的订阅者列表中,当Publisher收到事件时会遍历这个列表逐个回调订阅者的onEvent()方法 publisher.addSubscriber(consumer); }1.3 事件接力DumpService的本地转储DumpService是Nacos服务端负责配置本地磁盘持久化的核心服务它在Spring容器初始化、构造对象时就主动向NotifyCenter注册事件订阅成为配置变更事件的第一级消费者。NotifyCenter是Nacos服务端统一事件调度总线作为全局事件分发中枢统一接收上层发布的ConfigDataChangeEvent并根据订阅者声明的事件类型进行精准路由实现事件生产与消费的解耦支持多路订阅、批量分发。DumpService在构造函数中通过NotifyCenter.registerSubscriber()注册一个匿名订阅者重写subscribeType()明确只监听ConfigDataChangeEvent类型事件确保只处理配置变更相关消息。当配置发布、更新等操作触发ConfigDataChangeEvent事件后NotifyCenter会自动匹配订阅关系回调订阅者的onEvent()方法最终进入handleConfigDataChange()执行本地转储逻辑。dump()方法根据发布类型将请求路由到不同的处理分支本节以最常见的普通发布为例来分析。dumpFormal()的核心职责是将数据库中的最新配置同步到内存缓存CacheItem并更新MD5校验和。它不会直接处理转储逻辑而是通过DumpTaskMgr提交一个异步任务来执行实际的加载和推送工作。之所以采用异步设计是因为dump()操作涉及内存、磁盘的转储和客户端通知如果同步执行会阻塞配置发布的主线程影响整体吞吐量。内存缓存是Nacos服务端性能的关键客户端查询配置时服务端优先从CacheItem中读取只有缓存未命中时才会回源到数据库。这种缓存优先的策略将大部分读请求挡在了数据库之外支撑起Nacos在海量客户端场景下的高并发查询能力。com.alibaba.nacos.config.server.service.dump.DumpService public DumpService() { NotifyCenter.registerSubscriber(new Subscriber() { Override public void onEvent(Event event) { handleConfigDataChange(event); } Override public Class? extends Event subscribeType() { return ConfigDataChangeEvent.class; } }); } —————————————————————————————————————————————————————————————————————————————— public void dump(DumpRequest dumpRequest) { // 灰度发布存储灰度配置存储在config_info_beta表中只对BetaIps列表中的客户端生效 if (dumpRequest.isBeta()) { dumpBeta(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(), dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp()); // 批量存储用于批量发布时一次性处理多个dataId比如全量刷新或集群同步场景 } else if (dumpRequest.isBatch()) { dumpBatch(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(), dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp()); // 标签发布存储标签配置存储在config_info_tag表中不同客户端通过tag获取不同版本的配置 } else if (StringUtils.isNotBlank(dumpRequest.getTag())) { dumpTag(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(), dumpRequest.getTag(), dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp()); // 普通发布存储存储在config_info主表中对所有客户端生效 } else { dumpFormal(dumpRequest.getDataId(), dumpRequest.getGroup(), dumpRequest.getTenant(), dumpRequest.getLastModifiedTs(), dumpRequest.getSourceIp()); } } // 将配置从数据库加载到内存缓存更新 MD5 校验和并通知所有订阅了该dataIdgroup的客户端 private void dumpFormal(String dataId, String group, String tenant, long lastModified, String handleIp) { String groupKey GroupKey2.getKey(dataId, group, tenant); String taskKey groupKey; dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, false, false, false, null, lastModified, handleIp)); }1.4 异步调度核心NacosDelayTaskExecuteEngineNacosDelayTaskExecuteEngine是Nacos中处理延迟任务的异步执行引擎其设计围绕四个核心原则展开。单线程定时轮询引擎通过ExecutorFactory.newSingleScheduledExecutorService创建只有一个核心线程的定时调度器所有任务由该线程串行执行从根本上消除了多线程并发带来的数据竞争问题同时即便某个任务处理耗时较长也不会额外创建线程来分担保持资源消耗可控。任务去重合并同一个keydataId group tenant的任务在入队时会被合并为一条避免相同任务重复堆积执行。失败自动重试任务执行失败后会被重新放回队列等待下一轮轮询再次尝试。线程安全引擎选用ConcurrentHashMap作为任务存储容器利用其分段锁机制获得优异的并发读写性能但对于“检查—合并—替换”这种复合操作仍需要额外的ReentrantLock来保证原子性。在调度策略上引擎选择了scheduleWithFixedDelay而非scheduleAtFixedRate。两者的区别在于FixedDelay会在上一次执行完成后再等待固定的延迟时间默认 100ms才触发下一次执行天然避免了任务积压问题而FixedRate则严格按照固定周期触发不管上一次是否执行完毕单线程模型下未完成的任务只能排队等待可能导致延迟累积。选择FixedDelay的考量正在于此保证每轮处理之间有固定的间歇即使某次processTasks耗时较长也不会让 CPU 持续满载。后台线程每隔100ms扫描一次任务队列取出到达执行时间的任务批量处理从而在吞吐量和延迟之间取得平衡。com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine // Nacos 延迟任务执行引擎 public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngineAbstractDelayTask { // 单线程定时调度器 private final ScheduledExecutorService processingExecutor; // 任务存储容器 protected final ConcurrentHashMapObject, AbstractDelayTask tasks; // 显式锁,保证检查、合并、替换操作的原子性 protected final ReentrantLock lock new ReentrantLock(); public NacosDelayTaskExecuteEngine(String name) { this(name, null); } // 带日志的构造函数,默认初始容量32默认轮询间隔 100ms public NacosDelayTaskExecuteEngine(String name, Logger logger) { this(name, 32, logger, 100L); // 默认初始容量 32默认轮询间隔 100ms } // 完整参数构造函数,默认任务扫描间隔为100ms public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { // 初始化任务存储Map,指定初始容量为32避免频繁扩容带来的性能损耗 tasks new ConcurrentHashMap(initCapacity); // 创建单线程定时调度器 processingExecutor ExecutorFactory.newSingleScheduledExecutorService( new NameThreadFactory(name)); // 启动定时轮询任务上一次执行完成后再等100ms才执行下一次这里选择 FixedDelay保证每次执行之间有固定的休息时间即使某次 processTasks 耗时很长也不会让 CPU 持续满载 processingExecutor.scheduleWithFixedDelay( new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } // 该方法向引擎中添加一个延迟任务虽然ConcurrentHashMap的单个操作是原子的但getmergeput不是所以使用ReentrantLock保证整个复合操作的原子性加锁保证了不会出现两个线程主线程和线程池线程同时对同一个key执行addTask的竞态条件 // key以dataId group tenant的组合为唯一键 Override public void addTask(Object key, AbstractDelayTask newTask) { lock.lock(); try { // 检查是否已存在相同 key 的任务 AbstractDelayTask existTask tasks.get(key); if (null ! existTask) { // 将已存在任务的信息合并到新任务中,对于DumpTask来说merge()是空实现,对于其他类型的任务可能有自己特定的处理 newTask.merge(existTask); } // 将新任务放入Map替换旧任务 tasks.put(key, newTask); } finally { // finally 块保证即使merge或put抛出异常锁也会被释放 lock.unlock(); } } // 处理所有到期的任务 protected void processTasks() { // 获取所有任务key的快照 CollectionObject keys getAllTaskKeys(); // 遍历每个key处理到期任务 for (Object taskKey : keys) { // 尝试移除到期的任务 AbstractDelayTask task removeTask(taskKey); // 获取对应的Processor如DumpTask.class—DumpProcessor NacosTaskProcessor processor getProcessor(taskKey); try { // 执行任务,DumpProcessor.process()会查数据库、更新内存缓存、计算 MD5、通知客户端这些操作都在当前这个单一线程中同步执行 if (!processor.process(task)) { // 处理失败重新放回队列等待下次执行 retryFailedTask(taskKey, task); } } catch (Throwable e) { // 如果 processor.process() 抛出任何异常包括数据库异常、网络异常等记录错误日志然后将任务重新放回队列 getEngineLog().error(Nacos task execute error , e); retryFailedTask(taskKey, task); } } } // 定时轮询任务,该任务被scheduleWithFixedDelay调度需要上一次processTasks()执行完成后再等100ms才执行下一次 private class ProcessRunnable implements Runnable { Override public void run() { try { processTasks(); } catch (Throwable e) { // 即使 processTasks 本身抛出异常也不影响下次定时执行 getEngineLog().error(e.toString(), e); } } } }1.5 本地转储处理机制ConfigCacheServiceConfigCacheService是Nacos服务端处理配置缓存的核心服务其dumpWithMd5()方法在配置变更时负责将最新内容同步持久化到磁盘与内存形成“内存优先、磁盘兜底”的双存储保障。方法入口首先将dataId、group和tenant拼接为groupKey作为缓存唯一索引随后通过tryWriteLock()获取写锁保证同一groupKey同一时刻只有一个线程执行更新。拿到锁后进入三重校验先检查传入的时间戳是否比缓存中的更旧若已过期则直接丢弃再判断时间戳是否比缓存中的更新以此区分本次是内容变更还是仅元信息变更最后比对MD5确认内容是否真正发生了变化。内容确实变更时通过ConfigDiskServiceFactory.saveToDisk()将配置写入本地文件作为数据库不可用时的降级恢复数据源内存侧调用updateMd5()同步刷新缓存中的MD5、时间戳和加密密钥。若内容未变仅时间戳更新则走updateTimeStamp()只更新时间戳和密钥。两者均未变化则直接返回。整个过程在finally块中释放写锁确保异常情况下也不会死锁。com.alibaba.nacos.config.server.service.ConfigCacheService public static boolean dumpWithMd5(String dataId, String group, String tenant, String content, String md5, long lastModifiedTs, String type, String encryptedDataKey) { // 将dataId group tenant拼接为groupKey作为缓存的唯一索引 String groupKey GroupKey2.getKey(dataId, group, tenant); // 同一时刻只允许一个线程更新同一个groupKey的缓存返回值小于0表示获取失败其他线程正在处理直接返回 false final int lockResult tryWriteLock(groupKey); try { // 如果传入的时间戳比缓存中已有的更旧说明有更新的配置已被处理当前任务携带的数据已过时直接丢弃 boolean lastModifiedOutDated lastModifiedTs ConfigCacheService.getLastModifiedTs(groupKey); if (lastModifiedOutDated) { return true; } // 判断时间戳是否比缓存中的更新 boolean newLastModified lastModifiedTs ConfigCacheService.getLastModifiedTs(groupKey); // 获取当前缓存中的MD5 String localContentMd5 ConfigCacheService.getContentMd5(groupKey); // 比较新MD5和缓存中的MD5判断内容是否真实变更 boolean md5Changed !md5.equals(localContentMd5); // 内容变更时写磁盘 if (md5Changed) { // 将最新配置写入本地磁盘文件路径为${nacos.home}/data/config-data/下数据库不可用时作为降级恢复的数据源 ConfigDiskServiceFactory.getInstance().saveToDisk(dataId, group, tenant, content); } // 内容变更时更新JVM内存缓存并推送消息到客户端 if (md5Changed) { // 内容变更更新缓存中的MD5、最后修改时间、加密密钥然后推送消息到客户端 updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey); } else if (newLastModified) { // 内容未变但时间戳更新时仅更新时间戳和加密密钥 updateTimeStamp(groupKey, lastModifiedTs, encryptedDataKey); } // 内容和时间戳都没变返回 return true; } finally { // 释放写锁finally保证即使发生异常也会释放锁避免死锁 releaseWriteLock(groupKey); } }1.6 集群同步机制AsyncNotifyServiceAsyncNotifyService在构造函数中订阅了ConfigDataChangeEvent。当事件触发后handleConfigDataChangeEvent()从事件中提取出dataId、group、tenant、tag和lastModifiedTs随后通过memberManager.allMembersWithoutSelf()获取集群中除本机外的所有节点为每个节点创建一个NotifySingleRpcTask统一放入队列由ConfigExecutor.executeAsyncNotify()异步执行。AsyncRpcTask.run()调用executeAsyncRpcTask()逐节点出队处理。先检查节点是否仍在集群中再检查节点健康状态是否正常健康节点直接通过configClusterRpcClientProxy.syncConfigChange()发送ConfigChangeClusterSyncRequest的gRPC请求不健康节点则走asyncTaskExecute()延期重试。对端由ConfigChangeClusterSyncRequestHandler.handle()接收请求直接在远端调用dumpService.dump()刷新本地缓存。异步回调AsyncRpcNotifyCallBack在响应失败或网络异常时同样触发asyncTaskExecute()重试重试间隔按500failCount²×1000毫秒退避第0次500ms第1次1.5s第2次4.5s以此类推最多重试6次以防止无效任务堆积。com.alibaba.nacos.config.server.service.notify.AsyncNotifyService // AsyncNotifyService的构造函数在构造函数中向NotifyCenter注册了一个订阅者专门监听ConfigDataChangeEvent事件 public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager memberManager; // 预先注册Publisher确保事件通道已就绪 NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // 将一个Subscriber注册到事件中心当任何地方发布了ConfigDataChangeEvent时这个订阅者的onEvent()方法会被回调 NotifyCenter.registerSubscriber(new Subscriber() { Override public void onEvent(Event event) { handleConfigDataChangeEvent(event); } Override public Class? extends Event subscribeType() { return ConfigDataChangeEvent.class; } }); } —————————————————————————————————————————————————————————————————————————————— // 收到本地配置变更事件后构造通知任务并发送到集群其他节点 void handleConfigDataChangeEvent(Event event) { ConfigDataChangeEvent evt (ConfigDataChangeEvent) event; // 获取集群中除本机外的所有节点 CollectionMember ipList memberManager.allMembersWithoutSelf(); // 为每个目标节点构造通知任务 QueueNotifySingleRpcTask rpcQueue new LinkedList(); for (Member member : ipList) { rpcQueue.add(new NotifySingleRpcTask( dataId, group, tenant, tag, dumpTs, evt.isBeta, evt.isBatch, member)); } // 将所有任务封装为AsyncRpcTask提交到专用的异步通知线程池, 不阻塞当前事件回调线程避免拖慢本地 dump 流程 if (!rpcQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); } } —————————————————————————————————————————————————————————————————————————————— // 异步RPC任务 public class AsyncRpcTask implements Runnable { private QueueNotifySingleRpcTask queue; public AsyncRpcTask(QueueNotifySingleRpcTask queue) { this.queue queue; } Override public void run() { // 移交到executeAsyncRpcTask逐节点处理 executeAsyncRpcTask(queue); } } —————————————————————————————————————————————————————————————————————————————— // 遍历任务队列逐个向目标节点发送集群同步请求 void executeAsyncRpcTask(QueueNotifySingleRpcTask queue) { while (!queue.isEmpty()) { NotifySingleRpcTask task queue.poll(); Member member task.member; // 集群拓扑可能在同步过程中发生变化节点下线、扩缩容等 if (!memberManager.hasMember(member.getAddress())) { continue; // 节点已离线跳过 } // 节点健康检查 if (memberManager.stateCheck(member.getAddress(), HEALTHY_CHECK_STATUS)) { try { // 通过gRPC发送请求,异步发送带回调不阻塞当前线程 configClusterRpcClientProxy.syncConfigChange( member, syncRequest, new AsyncRpcNotifyCallBack(AsyncNotifyService.this, task)); } catch (Exception e) { // 发送异常如序列化失败时进入延期重试 asyncTaskExecute(task); } } else { // 进入延期重试等节点恢复后再试 asyncTaskExecute(task); } } } —————————————————————————————————————————————————————————————————————————————— // 将失败的任务放入延迟重试队列,延迟时间由getDelayTime()根据失败次数计算,失败次数越多延迟越久平方退避策略 private void asyncTaskExecute(NotifySingleRpcTask task) { int delay getDelayTime(task); // 计算退避延迟 QueueNotifySingleRpcTask queue new LinkedList(); queue.add(task); AsyncRpcTask asyncTask new AsyncRpcTask(queue); // scheduleAsyncNotify将任务提交到定时线程池延迟delay毫秒后执行 ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS); } —————————————————————————————————————————————————————————————————————————————— // gRPC集群同步请求的回调处理器 public static class AsyncRpcNotifyCallBack implements RequestCallBackConfigChangeClusterSyncResponse { Override public void onResponse(ConfigChangeClusterSyncResponse response) { if (!response.isSuccess()) { // 对端返回错误时重新调度等待下次重试 asyncNotifyService.asyncTaskExecute(task); } } Override public void onException(Throwable ex) { // 网络异常时重新调度等待下次重试 asyncNotifyService.asyncTaskExecute(task); } }1.7 gRPC推送RpcConfigChangeNotifier当ConfigCacheService.updateMd5()完成服务端JVM内存中配置MD5的更新后会通过NotifyCenter发布LocalDataChangeEvent事件。RpcConfigChangeNotifier作为该事件的订阅者收到通知后从事件中解析出groupKey还原出dataId、group、tenant三元组信息随后调用configDataChanged()执行推送逻辑。该方法通过ConfigChangeListenContext.getListeners(groupKey)获取当前监听该配置的所有RPC连接ID逐一校验连接存活状态与灰度标签匹配度。对符合条件的连接构建仅包含配置三元组的轻量级通知请求ConfigChangeNotifyRequest并提交至push()方法执行异步推送。推送采用指数退避重试策略首次立即执行失败后按tryTimes×2秒递增间隔重试超出最大重试次数默认50次可通过环境变量调整后将失效连接从ConnectionManager中移除。真正的gRPC推送由RpcPushTask.run()中的rpcPushService.pushWithCallback()完成底层通过BiRequestStream双向流通道将ConfigChangeNotifyRequest消息发送至客户端。这条通道正是客户端与Server之间建立gRPC长连接时绑定的双向流Server端借助它实现了从“被动应答”到“主动推送”的通信模式转变。客户端收到通知后仅将对应配置的缓存状态标记为consistentWithServerfalse并触发notifyListenConfig()。当客户端下一次发起ConfigBatchListenRequest配置监听请求时感知到MD5不一致便会主动向服务端拉取最新完整配置。整个推送过程仅传输配置三元组不携带真实配置内容以轻量化的方式实现高效变更通知。com.alibaba.nacos.config.server.remote.RpcConfigChangeNotifier // 在Spring容器初始化这个Bean时直接把自己注册到事件中心。此后任何LocalDataChangeEvent事件都会回调该类的onEvent()方法。 public RpcConfigChangeNotifier() { NotifyCenter.registerSubscriber(this); } // 声明订阅LocalDataChangeEvent事件类型 Override public Class? extends Event subscribeType() { return LocalDataChangeEvent.class; } Override public void onEvent(LocalDataChangeEvent event) { configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag); } —————————————————————————————————————————————————————————————————————————————— // 配置变更后向所有订阅了该配置的客户端推送变更通知。这里推送的是变更信号dataId group tenant而非配置内容本身。客户端收到信号后会主动调用 getConfig() 拉取最新内容。这样做有3点好处 // 1、避免大体积配置占用推送通道带宽 // 2、客户端可以按自己的节奏拉取不受推送速率限制 // 3、即使推送通知丢失客户端也能通过定时轮询 MD5 兜底 public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta, ListString betaIps, String tag) { // configChangeListenContext维护了groupKey与connectionId集合的映射客户端在建立gRPC双向流时通过 ConnectionSetupRequest声明自己监听的配置服务端将这些映射关系存储在这里用于变更时快速定位目标客户端 SetString listeners configChangeListenContext.getListeners(groupKey); if (CollectionUtils.isEmpty(listeners)) { return; // 没有客户端监听这个配置无需推送直接返回 } int notifyClientCount 0; // 遍历所有监听该配置的客户端连接 for (final String connectionId : listeners) { // 获取客户端对应的 gRPC 长连接 Connection connection connectionManager.getConnection(connectionId); if (connection null) { continue; // 连接已断开跳过 } // 从连接里提取连接元信息 ConnectionMeta metaInfo connection.getMetaInfo(); String clientIp metaInfo.getClientIp(); // 客户端 IP String clientTag metaInfo.getTag(); // 客户端标签 // 只有tag匹配的客户端才会收到通知其他客户端不受影响 if (StringUtils.isNotBlank(tag) !tag.equals(clientTag)) { continue; } // 构造轻量通知请求只含dataId/group/tenant因为不包含配置内容体积非常小可以快速推送给大量客户端 ConfigChangeNotifyRequest notifyRequest ConfigChangeNotifyRequest.build(dataId, group, tenant); // 创建带重试机制的推送任务 RpcPushTask rpcPushRetryTask new RpcPushTask(notifyRequest, ConfigCommonConfig.getInstance().getMaxPushRetryTimes(), connectionId, clientIp, metaInfo.getAppName()); // 调度推送任务 push(rpcPushRetryTask, connectionManager); notifyClientCount; } } —————————————————————————————————————————————————————————————————————————————— // 推送任务调度 private static void push(RpcPushTask retryTask, ConnectionManager connectionManager) { if (retryTask.isOverTimes()) { // 重试耗尽 → 注销该连接认为客户端已失联 connectionManager.unregister(retryTask.getConnectionId()); return; } if (connectionManager.getConnection(retryTask.getConnectionId()) null) { return; // 客户端已离线静默丢弃 } // 将任务提交到定时线程池 // 采用线性递增延迟策略 // 首次推送tryTimes0延迟 0 秒立即执行 // 第 1 次重试tryTimes1延迟 2 秒 // 第 2 次重试tryTimes2延迟 4 秒 // 第 n 次重试tryTimesn延迟 n*2 秒 // 目的是为了给网络抖动留出恢复时间避免短暂故障期间频繁无效重试浪费服务端资源 ConfigExecutor.scheduleClientConfigNotifier(retryTask, retryTask.getTryTimes() * 2, TimeUnit.SECONDS); } —————————————————————————————————————————————————————————————————————————————— // gRPC推送任务,每个客户端对应一个RpcPushTask实例 class RpcPushTask implements Runnable { Override public void run() { tryTimes; // 每次执行递增重试计数用于计算下次重试的延迟时间 // 通过RpcPushService向客户端推送ConfigChangeNotifyRequest异步回调由RpcPushCallback处理 rpcPushService.pushWithCallback(connectionId, notifyRequest, new RpcPushCallback(this), ConfigExecutor.getClientConfigNotifierServiceExecutor()); } } —————————————————————————————————————————————————————————————————————————————— // gRPC推送结果回调,设置超时时间为3000毫秒,如果3秒内gRPC没有返回成功或失败超时机制会自动触发onFail() class RpcPushCallback extends AbstractPushCallBack { public RpcPushCallback(RpcPushTask rpcPushTask) { super(3000L); this.rpcPushTask rpcPushTask; } Override public void onSuccess() { // 推成功什么都不用做 } Override public void onFail(Throwable e) { // 推失败安排重试 push(rpcPushTask, connectionManager); } }二、全文小结本文聚焦配置中心服务端的事件总线与数据持久化机制详细分析了配置变更从控制台发布到推送至客户端的全链路源码实现。配置变更的入口是ConfigOperationService.publishConfig()它支持普通发布、标签发布和灰度发布三种模式通过casMd5实现乐观锁并发控制数据分别持久化到config_info、config_info_tag和config_info_beta三张表最终统一通过ConfigChangePublisher.notifyConfigChange()发布ConfigDataChangeEvent事件。NotifyCenter是 Nacos 服务端的全局事件总线基于发布订阅模式实现事件的生产与消费彻底解耦。DumpService、AsyncNotifyService等组件在构造函数中向NotifyCenter注册自身为订阅者声明关注的事件类型与回调逻辑。NotifyCenter通过publisherMap维护事件类型与EventPublisher的映射关系事件发布时精准路由到对应发布器遍历订阅者集合并回调onEvent()方法完成一对多广播分发。DumpService收到事件后通过DumpTaskMgr将任务提交给NacosDelayTaskExecuteEngine异步执行。该引擎采用单线程定时轮询、任务去重合并、失败自动重试和ReentrantLock保护复合操作的设计以scheduleWithFixedDelay策略每 100ms 扫描一次任务队列。ConfigCacheService.dumpWithMd5()在执行转储时通过写锁与三重校验将配置同步写入内存缓存和本地磁盘形成内存优先、磁盘兜底的双存储保障。AsyncNotifyService负责集群节点间的配置同步通过memberManager.allMembersWithoutSelf()获取除本机外的所有节点逐一发送ConfigChangeClusterSyncRequest的 gRPC 请求。对不健康节点或失败响应采用平方退避算法延期重试重试间隔按500 failCount² × 1000毫秒递增最多重试 6 次。RpcConfigChangeNotifier负责向客户端推送变更通知。当ConfigCacheService.updateMd5()更新内存缓存后通过NotifyCenter发布LocalDataChangeEventRpcConfigChangeNotifier收到事件后遍历所有监听该配置的 gRPC 连接构建仅含配置三元组的轻量通知通过BiRequestStream双向流通道推送给客户端。推送采用指数退避重试超出最大次数后注销失效连接。服务端的事件驱动链路梳理完毕后下篇将转向客户端接收推送后的处理流程深入分析 gRPC 推送链路的源码实现——GrpcClient如何接收ConfigChangeNotifyRequestConfigRpcTransportClient如何通过executeConfigListen()进行批量 MD5 校验与按需拉取以及配置拉取完成后如何通过NacosConfigRefreshEventListener桥接 Spring Cloud 事件驱动RefreshScopeBean 热刷新。原创不易如果本文对您有帮助带来了些许灵感或启发烦请动动小手点赞、关注、转发、收藏。这是作者持续更新的动力源泉衷心感谢您的支持。我会尽量在工作之余为大家带来更高质量的内容努力保持周更。