跳到主要内容

CompletableFuture

问题

CompletableFuture 解决了 Future 的什么问题?如何使用 CompletableFuture 进行异步编排?异常处理怎么做?

答案

Future 的局限性

JDK 5 的 Future 接口只能实现最基本的异步:

FutureLimitations.java
Future<String> future = executor.submit(() -> fetchData());

// 问题 1:get() 阻塞当前线程
String result = future.get(); // 一直阻塞 ❌

// 问题 2:无法链式组合多个异步任务
// 问题 3:无法手动完成(设置结果)
// 问题 4:无法注册回调,只能轮询 isDone()

CompletableFuture(JDK 8)

CompletableFuture 实现了 FutureCompletionStage 接口,支持函数式的异步编排

创建异步任务

CreateCompletableFuture.java
// 1. supplyAsync - 有返回值
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return fetchData(); // 在 ForkJoinPool.commonPool 中执行
});

// 2. runAsync - 无返回值
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
sendNotification();
});

// 3. 指定线程池(推荐,避免使用公共 ForkJoinPool)
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
return fetchData();
}, executor);

// 4. 手动创建并完成
CompletableFuture<String> cf4 = new CompletableFuture<>();
cf4.complete("手动设置结果"); // 正常完成
cf4.completeExceptionally(new RuntimeException()); // 异常完成
指定线程池

默认使用 ForkJoinPool.commonPool()(共享线程池),不同业务的异步任务应使用独立线程池,避免相互影响。

链式转换(thenApply / thenAccept / thenRun)

ChainingExample.java
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUserId()) // 异步获取用户 ID
.thenApply(id -> fetchUserName(id)) // 用 ID 查名字(有入参有返回)
.thenApply(name -> "Hello, " + name) // 转换结果
.thenApply(String::toUpperCase); // 再次转换

// thenAccept:有入参,无返回值
CompletableFuture<Void> print = result.thenAccept(System.out::println);

// thenRun:无入参,无返回值(仅在前一步完成后执行)
CompletableFuture<Void> done = result.thenRun(() -> System.out.println("完成"));
方法入参返回值用途
thenApply前一步结果转换结果
thenAccept前一步结果消费结果
thenRun执行动作

每个方法都有对应的 Async 版本(在另一个线程中执行):

// 同步版本:使用前一步的线程执行
cf.thenApply(x -> transform(x));

// 异步版本:使用默认线程池执行
cf.thenApplyAsync(x -> transform(x));

// 异步版本:使用指定线程池执行
cf.thenApplyAsync(x -> transform(x), customExecutor);

组合两个异步任务

thenCompose(flatMap,串行依赖)

ThenCompose.java
// 第二个任务依赖第一个任务的结果
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUserId())
// thenCompose:接收一个函数,返回 CompletableFuture(避免嵌套)
.thenCompose(id -> CompletableFuture.supplyAsync(() -> fetchUserDetail(id)));

thenCombine(并行合并)

ThenCombine.java
// 两个任务并行执行,结果合并
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> fetchName());
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> fetchAge());

// 两个都完成后合并结果
CompletableFuture<String> result = nameFuture.thenCombine(ageFuture,
(name, age) -> name + " is " + age + " years old");

多任务编排

allOf(全部完成)

AllOf.java
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> fetchA());
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> fetchB());
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> fetchC());

// 等待所有任务完成(注意:allOf 返回 CompletableFuture<Void>)
CompletableFuture<Void> allDone = CompletableFuture.allOf(cf1, cf2, cf3);

// 获取所有结果
allDone.thenRun(() -> {
String a = cf1.join(); // join() 不抛受检异常
String b = cf2.join();
String c = cf3.join();
System.out.println(a + ", " + b + ", " + c);
});

anyOf(任一完成)

AnyOf.java
// 任一任务完成即返回
CompletableFuture<Object> fastest = CompletableFuture.anyOf(cf1, cf2, cf3);
fastest.thenAccept(result -> System.out.println("最快的结果: " + result));

异常处理

ExceptionHandling.java
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "success";
})
// exceptionally:出错时返回默认值(类似 catch)
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "default";
});

