Overview
This is an in-depth practical guide on how to use Java’s CompletableFuture
.
It is comprehensive and will serve you as a reference for when you have to work with them.
It provides unique insights into how they operate, which you won’t easily find in other blogs and books. This will help you develop a mental model for them.
You will learn:
- What they are and intuition for how they work internally.
- How to create them.
- How to chain/pipe/compose/transform them.
- How to cancel them
- How they handle exceptions.
Knowing how to use the API is valuable. Developing a good intuition to how they work is even better. We will learn both on this page. However, it comes short of mastery. For that, you need to know how they are implemented from the fundamentals of concurrency. If you are interested on exploring this topic:
Future vs. CompletableFuture
The key difference is chaining!
Future appeared in Java 5 (2004) and its an interface. Its a placeholder for a value which is not yet available. Once it is available – perhaps because the computation that computes it terminated – the future will contain said value. The API is: You can check whether the value is already present, and you can block the calling thread until the value arrives. This is very limiting and not in line with the concept of futures in other languages.
CompletableFuture appeared in Java 8 (2014). It implements the Future interface, but adds much more functionality. The critical feature is being able to register further tasks/actions/computations onto the future, which specify what needs to happen once the value is available BUT without the need to block for the value itself. Once the original future completes, those computations will run on a specified thread-pool. It is as if you specified: “Once the value is available, please do this, and then do that”.
Read more about the difference at Java’s CompletableFuture vs. Future.
Despite all their disadvantages, there are still many external libraries with APIs that return a “plain” Future. In such cases, you can transform it to a CompletableFuture as detailed in Transform a Future into CompletableFuture. But beware that are some caveats to it.
Terminology
This page is all about java’s CompletableFuture.
Sometimes I use the shorter term Future. Depending on the context, that refers to either a CompletableFuture, or to the conceptual idea of a future in programming languages. It never means the java.util.concurrent.Future
class though.
State and life-cycle
Futures are a placeholder for a value that is not yet known, and/or for an action that has not yet completed. This is modelled via 3 states. They start Incomplete (also referred to as empty) when the final value is not yet available and/or the action has not yet completed, and then transition to Complete. They can be completed successfully with a value (that can also be null), or completed exceptionally with a Throwable. Once complete though, their state is immutable! Further attempts to change it, either to a different state, or to swap the value inside Completed, do not take effect.
Lifecycle
- An uncompleted future is created.
- You share that future (i.e., its reference) with many parts of your program. Namely across many threads.
- This future is chained in various ways, creating new futures based off this one.
- Each new “stage” does or computes something meaningful.
- Some thread completes the future with a value or with an exception.
- The “child” futures from step 3 are triggered.
The life-cycle steps listed help to develop an understanding of how completable futures operate internally. However, these steps might seem unusual to those who are already familiar with using completable futures. This is because typical usage mainly involves chaining (step 3). In day-to-day scenarios, we developers usually receive a CompletableFuture from external code and then chain on it. It is less common to explicitly create an empty future (step 1) and then complete it explicitly (step 4).
Note
The concept of futures exists in other languages.
JavaScript calls it a promise, and the states are referred to as pending, fulfilled, and rejected. Scala also has them, but separated the responsibility of reading the “deferred” value onto an object called Future
, and the responsibility of writing the value onto an object called Promise
. In contrast Java’s CompletableFuture
embodies both reading and writing.
Intuition for chaining
What makes completable futures useful is being able to compose them easily.
With one future, you can register actions/computations to be run after it completes. Critically, this will not block the calling thread, and can be done before the future completes. Furthermore, every time you register a new action, you obtain a brand new future on which further actions can be registered. Conceptually, this leads to a graph or pipeline of computations. For the code snippet:
Future chaining
// Where it all starts.
// Imagine futA is obtained by using java's java.net.http.HttpClient.
CompletableFuture<java.net.http.HttpResponse<String>> futA;
// Registers action that logs the headers of the HTTP response
// but return same HTTPResponse value back.
CompletableFuture<java.net.http.HttpResponse<String>> futC =
futA.whenCompleteAsync(
(httpRes, error) -> {
if (error != null) error.printStackTrace();
else System.out.printf("Response received. Headers are: %s", httpRes.headers());
});
// Registers function that extracts the body of the HTTP request,
// once the HTTP request is available.
CompletableFuture<String> futB =
futA.thenApplyAsync(httpRequest -> httpRequest.body());
// Calculating the size of the body
CompletableFuture<Integer> futG =
futB.thenApplyAsync(requestBody -> requestBody.length());
// Parsing the raw string into JSON representation
// Method `parseJson` is not shown. Type `Json` is also not shown for brevity
CompletableFuture<Json> futE =
futB.thenApplyAsync(requestBody -> parseJson(requestBody));
// Registers function that extracts field `userName` from the JSON object,
// once the JSON object itself is available
CompletableFuture<String> futF =
futE.thenApplyAsync(json -> json.getAsString("userName"));
We can imagine the following computation graph:
Every call to a transformation method (e.g. thenApply, whenComplete, …) on an existing future which has not yet complete, adds the corresponding action/computation in an internal data-structure (which is a stack). Then, when that parent future completes, it pops those callbacks from the stack and runs them, which in turn will complete the new dependent futures. This internal data-structure also stores the executor on which the action/computation should run in. All this is done in a thread-safe manner.
For an advanced and insightful hands-on example of chaining, visit CompletableFuture Example: Crawler.
Creation
Below, we go over the 3 main ways to create a completable future.
This phrasing is misleading. Conceptually, there is only one way to create one, which is the standard approach. The other mechanisms are ultimately syntactic sugar: they make things easier, but are fundamentally doing the same thing under the hood.
The ways to create a completable future are:
- Already completed
- runAsync/supplyAsync
- Transformations
- Every-time we chain a future with an action, we create a new future.
- Not discussed on this chapter. Discussed in length in chapter Transformations.
- Standard approach
What separates all the other mechanisms from the standard approach is that the former actually do 2 things: They create a completable future AND specify how the future is going to be completed. For example, on the supplyAsync
, the future returned is completed when the lambda computation provided finishes.
On the standard approach however, there is no direct computation associated with it. It remains empty/incomplete until when (and if) it is completed explicitly.
The standard approach is the mechanism that http libraries use to complete the futures that they give back to user.
Already completed
You can create completable futures which are already completed. You can complete them successfully with a value, or unsuccessfully with an exception. Below, when the two calls return, the future is already completed:
import java.util.concurrent.CompletableFuture;
CompletableFuture<String> failedFuture =
CompletableFuture.failedFuture(new IllegalAccessException("kaBoOm!"));
CompletableFuture<String> successFuture =
CompletableFuture.completedFuture("Concurrency Deep Dives");
It might seem pointless to do this. If the value/error is already known, why bother wrapping it inside a future?
It boils down to being able to provide a consistent return type. For example, lets say you are implementing method fetchData
of an interface, as below. You will rely on a database connection that returns CompletableFuture
to do the actually heavy lifting. However, before, calling the database you want to validate the input. If validation fails, then you need to lift the exception into a future to respect the signature of the method.
public CompletableFuture<String> fetchData(String user) {
if (user == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("User cannot be null"));
}
// dbClient class/object is not shown.
// Simulate database access that returns a future
return dBClient.getUser(user);
}
Another example is unit tests and mocking. There would be no reason to start an asynchronous computation on a separate thread just to complete a future with a value to be used on a test. Its easier and cleaner to just:
// pseudo-code
val mockService: Service = mock[Service]
when(mockService.getData()).thenReturn(CompletableFuture.completedFuture("Dummy Data"))
runAsync / supplyAsync
These two methods are the most widely used by devs to create a future of their own.
Use these static methods if you have a computation that takes a “long time” to run, and you don’t want to block the calling thread. You “make it async” by deferring it to run on a specified (or default) thread pool.
import java.util.concurrent.Executor;
// how it is created not relevant
Executor myExecutor;
// default executor `ForkJoinPool.common` is used
CompletableFuture<String> cF1 =
CompletableFuture.supplyAsync(() -> "Very long computation");
// Same as cF1, but the action/computation occurs on our custom executor
CompletableFuture<String> cF2 =
CompletableFuture.supplyAsync(() -> "Very long computation", myExecutor);
// default executor `ForkJoinPool.common` is used
// Notice the return type is `Void`
CompletableFuture<Void> cF3 =
CompletableFuture.runAsync(
() -> {
System.out.println("Running");
// do something useful. Long computation
});
// continue doing useful things ....
Both methods are overloaded. When you don’t specify an executor, then Java’s ForkJoinPool
‘s default executor is used.
There are 2 key points.
Firstly, in both cases, the computation/task/action provided runs on a separate thread of the executor. It doesn’t block the calling thread. The 3 assignments above return immediately and the calling thread can continue doing useful work.
Secondly, the future returned will initially be incomplete. However, and this is the key part, when the computation submitted finishes, there is a mechanism that will – behind the covers – complete the future from within the thread of the executor.
supplyAsync
Use supplyAsync(Supplier<T> supplier)
when your task returns an actual value, like a String, Integer, or instance of a Class. The created CompletableFuture will be completed with the value obtained from calling the supplier
.
If the supplier throws a runtime exception, then the future will be completed exceptionally with that exception “wrapped” within a java.util.concurrent.CompletionException
.
runAsync
Use runAsync(Runnable runnable)
when there are only side-effects, and there is nothing meaningful for your task to return. The created CompletableFuture will be completed once the task finishes. If the task finishes succesfully, the value returned will be null
. If the task fails, the future will contain the exception wrapped with the CompletionException
as before.
For a more detail discussion about runAsync, supplyAsync, and their differences, read CompletableFuture runAsync & supplyAsync.
Standard approach
Using supplyAsync
or runAsync
implies that you have a self-contained task that can be represented as a Supplier<T>
or Runnable
.
Furthermore, using those methods masks how futures work under the hood. With them, the mechanism for the completion of the future is specified during its creation as being the result of a task/computation. You might therefore get the idea all futures are the 1) result of a computation, which is 2) known and specified by the party that creates it. But that is not the case.
The most generic way to create futures is:
- Create an empty future.
- Pass the future reference to other parts of the codebase, namely to other threads.
- Some other thread – in another part of the codebase – completes the future.
In fact, the completion of the futures obtained through supplyAsync
and runAsync
are only special cases of the 3 steps above, whereby the future is completed from within the thread of the executor passed in.
To create an empty future, the generic way is:
CompletableFuture<String> cF1 = new CompletableFuture<>();
We are just creating it. We are not completing it yet. If we were to block on it, it would block forever.
Naturally, such a future – unlike the ones created via supplyAsync
and runAsync
– is not of much use. So now we need to study future completion.
Completion
One core idea behind a future is that its state is one of:
- Incomplete
- Completed sucessfully with a value
- Completed unsuccessfully with an exception
Where once completed, the state cannot change – either back to incomplete, or to the alternative completed version.
There are two methods to “externally” complete a future. By externally, we mean by the client code. Internally, the implementation of the completable future can complete the future in many ways. For example, after the supplyAsync
lambda completes.
These 2 methods are:
CompletableFuture<String> cF1 = new CompletableFuture<>();
boolean wasSet = cF1.complete("Concurrency Deep Dives");
// or
boolean wasSet = cF1.completeExceptionally(new RuntimeException("kaboom"));
If the future was incomplete the methods return true
, and if it was already complete, they return false
. In the latter case, the operation doesn’t succeed, as you can’t complete a future twice (again, its immutable after completion)
Naturally, for the previous static methods such as runAsync
and supplyAsync
you don’t do this. The internal implementation conveniently changes the state of the future once the task completes.
However, out of curiosity, lets see what happens when we do:
Completing a future backed by a task
CompletableFuture<String> cF4 =
CompletableFuture.supplyAsync(
() -> {
try {
// We are sleeping the thread (on the executor) for 10 seconds.
Thread.sleep(10_000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// You would expect that future to eventually be completed with this value ...
return "Concurrency Deep Dives";
});
// However before the task runs, we complete the future "from the outside"
boolean wasCompleted = cF4.complete("Hello World");
// lets block this main thread for the result
String result = cf4.join();
System.out.printf("Completed from outside: %s. Result: %s", wasCompleted, result);
The snipped above would output:
Completed from outside: true. Result: Hello World
This would work similarly if we complete the future exceptionally, or if you use runAsync
instead.
But why would you need these complete
and completeExceptionally
? Why isn’t runAsync
and supplyAsync
sufficient?
We go back to the earlier arguments. With futures, the code creating them doesn’t always know how or when to complete them. So, again, its important to realize that those methods imply that the “client code” creating the future also has available a Runnable
or Supplier
instance.
But that is often not the case. In fact, that is the exception. Lets explore with an example.
Completing from another thread
The other core idea of Futures, is that they are a mechanism to communicate a single value from one thread to another.
Important
Futures are ultimately about thread communication!
Its a mechanism for one thread to send one single value to another thread.
Futures are rarely phrased or explained in this manner, but I find it is the best mental model for people exploring its inner workings.
Lets say you want a mechanism to determine when a user submits input data into the interactive console starting with a given prefix. For example, if you are interested in the prefix password=
, you want a mechanism to be notified when the user first writes password=<something>
. Additionally you want to determine what something
is.
You don’t know when or if, the user will write something that matches the prefix. Naturally you don’t want to block the calling thread. One way is to solve it with a future, where you provide the prefix
you are interested in, and the abstraction returns a future. The idea is the future to complete once the user writes a matching line.
But how to develop such a feature?
You could not construct this functionality with supplyAsync
, because there isn’t a way to correctly express the problem in a self contained task. Instead we need to separate the creation of the future from its completion. The code creating the future has no control over when and how it is completed. Consider:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public class StdInputMonitor {
// we need to protect this shared map from concurrent access.
// For each prefix we need to track which future needs to be completed.
private final ConcurrentHashMap<String, CompletableFuture<String>> theMap =
new ConcurrentHashMap<>();
public StdInputMonitor() {
Thread threadHandle = new Thread(this::run, "StdInputMonitor");
threadHandle.start(); // from now on, the app is concurrent!!
}
private void run() {
while (true) { // loops forever
String line = System.console().readLine(); // blocks until user writes on keyboard
theMap.forEach(
(prefix, future) -> {
// we finally complete the future
// for conciseness, we are not removing the future from the map
if (line.startsWith(prefix)) future.complete(line.substring(prefix.length()));
});
}
}
public CompletableFuture<String> monitor(String prefix) {
//create empty future
CompletableFuture<String> cF = new CompletableFuture<>();
var previousFut = theMap.computeIfAbsent(prefix, s -> cF);
if (previousFut != cF)
return CompletableFuture.failedFuture(new IllegalArgumentException("Already registered."));
// we return the future reference
// client-code can then chain-it before it completes.
// the future is completed on the `StdInputMonitor` thread.
return cF;
}
}
Creating an instance of StdInputMonitor
starts a new solitary thread, which loops indefinitely. On each iteration, it reads a line from the console and checks which prefixes – if any – match the line. For those that do, we complete the respective future.
Client code only has access to method monitor
. When a client calls it, we create a new future associated with the prefix provided. The future is then returned to the calling thread only after being register on the shared map, so that it can been seen by StdInputMonitor
thread. At this moment, two threads have a reference to the future.
On the “client side” the usage would be:
// we are running on thread `main`
public static void main(String[] args) {
StdInputMonitor stdInputMonitor = new StdInputMonitor();
// this statement runs within the `main` thread.
// meaning the future is created on `main` thread, but completed
// on the `StdInputMonitor` thread.
CompletableFuture<String> passwordF = stdInputMonitor.monitor("password=");
// search for `.join` explanation
System.out.println("Password is: " + passwordF.join());
}
.get and .join
The methods serve the same purpose: To block the calling thread and wait for the result of the future to become available. They return the result, or throw an exception if the future completed exceptionally.
The power of the completable future is being able to represent asynchronous computations. Blocking is antithetical to this, and it is discouraged in the async community. So much so that in other languages like Scala, the API to block a future was designed to be less easy to access, aiming to discourage its usage.
Understanding why blocking is bad is beyond the scope of this guide. Register below, where we delve deeper into this topic:
The difference between the two methods is small, and it has mostly to do with the type of exceptions they throw.
.get
will throw mostly checked exceptions, .join
throws only runtime exceptions, which, as they are unchecked, don’t require the user to handle them explicitly.
Additionally, .get
allows the user to specify a timeout limit for how long to wait. Lastly, .get
comes from the java.util.concurrent.Future
interface.
.join()
It will throw:
- CompletionException
- CancellationException.
Both are unchecked, meaning the user doesn’t have to handle them explicitly. CompletionException is thrown when the future is completed exceptionally, or one transformation stage throws. In both cases, the exceptions will wrap the actual cause. This means that if your future completes exceptionally with IOException
, .join
throws a CompletionException
wrapping it.
On the other hand, CancellationException is thrown then the future is cancelled explicitly, and it wraps no exception as cause.
.get()
It will throw checked exceptions requiring the caller to handle them explicitly:
- ExecutionException (checked)
- TimeoutException (checked)
- InterruptedException (checked)
- CancellationException
ExecutionException is the equivalent of CompletionException
, and it will also hold the actual exception as its cause.
TimeoutException is for the case where the user provides a timeout to .get
, and the future does not complete within the time interval.
import java.util.concurrent.TimeUnit;
CompletableFuture<String> cF1; // not shown
cF1.get(5, TimeUnit.SECONDS);
This method overload is not available on .join
.
Lastly, InterruptedException is thrown when the thread blocking on the .get
is interrupted from the outside. This is another feature which is not available on .join:
CompletableFuture<String> cF1 = new CompletableFuture<>();
Thread threadHandle =
new Thread(
() -> {
try {
cF1.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
},
"ConcurrencyDeepDives-Example");
threadHandle.start();
Thread.sleep(1_000L);
threadHandle.interrupt();
leads to:
java.lang.InterruptedException
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:386)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at futures.Example.lambda$main$0(Example.java:78)
at java.base/java.lang.Thread.run(Thread.java:840)
Transformations
Futures are useful because they are composable. If you have one, you can create new ones in a structured, thread-safe, and expressive manner, by registering actions/tasks to be done after the “parent” future completes.
Terminology
Transforming, pipe-lining, composing, chaining…. All these mean the same thing. We will use mostly transforming.
Every time you “register” a new action, you get a new future back. This enables you to create a chain or graph of computational steps which are clear and intuitive, as already mentioned at Intuition for Chaining.
Consider the snippet of code below, showcasing 1) reading IO, 2) processing, and 3) writing IO all with CompletableFutures
Supporting code
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
public class ChainingExample {
private static final HttpClient httpClient = HttpClient.newHttpClient();
private static CompletableFuture<String> httpGetRequest(String url) {
return httpClient
.sendAsync(
HttpRequest.newBuilder(URI.create(url)).GET().build(),
HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
private static int countPrefix(String prefix, String contents) {
return (int) Pattern.compile(prefix).matcher(contents).results().count();
}
private static CompletableFuture<Void> writeFile(String content) {
CompletableFuture<Void> cF = new CompletableFuture<>();
AsynchronousFileChannel fC;
try {
fC =
AsynchronousFileChannel.open(
Path.of("results.txt"),
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
fC.write(
ByteBuffer.wrap(content.getBytes()),
0,
null,
new CompletionHandler<>() {
public void completed(Integer result, Object attachment) {
cF.complete(null);
}
public void failed(Throwable exc, Object attachment) {
cF.completeExceptionally(exc);
}
});
// we should be closing the file channel
return cF;
}
// main method
}
// Remaining code where `httpGetRequest`, `countPrefix`, and `writeFile` are specified
public static void main(String[] args) {
StdInputMonitor stdInputMonitor = new StdInputMonitor();
CompletableFuture<Void> res =
stdInputMonitor
.monitor("url=")
.thenComposeAsync(url -> httpGetRequest(url)) // 1
.whenCompleteAsync( // 2
(contents, exception) -> {
if (exception == null) System.out.printf("Received: [%s]\n", contents);
else exception.printStackTrace();
})
.thenApplyAsync(siteContents -> countPrefix("monday", siteContents)) // 3
.thenComposeAsync(count -> writeFile(count.toString())); // 4
res.join();
}
The snippet above, takes an url from the standard input, makes a corresponding GET
request, and counts the number of substring monday
on the content returned, before saving that result onto a file.
In this pipeline, there are 4 “stages”, each depending on the previous. The first depends on the original future that completes when the user writes url=<some-valid-url>
on the interactive console. (Revisit StdInputMonitor
class)
Its important to understand that each stage returns a brand new future. This enables easy chaining. Each is also associated with some sort of action/computation in the form of a lambda, which is registered against the original future. Internally, once the parent future completes, that lambda runs, and completes the new future.
So for example, at stage (3), we count the number of occurrences of the sub-string “monday” on the html returned from the url. Naturally, function countPrefix
only runs when the content is available, which implies the corresponding future has already completed. But its the completion of that future itself that triggers this function.
Now we explore in detail all possible transformations available on the API.
API overview
Sadly, Java’s API on completable futures is over-bloated and un-intuitive. With 60+ methods, it is nearly impossible to quickly understand (let alone remember) which method you need to address a particular problem.
Below is an incomplete list of the transformations available. All these return a new future that is completed after the registered action finishes.
- Apply a function to a successful result: thenApply
- Apply a side-effectful action to a successful result: thenAccept
- Run an action after a successful result: thenRun
- Apply a function to a failed result, obtaining a successful result: exceptionally
- Apply a function – returning a new future – to a successful result: thenCompose
- Apply a function – returning a new future – to a failed result: exceptionallyCompose
- Apply one function if failed, or another function if successful result: handle
- Run an action when the original completes, but return original value/exception: whenComplete
- Take a 2nd future of same type, and apply a side-effectful action to the original or the 2nd future: acceptEither
- Take a 2nd future of same type, and apply a function to the original or the 2nd future: applyToEither
- Take a 2nd future of same type, and run an action after the original or the 2nd future finishes: runAfterEither
- Take a 2nd future, and apply a bi-function to the pair of successful results: thenCombine.
- Take a 2nd future, and apply a side-effectful action to the pair of successful results: thenAcceptBoth
- Take a 2nd future, and run an action after both complete successfully: runAfterBoth
These are 14 methods! As if this wasn’t complicated enough, for each of these there are actually 3 versions related to the executor in which the “transformation” takes place.
<method>(lambda ...)
. This is the Non-Async version. The transformation takes place in the thread/executor where the original future is completed, or if the future has already completed, on the calling thread.<method>Async(lambda ...)
Async on the default executor. The transformation takes place in the default executorForkJoinPool.commonPool()
<method>Async(lambda ..., Executor)
. The transformation takes place in the provided executor.
For example, for thenApply
we have:
import java.util.concurrent.Executor;
public class CompletableFuture<T> {
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
// ....
}
This brings the number of API methods to 14*3=42
. On top of that, we have a few standalone static methods that we will analyse further down.
Hidden naming logic
42
methods is a lot to remember. There is however, some logic behind the naming and type signature for all these methods. The rules are not very strict, but sufficient to save you from memorising all methods.
Note
The naming logic helps, but few people know it.
It didn’t have to be this way! Futures exist on other languages with a much simpler API. Scala for example, only has a handful of methods with a more obvious intent.
We have already seen the logic for 1) non-async, 2) async on default executor, and 3) async on specified executor. We will analyse the details here
apply, accept, run
Methods can also be divided between apply, accept, and run “types of methods”. The key to remember is that they are associated respectively, with the interfaces Function
, Consumer
, and Runnable
. Choose one or the other depending on whether your computation requires arguments and/or produces results.
Apply transformers take a function Function<T, U>
that is applied to the value T t
of the parent future and returns value U u
. This value will be what the new future will complete to.
Accept transformers take a Consumer<T>
. They are applied to the value T t
of the parent future’s value, but return void. This is useful for computations which only have side-effects, but which also depend on the value of the future. Naturally, the resulting future is always of type Void
Lastly, run transformers take a Runnable
instance. Like accept, they operate only via side-effects, and additionally they do not depend on the original future’s value:
CompletableFuture<String> cF1 = CompletableFuture.completedFuture("Concurrency Deep Dives");
CompletableFuture<Integer> cF2 =
cF1.thenApply(x -> x.length()); // Function[String,Integer]
CompletableFuture<Void> cF3 =
cF1.thenAccept(x -> writeToFile(x)); // Consumer[String], writeToFile not shown.
CompletableFuture<Void> cF4 =
cF1.thenRun(() -> System.out.println("Finished")); // Runnable
then, either, both
The examples above have been using methods prefixed then
. Here, the computation registered is triggered once the original future completes. But there is only one future to take into account: the one from which the method then
is called.
API methods containing both
, and either
on their names behave differently.
Alongside the computation, they also take a second CompletableFuture as arguments:
- for
either
methods, the computation – in the form of a Function, Consumer, or Runnable – runs on top of either of the futures. So it must wait for at least one to complete before applying the transformer action. Undetermined which one. - for
both
methods, the computation is applied to the result of both futures. Meaning, it must wait for both futures to complete. Naturally, here the computation provided must be expressed as aBiFunction
andBiConsumer
. TheRunnable
case remains the same as there is no such thing or need as aBiRunnable
.
These 2 conventions help you identify 9 methods, or 9*3=27
with the async versions:
public class CompletableFuture<T> {
CompletionStage<U> thenApply (Function<T,U> fn);
CompletionStage<Void> thenAccept (Consumer<T> action);
CompletionStage<Void> thenRun (Runnable action);
// ##
CompletionStage<V> thenCombine (CompletionStage<U> other, BiFunction<T,U,V> fn);
CompletionStage<Void> thenAcceptBoth (CompletionStage<U> other, BiConsumer<T, U> action);
CompletionStage<Void> runAfterBoth (CompletionStage<?> other, Runnable action);
// ##
CompletionStage<U> applyToEither (CompletionStage<T> other, Function<T, U> fn);
CompletionStage<Void> acceptEither (CompletionStage<T> other, Consumer<T> action);
CompletionStage<Void> runAfterEither (CompletionStage<?> other, Runnable action);
// other methods
}
There are other methods that do not fall onto these rules. Namely:
public class CompletableFuture<T> {
CompletableFuture<U> handle (BiFunction<T, Throwable, U> fn);
CompletableFuture<T> exceptionally (Function<Throwable,T> fn);
CompletableFuture<T> whenComplete (BiConsumer<T, Throwable> action);
CompletableFuture<U> thenCompose (Function<T,CompletionStage<U>> fn);
CompletableFuture<T> exceptionallyCompose (Function<Throwable,CompletionStage<T>> fn);
// other methods
}
We will analyze all of them in more detail below.
All transformations
Either transformations
public class CompletableFuture<T> {
CompletionStage<U> applyToEither (CompletionStage<T> other, Function<T, U> fn);
CompletionStage<Void> acceptEither (CompletionStage<T> other, Consumer<T> action);
CompletionStage<Void> runAfterEither (CompletionStage<?> other, Runnable action);
// other methods
}
Either transformations take a second future as argument (above, the other
argument). The idea is that the computation provided should run when either one of the two futures has completed.
There are no guarantees about which of the two futures are used as the “parent” of the dependent future’s computation, that is, of the lambda provided. That said, the implementation will use the first that completes, either successfully or exceptionally.
In the current implementation, the preference is the future on which the method is called (not the other
). So the following code:
CompletableFuture<String> cF1 = CompletableFuture.completedFuture("Hello");
CompletableFuture<String> cF2 = CompletableFuture.completedFuture("World");
cF1.acceptEither(cF2, System.out::println);
Will always print Hello
.
Both transformations
Straightforward. The computation provided will run once both futures have completed. That is clear from looking at the signatures. BiFunction and BiConsumer operate on both the value T t
and the value U u
from the result of the original and other
future respectively.
The Runnable case takes no arguments, but is still guaranteed to only run after both futures completed successfully.
public class CompletableFuture<T> {
CompletionStage<V> thenCombine (CompletionStage<U> other, BiFunction<T,U,V> fn);
CompletionStage<Void> thenAcceptBoth (CompletionStage<U> other, BiConsumer<T, U> action);
CompletionStage<Void> runAfterBoth (CompletionStage<?> other, Runnable action);
// other methods
}
But what happens when one of the futures completes exceptionally?
exceptionally
Sofar, when a future completes with an error (i.e. completes exceptionally), then dependent futures, that is, futures obtained by transforming the former, will also complete exceptionally and with the same error.
If you want to recover from such scenario, we can use API method exceptionally
. It takes a function mapping from a Throwable to a value T t
, and the behaviour is very similar to thenApply
, but instead of mapping a successful value T t
to a value U u
, it maps from the exception to a successfull value T t
. In other words, this acts as a try/catch on futures.
public class CompletableFuture<T> {
CompletableFuture<T> exceptionally(Function<Throwable, T> fn);
// other methods
}
Allows for:
CompletableFuture<?> cF1 =
CompletableFuture.supplyAsync(() -> "Concurrency Deep Dives")
.thenApply(
ignored -> {
throw new RuntimeException("kaBoom");
})
.exceptionally(
ex -> {
Throwable cause = ex.getCause();
if (ex.getCause().getMessage().equals("kaBoom")) return "Success: Hello";
else return "Success: World";
});
System.out.println(cF1.join());
Above, the future cF1
would complete successfully with the value Success: Hello
.
However, if the original future completes successfully, then the dependent future will also complete successfully with the same value. On the example above, if we remove the thenApply
transformation, then the output of the entire snippet would be Concurrency Deep Dives
.
handle
thenApply
transforms successful results, and exceptionally
transforms unsuccessful results.handle
does both. Instead of taking a function as argument, it takes a BiFunction. The first argument of the BiFunction lambda is the successful result, or null
if the future completes exceptionally, and the second argument is the exception, or null
if the future completes successfully.
CompletableFuture<String> cF1 = CompletableFuture.completedFuture("Concurrency Deep Dives");
CompletableFuture<String> cF2 = CompletableFuture.failedFuture(new RuntimeException("kaBoom"));
BiFunction<String, Throwable, Integer> transformer =
(s, throwable) -> {
if (throwable != null) return 0;
else return s.length();
};
System.out.println("cF1 result: " + cF1.handle(transformer).join());
System.out.println("cF2 result: " + cF2.handle(transformer).join());
leads to:
cF1 result: 22
cF2 result: 0
So handle is more generic method than thenApply and exceptionally.
compose
If you have a future and want to transform its value, you use applyThen
as seen before. This is similar to transforming the inner value of an Optional<T>
, and it is also know as a map.
But what if your lambda returns another completable future? In that case, you shouldn’t use applyThen
or you will get a CompletableFuture nested inside another CompletableFuture. You need what in funtional programming is called a flatMap.
CompletableFuture<String> httpCall(String url); // implementation not shown
CompletableFuture<String> urlF = CompletableFuture.completedFuture("https://www.reddit.com/r/java/");
CompletableFuture<String> htmlContentsF = urlF.thenCompose(url -> httpCall(url));
System.out.println(htmlContentsF.join());
exceptionallyCompose
By now it should be obvious what this one does.
It is to method exceptionally
what method compose
is to thenApply
.
whenComplete
This method is similar to handle. It takes a BiConsumer accounting for both the result and exception as arguments. The main difference is that while handle computes a future with a new value, whenComplete preserves the result of the original future. This makes it useful to run side-effect actions, but preserve the value.
All the transformations seen sofar returned a new future that completes with the return value of the action/computation provided.
Even Consumer and Runnable based transformations, whose computation returns void, still behave that way. Its just that the return value is null
.
whenComplete is slightly different. The returned future always maintains the value of the original future. Note however, that the returned future still only completes after the action is done.
CompletableFuture<String> cF2 =
CompletableFuture.supplyAsync(() -> "ConcurrencyDeepDives.com")
.whenComplete(
(value, ex) -> {
if (ex != null) System.out.println("Finished with an error!");
else System.out.println("Result is: " + value);
});
System.out.println("cF2 result is: " + cF2.join());
leads to:
Result is: ConcurrencyDeepDives.com
cF2 result is: ConcurrencyDeepDives.com
If however the action provided throws – which it shouldn’t – than the rules are:
- If the original future completed successfully, then the returned future completed exceptionally with the error thrown by the action provided.
- If the original future completed exceptionally, then the returned future will complete with that same error.
NonAsync/Async and executors
As mentioned before, all transformer methods have 3 versions. For thenApply:
public class CompletableFuture<T> {
<U> CompletableFuture<U> thenApply(Function<T,U> fn); // referred to as 'non-async'
<U> CompletableFuture<U> thenApplyAsync(Function<T,U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<T,U> fn, Executor executor);
// other methods
}
The difference is where the computation provided runs. That is, where does fn
above run? It must run in some thread. After all, that is the unit of computation in Java.
- Non-Async: Might run on the same thread that completes the source future, and it also might run on the thread that registers the transformation (the calling thread).
- Async with no executor: Runs on a thread of the Java’s common executor.
- Async with specified executor: Runs on a thread of the executor provided.
To develop an intuition on where tasks/actions/computations associated with a transformation run, we have to revisit how CompletableFutures work.
Non-Async
When a transformation is registered, the source future either has already completed, or it has not.
If it has not yet completed, then the calling thread, that is, the thread that is registering the transformation, cannot possibly be responsible for running the computation. It would have to block itself until the source future completed.
Instead, the registration operation returns immediately, and the computation lambda is stored inside the source future, as described at Intuition for chaining.
Once completed, the source future is aware that their are dependent actions that need to run. As such, it will run them immediately and on the same thread that completed it.
StdInputMonitor stdInputMonitor = new StdInputMonitor();
CompletableFuture<String> cF1 = stdInputMonitor.monitor("password=");
// Thread.sleep(20_000L) comment out this line afterwards
CompletableFuture<Integer> cF2 =
cF1.thenApply(
password -> {
System.out.printf(
"Password is: '%s'. I am running from thread: '%s'.\n",
password, Thread.currentThread().getName());
return password.length();
});
System.out.println("Password size: " + cF2.join());
Would output:
password=concurrencyDeepDives [input]
Password is: 'concurrencyDeepDives'. I am running from thread: 'StdInputMonitor'.
Password size: 20
If it has already completed, then the thread responsible for its completion is no longer available. Therefore, the only option is to run the computation on the calling thread.
If we go back to snippet above and introduce the thread sleep to simulate the scenario where the user inputs the password before line 5 where the transformation is registered, the output changes to:
password=concurrencyDeepDives [input]
Password is: 'concurrencyDeepDives'. I am running from thread: 'main'.
Password size: 20
Because it runs on the calling thread (main
in this case), this is a blocking operation that will prevent it from progressing until the registered computation completes.
Async
The two versions of async are identical. With the exception that for the method without the explicit Executor
, this is chosen to be Java’s default executor. This is done early, and the code paths from then one are the same.
Unlike the Non-Async case, the computation is always scheduled on the provided executor, and never on the calling thread, or on the thread that completed the parent future.
As before, when a transformation is registered, the source future either has already completed, or it has not.
If it has already completed, then the calling thread schedules the provided computation (i.e. the lambda of the transformation) on the provided executor immediately. Therefore it doesn’t block, as on the Non-Async case.
If it has not yet completed, then a similar behaviour as before occurs. The implementation of CompletableFuture takes this new lambda but also the provided executor and stores these inside the stack like structure discussed at Intuition for Chaining. Then, when the future does complete, that thread will pop these objects from the stack, and schedule the lambda/computation on the executor.
Exception Handling
When you create another future by registering a new transformation, what happens when that computation throws?
R: The new future will be completed exceptionally with that exception.
When the parent future is completed, and it is time to call your lambda, the implementation of CompletableFuture wraps the calling of the lambda in a try/catch. This prevents the thread where it is running from crashing. Then, it completes the future with a CompletionException
containing the exception thrown as the cause.
CompletableFuture<Void> cF1 =
CompletableFuture.supplyAsync(() -> "ConcurrencyDeepDives.com")
.thenApply(
ignored -> {
throw new RuntimeException("kaBoom!");
});
System.out.println(cF1.join());
leads to:
java.util.concurrent.CompletionException: java.lang.RuntimeException: kaBoom!
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
...
Caused by: java.lang.RuntimeException: kaBoom!
at futures.ExceptionHandling.lambda$main$1(ExceptionHandling.java:11)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
... 8 more
The behavior is very convenient. Otherwise, because the provided lambdas run on executor threads, it would be hard to have visibility over errors, let alone manage them.
This behavior occurs for all transformations. Alternatively, you could also use the transformation thenCompose
, thought that is not the intended usage.
CompletableFuture<Void> cF1 =
CompletableFuture.supplyAsync(() -> "ConcurrencyDeepDives.com")
.thenCompose(
ignored -> CompletableFuture.failedFuture(new RuntimeException("kaBoom!")));
Cancellation
Cancellation allows you to cancel futures that have not yet been triggered.
It does not allow you cancelling an already running computation.
This needs a bit of unpacking.
Consider these 3 categories of completable futures:
- Already completed.
- Not yet completed, but which will be completed by a “task” that is already running.
- Not yet completed, but which will be completed by a “task” that is NOT already running.
Again, completed means the future is in its final state and value.
Lets consider cF2
below as the future we want to cancel.cF2
is a dependent of cF1
whose definition is not shown. In turn, cF3
is a dependent of cF2
. The presence of the thread sleep before the cancellation already suggests that the behaviour is dependent of timing issues.
CompletableFuture<A> cF1 = ???; // not relevant how it is defined.
CompletableFuture<B> cF2 = cF1.thenApply(<lambda2>); // lambda2 runs after `cF1` completes
CompletableFuture<C> cF3 = cF2.thenApply(<lambda3>); // lambda3 runs after `cF2` completes
Thread.sleep(<interval>);
cF2.cancel(true);
Case 1 corresponds to the case where cF2
is already complete. This implied that cF1
is also already completed and lambda2
has run. You can’t cancel cF2
in this scenario. By definition its state cannot be changed. Additionally, its dependents (i.e., cF3
), will run even if you cancel the original future before they start.
Cases 2 and 3 are more subtle.
If by the time we reach line 5 and cancel cF2
the action that completes it (i.e. lambda2
) has already started, then we cannot stop that computation from continuing. However, the future will be completed exceptionally. This might seem odd. The action/lambda which is meant to complete it will still be running, but the future itself is already completed.
Additionally this means that all dependent futures such as cF3
will be triggered after cancellation and before lambda2
finishes.
In contrast, If by the time we reach line 5 and cancel cF2
the future cF1
has not yet completed, then that means the action associated with cF2
(i.e. lambda2
) has not yet started. In this case, that action will never start. This means that once cF1
this will not trigger lambda2
.
Example
public class CompleteFutureRunner {
private static final long start = System.currentTimeMillis();
private static long duration() {
return (System.currentTimeMillis() - start) / 1000L;
}
private static void sleepAndLog(String name, long sleepSeconds) {
System.out.printf("%d s. Starting %s.\n", duration(), name);
try {
Thread.sleep(sleepSeconds * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("%d s. Ending %s.\n", duration(), name);
}
public static void main(String[] args) throws InterruptedException {
long cF1Sleep = Long.parseLong(args[0]);
long cF2Sleep = Long.parseLong(args[1]);
long mainThreadSleep = Long.parseLong(args[2]);
CompletableFuture<Void> cF1 = CompletableFuture.runAsync(() -> sleepAndLog("cF1", cF1Sleep));
CompletableFuture<Void> cF2 = cF1.thenRun(() -> sleepAndLog("cF2", cF2Sleep));
cF2.whenCompleteAsync(
(ignored, ex) -> {
long elapsed = duration();
if (ex != null) System.out.printf("%d s. cF2 failed with: %s\n", elapsed, ex);
else System.out.printf("%d s. cF2 succeeded\n", elapsed);
});
Thread.sleep(mainThreadSleep * 1000L);
boolean isCancelled = cF2.cancel(true);
System.out.printf("%d s. Was cF2 cancelled: %s\n", duration(), isCancelled);
Thread.sleep(100_000L);
}
}
For the listing above, the first case would correspond to:
$ java -classpath <cp> CompleteFutureRunner 5 5 11
0 s. Starting cF1.
5 s. Ending cF1.
5 s. Starting cF2.
10 s. Ending cF2.
10 s. cF2 succeeded
11 s. Was cF2 cancelled: false
Because we cancel cF2
after it was completed, it has no effect.
For case 2:
$ java -classpath <cp> CompleteFutureRunner 5 5 7
0 s. Starting cF1.
5 s. Ending cF1.
5 s. Starting cF2.
7 s. Was cF2 cancelled: true
7 s. cF2 failed with: java.util.concurrent.CancellationException
10 s. Ending cF2.
The future is cancelled before its action finishes. The future is completed immediately and dependent actions are also triggered immediately with the CancellationException.
Lastly case 3:
$ java -classpath <cp> CompleteFutureRunner 5 5 3
0 s. Starting cF1.
3 s. Was cF2 cancelled: true
3 s. cF2 failed with: java.util.concurrent.CancellationException
5 s. Ending cF1.
By the time the action of cF2
is meant to start, cF2
has already been completed, so no point in running it.
Very well written article.
Thank you for the support!