Photo by Silas Baisch on Unsplash

Asynchronous programming in Java with CompletableFuture

Introduction

The CompletableFuture API is a high-level API for asynchronous programming in Java. This API supports pipelining (also known as chaining or combining) of multiple asynchronous computations into a single result without the mess of nested callbacks (“callback hell“). This API also is an implementation of the future/promise concurrency constructs in Java.

  • to cancel a task
  • to wait for a task to complete (if necessary) and then get its result
  • it is impossible to pipeline futures in a non-blocking manner
  • it is impossible to manually complete a future

Futures and promises

Future/promise are the high-level concurrency constructs that decouple a value (a future) from the way it is computed (a promise). That allows writing more fluent concurrent programs that transfer objects between threads without using any explicit synchronization mechanisms. The future/promise constructs are often used when multiple threads work on different tasks, and the results need to be combined by the main thread.

  • Java: java.util.concurrent.Future, java.util.concurrent.CompletableFuture
  • Scala: scala.concurrent.Future
  • C#: Task, TaskCompletionSource

CompletableFuture in practice

The following code example can help you to understand the use of the CompletableFuture class as a future/promise implementation in Java.

  1. to get the EUR/USD exchange rate (a slow task)
  2. to calculate the net product price (a fast task, depends on tasks 1, 2)
  3. to get the tax amount (a slow task, depends on tasks 3)
  4. to calculate the gross product price (a fast task, depends on tasks 3, 4)
logger.info("this task started");int netAmountInUsd = getPriceInEur() * getExchangeRateEurToUsd(); // blocking
float tax = getTax(netAmountInUsd); // blocking
float grossAmountInUsd = netAmountInUsd * (1 + tax);
logger.info("this task finished: {}", grossAmountInUsd);
logger.info("another task started");
logger.info("this task started");Future<Integer> priceInEur = executorService.submit(this::getPriceInEur);
Future<Integer> exchangeRateEurToUsd = executorService.submit(this::getExchangeRateEurToUsd);
while (!priceInEur.isDone() || !exchangeRateEurToUsd.isDone()) { // non-blocking
Thread.sleep(100);
logger.info("another task is running");
}
int netAmountInUsd = priceInEur.get() * exchangeRateEurToUsd.get(); // actually non-blocking
Future<Float> tax = executorService.submit(() -> getTax(netAmountInUsd));
while (!tax.isDone()) { // non-blocking
Thread.sleep(100);
logger.info("another task is running");
}
float grossAmountInUsd = netAmountInUsd * (1 + tax.get()); // actually non-blockinglogger.info("this task finished: {}", grossAmountInUsd);
logger.info("another task is running");
CompletableFuture<Integer> priceInEur = CompletableFuture.supplyAsync(this::getPriceInEur);
CompletableFuture<Integer> exchangeRateEurToUsd = CompletableFuture.supplyAsync(this::getExchangeRateEurToUsd);
CompletableFuture<Integer> netAmountInUsd = priceInEur
.thenCombine(exchangeRateEurToUsd, (price, exchangeRate) -> price * exchangeRate);
logger.info("this task started");netAmountInUsd
.thenCompose(amount -> CompletableFuture.supplyAsync(() -> amount * (1 + getTax(amount))))
.whenComplete((grossAmountInUsd, throwable) -> {
if (throwable == null) {
logger.info("this task finished: {}", grossAmountInUsd);
} else {
logger.warn("this task failed: {}", throwable.getMessage());
}
}); // non-blocking
logger.info("another task started");

The CompletionStage interface

The CompletionStage interface represents a stage in a multi-stage (possibly asynchronous) computation where stages can be forked, chained, and joined.

  1. Stages can be chained in a pipeline. A stage can be started by finishing a single previous stage (or two previous stages) in the pipeline. A stage finishes when its computation is completed. Finishing a stage can start a single next stage in the pipeline.
  2. A stage can be executed synchronously or asynchronously. The appropriate execution type should be selected depending on the parameters of the computation.
  • methods to handle exceptions

Methods to pipeline computations

