Transform a Future into CompletableFuture

It’s 2024. Completable futures have been introduced over 10 years ago in Java 8, and virtual threads – from Java 21 – have the potential to remove the need for futures altogether.

Despite this, many external libraries still return a plain java.util.concurrent.Future, and will never update. Additionally, meaningful adoption of Java +21 might take years (never mind their uncertain impact on futures).

An example of the prevalence of plain Futures is Java’s own AsynchronousFileChannel. It is on the standard library, but in Java 17 the API returns a plain Future and corresponding call-back based method, but no CompletableFuture:

Contract of method write() on AsynchronousFileChannel
package java.nio.channels;

import java.util.concurrent.Future;
import java.nio.channels.CompletionHandler;

abstract class AsynchronousFileChannel {

    // method write() is overloaded. This one returns a plain Future
    Future<Integer> write(ByteBuffer src, long position);

    // This one accepts a callback (the handler).
    // You can use the callback to complete a CompletableFuture
    // Not all APIs provide a callback based method
    <A> void write(ByteBuffer src,
                   long position,
                   A attachment,  // ignore this parameter
                   CompletionHandler<Integer, A> handler);
        
    // ... other methods    
}

CompletableFuture is much more powerful and useful than Future to model async computations. The key difference is chaining/pipelineing. You can chain operations on top of existing futures creating a sort of pipeline, where once one computation completes, others start. You can do this without blocking, and before the original future completes. Read more on A Guide to CompletableFuture.

However, even though they have been introduced over 10 years ago, many external libraries still don’t support them and return a plain Future or sometimes call-back based methods.

In such scenarios, can you transform the plain Future into a CompletableFuture to take advantage of its composability? Yes, but there are some caveats.

Don’t Block

The most straightforward way is to sacrifice a thread:

// overloaded method. If executor not passed in explicitly, then use Java's common pool.
// not advised. This pool is not meant for blocking operations.
static <T> CompletableFuture<T> toCompletableFuture(Future<T> original) {

  // ForkJoinPool.commonPool() not designed for blocking operations.
  return toCompletableFuture(original, ForkJoinPool.commonPool());
}

static <T> CompletableFuture<T> toCompletableFuture(Future<T> original, Executor executor) {
  return CompletableFuture.supplyAsync(
      () -> {
        try {
          return original.get();
        } catch (Throwable e) {
          throw new CompletionException(e);
        }
      },
      executor);
}

Where we do the blocking on a separate thread.
This has the advantage that it doesn’t block the calling thread and you then get a completable future that you can compose. Additionally, the returned CompletableFuture will complete as soon as the original future finishes, meaning any subsequent async computations registered on the completable future will be scheduled immediately.

However, although we don’t block the calling thread, we are still blocking some thread from the executor passed in.

That executor thread will block and wait indefinitely until the future completes. During this time, it will be de-scheduled by the operating system and therefore not waste CPU resources. But during this time the thread is not used for anything else.

This works if used for a “few futures”, but is not feasible for hundreds, or thousands.

On one hand, each thread, even if de-scheduled by the OS, still consumes resources, namely a certain amount of memory for its stack.

On the other hand, the executor that you passed in will likely have a maximum amount of threads that it can create and manage. If you try to transform many futures, and if each future takes a long time, then that executor will not have left-over threads for other computations, leading to thread-pool starvation.

Poll on a scheduler

To transform a plain future into a CompletableFuture, a smarter move is not to block on a dedicated thread, but to poll on a common thread.

With the help of a scheduler, like java.util.Timer, for every future that we want to transform, we submit a periodic task that keeps checking if it has already completed. Timer in particular uses only 1 thread in the background to schedule and execute such tasks. Because isDone returns immediately without blocking, each “check” doesn’t impact subsequent checks. Read more about Timer at Guide to java.util.Timer.

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;

public class FutureToCompletableFuture {

  private FutureToCompletableFuture() {}

  // this class uses only one thread and can handle tens of thousands of
  // the polling tasks we require
  private static Timer timer = new Timer("future-to-completablefuture-thread", false);
  private static long pollPeriodMillisDefault = 50L;

  static <T> CompletableFuture<T> poll(Future<T> original) {
    return poll(original, pollPeriodMillisDefault);
  }

