从 Future 到 CompletableFuture

#Concurrency #JAVA #Future #ThreadPool #CompletableFuture

使用 Future 完成多线程任务 (ThreadPoolExecutor)

快速上手

  1. 创建 线程池

    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory());
    
  2. 完成多线程任务要执行的逻辑

    // 创建一个新的类 实现 Callable 接口中的 call() 方法
    public class Task implements Callable<Long> {
    	@Override
    	public Long call() throws Exception {
    		System.out.println("the Thread name is " + Thread.currentThread().getName());
    		return System.currentTimeMillis();
    		}
    }
    
  3. 创建 Future 实例提交任务

    Future<Long> future = executor.submit(new Task());
    

    ⚠️ 注意 submit() 只是提交任务到线程池,并非立即执行任务,执行时机由线程池自行决定。

  4. 获取返回值

    Long res = future.get(); // 阻塞式无限时返回
    Long res = future.get(1, TimeUnit.SECONDS); // 阻塞式限时返回
    

代码实例:

// Task.java
	public class Task implements Callable<Long> {
		@Override
		public Long call() throws Exception {
			System.out.println("the Thread name is " + Thread.currentThread().getName());
			return System.currentTimeMillis();
			}
	}

// main
public static void main(String[] args) {
	BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
	ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, queue,                 Executors.defaultThreadFactory());
	//提交任务
	Future<Long> future = executor.submit(new Task());
	try {

		try {
			//阻塞式限时 1s 获取返回值
			System.out.println("the return value is " + future.get(1, TimeUnit.SECONDS));	
		} catch(TimeoutException e) {
		//超时取消任务
			future.cancel(true);
		} catch (ExecutionException e) {
			System.err.println("Task failed: " + e.getCause());
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			System.err.println("Main thread interrupted");
		}

	//关闭线程池	
	} finally {
		executor.shutdown(); // 禁止新任务
		try {
			// 等待已提交任务完成
			if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
			executor.shutdownNow(); // 取消正在执行的任务
			}
		} catch (InterruptedException e) {
			executor.shutdownNow(); // 被中断时强制关闭
			
			// 为什么要恢复中断状态?
		    // 因为 catch 住异常后,中断标志被清除了
		    // 如果上层代码也想知道 "发生过中断",必须重新设置
			Thread.currentThread().interrupt(); // 恢复中断状态
		}
	}
	
	
	}

几个注意点

  1. cancel(boolean mayInterruptIfRunning) 向任务发送 Thread.interrupt() 只有任务能够响应中断才有效

    • 当 bool 为 false 时不中断当前正在执行的任务,只暂停未开始执行的任务
    • 当 bool 为 true 时会中断当前正在执行的任务 通用写法:
    public Long call() throws Exception {
    	while (!Thread.currentThread().isInterrupted()) { //
    		try {
    		//注意:这不是修改上面的 Task,而是展示一个通用写法。
    		//  Task 瞬间执行完毕而且 return 不会循环检查,没有响应中断。
    		// 在该例中不会被中断,因为没有可中断阻塞点和循环检查
    			System.out.println("the Thread name is " + Thread.currentThread().getName());
    			return System.currentTimeMillis();
    
    		} catch(InterruptedException e) {
    			Thread.currentThread().interrupt();
    			System.err.println("thread interrupted");
    		}
    	}
    }
    

可中断阻塞点是什么? 调用 thread.interrupt() 会: 将该线程的中断状态(interrupt status)设置为 true。 如果线程正处于 sleep()、wait()、join() 等阻塞状态,会立即抛出 InterruptedException,并清除中断状态(变回 false)。

  1. 如何优雅关闭线程池

    明晰 executor.shutdownNow()executor.shutdown()

方法 shutdown() shutdownNow()
线程池状态变化 不允许新任务提交,但已提交的任务继续执行 不允许新任务提交,且试图停止正在执行的任务
新任务提交 拒绝(抛出 RejectedExecutionException) 拒绝(抛出 RejectedExecutionException)
正在执行的任务 允许正常执行完成 尝试中断(调用 thread.interrupt())
等待队列中的任务 会继续执行(队列任务会逐个取出执行) 不执行,直接从队列中移除并返回
中断行为 只中断 空闲 的线程(interruptIdleWorkers()) 中断 所有 工作线程(interruptWorkers())
返回值 void(无返回值) List :返回队列中尚未开始执行的任务列表
适用场景 希望程序正常结束,不丢失任何已提交的任务 需要尽快停止,愿意牺牲队列中的任务
是否会等待任务结束 是(线程池会等到所有任务执行完才彻底终止) 否(立即尝试停止)
四步走关闭线程池:
1. 先优雅关闭 `shutdown()` 放在最外层,确保执行
2. 等待一定时间 `if(!executor.awaitTermination(5, TimeUnit.SECONDS))` 
3. 时间到后强行关闭 `shutdownNow()`
4. 包裹 2-3 catch 中断异常,防止意外退出未关闭线程池

