CompletableFuture是如何提升性能的?

嗨,你好,我是猿java

在日常开发中,为了提高程序的性能,我们经常会使用异步方式来完成,在本文中,我们将学习一种常用的工具类: CompletableFuture,并且学习如何使用它来提高 Java 应用程序的性能,让我们开始学习旅程吧!

在分析 CompletableFuture 之前,我们先来看看 Future

什么是 Future?

Future 是 Java 5 中引入的 Java 接口,用于表示将来可用的值,使用 Future 给 Java 带来了巨大的好处,它可以帮助我们异步执行一些非常密集的计算,而不会阻塞当前线程,因此,我们可以在当前线程中继续做一些工作。

我们可以把 Future 想象成去餐厅吃晚饭,在晚餐准备期间,我们可以干些其他的事情,一旦晚餐准备好,就可以吃饭了。

Future 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package java.util.concurrent;

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

什么是 CompletableFuture?

CompletableFuture 是 Java 8 引入的一个类,用于处理异步编程。它是 Future 接口的一个增强版,提供了一些有用的方法来管理异步任务。其源码如下:

1
2
3
4
5
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack; // Top of Treiber stack of dependent actions
// 方法太多,此处省略
}

CompletableFuture 和 Future

在本节中,我们将了解 Future 接口以及它的一些局限性,并且分析如何使用 CompletableFuture 类来解决这些问题。

定义超时

Future 接口只提供了 get() 方法来获取计算结果,但如果计算时间过长,我们的线程就会一直堵塞。

为了更好地理解,让我们看一些示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.concurrent.*;

public class FutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> threadSleep());
System.out.println("The result is: " + future.get());
}

private static String threadSleep() throws InterruptedException {
TimeUnit.DAYS.sleep(365 * 10);
return "finishSleep";
}
}

上述示例中,我们创建了一个 ExecutorService 实例,并且使用它来提交一个线程睡眠的任务(threadSleep()),然后通过调用 future.get() 方法在控制台上打印结果值,因为 threadSleep() 方法中,我们让线程睡眠了 10年,所以控制台要等待 10年才会打印值,而且 Future 也没有任何方法可以手动完成任务。

那么,CompletableFuture 是如何克服这个问题的呢?

我们还是使用相同的场景,并且在流程中调用了 CompletableFuture.complete() 方法,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.*;

public class CompletableFutureDemo {

public static void main(String[] args) {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> threadSleep());
completableFuture.complete("Completed");
System.out.println("result: " + completableFuture.get());
System.out.println("completableFuture done ? " + completableFuture.isDone());
}

private static String threadSleep(){
try {
TimeUnit.DAYS.sleep(365 * 10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "finishSleep";
}
}

在上述示例中:

  • 首先,通过调用 CompletableFuture.supplyAsync() 方法创建一个 String 类型的 CompletableFuture;
  • 然后,调用 completableFuture.complete()方法;
  • 接着,调用 isDone() 方法,并打印其结果值;
  • 最后,main() 方法的输出如下:
    1
    2
    result: Completed
    completableFuture done ? true

通过运行结果我们可以看到:需要睡眠 10年的 threadSleep() 居然完成了,为什么呢?在整个代码中嫌疑最大的是 completableFuture.complete(),因此我们来看看这个方法到底做了什么?其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* If not already completed, sets the value returned by {@link
* #get()} and related methods to the given value.
*
* @param value the result value
* @return {@code true} if this invocation caused this CompletableFuture
* to transition to a completed state, else {@code false}
*/
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}

通过源码我们可以知道:complete(T value) 方法是用于手动完成一个 CompletableFuture 任务(即使任务尚未执行或未完成)并且返回 value。

因此,CompletableFuture 是通过 complete(T value)方法手动结束 任务,从而客服了 Futrue 无法手动结束任务的限制。

组合异步操作

假设我们需要调用两个 Method:firstMethod() 和 secondMethod(),并且将 firstMethod() 的结果作为 secondMethod() 的输入。

通过使用 Future 接口,我们无法异步组合这两个操作,只能同步完成,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.*;

public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> firstFuture = executor.submit(() -> firstMethod(1));

int firstMethodResult = firstFuture.get(); // 获取 firstMethod的结果值
System.out.println("firstMethodResult:" + firstMethodResult);
Future<Integer> secondFuture = executor.submit(() -> secondMethod(firstMethodResult));
System.out.println("secondMethodResult:" + secondFuture.get());
executor.shutdown();
}

private static int firstMethod(int num) {
return num;
}

private static int secondMethod(int firstMethodResult) {
return 2 + firstMethodResult;
}
}

在上述示例代码中:

  • 首先,通过 ExecutorService 提交一个返回 Future 的任务来调用 firstMethod;
  • 然后,需要将 firstMethod 的结果值传递给第 secondMethod,但检索 firstMethod 结果值的唯一方法是使用 Future.get(),该方法会阻塞主线程;
  • 接着,我们必须等到 firstMethod 返回结果,然后再执行 secondMethod 操作,整个流程就变成同步过程;
  • 最后,main() 的输出如下:
    1
    2
    firstMethodResult:1
    secondMethodResult:3
    通过运行结果可以看出,结果符合预期而且整个过程是串行执行的。