  static <T> CompletableFuture<T> poll(Future<T> original, long pollPeriodMillis) {
    // two threads will keep a reference to this future
    // the calling thread (calling the method), and the thread of the timer.
    CompletableFuture<T> cF1 = new CompletableFuture<>();

    TimerTask timerTask =
        new TimerTask() {
          @Override
          public void run() {

            // this operation is non-blocking.
            // this means we are not halting subsequent "polling operations" 
            // on other future conversions.
            if (original.isDone()) {
              try {
                T t = original.get();
                cF1.complete(t);
              } catch (ExecutionException e) {
                cF1.completeExceptionally(new CompletionException(e.getCause()));
              } catch (CancellationException | InterruptedException e) {
                cF1.completeExceptionally(e);
              }
              
              // after its completed polling is unnecessary and wasteful
              this.cancel(); 
            }
          }
        };

    // run the task at a given frequency.
    timer.schedule(timerTask, pollPeriodMillis, pollPeriodMillis);

    return cF1;
  }
}  

This way, you can transform thousands of plain Futures with only ever the 1 extra thread of the Timer.

The disadvantage is that we need to specify the polling frequency. To high and you waste resources, to low and any pipeline transformations registered against the completable future will be scheduled with an unacceptable delay.

The approach above would work well. However, if you need this functionality, look into library futurity for a more robust solution. It doesn’t use Timer class, but the principle is the same.

Don’t block the worker thread

There is a caveat to this approach.

As discussed in more detail in A Guide to CompletableFuture, most chaining/transformation operations on CompletableFuture API have 3 versions, one of which is “non-async”. For example, for thenApply:

  • thenApply(<lambda>) [non-async]
  • thenApplyAsync(<lambda>) [async]
  • thenApplyAsync(<lambda>, <executor>) [async]

The behaviour of the non-async version is such that, if by the time of the registration of the transformation (i.e. of chaining a new operation), the original future is not yet completed, then that operation runs on the thread that completes the future. In this case it means on the lonely thread of our java.util.Timer!
Therefore, if the operation takes a long time, that delays all other scheduled tasks. Namely, it delays the polling/completion of all the other futures being transformed.

For example:

Supporting methods and variables

public class Utils {
  // Returns a future that completes with the specfied result 
  // after the specified delay.
  // yes, it is returning a CompletableFuture, but we pretend that it is a 
  // plain Future (CompletableFutures extend the plain Future interface)
  public static <T> CompletableFuture<T> sleepAsync(T t, long sleepMillis) {
    return CompletableFuture.supplyAsync(
        () -> t, CompletableFuture.delayedExecutor(sleepMillis, TimeUnit.MILLISECONDS));
  }
}


public class BlockingSchedulerThread {

  private static long startReference = System.currentTimeMillis();

  private static int lapsedS() {
    return (int) ((System.currentTimeMillis() - startReference) / 1000L);
  }

  private static void sleep(long millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  // Prints the 1) argument, 2) the thread executing the method, and 3) the time elapsed
  // since the start of the program
  private static void printResultAndThread(String result) {
    System.out.printf("%s # %s # %ds\n", result, Thread.currentThread().getName(), lapsedS());
  }
}

public class BlockingSchedulerThread {

