CompletableFuture runAsync & supplyAsync

As explained in greater detail in A Guide to CompletableFuture there are 4 ways to create a CompletableFuture:

  1. Already completed
  2. runAsync/supplyAsync
  3. Transformations
    • Every-time we chain a future with an action, we create a new future.
  4. Standard approach

Here we analyse runAsync and supplyAsync in detail, giving some practical examples. These two methods are the most used by programmers on day-to-day.

These methods are static on class CompletableFuture, so you don’t need to have a reference to an already existing future object:

runAsync and supplyAsync API
import java.util.function.Supplier;
import java.util.concurrent.Executor;

public class CompletableFuture<T> {

  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

  public static CompletableFuture<Void> runAsync(Runnable runnable);
  public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
  // other methods
}

For each, there are two versions relating to where the computation provided runs. Or to be more precise, in which executor (i.e. thread pool) it runs. One method allows you to explicitly provide it. If you use the other method, then Java’s static commonPool() is used. Apart from that, there is no difference between the two versions. The choice of executor doesn’t impact the overall behaviour of runAsync/supplyAsync.

Both methods are used when you have a computation that “takes a long time” and you don’t want to block the calling thread. You “make it async” by scheduling it to run on a thread pool (i.e. the executor). The CompletableFuture reference you get back allows you to essentially be notified when the computation finishes and then chain/compose that result with further computations. Chaining is beyond the scope of this post, but you can read more about it at A Guide to CompletableFuture .

By “takes a long time” we normally mean operations that do file IO or network IO, like using files and making HTTP requests. IO is much slower compared to normal operations that use data in memory. Additionally, CPU operations that are computationally expensive, like sorting a large list are also qualified as “taking long time”.

runAsync vs. supplyAsync

What is the difference between the 2 methods? When should you use one versus the other?

The difference is very small:

  • runAsync for computations that do not return a result. Only side-effects.
  • supplyAsync for computations that do return a result. Though they might also have side-effects.

If in doubt, you can use supplyAsync, which can do what runAsync does, but not vice-versa.

Everything else is the same. Namely how they behave with errors, and what you can afterwards do with the CompletableFuture instance you receive back.

runAsync

The runAsync method accepts a Runnable instance. Runnables return void and that is evident on the return type of the CompletableFuture you obtain back.

Even though the Runnable computation doesn’t return a value and the future is associated with Void type, you can be sure that the associated CompletableFuture completes only after that Runnable computation terminates, and not before. That is of central importance. Otherwise there would be no reason to use runAsync

runAsync is helpful when you have computation that only has side-effects, that do not return a meaningful result that you want to use later. Potential examples include writing to a file, sending an email over the network, and interacting with a database.

A trivial example on how to use them is:

// This assignment does not block the current thread.
CompletableFuture<Void> cF1 =
    CompletableFuture.runAsync(
        () -> { // we are using lambda syntax
          try {
            // pretend this is a long-running or blocking computation
            Thread.sleep(10_000L); // 10 seconds
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
          // notice this lambda function is not returning a value!
          System.out.println("Finishing running computation ...");
        });

// We "chain" cF1 to register async actions to be done once cF1 completes
// "chaining" is beyond the scope of this post.
CompletableFuture<Void> cF2 =
    cF1.thenRunAsync(
        () -> {
          // Do something else
        });

cF1.join(); // Just to use as example. Normally you don't want to do this.
System.out.println("This line will only run after the future cF1 completes ...");

Above, cF1 assignment returns immediately, and the associated computation will run on Java’s commonPool() . But even before that computation finishes (after 10 seconds), we can already “chain” the original future with new operations, like it is done with cF2

For the snipped above, after 10 seconds, the output of the above would be:

Finishing running computation ...
This line will only run after the future cF1 completes ...

supplyAsync

Unlike runAsync, supplyAsync method accepts a Supplier<U>:

package java.util.function;

public interface Supplier<U> {
    U get();
}

You use this method if the computation that you want to “make async” returns a meaningful result, like a String, Integer, custom type.

The CompletableFuture you get back completes with the value U once method get()finishes. Essentially, this method is scheduled to run on the executor (either the provided or commonPool), and once it finishes it returns a value U u, that is then used to complete the future.

A trivial example on how to use them is:

// This assignment does not block the current thread.
CompletableFuture<String> cF1 =
    CompletableFuture.supplyAsync(  // this is running on a thread of the Java's commonPool().
        () -> {  // we are using lambda syntax
          try {
            // pretend this is a long-running or blocking computation
            Thread.sleep(10_000L); // sleep for 10 seconds
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }

          // We are returning a "meaningful" result value of type String.
          return "Concurrency Deep Dives";
        });

// We "chain" cF1. The lambda function takes the value of type string and upper cases it.
CompletableFuture<String> cF2 = cF1.thenApplyAsync(i -> i.toUpperCase());

// Just to use as example. Normally you don't want to do this.
System.out.println(cF1.join());
System.out.println(cF2.join());

After 10 seconds, the output of the above would be:

Concurrency Deep Dives
CONCURRENCY DEEP DIVES

Notice that nothing stops you from providing a return value of null on the Supplier though. In that case you the behaviour is similar as using runAsync.

Practical examples

runAsync

As mentioned runAsync is useful for when your computation only has side effects and does not need to return a result.

Lets imagine you want to take a large text file and replace occurrences of a word for another. You might want to make this action asynchronous because the file is large and you are writing and reading files. Such IO operations are always relatively slow.

You want to read the file (not entirely into memory because it may be to large), replace the words, and then write back to a different file.

This operation returns no value. But it is still helpful to somehow be notified when the task of reading and writing has finished. Perhaps you then want to read the output file, or send it via the network. So runAsync allows you to achieve this.

Lets see how to replace cat‘s for dog‘s.

textFilter function: Copy a file, swapping “cat” for “dog”

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;

// only contains side-effects
private static void textFilter(Path in, Path out) {
  try (Scanner input = new Scanner(Files.newInputStream(in), StandardCharsets.UTF_8);
      OutputStream output = Files.newOutputStream(out)) {

    while (input.hasNextLine()) {
      String newLine = input.nextLine().replace("cat", "dog");
      output.write((newLine + "\n").getBytes(StandardCharsets.UTF_8));
    }
  } catch (Exception e) {
    throw new RuntimeException(String.format("Copying '%s' to '%s'.", in, out), e);
  }
}

public static void main(String[] args) {
  Path fileIn = Path.of(args[0]);
  Path fileOut = Path.of(args[1]);

  CompletableFuture<Void> cF1 = CompletableFuture.runAsync(() -> textFilter(fileIn, fileOut));
  // cF1 only completes when `textFilter` has finished writting to the output file
}

Importantly, notice that even though there is no value associated with the future, it will complete only after textFilter method completes. This implies finishing the entire process of reading the input file, swapping the words, and writing and closing the output file.

supplyAsync

Again, use supplyAsync when the computation you want to perform asynchnously, returns a meaningful result that you then want to use afterwards.

For example, imagine you want to process a large text file to count the frequency of each word. You want the result as a map, linking each word to the number of occurrences in the file.

Count occurrences words on a file

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

private static Map<String, Integer> countOccurrence(Path filePath) {
  Map<String, Integer> occurrences = new HashMap<>();
  try (Scanner scanner = new Scanner(filePath, StandardCharsets.UTF_8)) {
    while (scanner.hasNext()) {
      String word = scanner.next();
      occurrences.compute(
          word,
          (ignored, currentCount) -> {
            if (currentCount == null) return 1;
            else return currentCount + 1;
          });
    }
  } catch (Exception e) {
    throw new RuntimeException(String.format("Counting word occurrence in '%s'", filePath), e);
  }
  return occurrences;
}

public static void main(String[] args) {
  Path inputFile = Paths.get(args[0]);
  
  // Once complete, it contains the number of times every word appears in the file
  CompletableFuture<Map<String, Integer>> cF1 =
      CompletableFuture.supplyAsync(() -> countOccurrence(inputFile));
}

Above, the rationale for making the computation async is that we are reading from a file. The file might be large, and therefore counting make take a long time. But even if it were small, it might still make sense to make the operation async, because IO operations (like reading files) are orders of magnitude slower than computations that use main memory and CPU.

Once we have cF1, we can chain further transformations. Such transformations are scheduled to run on the executor provided (or the default) once the value of the “parent” future is available. For example, imagine we want to list the occurrences of the top three names mentioned in Crime and Punishment:

Name occurrences Crime and Punishment

Set<String> wordsToFilter = Set.of(".... not shown, to big");

CompletableFuture<Map<String, Integer>> cF1 =
    CompletableFuture.supplyAsync(() -> countOccurrence(inputFile));

CompletableFuture<List<Map.Entry<String, Integer>>> cF2 =
    cF1.thenApply(
        wordsToCount ->
            wordsToCount.entrySet().stream()
                .filter(p -> !wordsToFilter.contains(p.getKey()))
                .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                .toList());

// Just to use as example. Normally you don't want to do this.
List<Map.Entry<String, Integer>> res = cF2.join();
System.out.println(res.subList(0, 3));

Would result in:

[Raskolnikov=547, Katerina=211, Razumihin=200]

But chaining is not in scope of this post. To properly learn chaining, have a look at A Guide to CompletableFuture.

Naturally, supplyAsync can also be used if you have a CPU intensive computation, like computing the smallest element on a list:

import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

// returns a list of `size` elements, each between 0 and 50 million.
public static List<Integer> generateRandomList(long size) {
  Random random = new Random();
  return random.ints(size, 0, 50_000_000).boxed().collect(Collectors.toList());
}

public static void main(String[] args) {
  // List of 10 million random elements
  List<Integer> unsortedList = generateRandomList(10_000_000L);
  CompletableFuture<Integer> cF1 =
      CompletableFuture.supplyAsync(() -> unsortedList.stream().sorted().toList().get(0));

  System.out.println("Smallest element: " + cF1.join());
}

Error Handling

What happens when the lambda computations provided to runAsync and supplyAsync throw an exception?

Here again, there is no difference between the 2 methods. For both runAsync and supplyAsync, the implementation of CompletableFuture runs the corresponding Runnable and Supplier instances inside a try/catch block. Naturally this is running on a thread of the executor. If they throw, the exception is catched and the future is completed exceptionally.

So you have a mechanism to know 1) that an exception was thrown, 2) which exception it was, 3) and recover mechanisms.

