How it works in java. CompletableFuture.

Sergey Kuptsov
6 min readNov 18, 2017

--

The main principle of programming says not to reinvent the wheel. But sometimes in order to understand what is under the hood and how use instrument correctly we do need this. Today reinventing completable futures.

CompletableFuture appeared in java 8 and was inspired by google ListenableFuture. As javadoc says CompletableFuture is “A Future that may be explicitly completed”. But mostly it is a bunch of helpful methods to execute some code asynchroniosly with possibility of combining async stages in fast api style.

Let’s start from some low functianality interface WaitingFuture with 2 methods:

public interface WaitingFuture<V> {

V get() throws ExecutionException;

V get(long timeout, TimeUnit unit) throws TimeoutException, ExecutionException;
}

One method to block forever until execution finished and fail if execution failed and one with time limit which will throw TimeoutException if timeout occures.

Let’s now add one utility method which allows to actually execute some user function asynchronously and get result of it. Asynchronously means that we can continue execution flow after start of waitingFuture and go back to results of execution later. This means that we need some another thread to execute user function. The best pattern is to use some execution thread pool. For simplicity let’s start new thread for each user function.

But how to receive user function results or exception — the idea is simple — let’s run user function async with some data container which can hold results or exception. Here we have some problem — the gc lifetime of such container must be managed by client that started async task — cause later we want to observe async results — so the obvious decision is to use returning WaitingFuture as such container. As long as client holds link to waitingFuture — as long the executuon results will live.

public class WaitingFutureImpl<V> implements WaitingFuture<V> {
// result of user function
private volatile V result;
// exception if one occured
private volatile Throwable throwable;
// marker that indicates that user function has finished
private volatile boolean finished;
...
}

And then we must have some Runnable which will actually set execution results. Let’s do it as nested class so it has simple access to external private variables:

private static class RunnableWaitingFuture<V> implements Runnable {
private final Supplier<V> userFunction;
private final WaitingFutureImpl<V> waitingFuture;

public RunnableWaitingFuture(Supplier<V> userFunction, WaitingFutureImpl<V> waitingFuture) {
this.userFunction = userFunction;
this.waitingFuture = waitingFuture;
}

@Override
public void run() {
try {
waitingFuture.result = userFunction.get();
} catch (Throwable throwable) {
waitingFuture.throwable = throwable;
} finally {
waitingFuture.finished = true;
}
}
}

And we will start this runnable in our utility method:

public static <V> WaitingFuture<V> executeAsync(Supplier<V> userFunction) {
WaitingFuture<V> waitingFuture = new SpinLoopWaitingFuture<>();
new Thread(new RunnableWaitingFuture<>(userFunction, waitingFuture)).start();
return waitingFuture;
}

Until we have a link to WaitingFuture that was returned from executeAsync we will have execution results.

Let’s now introduce get methods. We need to wait for result. The first simple option is to spin wait — until result will be ready. We will constantly check variable “finished” — as soon as check will be on different thread — for visibility reason we must have it volatile:

@Override
public V get() throws ExecutionException {
while (!finished) {
//spin
}

if (throwable != null) {
throw new ExecutionException(throwable);
}

return result;
}

Generally speaking it is not good to wait for get() function without timeout — cause we risk to stuck out main thread if user function run forever.

Let’s make get(long timeout, TimeUnit unit) version:

@Override
public V get(long timeout, TimeUnit unit) throws ExecutionException, TimeoutException {
long started = System.nanoTime();
long allowedExecutionNanos = unit.toNanos(timeout);
while (!finished) {
if (System.nanoTime() - started > allowedExecutionNanos) {
throw new TimeoutException();
}
}

if (throwable != null) {
throw new ExecutionException(throwable);
}

return result;
}

Here is full implementation:

public class SpinLoopWaitingFuture<V> implements WaitingFuture<V> {
// result of user function
private volatile V result;
// exception if one occured
private volatile Throwable throwable;
// marker that indicates that user function finished
private volatile boolean finished;

@Override
public V get() throws ExecutionException {
while (!finished) {
//spin
}

if (throwable != null) {
throw new ExecutionException(throwable);
}

return result;
}

@Override
public V get(long timeout, TimeUnit unit) throws ExecutionException, TimeoutException {
long started = System.nanoTime();
long allowedExecutionNanos = unit.toNanos(timeout);
while (!finished) {
if (System.nanoTime() - started > allowedExecutionNanos) {
throw new TimeoutException();
}
}

if (throwable != null) {
throw new ExecutionException(throwable);
}

return result;
}

public static <V> WaitingFuture<V> executeAsync(Supplier<V> userFunction) {
SpinLoopWaitingFuture<V> waitingFuture = new SpinLoopWaitingFuture<>();
new Thread(new RunnableWaitingFuture<>(userFunction, waitingFuture)).start();
return waitingFuture;
}

private static class RunnableWaitingFuture<V> implements Runnable {
private final Supplier<V> userFunction;
private final SpinLoopWaitingFuture<V> waitingFuture;

public RunnableWaitingFuture(Supplier<V> userFunction, SpinLoopWaitingFuture<V> waitingFuture) {
this.userFunction = userFunction;
this.waitingFuture = waitingFuture;
}

@Override
public void run() {
try {
waitingFuture.result = userFunction.get();
} catch (Throwable throwable) {
waitingFuture.throwable = throwable;
} finally {
waitingFuture.finished = true;
}
}
}
}

Frankly speaking spin wait for long tasks is not good cause it causes thread to waste some system resources such as cpu, context switches while the profit of running is zero. The alternative of spin wait is blocking — that causes thread to save current context and disable system scheduling process for thread — it will not be managed. The main way to block thread is to use synchronized and the choose of monitor object is obvious — it is the instance of WaitingFuture itself — if we execute method run of RunnableWaitingFuture in synchronized context with the provided instance of waitingFuture and method get with blocking on the same instance of WaitingFuture we will get the desired blocking. RunnableWaitingFuture.run will get the monitor before get and get will wait for monitor release. But how to make timed get in such concept — there not such method in java that cause to wait synchronized for period of time. You can choose to use Object.wait(long timeout, int nanos) — but you can do it only in synchronized context(“owning monitor”) — and method run will do the same — not allowing to get into get()’s synchronized block, cause run() starts before get(). Let’s recap — all we need is to park thread untill another thread call us to unpark or fail with timeout — happily we have such threads — one to block and one that knows that result is ready and can unpark waiting thread and java has satisfying api: LockSupport.parkNanos:

...
private volatile
Thread thread;

@Override
public V get() throws ExecutionException {
this.thread = Thread.currentThread();
LockSupport.park(this);

if (throwable != null) {
throw new ExecutionException(throwable);
}

return result;
}

@Override
public V get(long timeout, TimeUnit unit) throws ExecutionException, TimeoutException {
this.thread = Thread.currentThread();
long allowedExecutionNanos = unit.toNanos(timeout);
LockSupport.parkNanos(allowedExecutionNanos);

if (!finished) {
throw new TimeoutException();
}

if (throwable != null) {
throw new ExecutionException(throwable);
}

return result;
}
private static class RunnableWaitingFuture<V> implements Runnable {
...
@Override
public void run() {
try {
waitingFuture.result = userFunction.get();
} catch (Throwable throwable) {
waitingFuture.throwable = throwable;
} finally {
waitingFuture.finished = true;
LockSupport.unpark(waitingFuture.thread);
}
}
}
}

For sure parking/unparking threads is costly operation and for very short-lived operation. So which one to use — actually java uses some combination of this methods — it spin waits for some very short random time and if we exceeded this time parks thread:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
....
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
....
private Object waitingGet(boolean interruptible) {
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
}....
}
static final class Signaller extends Completion
...
final CompletableFuture<?> tryFire(int ignore) {
Thread w; // no need to atomically claim
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w);
}
}
...
public boolean block() {
...
LockSupport.parkNanos(this, nanos);
...
}

--

--