  public static void main(String[] args) {

    Future<String> pF1 = Utils.sleepAsync("Concurrency Deep Dives - 1", 10_000L);
    Future<String> pF2 = Utils.sleepAsync("Concurrency Deep Dives - 2", 12_000L);
    Future<String> pF3 = Utils.sleepAsync("Concurrency Deep Dives - 3", 13_000L);

    CompletableFuture<String> cF1 = FutureToCompletableFuture.poll(pF1);
    CompletableFuture<String> cF2 = FutureToCompletableFuture.poll(pF2);
    CompletableFuture<String> cF3 = FutureToCompletableFuture.poll(pF3);

    // the problem is that 'whenComplete' is synchronous. It runs on the thread
    // that completes 'cF1', which is the timer thread
    cF1.whenComplete(
        (result, exception) -> {
          printResultAndThread(result);
          sleep(21_000L); // prevents the timer thread from executing other tasks
        });

    cF2.whenCompleteAsync((result, exception) -> printResultAndThread(result));
    cF3.whenCompleteAsync((result, exception) -> printResultAndThread(result));
  }
}

Above, the second and third futures complete after 12 and 13 seconds of the start of program respectively. Therefore, the printResultAndThread computation that we are “pipeliening” at lines 21/22 should start approximately at that time as well. Instead the output is:

Concurrency Deep Dives - 1 # future-to-completablefuture-thread # 10s
Concurrency Deep Dives - 3 # ForkJoinPool.commonPool-worker-1 # 31s
Concurrency Deep Dives - 2 # ForkJoinPool.commonPool-worker-2 # 31s

The computations that we pipelined were delayed by 9 and 8 seconds because on line 15, we pipeline the first future with a synchronous computation that takes 21 seconds. This computation – that is, the print method and the sleep – are going to run on the same thread that completes the original future cF1. That thread is the timer thread (named future-to-completablefuture-thread). Because the timer has no other threads available, the polling of the other 2 futures does not occur until it is unblocked, but by then, we are already delayed.

This would be mitigated if we use a scheduler with more worker threads rather than java.util.Timer, but this would only postpone the problem.

Therefore, its important not to use the synchronous version of any of the pipelining methods of CompletableFuture. If we swap whenComplete for whenCompleteAsync, the problem disappears:

Concurrency Deep Dives - 1 # ForkJoinPool.commonPool-worker-1 # 10s
Concurrency Deep Dives - 2 # ForkJoinPool.commonPool-worker-2 # 12s
Concurrency Deep Dives - 3 # ForkJoinPool.commonPool-worker-2 # 13s

Notice in particular how the computation pipelined on cF1 no longer runs on the Timer’s thread.

Leverage Callbacks

Sometimes, you don’t need to transform a plain Future into a CompletableFuture at all.

Some APIs have callback based alternatives. You pass in a function that the API is responsible for invoking to the result of the asynchronous operation (once that result becomes available).

For example, above AsynchronousFileChannel has the overloaded method returning void, and receives the callback handler that is executed once the data has been written onto the file:

Using callback API of AsynchronousFileChannel

public static void main(String[] args) throws IOException {
  // not important how we obtain it for the purposes of this example
  AsynchronousFileChannel fC = AsynchronousFileChannel.open(Paths.get(args[0]));
  
  String contents = "Concurrency Deep Dives";
  fC.write(
      ByteBuffer.wrap(contents.getBytes()),
      0,
      null,
      new CompletionHandler<>() {
        
        // method called after contents successfully written
        // `result` is the number of bytes written (not relevant for this example)
        public void completed(Integer result, Object attachment) {
          doSomethingOnSuccess(result) // method not shown
          System.out.printf("Finished writing %d bytes into the file", result);
        }

        // method called after writing to file-system failed (e.g. permissions issue)
        public void failed(Throwable exc, Object attachment) {
          doSomethingOnFailure(exc)  // method not shown
          exc.printStackTrace();
        }
      });
}

But callbacks have many problems. Just ask Javascript about callback hell. They are not composable and when you pass the callback method to the external API, you are at its mercy. What we really want is to use completable futures.

Fortunately, you can easily transform callback APIs into CompletableFutures. Simply use the callback to complete the future:

// the method that allows you to transform a callback API into completable futures
static CompletableFuture<Integer> write(AsynchronousFileChannel fC, String contents) {
  CompletableFuture<Integer> cF1 = new CompletableFuture<>();

  fC.write(
      ByteBuffer.wrap(contents.getBytes()),  // the contents to write to the file
      0,     // file position from where to start writing - irrelevant for this explanation
      null,  // irrelevant for this explanation
      new CompletionHandler<>() { // the callback!!
        
        // This is called by the AsynchronousFileChannel implementation
        // after it successfully writes the contents to file system
        // `result` is the number of bytes written (not relevant for this example)
        public void completed(Integer result, Object attachment) {
          cF1.complete(result);  // we complete the future
        }

        // alternatively, if the writing to the filesystem fails
        // this method is called
        public void failed(Throwable exc, Object attachment) {
          cF1.completeExceptionally(exc);  // we complete the future
        }
      });


  return cF1;
}


public static void main(String[] args) throws IOException, InterruptedException {

  AsynchronousFileChannel fC = AsynchronousFileChannel.open(Paths.get(args[0]), StandardOpenOption.WRITE);
  
  
  CompletableFuture<Integer> cF1 = write(fC, "Concurrency Deep Dives");
  // we now have a completable future that is composable!
}

This approach is preferable to transforming a Future into a CompletableFuture because you avoid the overhead of needing a scheduler, and also the completion occurs immediately, rather than being delayed by up to the polling frequency.

Leave a Comment

Index