实战示例

多线程并发执行任务 思路:

  1. 创建数组或 list

  2. 将任务通过 for 循环提交到线程池中

  3. 查看结果

框架:

//1.创建线程池
//2.创建待执行的任务队列 
List<Future<Long>> futures = new ArrayList<>();

try {
	//将任务提交到线程池
	for (int i = 0; i < n; i++) {
		futures.add(executor.submit(new Task()));	
	}
	for (Future<Long> future : futures) {
		//查看结果
	}
} finally {
	//关闭线程池
}
操作 关键代码 备注
定义任务 implements Callable 有返回值,能抛异常
提交任务 Future f = exec.submit(task) 异步,不阻塞
获取结果 f.get(1, TimeUnit.SECONDS) 阻塞,建议设超时
取消任务 f.cancel(true) 需要任务代码配合响应中断
大忌 在 submit 循环中直接 get 会变成串行

使用现代的 CompletableFuture 完成多线程任务

快速上手

  1. 创建线程池
  2. 创建逻辑(与 Future 不同,要实现 Supplier 接口的 get() 方法)
  3. 使用 CompletableFuture 提交任务
  4. 添加你要的操作 代码实例:
try (ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory())) {
	CompletableFuture<Long> cf = CompletableFuture.supplyAsync(new CFTask(), executor); 
	Long long1 = cf.join();
	System.out.println(long1);
} catch (Exception e) {
	// close thread pool
}
  • supplyAsync() 异步非阻塞提交任务
方法 阻塞? 异常处理
get() ✅ 阻塞 抛 checked exception(必须 try-catch)
join() ✅ 阻塞 抛 unchecked CompletionException(可不 catch)

常见的几种方法

创建异步任务

方法 作用 例子
supplyAsync(Supplier, Executor) 指定线程池执行,返回有结果的异步线程 cf = CompletableFuture.supplyAsync(() -> dbQuery(), myExecutor);
runAsync(Runnable, Executor) 无返回值异步任务,指定线程池 CompletableFuture.runAsync(() -> sendEmail());
  1. supplyAsync 异步执行要有返回值,被执行的任务要去实现 Supplier 接口
  2. runAsync 异步执行但无返回值,要实现 Runnable 接口

转换结果(链式处理的核心)

方法 作用 例子
thenApply(fn) 获取上一方法的返回值并执行逻辑,再次返回 .thenApply(user -> user.getName())
thenApplyAsync(fn) 异步转换(新线程) 常用
thenCompose(fn) 上一步返回另一个 CF → 扁平化(避免嵌套) .thenCompose(id -> getOrderCF(id))
thenComposeAsync(fn) 异步扁平化 常用

thenApply vs thenCompose

  • thenApply:函数返回普通值
  • thenCompose:函数返回 CompletableFuture(必须用这个才能链下去不嵌套)

消费结果(只关心结果,不返回新值)

