一个聚合接口需要依次查询用户资料、最近订单和积分余额。三个下游各耗时 200 毫秒时串行调用至少需要 600 毫秒如果三者互不依赖并行执行后的理论耗时接近最慢的那个请求。但把三行代码套进supplyAsync并不等于完成了异步治理。我们还需要回答任务在哪个线程池运行某个下游超时后是否拖垮整个接口异常是继续传播还是返回降级值请求链路中的 TraceId 能否传到异步线程本文示例基于Java 21 Spring Boot 3.x。一、先区分并发、异步与并行这三个概念经常被混在一起并发多个任务在同一时间段内推进并行多个任务在同一时刻由不同计算资源执行异步调用方提交任务后不必原地等待任务结束。CompletableFuture负责描述异步任务及其依赖关系但实际是否并行取决于使用的Executor以及可用线程数。下面的写法虽然返回CompletableFuture却没有产生异步执行CompletableFutureStringfutureCompletableFuture.completedFuture(already done);而supplyAsync会将有返回值的任务提交给执行器CompletableFutureStringfutureCompletableFuture.supplyAsync(()-remoteClient.query(),executor);Oracle 的 Java 21 文档明确说明未显式传入Executor的异步方法默认使用ForkJoinPool.commonPool()。这对简单演示很方便对生产服务却通常缺少隔离、容量控制和清晰的线程命名。二、为什么不要直接依赖 commonPool以下代码能运行但不适合作为生产默认方案CompletableFutureUserProfilefutureCompletableFuture.supplyAsync(()-userClient.query(userId));问题在于JVM 内其他代码也可能使用公共池容易互相争抢线程阻塞式 HTTP、JDBC 调用会长期占用工作线程无法针对某类下游单独设置队列、拒绝策略和监控指标默认线程名难以快速关联具体业务。为聚合查询创建独立线程池ConfigurationpublicclassAsyncExecutorConfig{Bean(dashboardExecutor)publicThreadPoolTaskExecutordashboardExecutor(){ThreadPoolTaskExecutorexecutornewThreadPoolTaskExecutor();executor.setCorePoolSize(8);executor.setMaxPoolSize(16);executor.setQueueCapacity(200);executor.setThreadNamePrefix(dashboard-);executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(20);executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());executor.initialize();returnexecutor;}}线程池参数不能照抄固定公式需要从目标并发量、平均耗时、下游容量和允许排队时间反推。尤其要注意扩大线程池只能增加并发不能扩大数据库连接池或下游接口的真实容量。CallerRunsPolicy会让提交任务的线程执行被拒绝的任务从而形成一定反压但它也会抬高当前请求延迟。对于必须快速失败的接口可以改用AbortPolicy并将RejectedExecutionException映射为明确的降级响应。三、示例并行聚合三个下游结果先定义返回模型publicrecordUserProfile(LonguserId,Stringnickname){}publicrecordOrderSummary(intcount,BigDecimaltotalAmount){publicstaticOrderSummaryempty(){returnnewOrderSummary(0,BigDecimal.ZERO);}}publicrecordPointsBalance(longavailable){publicstaticPointsBalanceunavailable(){returnnewPointsBalance(-1);}}publicrecordDashboardView(UserProfileprofile,OrderSummaryorders,PointsBalancepoints){}聚合服务如下ServicepublicclassDashboardService{privatestaticfinalLoggerlogLoggerFactory.getLogger(DashboardService.class);privatefinalUserClientuserClient;privatefinalOrderClientorderClient;privatefinalPointsClientpointsClient;privatefinalExecutordashboardExecutor;publicDashboardService(UserClientuserClient,OrderClientorderClient,PointsClientpointsClient,Qualifier(dashboardExecutor)ExecutordashboardExecutor){this.userClientuserClient;this.orderClientorderClient;this.pointsClientpointsClient;this.dashboardExecutordashboardExecutor;}publicDashboardViewquery(LonguserId){CompletableFutureUserProfileprofileFutureCompletableFuture.supplyAsync(()-userClient.query(userId),dashboardExecutor).orTimeout(500,TimeUnit.MILLISECONDS);CompletableFutureOrderSummaryordersFutureCompletableFuture.supplyAsync(()-orderClient.summary(userId),dashboardExecutor).completeOnTimeout(OrderSummary.empty(),800,TimeUnit.MILLISECONDS).exceptionally(ex-{log.warn(query order summary failed, userId{},userId,unwrap(ex));returnOrderSummary.empty();});CompletableFuturePointsBalancepointsFutureCompletableFuture.supplyAsync(()-pointsClient.balance(userId),dashboardExecutor).orTimeout(300,TimeUnit.MILLISECONDS).exceptionally(ex-{log.warn(query points failed, userId{},userId,unwrap(ex));returnPointsBalance.unavailable();});CompletableFuture.allOf(profileFuture,ordersFuture,pointsFuture).join();returnnewDashboardView(profileFuture.join(),ordersFuture.join(),pointsFuture.join());}privatestaticThrowableunwrap(Throwablethrowable){if(throwableinstanceofCompletionExceptionthrowable.getCause()!null){returnthrowable.getCause();}returnthrowable;}}这段代码刻意区分了关键数据和非关键数据用户资料是页面主体失败时让异常继续传播订单汇总超时或失败时返回空汇总积分不可用时返回-1由前端展示“暂不可用”。降级值应当在业务语义上可区分。“查询失败”不能悄悄伪装成“真实余额为 0”否则系统会把可用性问题变成数据正确性问题。四、allOf 为什么不能直接拿到所有结果CompletableFuture.allOf的返回类型是CompletableFutureVoid。它只表达“这些任务全部结束”不会自动收集结果CompletableFutureVoidallCompletableFuture.allOf(f1,f2,f3);all.join();任务类型一致时可以封装一个通用方法publicstaticTCompletableFutureListTsequence(ListCompletableFutureTfutures){CompletableFutureVoidallCompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));returnall.thenApply(ignored-futures.stream().map(CompletableFuture::join).toList());}使用方式ListCompletableFutureProductfuturesproductIds.stream().map(id-CompletableFuture.supplyAsync(()-productClient.query(id),executor)).toList();ListProductproductssequence(futures).join();需要注意allOf中任意任务异常完成组合结果也会异常完成。如果希望“成功几个返回几个”必须先为每个子任务转换结果例如包装成TaskResultT而不是在最外层统一吞掉异常。publicrecordTaskResultT(Tvalue,Throwableerror){publicstaticTTaskResultTsuccess(Tvalue){returnnewTaskResult(value,null);}publicstaticTTaskResultTfailure(Throwableerror){returnnewTaskResult(null,error);}}CompletableFutureTaskResultProductsafeFutureCompletableFuture.supplyAsync(()-productClient.query(productId),executor).handle((value,error)-errornull?TaskResult.success(value):TaskResult.failure(error));这样既能保留部分成功结果也不会丢失失败原因。五、thenApply、thenCompose 与 thenCombine 怎么选1. thenApply同步转换结果当后续步骤只需要转换上一步结果时使用CompletableFutureStringnicknameFutureprofileFuture.thenApply(UserProfile::nickname);它类似Stream.map。如果转换函数返回的本身也是CompletableFuture就会出现嵌套CompletableFutureCompletableFutureAddressnestedprofileFuture.thenApply(profile-addressService.queryAsync(profile.userId()));2. thenCompose串联两个异步任务后一异步任务依赖前一任务结果时使用thenCompose展平CompletableFutureAddressaddressFutureprofileFuture.thenCompose(profile-addressService.queryAsync(profile.userId()));它类似flatMap。3. thenCombine合并两个互相独立的结果CompletableFutureAccountViewaccountFutureprofileFuture.thenCombine(pointsFuture,(profile,points)-newAccountView(profile,points));一个实用判断方法是只转换一个结果thenApply下一个异步任务依赖当前结果thenCompose两个独立任务完成后合并thenCombine等待任意一个任务完成applyToEither等待一组任务全部完成allOf。六、超时不是取消最容易被忽略的陷阱Java 9 以后提供了两个常用超时 APIfuture.orTimeout(500,TimeUnit.MILLISECONDS);future.completeOnTimeout(fallback,500,TimeUnit.MILLISECONDS);它们的差别是orTimeout到期后以TimeoutException异常完成completeOnTimeout到期后用指定默认值正常完成。但Future 超时不代表底层 I/O 已停止。例如 HTTP 请求仍可能占用连接和线程随后才真正返回。CompletableFuture.cancel(true)也不能被理解成可靠地中断所有底层操作。因此必须同时配置资源自身的超时HttpClientclientHttpClient.newBuilder().connectTimeout(Duration.ofMillis(300)).build();HttpRequestrequestHttpRequest.newBuilder(uri).timeout(Duration.ofMillis(500)).GET().build();数据库查询也应设置连接获取、事务和语句执行超时。合理的超时层级通常满足下游单次请求超时 聚合任务超时 Web 接口总超时 网关超时如果内层超时反而比网关更长请求即使已经被客户端放弃服务端仍会继续消耗资源。七、exceptionally、handle、whenComplete 的区别三个方法都能观察异常但语义不同方法正常时执行异常时执行能否改变结果典型用途exceptionally否是能异常降级handle是是能将成功和失败统一转换whenComplete是是通常不改变日志、指标、清理exceptionally适合提供同类型降级值future.exceptionally(ex-fallbackValue);handle适合将结果转换为统一包装future.handle((value,ex)-exnull?ApiResult.success(value):ApiResult.failure(unwrap(ex).getMessage()));whenComplete适合记录耗时和异常longstartSystem.nanoTime();future.whenComplete((value,ex)-{longcostMsTimeUnit.NANOSECONDS.toMillis(System.nanoTime()-start);metrics.record(points,costMs,exnull);});不要在whenComplete中做可能失败的业务补偿。观察逻辑一旦抛出新异常可能覆盖或干扰原任务结果让排障更加困难。八、异步线程中的 TraceId 为什么丢了日志框架的 MDC 通常基于ThreadLocal。任务切换到线程池后原请求线程中的 TraceId 不会自动出现。Spring 的TaskDecorator可以在提交任务时复制上下文publicclassMdcTaskDecoratorimplementsTaskDecorator{OverridepublicRunnabledecorate(Runnablerunnable){MapString,StringcallerContextMDC.getCopyOfContextMap();return()-{MapString,StringworkerContextMDC.getCopyOfContextMap();try{if(callerContext!null){MDC.setContextMap(callerContext);}else{MDC.clear();}runnable.run();}finally{if(workerContext!null){MDC.setContextMap(workerContext);}else{MDC.clear();}}};}}注册到线程池executor.setTaskDecorator(newMdcTaskDecorator());finally中恢复或清理上下文非常重要。线程池会复用线程如果只设置不清理下一个请求可能继承上一个请求的 TraceId。九、CompletableFuture 与 Async 如何选择Async适合将某个 Spring Bean 方法声明为异步入口CompletableFuture更适合在方法内部表达复杂编排。两者可以组合但需要注意 Spring AOP 的代理边界同一个对象内部的自调用不会经过代理因此Async可能失效。ServicepublicclassReportService{Async(dashboardExecutor)publicCompletableFutureReportgenerate(LonguserId){returnCompletableFuture.completedFuture(doGenerate(userId));}}调用generate的代码应来自另一个 Spring Bean。对于聚合接口直接注入Executor并显式使用supplyAsync依赖关系通常更直观也更容易测试。此外返回void的Async方法无法通过 Future 把异常交还调用方只能依赖AsyncUncaughtExceptionHandler。涉及可靠业务结果时应优先返回CompletableFutureT或者使用消息队列承载真正的后台任务。十、如何测试异步编排业务测试不应该依赖真实等待。可以给服务注入一个当前线程执行器使异步代码同步运行ExecutordirectExecutorRunnable::run;针对超时和异常路径再使用可控的 StubTestvoidshould_fallback_when_points_service_fails(){PointsClientpointsClientuserId-{thrownewIllegalStateException(points unavailable);};DashboardServiceservicenewDashboardService(userClient,orderClient,pointsClient,Runnable::run);DashboardViewviewservice.query(1L);assertThat(view.points().available()).isEqualTo(-1);}集成测试还应覆盖一个关键任务失败时接口是否返回预期错误非关键任务失败时降级结果是否可识别线程池队列满时系统是反压、降级还是快速失败总耗时是否接近最慢子任务而不是所有任务耗时之和TraceId 是否存在串号或丢失。十一、生产环境检查清单上线前至少确认以下事项每个*Async调用是否显式使用了合适的执行器阻塞任务与 CPU 密集任务是否使用不同线程池线程池大小是否受到下游连接池和限流能力约束队列是否有界拒绝策略是否符合接口语义每个远程调用是否配置连接超时和读取超时关键数据与非关键数据是否采用不同失败策略降级值是否会和真实业务数据混淆是否监控活跃线程、队列长度、拒绝次数和任务耗时MDC、安全上下文等 ThreadLocal 数据是否正确传递和清理异常是否保留原始 cause而不是只记录CompletionException。十二、总结高质量的CompletableFuture代码核心不在于链式调用写得多漂亮而在于四个边界是否清晰执行边界任务在哪个线程池运行时间边界任务和底层资源何时超时失败边界哪些错误传播哪些结果允许降级容量边界线程、队列和下游连接数能承受多少并发。当任务之间确实存在依赖或并行聚合需求时CompletableFuture很有表达力如果只是把长任务扔到后台且要求可靠执行消息队列、任务表和调度系统通常比进程内 Future 更合适。参考资料Java 21 CompletableFuture APISpring FrameworkTask Execution and SchedulingSpring BootTask Execution and Scheduling