从 Future 到 CompletableFuture
使用 Future 完成多线程任务 (ThreadPoolExecutor)
快速上手
-
创建 线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory()); -
完成多线程任务要执行的逻辑
// 创建一个新的类 实现 Callable 接口中的 call() 方法 public class Task implements Callable<Long> { @Override public Long call() throws Exception { System.out.println("the Thread name is " + Thread.currentThread().getName()); return System.currentTimeMillis(); } } -
创建 Future 实例提交任务
Future<Long> future = executor.submit(new Task());⚠️ 注意 submit() 只是提交任务到线程池,并非立即执行任务,执行时机由线程池自行决定。
-
获取返回值
Long res = future.get(); // 阻塞式无限时返回 Long res = future.get(1, TimeUnit.SECONDS); // 阻塞式限时返回
代码实例:
// Task.java
public class Task implements Callable<Long> {
@Override
public Long call() throws Exception {
System.out.println("the Thread name is " + Thread.currentThread().getName());
return System.currentTimeMillis();
}
}
// main
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory());
//提交任务
Future<Long> future = executor.submit(new Task());
try {
try {
//阻塞式限时 1s 获取返回值
System.out.println("the return value is " + future.get(1, TimeUnit.SECONDS));
} catch(TimeoutException e) {
//超时取消任务
future.cancel(true);
} catch (ExecutionException e) {
System.err.println("Task failed: " + e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Main thread interrupted");
}
//关闭线程池
} finally {
executor.shutdown(); // 禁止新任务
try {
// 等待已提交任务完成
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 取消正在执行的任务
}
} catch (InterruptedException e) {
executor.shutdownNow(); // 被中断时强制关闭
// 为什么要恢复中断状态?
// 因为 catch 住异常后,中断标志被清除了
// 如果上层代码也想知道 "发生过中断",必须重新设置
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
几个注意点
-
cancel(boolean mayInterruptIfRunning)向任务发送Thread.interrupt()只有任务能够响应中断才有效- 当 bool 为 false 时不中断当前正在执行的任务,只暂停未开始执行的任务
- 当 bool 为 true 时会中断当前正在执行的任务 通用写法:
public Long call() throws Exception { while (!Thread.currentThread().isInterrupted()) { // try { //注意:这不是修改上面的 Task,而是展示一个通用写法。 // Task 瞬间执行完毕而且 return 不会循环检查,没有响应中断。 // 在该例中不会被中断,因为没有可中断阻塞点和循环检查 System.out.println("the Thread name is " + Thread.currentThread().getName()); return System.currentTimeMillis(); } catch(InterruptedException e) { Thread.currentThread().interrupt(); System.err.println("thread interrupted"); } } }
可中断阻塞点是什么? 调用 thread.interrupt() 会: 将该线程的中断状态(interrupt status)设置为 true。 如果线程正处于 sleep()、wait()、join() 等阻塞状态,会立即抛出 InterruptedException,并清除中断状态(变回 false)。
-
如何优雅关闭线程池
明晰
executor.shutdownNow()与executor.shutdown()
| 方法 | shutdown() | shutdownNow() |
|---|---|---|
| 线程池状态变化 | 不允许新任务提交,但已提交的任务继续执行 | 不允许新任务提交,且试图停止正在执行的任务 |
| 新任务提交 | 拒绝(抛出 RejectedExecutionException) | 拒绝(抛出 RejectedExecutionException) |
| 正在执行的任务 | 允许正常执行完成 | 尝试中断(调用 thread.interrupt()) |
| 等待队列中的任务 | 会继续执行(队列任务会逐个取出执行) | 不执行,直接从队列中移除并返回 |
| 中断行为 | 只中断 空闲 的线程(interruptIdleWorkers()) | 中断 所有 工作线程(interruptWorkers()) |
| 返回值 | void(无返回值) | List :返回队列中尚未开始执行的任务列表 |
| 适用场景 | 希望程序正常结束,不丢失任何已提交的任务 | 需要尽快停止,愿意牺牲队列中的任务 |
| 是否会等待任务结束 | 是(线程池会等到所有任务执行完才彻底终止) | 否(立即尝试停止) |
四步走关闭线程池:
1. 先优雅关闭 `shutdown()` 放在最外层,确保执行
2. 等待一定时间 `if(!executor.awaitTermination(5, TimeUnit.SECONDS))`
3. 时间到后强行关闭 `shutdownNow()`
4. 包裹 2-3 catch 中断异常,防止意外退出未关闭线程池
实战示例
多线程并发执行任务 思路:
-
创建数组或 list
-
将任务通过 for 循环提交到线程池中
-
查看结果
框架:
//1.创建线程池
//2.创建待执行的任务队列
List<Future<Long>> futures = new ArrayList<>();
try {
//将任务提交到线程池
for (int i = 0; i < n; i++) {
futures.add(executor.submit(new Task()));
}
for (Future<Long> future : futures) {
//查看结果
}
} finally {
//关闭线程池
}
| 操作 | 关键代码 | 备注 |
|---|---|---|
| 定义任务 | implements Callable | 有返回值,能抛异常 |
| 提交任务 | Future f = exec.submit(task) | 异步,不阻塞 |
| 获取结果 | f.get(1, TimeUnit.SECONDS) | 阻塞,建议设超时 |
| 取消任务 | f.cancel(true) | 需要任务代码配合响应中断 |
| 大忌 | 在 submit 循环中直接 get | 会变成串行 |
使用现代的 CompletableFuture 完成多线程任务
快速上手
- 创建线程池
- 创建逻辑(与 Future 不同,要实现 Supplier 接口的 get() 方法)
- 使用 CompletableFuture 提交任务
- 添加你要的操作 代码实例:
try (ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory())) {
CompletableFuture<Long> cf = CompletableFuture.supplyAsync(new CFTask(), executor);
Long long1 = cf.join();
System.out.println(long1);
} catch (Exception e) {
// close thread pool
}
- supplyAsync() 异步非阻塞提交任务
| 方法 | 阻塞? | 异常处理 |
|---|---|---|
get() |
✅ 阻塞 | 抛 checked exception(必须 try-catch) |
join() |
✅ 阻塞 | 抛 unchecked CompletionException(可不 catch) |
常见的几种方法
创建异步任务
| 方法 | 作用 | 例子 |
| supplyAsync(Supplier, Executor) | 指定线程池执行,返回有结果的异步线程 | cf = CompletableFuture.supplyAsync(() -> dbQuery(), myExecutor); |
| runAsync(Runnable, Executor) | 无返回值异步任务,指定线程池 | CompletableFuture.runAsync(() -> sendEmail()); |
- supplyAsync 异步执行要有返回值,被执行的任务要去实现 Supplier 接口
- runAsync 异步执行但无返回值,要实现 Runnable 接口
转换结果(链式处理的核心)
| 方法 | 作用 | 例子 |
|---|---|---|
| thenApply(fn) | 获取上一方法的返回值并执行逻辑,再次返回 | .thenApply(user -> user.getName()) |
| thenApplyAsync(fn) | 异步转换(新线程) | 常用 |
| thenCompose(fn) | 上一步返回另一个 CF → 扁平化(避免嵌套) | .thenCompose(id -> getOrderCF(id)) |
| thenComposeAsync(fn) | 异步扁平化 | 常用 |
thenApply vs thenCompose:
- thenApply:函数返回普通值
- thenCompose:函数返回 CompletableFuture(必须用这个才能链下去不嵌套)
消费结果(只关心结果,不返回新值)
| 方法 | 作用 | 例子 |
|---|---|---|
| thenAccept(result -> …) | 拿到结果处理(比如打印、存库) | .thenAccept(pname -> System.out.println(pname)) |
| thenAcceptAsync(…) | 异步消费 | |
| thenRun(() -> …) | 不关心上一步结果,只做后续动作 | .thenRun(() -> {System.out.println(“flushing”);} |
异常处理
| 方法 | 作用 |
|---|---|
| exceptionally(ex -> defaultValue) | 出异常时返回备用值(继续链) |
| handle((result, ex) -> …) | 无论正常还是异常都能处理,最灵活 |
| whenComplete((result, ex) -> …) | 只观察,不改变结果(常用于日志) |
| 推荐优先用 handle 或 exceptionally。 |
两个任务组合
| 方法 | 作用 |
|---|---|
| thenCombine(otherCF, (r1, r2) -> newResult) | 两个任务都完成 → 合并结果 |
| thenCombineAsync(…) | 异步合并 |
| thenAcceptBoth(otherCF, (r1, r2) -> …) | 两个完成 → 消费,不返回 |
| runAfterBoth(otherCF, () -> …) | 两个完成 → 执行动作 |
多个任务全部完成
| 方法 | 作用 |
|---|---|
| CompletableFuture.allOf(cf1, cf2, cf3…) | 等待所有任务完成,返回 void 的 CF |
| CompletableFuture.anyOf(cf1, cf2…) | 哪个最快完成就返回那个结果 |
| 等待 n 个任务方式: | |
CompletableFuture.allof(futures.toArray(CompletableFuture[]::new)).join(); 或者 |
|
CompletableFuture.allof(futures.toArray(CompletableFuture[0])).join(); |
为什么要加 join()? 为了 阻塞等待所有任务完成, 如果不加 join 他会异步执行后续代码,未等待所有线程完成。

实战示例
-
练习 supplyAsync (发起) thenApply (加工) thenCombine(合并) exceptionally(异常处理) thenAccept (最终消费) 已有
public static String queryDB(String queryName); public static String unsafeApi()目标:
- 使用
queryDB查询 username 查到结果后加入前缀 “Proccessed_”, 打印 - 使用
queryDB查询 UserStats 和 OrderCounts 完成后组合结果合并成一个字符串 “UserStats + OrderCounts” 并打印 - 调用
unsafeApi()如果发生错误,进行处理并且返回默认值 “Default_Safe_Data”
//task.1 CompletableFuture.supplyAsync(() -> queryDB("username"), executor) //{ return queryDB("username");} //此处加入 executor,后续会自动沿用这个线程,如果不使用 executor 会使用 forkjoinpool .thenApply(res -> { //自己命名接收到的值然后进行处理 return "Proccessed_" + res; }) .thenAccept(name -> { System.out.println(name); }); //task.2 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> queryDB("UserStats"), executor); CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> queryDB("OrderCounts"), executor); cf1.thenCombine(cf2, (cf1, cf2) -> { // thenCombine 会自动等待两个任务完成,不需要加 join() 或 get() 来强制等待 return cf1 + ": " + cf2; }) .thenAccept(combined -> { log("合并报表: " + combined); }); CompletableFuture.supplyAsync(() -> queryDB("UserStats"), executor).thenCombine() //task.3 CompletableFuture.supplyAsync(() -> unsafeApi(), executor) .exceptionally((error) -> { log("error: " + error.getMessage()); return "Default_Safe_Data"; }) .thenAccept(data -> System.out.println(data)); - 使用
⚠️ 注意
- thenApply 必须返回对象,不能是 void(比如只 println 不 return)。
- 异步异常必须处理,否则链会断掉
-
练习 allOf(Future… cfs) (聚合多任务), completeOnTimeout() (超时控制) 已有
public static double fetchPrice(String platform); public static List<Double> batchQuery(List<String> platforms){}目的:
- 在 batchQuery() 中并发查询所有 platform
- 查询时长超过 2s 与错误的 返回 -1
- 等待所有任务完成后,获取数据返回
List<Double>
public static List<Double> batchQuery(List<String> platforms) {
ExecutorService executor = Executors.newFixedThreadPool(platforms.size());
Double defaultValue = -1.0;
List<Double> result = new ArrayList<>();
try {
List<CompletableFuture<Double>> futures = new ArrayList<>(); //创建数组用来查询和接收 N 个平台的数据
for (String platform : platforms) {
futures.add(CompletableFuture.supplyAsync(() -> fetchPrice(platform), executor)
.exceptionally((e) -> {
System.err.println(platform + " " + e.getMessage());
return -1.0; //报错,将异常吞掉,返回 -1 作为默认值
}).completeOnTimeout(defaultValue, 2, TimeUnit.SECONDS)); //(返回值, 时间, 单位)
}
//这里编排 有一些讲究
//❌ ~~如果 completeOnTimeout 在 exceptionally 前,exceptionally 会接收到超时的报错~~
//✅ exceptionally:会吞掉真实的异常,返回正常值。
// completeOnTimeout:不会抛异常,但它吞掉的不是“异常”,而是“超时未完成”的状态,直接转为正常完成 + 默认值。
//⚠️ 着重 必须加 join 存放任务方式是 cf.toArray(CompletableFuture []::new)
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
for (CompletableFuture<Double> future : futures) {
result.add(future.join()); // 这里拿真正的结果
}
return result;
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
⚠️ 注意
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();返回值是 void 需要手动对单个任务获取completeOnTimeout:不会抛异常,但它吞掉的不是“异常”,而是“超时未完成”的状态,直接转为正常完成 + 默认值。