方法 作用 例子
thenAccept(result -> …) 拿到结果处理(比如打印、存库) .thenAccept(pname -> System.out.println(pname))
thenAcceptAsync(…) 异步消费
thenRun(() -> …) 不关心上一步结果,只做后续动作 .thenRun(() -> {System.out.println(“flushing”);}

异常处理

方法 作用
exceptionally(ex -> defaultValue) 出异常时返回备用值(继续链)
handle((result, ex) -> …) 无论正常还是异常都能处理,最灵活
whenComplete((result, ex) -> …) 只观察,不改变结果(常用于日志)
推荐优先用 handle 或 exceptionally。

两个任务组合

方法 作用
thenCombine(otherCF, (r1, r2) -> newResult) 两个任务都完成 → 合并结果
thenCombineAsync(…) 异步合并
thenAcceptBoth(otherCF, (r1, r2) -> …) 两个完成 → 消费,不返回
runAfterBoth(otherCF, () -> …) 两个完成 → 执行动作

多个任务全部完成

方法 作用
CompletableFuture.allOf(cf1, cf2, cf3…) 等待所有任务完成,返回 void 的 CF
CompletableFuture.anyOf(cf1, cf2…) 哪个最快完成就返回那个结果
等待 n 个任务方式:
CompletableFuture.allof(futures.toArray(CompletableFuture[]::new)).join(); 或者
CompletableFuture.allof(futures.toArray(CompletableFuture[0])).join();

为什么要加 join()? 为了 阻塞等待所有任务完成, 如果不加 join 他会异步执行后续代码,未等待所有线程完成。

whyNeedJoin.png


实战示例

  1. 练习 supplyAsync (发起) thenApply (加工) thenCombine(合并) exceptionally(异常处理) thenAccept (最终消费) 已有

        public static String queryDB(String queryName);
        public static String unsafeApi() 
    

    目标:

    • 使用 queryDB 查询 username 查到结果后加入前缀 “Proccessed_”, 打印
    • 使用 queryDB 查询 UserStats 和 OrderCounts 完成后组合结果合并成一个字符串 “UserStats + OrderCounts” 并打印
    • 调用 unsafeApi() 如果发生错误,进行处理并且返回默认值 “Default_Safe_Data”
    //task.1
    CompletableFuture.supplyAsync(() -> 
    					queryDB("username"), executor) //{ return queryDB("username");} 
    					//此处加入 executor,后续会自动沿用这个线程,如果不使用 executor 会使用 forkjoinpool
    				.thenApply(res -> { 
    				//自己命名接收到的值然后进行处理
    					return "Proccessed_" + res;
    				})
    				.thenAccept(name -> {
    					System.out.println(name);
    				});
    
    
    //task.2
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> queryDB("UserStats"), executor);
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> queryDB("OrderCounts"), executor);
    cf1.thenCombine(cf2, (cf1, cf2) -> { // thenCombine 会自动等待两个任务完成,不需要加 join() 或 get() 来强制等待
    	return cf1 + ": " + cf2; 
    })
    .thenAccept(combined -> {
    log("合并报表: " + combined); 
    });
    CompletableFuture.supplyAsync(() -> queryDB("UserStats"), executor).thenCombine()
    
    //task.3
    CompletableFuture.supplyAsync(() -> unsafeApi(), executor)
    		.exceptionally((error) -> {
                log("error: " + error.getMessage());
                return "Default_Safe_Data";
            })
            .thenAccept(data -> System.out.println(data));
    

⚠️ 注意

  • thenApply 必须返回对象,不能是 void(比如只 println 不 return)。
  • 异步异常必须处理,否则链会断掉

  1. 练习 allOf(Future… cfs) (聚合多任务), completeOnTimeout() (超时控制) 已有

    public static double fetchPrice(String platform);
    public static List<Double> batchQuery(List<String> platforms){}
    

    目的:

    • 在 batchQuery() 中并发查询所有 platform
    • 查询时长超过 2s 与错误的 返回 -1
    • 等待所有任务完成后,获取数据返回 List<Double>
public static List<Double> batchQuery(List<String> platforms) {

        ExecutorService executor = Executors.newFixedThreadPool(platforms.size());
        Double defaultValue = -1.0;
        List<Double> result = new ArrayList<>();
        try {
            List<CompletableFuture<Double>> futures = new ArrayList<>(); //创建数组用来查询和接收 N 个平台的数据

            for (String platform : platforms) { 
                futures.add(CompletableFuture.supplyAsync(() -> fetchPrice(platform), executor)
                        .exceptionally((e) -> {
                            System.err.println(platform + "  " + e.getMessage());
                            return -1.0; //报错,将异常吞掉,返回 -1 作为默认值
                        }).completeOnTimeout(defaultValue, 2, TimeUnit.SECONDS)); //(返回值, 时间, 单位)
            }
            //这里编排 有一些讲究
            //❌ ~~如果 completeOnTimeout 在 exceptionally 前,exceptionally 会接收到超时的报错~~ 
			//✅ exceptionally:会吞掉真实的异常,返回正常值。 
			// completeOnTimeout:不会抛异常,但它吞掉的不是“异常”,而是“超时未完成”的状态,直接转为正常完成 + 默认值。
			
			
			//⚠️ 着重 必须加 join 存放任务方式是 cf.toArray(CompletableFuture []::new)
            CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
            for (CompletableFuture<Double> future : futures) {
                result.add(future.join()); // 这里拿真正的结果
            }


            return result;

        } finally {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }

        }
    }

⚠️ 注意 CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); 返回值是 void 需要手动对单个任务获取 completeOnTimeout:不会抛异常,但它吞掉的不是“异常”,而是“超时未完成”的状态,直接转为正常完成 + 默认值。


Reference

ExecutorService (Java SE 17 & JDK 17)

Guide To CompletableFuture | Baeldung