// handle:无论成功还是失败都执行(类似 try-catch-finally)
CompletableFuture<String> result2 = CompletableFuture
.supplyAsync(() -> riskyOperation())
.handle((value, ex) -> {
if (ex != null) {
return "出错了: " + ex.getMessage();
}
return "成功: " + value;
});

// whenComplete:观察结果,不修改返回值
CompletableFuture<String> result3 = CompletableFuture
.supplyAsync(() -> riskyOperation())
.whenComplete((value, ex) -> {
if (ex != null) {
log.error("任务失败", ex);
} else {
log.info("任务成功: {}", value);
}
});
方法出错时成功时返回值
exceptionally执行,返回替代值不执行可修改
handle执行执行可修改
whenComplete执行执行不可修改

超时控制(JDK 9+)

Timeout.java
// JDK 9+
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> slowOperation())
.orTimeout(3, TimeUnit.SECONDS) // 超时抛 TimeoutException
.completeOnTimeout("默认值", 3, TimeUnit.SECONDS); // 超时返回默认值

实战示例:并行调用多个服务

ParallelServiceCalls.java
public UserProfile getUserProfile(long userId) {
// 并行调用三个微服务
CompletableFuture<UserInfo> userFuture =
CompletableFuture.supplyAsync(() -> userService.getUser(userId), executor);
CompletableFuture<List<Order>> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.getOrders(userId), executor);
CompletableFuture<Integer> scoreFuture =
CompletableFuture.supplyAsync(() -> scoreService.getScore(userId), executor);

// 等待三个结果,组装返回
return CompletableFuture.allOf(userFuture, orderFuture, scoreFuture)
.thenApply(v -> {
UserInfo user = userFuture.join();
List<Order> orders = orderFuture.join();
int score = scoreFuture.join();
return new UserProfile(user, orders, score);
})
.join(); // 阻塞获取最终结果
}

常见面试问题

Q1: CompletableFuture 和 Future 的区别?

答案

对比FutureCompletableFuture
获取结果get() 阻塞非阻塞回调 + get()/join()
任务组合不支持thenApplythenComposethenCombine
多任务编排不支持allOfanyOf
异常处理只能 try-catchexceptionallyhandle
手动完成不支持complete()completeExceptionally()
API 风格命令式函数式

Q2: thenApply 和 thenCompose 的区别?

答案

类似 Stream 的 mapflatMap

  • thenApplyFunction<T, U>,返回值直接作为新 CompletableFuture 的结果
  • thenComposeFunction<T, CompletableFuture<U>>,函数本身返回一个 CompletableFuture,避免嵌套
// thenApply → CompletableFuture<CompletableFuture<String>> 嵌套
cf.thenApply(id -> fetchAsync(id));

// thenCompose → CompletableFuture<String> 扁平化
cf.thenCompose(id -> fetchAsync(id));

Q3: get() 和 join() 的区别?

答案

对比get()join()
受检异常抛出 ExecutionException(需要 try-catch)抛出 CompletionException(非受检)
超时支持 get(timeout, unit)不支持
用途需要超时控制时使用链式调用中更方便

Q4: CompletableFuture 默认使用什么线程池?

答案

默认使用 ForkJoinPool.commonPool()。这是一个全局共享的线程池,默认线程数为 Runtime.getRuntime().availableProcessors() - 1

生产环境建议:为不同业务传入自定义线程池,避免不同任务竞争同一个线程池资源,也避免慢任务拖垮所有异步操作。

Q5: CompletableFuture 的异常处理策略?

答案

  • exceptionally:只处理异常,返回替代值
  • handle:无论成功失败都执行,可修改返回值
  • whenComplete:无论成功失败都执行,不可修改返回值(只做观察/记录)

链式调用中异常会传播,直到遇到异常处理方法。

Q6: 如何实现超时控制?

答案

JDK 9+:orTimeout()completeOnTimeout()

JDK 8 需要手动实现:

public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future, long timeout, TimeUnit unit) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
future.completeExceptionally(new TimeoutException("超时"));
}, timeout, unit);
return future;
}

相关链接