那么,CompletableFuture 是如何在不阻塞主线程的前提下,异步组合两个过程的呢?具体操作如下示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.concurrent.*;

public class CompletableFutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {
var finalResult = CompletableFuture.supplyAsync(() -> firstMethod(1))
.thenApply(firstMethodResult -> secondMethod(firstMethodResult));
System.out.println("finalResult:" + finalResult.get());
}
private static int firstMethod(int num) {
return num;
}

private static int secondMethod(int firstMethodResult) {
return 2 + firstMethodResult;
}
}

在上述示例代码中:

  • 首先,通过 CompletableFuture.supplyAsync 方法,返回一个新的 CompletableFuture,该 CompletableFuture 是在 ForkJoinPool.commonPool() 中异步完成的,并且将结果值赋值给 Supplier;
  • 接着,获取 firstMethod() 的结果并使用 thenApply() 方法,将其传递给另一个调用 secondMethod();
  • 最后,main() 的输出如下:
    1
    finalResult:3

通过运行结果可以看出,结果符合预期而且 CompletableFuture 成功的把 firstMethod() 和 secondMethod() 两个异步过程整合。

CompletableFuture如何提升性能?

在本节中,我们将通过设定的场景来验证 CompletableFuture 是如何提升性能的。

场景设定

  • 定义一个 Transaction 类,并且包含 id 属性;
  • 定义一个 TransactionExecutor类,包含 transactionId 属性和静态方法 doTransaction(Transaction transaction),方法接收 Transaction对象;
  • 在 doTransaction() 方法中,通过线程睡眠 1秒来模拟业务耗时;

场景涉及的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TransactionExecutor {
private final String transactionId;

public TransactionExecutor(String transactionId) {
this.transactionId = transactionId;
}

public static TransactionExecutor doTransaction(Transaction transaction) {
Thread.sleep(1000L); // 通过线程睡眠来模拟真实的业务耗时
return new TransactionExecutor("transactionId: " + transaction.getId());
}

@Override
public String toString() {
return "TransactionExecutor{" +
"transactionId='" + transactionId + '\'' +
'}';
}
}

public class Transaction {
private String id;

public Transaction(String id) {
this.id = id;
}
// getter setter方法
}

接着,我们将通过以下 3种方式来执行给定的场景:

  • 同步实现
  • 并行流实现
  • CompletableFuture 实现

同步实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Demo {

public static void main(String[] args) {
long start = System.currentTimeMillis();
var executor = Stream.of(
new Transaction("1"),
new Transaction("2"),
new Transaction("3"))
.map(TransactionExecutor::doTransaction)
.collect(Collectors.toList());
long end = System.currentTimeMillis();

System.out.printf("The operation take %s ms%n", end - start);
System.out.println("TransactionExecutor are: " + executor);
}
}

运行代码结果如下:

1
2
3
4
The operation take 3087 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]

从运行结果可以看出:该程序花费了 3 秒多,因为每个事务都是串行执行,并且每个事务消耗 1秒,和预期时间比较吻合。

并行流实现

我们使用 Lambda 的 parallel并行流来实现,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Demo {

public static void main(String[] args) {
long start = System.currentTimeMillis();
var executor = Stream.of(
new Transaction("1"),
new Transaction("2"),
new Transaction("3"))
.parallel()
.map(TransactionExecutor::doTransaction)
.collect(Collectors.toList());
long end = System.currentTimeMillis();

System.out.printf("The operation took %s ms%n", end - start);
System.out.println("TransactionExecutor are: " + executor);
}
}

通过运行结果,我们发现它合同步方法的差异是巨大的!应用程序运行速度几乎快了三倍,如下所示:

1
2
3
4
The operation took 1007 ms
TransactionExecutor are: [TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'}]

CompletableFuture实现

现在将重构我们的客户端应用程序以利用 CompletableFuture:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Demo {

public static void main(String[] args) {
Executor executor = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
var future = Stream.of(
new Transaction("1"),
new Transaction("2"),
new Transaction("3"),
new Transaction("4"),
new Transaction("5"),
new Transaction("6"),
new Transaction("7"),
new Transaction("8"),
new Transaction("9"),
new Transaction("10")
).map(transaction -> CompletableFuture.supplyAsync(
() -> TransactionExecutor::doTransaction(transaction), executor)
).collect(toList());

var categories = future.stream()
.map(CompletableFuture::join)
.collect(toList());
long end = System.currentTimeMillis();

System.out.printf("The operation took %s ms%n", end - start);
System.out.println("TransactionExecutor are: " + categories);
}
}

使用 CompletableFuture,对于 10 个线程,执行的时间也在 1 秒左右,性能简直杆杆的,运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
The operation took 1040 ms
TransactionExecutor are:[TransactionExecutor{transactionId='transactionId: 1'},
TransactionExecutor{transactionId='transactionId: 2'},
TransactionExecutor{transactionId='transactionId: 3'},
TransactionExecutor{transactionId='transactionId: 4'},
TransactionExecutor{transactionId='transactionId: 5'},
TransactionExecutor{transactionId='transactionId: 6'},
TransactionExecutor{transactionId='transactionId: 7'},
TransactionExecutor{transactionId='transactionId: 8'},
TransactionExecutor{transactionId='transactionId: 9'},
TransactionExecutor{transactionId='transactionId: 10'}]

CompletableFuture工作流程

CompletableFuture 的执行流程可以分为以下 3个阶段:

  1. 创建阶段: 创建 CompletableFuture 实例,并提交异步任务。
  2. 执行阶段: 异步任务在线程池中执行,并在完成后触发相关的回调函数。
  3. 完成阶段: 异步任务完成,CompletableFuture 的状态更新,并执行后续操作。

创建阶段

CompletableFuture 的创建可以通过以下几种方法:

  • CompletableFuture.supplyAsync(Supplier<U> supplier): 提交一个返回结果的异步任务;
  • CompletableFuture.runAsync(Runnable runnable): 提交一个不返回结果的异步任务;
1
2
3
4
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "Result";
});

执行阶段

异步任务提交后,会在线程池中执行。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为它的线程池。如果需要自定义线程池,可以使用以下方法:

  • CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor);
  • CompletableFuture.runAsync(Runnable runnable, Executor executor);

如下代码:

1
2
3
4
5
Executor executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "Result";
}, executor);

完成阶段

任务完成后,CompletableFuture 的状态将从未完成(未完成或未异常)更新为已完成(正常完成或异常完成)。你可以定义一系列操作来处理任务的结果或异常。

正常完成

任务正常完成后,可以使用以下方法进行链式调用:

  • thenApply(Function<T, U> fn): 对任务结果进行转换;
  • thenAccept(Consumer action): 对任务结果进行消费;
  • thenRun(Runnable action): 执行一个额外的任务,不依赖于任务结果;

如下代码:

1
2
3
4
5
future.thenApply(result -> {
return "Processed " + result;
}).thenAccept(processedResult -> {
System.out.println(processedResult);
});

异常完成

如果任务执行过程中抛出了异常,可以使用以下方法处理异常:

  • exceptionally(Function<Throwable, T> fn): 在任务抛出异常时提供一个默认值;
  • handle(BiFunction<T, Throwable, U> fn): 任务完成后处理结果或异常;

如下代码:

1
2
3
4
future.exceptionally(ex -> {
System.out.println("Exception: " + ex.getMessage());
return "Default Result";
});

合并多个CompletableFuture

可以合并多个CompletableFuture,等待它们全部完成或其中任意一个完成:

  • allOf(CompletableFuture<?>… cfs): 等待所有 CompletableFuture 完成;
  • anyOf(CompletableFuture<?>… cfs): 等待任意一个 CompletableFuture 完成。

如下代码:

1
2
3
4
5
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.thenRun(() -> System.out.println("All tasks completed."));

CompletableFuture 的执行流程分为创建、执行和完成三个阶段。通过创建异步任务、在线程池中执行任务,并在任务完成后进行结果处理,可以实现复杂的异步编程逻辑。

适用场景

CompletableFuture 适合在处理异步编程和并发任务时使用。以下是一些适合使用 CompletableFuture 的典型场景:

异步 I/O 操作

在进行 I/O 操作时,例如文件读取、数据库访问或网络请求,使用 CompletableFuture 可以使这些操作异步执行,从而避免阻塞主线程,提高应用程序的响应性。

并行处理任务

当需要并行处理多个独立任务时,使用 CompletableFuture 可以有效利用多核 CPU,提高计算效率。

流水线处理

当有一系列依赖的操作需要按顺序执行时,CompletableFuture 可以使这些操作异步执行,形成处理流水线,从而提高处理效率。

事件驱动的异步处理

在事件驱动的系统中,例如 GUI 应用或服务器请求处理,CompletableFuture 可以在事件发生时异步处理任务。

异常处理和回退机制

CompletableFuture 提供了灵活的异常处理机制,可以在异步任务发生异常时执行回退操作或提供默认值。

并发任务的组合

CompletableFuture 可以组合多个并发任务的结果,例如使用 allOf 和 anyOf 方法。

总结

在本文中,我们学习了如何在 Java 中使用 Future 接口以及它的局限性,同时,我们还学习了如何使用 CompletableFuture 类来克服 Future 的这些限制。

接着,我们通过一个业务场景演示,通过同步执行,并发执行,CompletableFuture执行来比较它们的执行效率。

最后,CompletableFuture 适合在异步编程、并发任务、非阻塞 I/O、事件驱动处理、异常处理、任务组合等场景中使用。它提供了丰富的 API 来处理异步操作,使代码更加简洁、可读和高效。通过使用 CompletableFuture,开发者可以更好地利用系统资源,提高应用程序的性能和响应性。

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。

drawing