To learn more about error handling and what it means for a future to be completed exceptionally, visit A Guide to CompletableFuture.

For example, if we go back to the supplyAsync example, and provide a input file that does not exist:

public static void main(String[] args) {
  Path inputFile = Paths.get("this-file-does-not-exist");
  
  // Once complete, it contains the number of times every word appears in the file
  CompletableFuture<Map<String, Integer>> cF1 =
      CompletableFuture.supplyAsync(() -> countOccurrence(inputFile));

  // This will throw because `countOccurrence` throws an exception.
  System.out.println(cF1.join());
}

Then method countOccurrence would throw.

However, this exception would not be lost. You would be “notified” about this by virtue of the future cF1 being completed exceptionally. The exception is of type CompletionException, which then wraps the actual causes. The above would output to the console:

Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: Counting word occurrence in 'this-file-does-not-exist'
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
..... truncated
Caused by: java.lang.RuntimeException: Counting word occurrence in 'this-file-does-not-exist'
	at futures.CountWordOccurrence.countOccurrence(CountWordOccurrence.java:24)
	at futures.CountWordOccurrence.lambda$main$1(CountWordOccurrence.java:32)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	... 6 more
Caused by: java.nio.file.NoSuchFileException: this-file-does-not-exist
	at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
  ... truncated
	at java.base/java.util.Scanner.makeReadable(Scanner.java:621)
	at java.base/java.util.Scanner.<init>(Scanner.java:756)
	at futures.CountWordOccurrence.countOccurrence(CountWordOccurrence.java:13)
	... 8 more

The same would happen for runAsync.

Leave a Comment

Index