Implementing Go Concurrency Features in Java Part 3: Implementing the Select Statement

William Yin
13 min readJul 7, 2024

--

This article is the third in a series of articles documenting my process implementing Go concurrency features in Java. In the previous article we implemented Go’s channels. That article can be found here. In this article I will document how I implemented the select statement in Java.

Overview

The select statement allows a goroutine to wait on multiple communication operations. A select statement blocks until one of its cases can run, and then it executes that case. If multiple cases are ready, then it chooses one at random. The condition for a case to run can be sending or receiving a message from a channel or a specified amount of time passing. You can also specify a default case to run to ensure that the select statement does not block if none of the cases are available when the statement executes.

The select statement is especially useful when solving the sleeping barber problem. Without the select statement, we would have to use a semaphore and check if it has any permits available before acquiring to prevent the customer from waiting for an empty seat. This would require us to keep track of the remaining permits in the semaphore separately and lock the semaphore while we are checking the number of seats available and acquiring a permit. However, by using the select statement in conjunction with channels, the problem becomes much more intuitive as we can represent the waiting room as a buffered channel with a capacity equal to number of seats in the waiting room and create a select statement for the customer that executes a certain case if the waiting room has space available and executes a default case immediately if not. This solution is shown below.

import io.javago.Channel;
import io.javago.InputChannel;
import io.javago.OutputChannel;
import io.javago.sync.WaitGroup;

import static io.javago.Go.go;
import static io.javago.Selector.select;

