嗨,你好,我是猿java
在日常开发中,为了提高程序的性能,我们经常会使用异步方式来完成,在本文中,我们将学习一种常用的工具类: CompletableFuture
,并且学习如何使用它来提高 Java 应用程序的性能,让我们开始学习旅程吧!
在分析 CompletableFuture
之前,我们先来看看 Future
。
什么是 Future? Future
是 Java 5 中引入的 Java 接口,用于表示将来可用的值,使用 Future
给 Java 带来了巨大的好处,它可以帮助我们异步执行一些非常密集的计算,而不会阻塞当前线程,因此,我们可以在当前线程中继续做一些工作。
我们可以把 Future
想象成去餐厅吃晚饭,在晚餐准备期间,我们可以干些其他的事情,一旦晚餐准备好,就可以吃饭了。
Future
的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package java.util.concurrent;public interface Future <V> { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
什么是 CompletableFuture? CompletableFuture
是 Java 8 引入的一个类,用于处理异步编程。它是 Future 接口的一个增强版,提供了一些有用的方法来管理异步任务。其源码如下:
1 2 3 4 5 public class CompletableFuture <T> implements Future <T>, CompletionStage<T> { volatile Object result; volatile Completion stack; }
CompletableFuture 和 Future 在本节中,我们将了解 Future
接口以及它的一些局限性,并且分析如何使用 CompletableFuture
类来解决这些问题。
定义超时 Future
接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞。
为了更好地理解,让我们看一些示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import java.util.concurrent.*;public class FutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> future = executor.submit(() -> threadSleep()); System.out.println("The result is: " + future.get()); } private static String threadSleep () throws InterruptedException { TimeUnit.DAYS.sleep(365 * 10 ); return "finishSleep" ; } }
上述示例中,我们创建了一个 ExecutorService 实例,并且使用它来提交一个线程睡眠的任务(threadSleep()),然后通过调用 future.get() 方法在控制台上打印结果值,因为 threadSleep() 方法中,我们让线程睡眠了 10年,所以控制台要等待 10年才会打印值,而且 Future
也没有任何方法可以手动完成任务。
那么,CompletableFuture
是如何克服这个问题的呢?
我们还是使用相同的场景,并且在流程中调用了 CompletableFuture.complete() 方法,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import java.util.concurrent.*;public class CompletableFutureDemo { public static void main (String[] args) { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep()); completableFuture.complete("Completed" ); System.out.println("result: " + completableFuture.get()); System.out.println("completableFuture done ? " + completableFuture.isDone()); } private static String threadSleep () { try { TimeUnit.DAYS.sleep(365 * 10 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return "finishSleep" ; } }
在上述示例中:
首先,通过调用 CompletableFuture.supplyAsync() 方法创建一个 String 类型的 CompletableFuture;
然后,调用 completableFuture.complete()方法;
接着,调用 isDone() 方法,并打印其结果值;
最后,main() 方法的输出如下:1 2 result: Completed completableFuture done ? true
通过运行结果我们可以看到:需要睡眠 10年的 threadSleep() 居然完成了,为什么呢?在整个代码中嫌疑最大的是 completableFuture.complete(),因此我们来看看这个方法到底做了什么?其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public boolean complete (T value) { boolean triggered = completeValue(value); postComplete(); return triggered; }
通过源码我们可以知道:complete(T value) 方法是用于手动完成一个 CompletableFuture 任务(即使任务尚未执行或未完成)并且返回 value。
因此,CompletableFuture
是通过 complete(T value)方法手动结束 任务,从而客服了 Futrue
无法手动结束任务的限制。
组合异步操作 假设我们需要调用两个 Method:firstMethod() 和 secondMethod(),并且将 firstMethod() 的结果作为 secondMethod() 的输入。
通过使用 Future
接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import java.util.concurrent.*;public class FutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Integer> firstFuture = executor.submit(() -> firstMethod(1 )); int firstMethodResult = firstFuture.get(); System.out.println("firstMethodResult:" + firstMethodResult); Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult)); System.out.println("secondMethodResult:" + secondFuture.get()); executor.shutdown(); } private static int firstMethod (int num) { return num; } private static int secondMethod (int firstMethodResult) { return 2 + firstMethodResult; } }
在上述示例代码中:
首先,通过 ExecutorService 提交一个返回 Future
的任务来调用 firstMethod;
然后,需要将 firstMethod 的结果值传递给第 secondMethod,但检索 firstMethod 结果值的唯一方法是使用 Future.get(),该方法会阻塞主线程;
接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程;
最后,main() 的输出如下:1 2 firstMethodResult:1 secondMethodResult:3
通过运行结果可以看出,结果符合预期而且整个过程是串行执行的。
那么,CompletableFuture
是如何在不阻塞主线程的前提下,异步组合两个过程的呢?具体操作如下示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import java.util.concurrent.*;public class CompletableFutureDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1 )) .thenApply(firstMethodResult -> secondMethod(firstMethodResult)); System.out.println("finalResult:" + finalResult.get()); } private static int firstMethod (int num) { return num; } private static int secondMethod (int firstMethodResult) { return 2 + firstMethodResult; } }
在上述示例代码中:
首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的 CompletableFuture
,该 CompletableFuture
是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier;
接着,获取 firstMethod() 的结果并使用 thenApply() 方法,将其传递给另一个调用 secondMethod();
最后,main() 的输出如下:
通过运行结果可以看出,结果符合预期而且 CompletableFuture
成功的把 firstMethod() 和 secondMethod() 两个异步过程整合。
CompletableFuture如何提升性能? 在本节中,我们将通过设定的场景来验证 CompletableFuture
是如何提升性能的。
场景设定
定义一个 Transaction 类,并且包含 id 属性;
定义一个 TransactionExecutor类,包含 transactionId 属性和静态方法 doTransaction(Transaction transaction),方法接收 Transaction对象;
在 doTransaction() 方法中,通过线程睡眠 1秒来模拟业务耗时;
场景涉及的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class TransactionExecutor { private final String transactionId; public TransactionExecutor (String transactionId) { this .transactionId = transactionId; } public static TransactionExecutor doTransaction (Transaction transaction) { Thread.sleep(1000L ); return new TransactionExecutor ("transactionId: " + transaction.getId()); } @Override public String toString () { return "TransactionExecutor{" + "transactionId='" + transactionId + '\'' + '}' ; } } public class Transaction { private String id; public Transaction (String id) { this .id = id; } }
接着,我们将通过以下 3种方式来执行给定的场景:
同步实现
并行流实现
CompletableFuture 实现
同步实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class Demo { public static void main (String[] args) { long start = System.currentTimeMillis(); var executor = Stream.of( new Transaction ("1" ), new Transaction ("2" ), new Transaction ("3" )) .map(TransactionExecutor::doTransaction) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.printf("The operation take %s ms%n" , end - start); System.out.println("TransactionExecutor are: " + executor); } }
运行代码结果如下:
1 2 3 4 The operation take 3087 ms TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1' }, TransactionExecutor{transactionId='transactionId: 2' }, TransactionExecutor{transactionId='transactionId: 3' }]
从运行结果可以看出:该程序花费了 3 秒多,因为每个事务都是串行执行,并且每个事务消耗 1秒,和预期时间比较吻合。
并行流实现 我们使用 Lambda 的 parallel并行流来实现,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Demo { public static void main (String[] args) { long start = System.currentTimeMillis(); var executor = Stream.of( new Transaction ("1" ), new Transaction ("2" ), new Transaction ("3" )) .parallel() .map(TransactionExecutor::doTransaction) .collect(Collectors.toList()); long end = System.currentTimeMillis(); System.out.printf("The operation took %s ms%n" , end - start); System.out.println("TransactionExecutor are: " + executor); } }
通过运行结果,我们发现它合同步方法的差异是巨大的!应用程序运行速度几乎快了三倍,如下所示:
1 2 3 4 The operation took 1007 ms TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1' }, TransactionExecutor{transactionId='transactionId: 2' }, TransactionExecutor{transactionId='transactionId: 3' }]
CompletableFuture实现 现在将重构我们的客户端应用程序以利用 CompletableFuture:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Demo { public static void main (String[] args) { Executor executor = Executors.newFixedThreadPool(10 ); long start = System.currentTimeMillis(); var future = Stream.of( new Transaction ("1" ), new Transaction ("2" ), new Transaction ("3" ), new Transaction ("4" ), new Transaction ("5" ), new Transaction ("6" ), new Transaction ("7" ), new Transaction ("8" ), new Transaction ("9" ), new Transaction ("10" ) ).map(transaction -> CompletableFuture.supplyAsync( () -> TransactionExecutor::doTransaction(transaction), executor) ).collect(toList()); var categories = future.stream() .map(CompletableFuture::join) .collect(toList()); long end = System.currentTimeMillis(); System.out.printf("The operation took %s ms%n" , end - start); System.out.println("TransactionExecutor are: " + categories); } }
使用 CompletableFuture
,对于 10 个线程,执行的时间也在 1 秒左右,性能简直杆杆的,运行结果如下:
1 2 3 4 5 6 7 8 9 10 11 The operation took 1040 ms TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1' }, TransactionExecutor{transactionId='transactionId: 2' }, TransactionExecutor{transactionId='transactionId: 3' }, TransactionExecutor{transactionId='transactionId: 4' }, TransactionExecutor{transactionId='transactionId: 5' }, TransactionExecutor{transactionId='transactionId: 6' }, TransactionExecutor{transactionId='transactionId: 7' }, TransactionExecutor{transactionId='transactionId: 8' }, TransactionExecutor{transactionId='transactionId: 9' }, TransactionExecutor{transactionId='transactionId: 10' }]
CompletableFuture工作流程 CompletableFuture
的执行流程可以分为以下 3个阶段:
创建阶段: 创建 CompletableFuture 实例,并提交异步任务。
执行阶段: 异步任务在线程池中执行,并在完成后触发相关的回调函数。
完成阶段: 异步任务完成,CompletableFuture 的状态更新,并执行后续操作。
创建阶段 CompletableFuture
的创建可以通过以下几种方法:
CompletableFuture.supplyAsync(Supplier<U> supplier): 提交一个返回结果的异步任务;
CompletableFuture.runAsync(Runnable runnable): 提交一个不返回结果的异步任务;
1 2 3 4 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Result" ; });
执行阶段 异步任务提交后,会在线程池中执行。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为它的线程池。如果需要自定义线程池,可以使用以下方法:
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor);
CompletableFuture.runAsync(Runnable runnable, Executor executor);
如下代码:
1 2 3 4 5 Executor executor = Executors.newFixedThreadPool(3 );CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Result" ; }, executor);
完成阶段 任务完成后,CompletableFuture 的状态将从未完成(未完成或未异常)更新为已完成(正常完成或异常完成)。你可以定义一系列操作来处理任务的结果或异常。
正常完成 任务正常完成后,可以使用以下方法进行链式调用:
thenApply(Function<T, U> fn): 对任务结果进行转换;
thenAccept(Consumer action): 对任务结果进行消费;
thenRun(Runnable action): 执行一个额外的任务,不依赖于任务结果;
如下代码:
1 2 3 4 5 future.thenApply(result -> { return "Processed " + result; }).thenAccept(processedResult -> { System.out.println(processedResult); });
异常完成 如果任务执行过程中抛出了异常,可以使用以下方法处理异常:
exceptionally(Function<Throwable, T> fn): 在任务抛出异常时提供一个默认值;
handle(BiFunction<T, Throwable, U> fn): 任务完成后处理结果或异常;
如下代码:
1 2 3 4 future.exceptionally(ex -> { System.out.println("Exception: " + ex.getMessage()); return "Default Result" ; });
合并多个CompletableFuture 可以合并多个CompletableFuture
,等待它们全部完成或其中任意一个完成:
allOf(CompletableFuture<?>… cfs): 等待所有 CompletableFuture 完成;
anyOf(CompletableFuture<?>… cfs): 等待任意一个 CompletableFuture 完成。
如下代码:
1 2 3 4 5 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1" ); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2" ); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2); combinedFuture.thenRun(() -> System.out.println("All tasks completed." ));
CompletableFuture
的执行流程分为创建、执行和完成三个阶段。通过创建异步任务、在线程池中执行任务,并在任务完成后进行结果处理,可以实现复杂的异步编程逻辑。
适用场景 CompletableFuture
适合在处理异步编程和并发任务时使用。以下是一些适合使用 CompletableFuture 的典型场景:
异步 I/O 操作 在进行 I/O 操作时,例如文件读取、数据库访问或网络请求,使用 CompletableFuture 可以使这些操作异步执行,从而避免阻塞主线程,提高应用程序的响应性。
并行处理任务 当需要并行处理多个独立任务时,使用 CompletableFuture 可以有效利用多核 CPU,提高计算效率。
流水线处理 当有一系列依赖的操作需要按顺序执行时,CompletableFuture 可以使这些操作异步执行,形成处理流水线,从而提高处理效率。
事件驱动的异步处理 在事件驱动的系统中,例如 GUI 应用或服务器请求处理,CompletableFuture 可以在事件发生时异步处理任务。
异常处理和回退机制 CompletableFuture 提供了灵活的异常处理机制,可以在异步任务发生异常时执行回退操作或提供默认值。
并发任务的组合 CompletableFuture 可以组合多个并发任务的结果,例如使用 allOf 和 anyOf 方法。
总结 在本文中,我们学习了如何在 Java 中使用 Future
接口以及它的局限性,同时,我们还学习了如何使用 CompletableFuture
类来克服 Future
的这些限制。
接着,我们通过一个业务场景演示,通过同步执行,并发执行,CompletableFuture
执行来比较它们的执行效率。
最后,CompletableFuture
适合在异步编程、并发任务、非阻塞 I/O、事件驱动处理、异常处理、任务组合等场景中使用。它提供了丰富的 API 来处理异步操作,使代码更加简洁、可读和高效。通过使用 CompletableFuture,开发者可以更好地利用系统资源,提高应用程序的性能和响应性。
学习交流 如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。