The CompletionStage interface has 43 public methods, most of which follow three clear naming patterns.

  • if a method name has fragment “either“, then the new stage starts after completion of the first of two previous stages
  • if a method name has fragment “both“, then the new stage starts after completion of both of two previous stages
  • if a method name has fragment “accept“, then the new stage consumes an argument by the given Consumer
  • if a method name has fragment “run“, then the new stage runs an action by the given Runnable
  • if a method has fragment “somethingAsync(…)“, then the new stage is executed by the default asynchronous facility
  • if a method has fragment “somethingAsync(…, Executor)“, then the new stage is executed by the given Executor
CompletableFuture<Double> pi = CompletableFuture.supplyAsync(() -> Math.PI);
CompletableFuture<Integer> radius = CompletableFuture.supplyAsync(() -> 1);
// area of a circle = π * r^2
CompletableFuture<Void> area = radius
.thenApply(r -> r * r)
.thenCombine(pi, (multiplier1, multiplier2) -> multiplier1 * multiplier2)
.thenAccept(a -> logger.info("area: {}", a))
.thenRun(() -> logger.info("operation completed"));
area.join();

Methods to handle exceptions

Each computation may complete normally or exceptionally. In asynchronous computations, the source of the exception and the recovery method can be in different threads. Therefore in this case it is not possible to use the try-catch-finally statements to recover from exceptions. So the CompletionStage interface has special methods to handle exceptions.

CompletableFuture.supplyAsync(() -> 0)
.thenApply(i -> { logger.info("stage 1: {}", i); return 1 / i; }) // executed and failed
.thenApply(i -> { logger.info("stage 2: {}", i); return 1 / i; }) // skipped
.whenComplete((value, t) -> {
if (t == null) {
logger.info("success: {}", value);
} else {
logger.warn("failure: {}", t.getMessage()); // executed
}
})
.thenApply(i -> { logger.info("stage 3: {}", i); return 1 / i; }) // skipped
.handle((value, t) -> {
if (t == null) {
return value + 1;
} else {
return -1; // executed and recovered
}
})
.thenApply(i -> { logger.info("stage 4: {}", i); return 1 / i; }) // executed
.join();

The CompletableFuture class

The CompletableFuture class represents a stage in a multi-stage (possibly asynchronous) computation where stages can be created, checked, completed, and read. The CompletableFuture class is the main implementation of the CompletionStage interface, and it also implements the Future interface. That means the CompletableFuture class can simultaneously represent a stage in a multi-stage computation and the result of such a computation.

  1. A reading thread waits (in a blocking or non-blocking manner) until the future is completed normally or exceptionally.
  2. A completing thread completes the future and unblocks the reading thread.
  • methods to check futures
  • methods to complete futures
  • methods to read futures
  • methods for bulk futures operations
ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> future = new CompletableFuture<>(); // creating an incomplete futureexecutorService.submit(() -> {
Thread.sleep(500);
future.complete("value"); // completing the incomplete future
return null;
});
while (!future.isDone()) { // checking the future for completion
Thread.sleep(1000);
}
String result = future.get(); // reading value of the completed future
logger.info("result: {}", result);
executorService.shutdown();

Methods to create futures

In the most general case, a future is created incompleted in one thread and is completed in another thread. However, in some cases (for example, for testing), it may be necessary to create an already completed future.

Methods to check futures

The CompletableFuture class has non-blocking methods for checking whether a future is incomplete, completed normally, completed exceptionally, or canceled.

Methods to complete futures

The CompletableFuture class has methods for completing futures, which means transferring incomplete futures to one of the completed states: normal completion, exceptional completion, and cancellation.

Methods to read futures

The CompletableFuture class has methods for reading futures, waiting if necessary. Note that in most cases, these methods should be used as the final step in a computation pipeline.

Methods for bulk future operations

The CompletionStage interface has methods to wait for all (thenCombine, thenAcceptBoth, runAfterBoth) and any (applyToEither, acceptEitherrun, runAfterEither) of two computations to complete. The CompletableFuture class extends this functionality and has two static methods to wait for all or any of many futures to complete.

Conclusion

The CompletableFuture API is a high-level API that allows you to develop fluent asynchronous code. This API is not simple, but it is worth learning if you want to write efficient asynchronous code.

  • Avoid blocking methods inside a computation pipeline
  • Avoid short (hundreds of milliseconds) asynchronous computations because frequent context switching can introduce significant overhead
  • Be aware of the new exception handling mechanism that works differently than the try-catch-finally statements
  • Manage timeouts not to wait too long (perhaps indefinitely) for a stuck computation

Senior Software Engineer