class SleepingBarber {

private static final int NUM_CUSTOMERS = 100;
private static final int NUM_CHAIRS = 5;

private static final WaitGroup wg = new WaitGroup();

public static void main(String[] args) {
wg.add(NUM_CUSTOMERS);

try (
Channel<Integer> customerArrived = Channel.make();
Channel<Boolean> barberDone = Channel.make();
Channel<Object> waitingRoom = Channel.make(NUM_CHAIRS)
) {
go(new Barber(customerArrived, barberDone));
for (int i = 0; i < NUM_CUSTOMERS; i++) {
go(new Customer(i, waitingRoom, customerArrived, barberDone));
try {
Thread.sleep((int) (Math.random() * 100 + 1));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
wg.await();
}
System.out.println("All customers have been served or left.");
}

private record Barber(
InputChannel<Integer> customerArrived,
OutputChannel<Boolean> barberDone
) implements Runnable {

@Override
public void run() {
for (Integer customer : customerArrived) {
System.out.println("Barber is cutting customer #" + customer + "'s hair.");
sleep((int) (Math.random() * 100 + 1));
System.out.println("Barber has finished cutting customer #" + customer + "'s hair.");
barberDone.send(true);
}
}

private void sleep(long millis) {
if (millis > 0) {
synchronized (this) {
try {
this.wait(millis);
} catch (InterruptedException ignored) {}
}
}
}
}

private record Customer(
int id,
Channel<Object> waitingRoom,
OutputChannel<Integer> customerArrived,
InputChannel<Boolean> barberDone
) implements Runnable {

@Override
public void run() {
try (wg) {
System.out.println("Customer #" + id + " has arrived.");
select()
.addCase(waitingRoom, new Object(), () -> {
System.out.println("Customer #" + id + " has taken a seat in the waiting room.");
customerArrived.send(id);
barberDone.receive();
waitingRoom.receive();
System.out.println("Customer #" + id + " received a haircut and is leaving.");
})
.addDefault(() -> System.out.println("Customer #" + id + " found no free chairs and is leaving."))
.run();
}
}
}
}

Implementing the Select Statement

In Go, the condition for a case in a select statement to be executed can be when a message is sent to a channel, when a message is received from a channel, or when a certain amount of time passes. We will represent each of these cases with a Java class. A channel may also contain a default case that executes if none of the other cases can immediately execute.

To mimic the select statement’s syntax as closely as possible, we will be using the builder design pattern so that the user can chain case additions. Below is an example of how the user will create and run a select statement using our library.

select()
.addCase(inputChannel, msg -> System.out.println(msg)) // Message received case
.addCase(outputChannel, new Object(), () -> System.out.println("Message sent!") // Message sent case
.addCase(Duration.ofMillis(500), () -> System.out.println("500 ms has passed.")) // Timeout case
.addDefault(() -> System.out.println("Executing default case.") // Default case
.run();

Our select statement will be packaged in the Selector class which we will create now. In our select statement, we will wait for each case on a separate thread and maintain a channel to each of those threads. We will also assign an id to each case and when a case’s condition is met, it will send its id to the select statement’s thread which will then interrupt every other case’s thread. The select statement’s thread will then case whose condition was met to finish executing. To ensure that only one case executes, we will use an AtomicBoolean in the Selector class to indicate if a case is already executing. When a case tries to execute, it will first try to atomically update the atomic boolean's value to true if it is not already true using the atomic boolean's compareAndSet(boolean expectedValue, boolean newValue)method. It will only execute its body if the update is successful. If the select statement has a default case, then before creating the threads for the cases, it will check each case’s respective channel to see if the case can execute. If no case’s channel is available, it will execute the default case and return after it completes.

Below is the code to implement the select statement in Java. We will create the InputChannelCase , OutputChannelCase , and DelayedCase classes in the next section.

package io.javago;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* The {@code Selector} class implements Go's {@code select} statement.
* It implements a select-like mechanism for handling multiple asynchronous tasks.
* It manages cases involving input channels, output channels, delayed tasks, and a default case.
* When a {@code Selector}'s {@link #run} method is called, it creates a virtual thread for each case.
* In the event that one of the cases is a {@link DelayedCase}, the delayed case will also reserve a platform thread
* until either another case in the {@code Selector} or the delayed case's method is completed.
* This means that the amount of delayed cases in a {@code Selector} and their duration should be kept to a minimum.
* When a case is executed, the virtual threads for all other tasks are interrupted and any pending tasks on them are
* cancelled.
*/
public class Selector {

private final List<Thread> threads = new ArrayList<>();
private final List<Runnable> cases = new ArrayList<>();
private final Channel<Integer> toCases = Channel.make();
private final List<InputChannel<?>> inputChannels = new ArrayList<>();
private final List<OutputChannel<?>> outputChannels = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private Runnable defaultCase;

/**
* Private constructor for creating instances of {@code Selector}. Use {@link #select()} method to instantiate.
*/
private Selector() {}

/**
* Static factory method to create a new instance of {@code Selector}.
*
* @return a new instance of {@code Selector}
*/
public static Selector select() {
return new Selector();
}

/**
* Adds an {@link InputChannelCase} to the selector.
* The case will execute its {@link Consumer} after it receives a message from its associated {@link InputChannel}.
* The message received from the channel will be the input to the consumer.
*
* @param <T> the type of messages handled by the input channel
* @param ch the input channel to monitor
* @param c the consumer to execute when a message is received
* @return this {@code Selector} instance for method chaining
*/
public <T> Selector addCase(InputChannel<T> ch, Consumer<T> c) {
cases.add(new InputChannelCase<>(ch, c, toCases, cases.size(), closed));
inputChannels.add(ch);
return this;
}

/**
* Adds an {@link OutputChannelCase} to the selector.
* The case will execute its {@link Runnable} after it sends a message to its associated {@link OutputChannel}.
*
* @param <T> the type of message to send through the output channel
* @param ch the output channel to send the message
* @param message the message to send through the output channel
* @param r the callback to execute after sending the message
* @return this {@code Selector} instance for method chaining
*/
public <T> Selector addCase(OutputChannel<T> ch, T message, Runnable r) {
cases.add(new OutputChannelCase<>(ch, message, r, toCases, cases.size(), closed));
outputChannels.add(ch);
return this;
}

/**
* Adds a {@link DelayedCase} to the selector.
* The delayed case will reserve a platform thread until either another case in the {@code Selector} or the delayed
* case's method is completed.
* This means that the amount of delayed cases in a {@code Selector} and their duration should be kept to a minimum.
*
* @param d the duration to wait before executing the callback
* @param r the runnable to execute after the delay
* @return this {@code Selector} instance for method chaining
*/
public Selector addCase(Duration d, Runnable r) {
cases.add(new DelayedCase(d, r, toCases, cases.size(), closed));
return this;
}

/**
* Adds a default case to the selector.
* The default case will be executed if every {@link InputChannelCase} and {@link OutputChannelCase}'s channel is
* empty or full respectively.
*
* @param r the runnable to execute as the default case
* @return this {@code Selector} instance for method chaining
*/
public Selector addDefault(Runnable r) {
defaultCase = r;
return this;
}

/**
* Executes the selector logic.
* Creates a virtual thread for each case.
* In the event that one of the cases is a {@link DelayedCase}, the delayed case will also reserve a platform thread
* until either another case in the {@code Selector} or the delayed case's method is completed.
* This means that the amount of delayed cases in a {@code Selector} and their duration should be kept to a minimum.
* When a case is executed, the virtual threads for all other tasks are interrupted and any pending tasks on them
* are cancelled.
*/
public void run() {
if (defaultCase != null) {
boolean goToDefaultCase = true;
for (InputChannel<?> ch : inputChannels) {
if (!ch.isEmpty()) {
goToDefaultCase = false;
}
}
for (OutputChannel<?> ch : outputChannels) {
if (!ch.isClosed() && !ch.isFull()) {
goToDefaultCase = false;
}
}
if (goToDefaultCase) {
defaultCase.run();
return;
}
}

for (Runnable r : cases) {
threads.add(Thread.ofVirtual().start(r));
}
int runningThreadId = toCases.receive();
for (int i = 0; i < threads.size(); i++) {
if (i != runningThreadId) {
threads.get(i).interrupt();
}
}
toCases.receive();
toCases.close();
}
}

Creating the InputChannelCase, OutputChannelCase, and DelayedCase Classes

These three classes are relatively simple. All of the classes implement the Runnable interface so we can pass them directly to a Thread object. First, let’s begin with the InputChannelCase class whose code is given below.

package io.javago;

import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
* The {@code InputChannelCase} class is used by the {@code Selector} class to create a case that is run when a message
* is received from the case's channel.
* It implements the {@link Runnable} interface and allows a callback to be run when a message is received.
* Additionally, it sends an identifier to a specified output channel before and after executing the callback.
*
* @param <T> the type of messages handled by the input channel
*/
public class InputChannelCase<T> implements Runnable {

private final InputChannel<T> inputChannel;
private final Consumer<T> callback;
private final OutputChannel<Integer> toSelector;
private final int id;
private final AtomicBoolean closed;

/**
* Constructs an {@code InputChannelCase} with the specified input channel, callback, output channel, identifier,
* and closed state.
*
* @param inputChannel the input channel from which messages are received
* @param callback the consumer to be executed when a message is received
* @param toSelector the output channel to send the identifier
* @param id the identifier to be sent to the output channel
* @param closed the atomic boolean indicating the closed state
*/
public InputChannelCase(
InputChannel<T> inputChannel,
Consumer<T> callback,
OutputChannel<Integer> toSelector,
int id,
AtomicBoolean closed
) {
this.inputChannel = inputChannel;
this.callback = callback;
this.toSelector = toSelector;
this.id = id;
this.closed = closed;
}

/**
* Runs the input channel case. It receives a message from the input channel and executes the callback.
* If the input channel is closed or the thread is interrupted, the callback is executed immediately which will
* close its associated {@link Selector} object if it exists.
* The identifier is sent to the output channel before and after executing the callback.
*
* @throws NoSuchElementException if the channel is closed and empty
*/
@Override
public void run() {
if (inputChannel.isClosed() && closed.compareAndSet(false, true)) {
toSelector.send(id);
T message = inputChannel.receive();
callback.accept(message);
toSelector.send(id);
}
boolean hasNext = inputChannel.hasNext();
if (Thread.currentThread().isInterrupted() && !inputChannel.isClosed()) {
return;
}
if ((hasNext || inputChannel.isClosed()) && closed.compareAndSet(false, true)) {
toSelector.send(id);
T message = inputChannel.receive();
callback.accept(message);
toSelector.send(id);
}
}
}

As stated in the previous section, we use the atomic boolean’s compareAndSet(boolean expectedValue, boolean newValue) method to ensure that this case only executes if no other case has run yet. Just like in Go, if the channel we are waiting to receive a message from is closed, we immediately execute the case’s body represented by the Consumer object. Otherwise, we wait until the channel has a message and then receive it and execute the case’s body sending the case’s id to the select statement’s thread before and after execution. If this case’s thread is interrupted and the channel is not closed, then that means the select statement’s thread interrupted it because a case is already running which means we should cancel the case by returning from its run() method.

Next up is the OutputChannelCase class whose code is given below.

package io.javago;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* The {@code OutputChannelCase} class is used by the {@code Selector} class to create a case that is run when a message
* is sent to the case's channel.
* It implements the {@link Runnable} interface and allows a callback to be run after the message is sent.
* Additionally, it sends an identifier to a specified output channel before and after executing the callback.
*
* @param <T> the type of message to be sent to the output channel
*/
public class OutputChannelCase<T> implements Runnable {

private final OutputChannel<T> outputChannel;
private final T message;
private final Runnable callback;
private final OutputChannel<Integer> toSelector;
private final int id;
private final AtomicBoolean closed;

/**
* Constructs an {@code OutputChannelCase} with the specified output channel, message, callback, output channel for selector, identifier, and closed state.
*
* @param outputChannel the output channel to which the message will be sent
* @param message the message to be sent to the output channel
* @param callback the runnable to be executed after the message is sent
* @param toSelector the output channel to send the identifier
* @param id the identifier to be sent to the output channel
* @param closed the atomic boolean indicating the closed state
*/
public OutputChannelCase(
OutputChannel<T> outputChannel,
T message,
Runnable callback,
OutputChannel<Integer> toSelector,
int id,
AtomicBoolean closed
) {
this.outputChannel = outputChannel;
this.message = message;
this.callback = callback;
this.toSelector = toSelector;
this.id = id;
this.closed = closed;
}

/**
* Runs the output channel case. It sends a message to the output channel and executes the callback.
* If the output channel is closed or the thread is interrupted, the case returns immediately without executing the
* callback.
* This will not close its associated {@link Selector} object if it exists meaning it will still wait for a case to
* be completed.
* The identifier is sent to the output channel before and after executing the callback.
*/
@Override
public void run() {
boolean hasSpace = outputChannel.hasSpace();
if (Thread.currentThread().isInterrupted()) {
return;
}
if (hasSpace && closed.compareAndSet(false, true)) {
toSelector.send(id);
outputChannel.send(message);
callback.run();
toSelector.send(id);
}
}
}

As expected, the OutputChannelCase class is very similar to the InputChannelCase class except rather than waiting until the channel has a message and then receiving it, the case waits until the channel has space for a new message and then sends its message before executing its body. If the channel is closed while the case is waiting, then it does nothing and returns from its run() method.

Finally, we have the DelayedCase class.

package io.javago;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The {@code DelayedCase} class is used to create the timeout case found in Go's {@code select} statement.
* It represents a task that is executed after a specified delay.
* It implements the {@link Runnable} interface and allows a callback to be run after the delay.
* {@code DelayedCase} reserves a platform thread until its callback has completed execution so the delay and amount of
* instances of this class currently running should be kept to a minimum.
* Additionally, it sends an identifier to a specified output channel before and after executing the callback.
*/
public class DelayedCase implements Runnable {

private final Duration delay;
private final Runnable callback;
private final OutputChannel<Integer> toSelector;
private final int id;
private final AtomicBoolean closed;

/**
* Constructs a {@code DelayedCase} with the specified delay, callback, output channel, identifier, and closed
* state.
* {@code DelayedCase} reserves a platform thread until its callback has completed execution so the delay and amount
* of instances of this class currently running should be kept to a minimum.
*
* @param delay the duration to wait before executing the callback
* @param callback the runnable to be executed after the delay
* @param toSelector the output channel to send the identifier
* @param id the identifier to be sent to the output channel
* @param closed the atomic boolean indicating the closed state
*/
public DelayedCase(
Duration delay,
Runnable callback,
OutputChannel<Integer> toSelector,
int id,
AtomicBoolean closed
) {
this.delay = delay;
this.callback = callback;
this.toSelector = toSelector;
this.id = id;
this.closed = closed;
}

/**
* Runs the delayed task. After the specified delay, it sends the identifier to the output channel,
* runs the callback, and sends the identifier again. If interrupted, it cancels the timer.
* Reserves a platform thread until its callback has completed execution so the delay and amount of instances of
* this class currently running should be kept to a minimum.
*/
@Override
public void run() {
Timer timer = new Timer();
final Thread thread = Thread.currentThread();
timer.schedule(new TimerTask() {
@Override
public void run() {
if (closed.compareAndSet(false, true)) {
toSelector.send(id);
callback.run();
toSelector.send(id);
thread.interrupt();
}
}
}, calculateExecutionTime());
try {
Thread.sleep(delay.toMillis());
} catch (InterruptedException e) {
timer.cancel();
}
}

/**
* Calculates the execution time by adding the delay to the current time.
*
* @return the date representing the execution time
*/
private Date calculateExecutionTime() {
Instant currentInstant = new Date().toInstant();
Instant instantAfterDelay = currentInstant.plus(delay);
return Date.from(instantAfterDelay);
}
}

Just like Go’s timeout case, the DelayedCase class schedules its body to run after a specified delay using Java’s Timer class. As stated in the run() method’s comments, the Timer class reserves a platform thread to run the body meaning the delay duration and number of delayed cases currently waiting should be kept to a minimum (the cases themselves run on virtual threads). After scheduling its body, the delayed case’s thread goes to sleep for the specified delay duration until it is either interrupted up by the thread running the case’s body or the select statement. At which point, it will cancel its scheduled TimerTask . It should be noted that it is safe for the DelayedCase to cancel its TimerTask if it is interrupted by the task’s thread because cancelling a TimerTask that has already executed has no effect.

Conclusion

In this article we implemented Go’s select statement in Java. We are now able to conditionally execute logic based on which communication operation resolves first. In the next article we will implement the entirety of Go’s sync package excluding WaitGroup which we implemented in the first article in this series found here, Cond , Locker , Mutex , and RWMutex which already have equivalents in Java.

For the next article in this series click here.

For the previous article in this series click here.

To view the official web page for this library click here.

--

--