Implementing Go Concurrency Features in Java Part 3: Implementing the Select Statement
This article is the third in a series of articles documenting my process implementing Go concurrency features in Java. In the previous article we implemented Go’s channels. That article can be found here. In this article I will document how I implemented the select
statement in Java.
Overview
The select
statement allows a goroutine to wait on multiple communication operations. A select
statement blocks until one of its cases can run, and then it executes that case. If multiple cases are ready, then it chooses one at random. The condition for a case to run can be sending or receiving a message from a channel or a specified amount of time passing. You can also specify a default case to run to ensure that the select
statement does not block if none of the cases are available when the statement executes.
The select
statement is especially useful when solving the sleeping barber problem. Without the select
statement, we would have to use a semaphore and check if it has any permits available before acquiring to prevent the customer from waiting for an empty seat. This would require us to keep track of the remaining permits in the semaphore separately and lock the semaphore while we are checking the number of seats available and acquiring a permit. However, by using the select
statement in conjunction with channels, the problem becomes much more intuitive as we can represent the waiting room as a buffered channel with a capacity equal to number of seats in the waiting room and create a select
statement for the customer that executes a certain case if the waiting room has space available and executes a default case immediately if not. This solution is shown below.
import io.javago.Channel;
import io.javago.InputChannel;
import io.javago.OutputChannel;
import io.javago.sync.WaitGroup;
import static io.javago.Go.go;
import static io.javago.Selector.select;
class SleepingBarber {
private static final int NUM_CUSTOMERS = 100;
private static final int NUM_CHAIRS = 5;
private static final WaitGroup wg = new WaitGroup();
public static void main(String[] args) {
wg.add(NUM_CUSTOMERS);
try (
Channel<Integer> customerArrived = Channel.make();
Channel<Boolean> barberDone = Channel.make();
Channel<Object> waitingRoom = Channel.make(NUM_CHAIRS)
) {
go(new Barber(customerArrived, barberDone));
for (int i = 0; i < NUM_CUSTOMERS; i++) {
go(new Customer(i, waitingRoom, customerArrived, barberDone));
try {
Thread.sleep((int) (Math.random() * 100 + 1));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
wg.await();
}
System.out.println("All customers have been served or left.");
}
private record Barber(
InputChannel<Integer> customerArrived,
OutputChannel<Boolean> barberDone
) implements Runnable {
@Override
public void run() {
for (Integer customer : customerArrived) {
System.out.println("Barber is cutting customer #" + customer + "'s hair.");
sleep((int) (Math.random() * 100 + 1));
System.out.println("Barber has finished cutting customer #" + customer + "'s hair.");
barberDone.send(true);
}
}
private void sleep(long millis) {
if (millis > 0) {
synchronized (this) {
try {
this.wait(millis);
} catch (InterruptedException ignored) {}
}
}
}
}
private record Customer(
int id,
Channel<Object> waitingRoom,
OutputChannel<Integer> customerArrived,
InputChannel<Boolean> barberDone
) implements Runnable {
@Override
public void run() {
try (wg) {
System.out.println("Customer #" + id + " has arrived.");
select()
.addCase(waitingRoom, new Object(), () -> {
System.out.println("Customer #" + id + " has taken a seat in the waiting room.");
customerArrived.send(id);
barberDone.receive();
waitingRoom.receive();
System.out.println("Customer #" + id + " received a haircut and is leaving.");
})
.addDefault(() -> System.out.println("Customer #" + id + " found no free chairs and is leaving."))
.run();
}
}
}
}
Implementing the Select Statement
In Go, the condition for a case in a select
statement to be executed can be when a message is sent to a channel, when a message is received from a channel, or when a certain amount of time passes. We will represent each of these cases with a Java class. A channel may also contain a default case that executes if none of the other cases can immediately execute.
To mimic the select
statement’s syntax as closely as possible, we will be using the builder design pattern so that the user can chain case additions. Below is an example of how the user will create and run a select
statement using our library.
select()
.addCase(inputChannel, msg -> System.out.println(msg)) // Message received case
.addCase(outputChannel, new Object(), () -> System.out.println("Message sent!") // Message sent case
.addCase(Duration.ofMillis(500), () -> System.out.println("500 ms has passed.")) // Timeout case
.addDefault(() -> System.out.println("Executing default case.") // Default case
.run();
Our select
statement will be packaged in the Selector
class which we will create now. In our select
statement, we will wait for each case on a separate thread and maintain a channel to each of those threads. We will also assign an id to each case and when a case’s condition is met, it will send its id to the select
statement’s thread which will then interrupt every other case’s thread. The select
statement’s thread will then case whose condition was met to finish executing. To ensure that only one case executes, we will use an AtomicBoolean
in the Selector
class to indicate if a case is already executing. When a case tries to execute, it will first try to atomically update the atomic boolean's value to true
if it is not already true
using the atomic boolean's compareAndSet(boolean expectedValue, boolean newValue)
method. It will only execute its body if the update is successful. If the select
statement has a default case, then before creating the threads for the cases, it will check each case’s respective channel to see if the case can execute. If no case’s channel is available, it will execute the default case and return after it completes.
Below is the code to implement the select
statement in Java. We will create the InputChannelCase
, OutputChannelCase
, and DelayedCase
classes in the next section.
package io.javago;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* The {@code Selector} class implements Go's {@code select} statement.
* It implements a select-like mechanism for handling multiple asynchronous tasks.
* It manages cases involving input channels, output channels, delayed tasks, and a default case.
* When a {@code Selector}'s {@link #run} method is called, it creates a virtual thread for each case.
* In the event that one of the cases is a {@link DelayedCase}, the delayed case will also reserve a platform thread
* until either another case in the {@code Selector} or the delayed case's method is completed.
* This means that the amount of delayed cases in a {@code Selector} and their duration should be kept to a minimum.
* When a case is executed, the virtual threads for all other tasks are interrupted and any pending tasks on them are
* cancelled.
*/
public class Selector {
private final List<Thread> threads = new ArrayList<>();
private final List<Runnable> cases = new ArrayList<>();
private final Channel<Integer> toCases = Channel.make();
private final List<InputChannel<?>> inputChannels = new ArrayList<>();
private final List<OutputChannel<?>> outputChannels = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private Runnable defaultCase;
/**
* Private constructor for creating instances of {@code Selector}. Use {@link #select()} method to instantiate.
*/
private Selector() {}
/**
* Static factory method to create a new instance of {@code Selector}.
*
* @return a new instance of {@code Selector}
*/
public static Selector select() {
return new Selector();
}
/**
* Adds an {@link InputChannelCase} to the selector.
* The case will execute its {@link Consumer} after it receives a message from its associated {@link InputChannel}.
* The message received from the channel will be the input to the consumer.
*
* @param <T> the type of messages handled by the input channel
* @param ch the input channel to monitor
* @param c the consumer to execute when a message is received
* @return this {@code Selector} instance for method chaining
*/
public <T> Selector addCase(InputChannel<T> ch, Consumer<T> c) {
cases.add(new InputChannelCase<>(ch, c, toCases, cases.size(), closed));
inputChannels.add(ch);
return this;
}
/**
* Adds an {@link OutputChannelCase} to the selector.
* The case will execute its {@link Runnable} after it sends a message to its associated {@link OutputChannel}.
*
* @param <T> the type of message to send through the output channel
* @param ch the output channel to send the message
* @param message the message to send through the output channel
* @param r the callback to execute after sending the message
* @return this {@code Selector} instance for method chaining
*/
public <T> Selector addCase(OutputChannel<T> ch, T message, Runnable r) {
cases.add(new OutputChannelCase<>(ch, message, r, toCases, cases.size(), closed));
outputChannels.add(ch);
return this;
}
/**
* Adds a {@link DelayedCase} to the selector.
* The delayed case will reserve a platform thread until either another case in the {@code Selector} or the delayed
* case's method is completed.
* This means that the amount of delayed cases in a {@code Selector} and their duration should be kept to a minimum.
*
* @param d the duration to wait before executing the callback
* @param r the runnable to execute after the delay
* @return this {@code Selector} instance for method chaining
*/
public Selector addCase(Duration d, Runnable r) {
cases.add(new DelayedCase(d, r, toCases, cases.size(), closed));
return this;
}
/**
* Adds a default case to the selector.
* The default case will be executed if every {@link InputChannelCase} and {@link OutputChannelCase}'s channel is
* empty or full respectively.
*
* @param r the runnable to execute as the default case
* @return this {@code Selector} instance for method chaining
*/
public Selector addDefault(Runnable r) {
defaultCase = r;
return this;
}
/**
* Executes the selector logic.
* Creates a virtual thread for each case.
* In the event that one of the cases is a {@link DelayedCase}, the delayed case will also reserve a platform thread
* until either another case in the {@code Selector} or the delayed case's method is completed.
* This means that the amount of delayed cases in a {@code Selector} and their duration should be kept to a minimum.
* When a case is executed, the virtual threads for all other tasks are interrupted and any pending tasks on them
* are cancelled.
*/
public void run() {
if (defaultCase != null) {
boolean goToDefaultCase = true;
for (InputChannel<?> ch : inputChannels) {
if (!ch.isEmpty()) {
goToDefaultCase = false;
}
}
for (OutputChannel<?> ch : outputChannels) {
if (!ch.isClosed() && !ch.isFull()) {
goToDefaultCase = false;
}
}
if (goToDefaultCase) {
defaultCase.run();
return;
}
}
for (Runnable r : cases) {
threads.add(Thread.ofVirtual().start(r));
}
int runningThreadId = toCases.receive();
for (int i = 0; i < threads.size(); i++) {
if (i != runningThreadId) {
threads.get(i).interrupt();
}
}
toCases.receive();
toCases.close();
}
}
Creating the InputChannelCase, OutputChannelCase, and DelayedCase Classes
These three classes are relatively simple. All of the classes implement the Runnable
interface so we can pass them directly to a Thread
object. First, let’s begin with the InputChannelCase
class whose code is given below.
package io.javago;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* The {@code InputChannelCase} class is used by the {@code Selector} class to create a case that is run when a message
* is received from the case's channel.
* It implements the {@link Runnable} interface and allows a callback to be run when a message is received.
* Additionally, it sends an identifier to a specified output channel before and after executing the callback.
*
* @param <T> the type of messages handled by the input channel
*/
public class InputChannelCase<T> implements Runnable {
private final InputChannel<T> inputChannel;
private final Consumer<T> callback;
private final OutputChannel<Integer> toSelector;
private final int id;
private final AtomicBoolean closed;
/**
* Constructs an {@code InputChannelCase} with the specified input channel, callback, output channel, identifier,
* and closed state.
*
* @param inputChannel the input channel from which messages are received
* @param callback the consumer to be executed when a message is received
* @param toSelector the output channel to send the identifier
* @param id the identifier to be sent to the output channel
* @param closed the atomic boolean indicating the closed state
*/
public InputChannelCase(
InputChannel<T> inputChannel,
Consumer<T> callback,
OutputChannel<Integer> toSelector,
int id,
AtomicBoolean closed
) {
this.inputChannel = inputChannel;
this.callback = callback;
this.toSelector = toSelector;
this.id = id;
this.closed = closed;
}
/**
* Runs the input channel case. It receives a message from the input channel and executes the callback.
* If the input channel is closed or the thread is interrupted, the callback is executed immediately which will
* close its associated {@link Selector} object if it exists.
* The identifier is sent to the output channel before and after executing the callback.
*
* @throws NoSuchElementException if the channel is closed and empty
*/
@Override
public void run() {
if (inputChannel.isClosed() && closed.compareAndSet(false, true)) {
toSelector.send(id);
T message = inputChannel.receive();
callback.accept(message);
toSelector.send(id);
}
boolean hasNext = inputChannel.hasNext();
if (Thread.currentThread().isInterrupted() && !inputChannel.isClosed()) {
return;
}
if ((hasNext || inputChannel.isClosed()) && closed.compareAndSet(false, true)) {
toSelector.send(id);
T message = inputChannel.receive();
callback.accept(message);
toSelector.send(id);
}
}
}
As stated in the previous section, we use the atomic boolean’s compareAndSet(boolean expectedValue, boolean newValue)
method to ensure that this case only executes if no other case has run yet. Just like in Go, if the channel we are waiting to receive a message from is closed, we immediately execute the case’s body represented by the Consumer
object. Otherwise, we wait until the channel has a message and then receive it and execute the case’s body sending the case’s id to the select
statement’s thread before and after execution. If this case’s thread is interrupted and the channel is not closed, then that means the select
statement’s thread interrupted it because a case is already running which means we should cancel the case by returning from its run()
method.
Next up is the OutputChannelCase
class whose code is given below.
package io.javago;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The {@code OutputChannelCase} class is used by the {@code Selector} class to create a case that is run when a message
* is sent to the case's channel.
* It implements the {@link Runnable} interface and allows a callback to be run after the message is sent.
* Additionally, it sends an identifier to a specified output channel before and after executing the callback.
*
* @param <T> the type of message to be sent to the output channel
*/
public class OutputChannelCase<T> implements Runnable {
private final OutputChannel<T> outputChannel;
private final T message;
private final Runnable callback;
private final OutputChannel<Integer> toSelector;
private final int id;
private final AtomicBoolean closed;
/**
* Constructs an {@code OutputChannelCase} with the specified output channel, message, callback, output channel for selector, identifier, and closed state.
*
* @param outputChannel the output channel to which the message will be sent
* @param message the message to be sent to the output channel
* @param callback the runnable to be executed after the message is sent
* @param toSelector the output channel to send the identifier
* @param id the identifier to be sent to the output channel
* @param closed the atomic boolean indicating the closed state
*/
public OutputChannelCase(
OutputChannel<T> outputChannel,
T message,
Runnable callback,
OutputChannel<Integer> toSelector,
int id,
AtomicBoolean closed
) {
this.outputChannel = outputChannel;
this.message = message;
this.callback = callback;
this.toSelector = toSelector;
this.id = id;
this.closed = closed;
}
/**
* Runs the output channel case. It sends a message to the output channel and executes the callback.
* If the output channel is closed or the thread is interrupted, the case returns immediately without executing the
* callback.
* This will not close its associated {@link Selector} object if it exists meaning it will still wait for a case to
* be completed.
* The identifier is sent to the output channel before and after executing the callback.
*/
@Override
public void run() {
boolean hasSpace = outputChannel.hasSpace();
if (Thread.currentThread().isInterrupted()) {
return;
}
if (hasSpace && closed.compareAndSet(false, true)) {
toSelector.send(id);
outputChannel.send(message);
callback.run();
toSelector.send(id);
}
}
}
As expected, the OutputChannelCase
class is very similar to the InputChannelCase
class except rather than waiting until the channel has a message and then receiving it, the case waits until the channel has space for a new message and then sends its message before executing its body. If the channel is closed while the case is waiting, then it does nothing and returns from its run()
method.
Finally, we have the DelayedCase
class.
package io.javago;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The {@code DelayedCase} class is used to create the timeout case found in Go's {@code select} statement.
* It represents a task that is executed after a specified delay.
* It implements the {@link Runnable} interface and allows a callback to be run after the delay.
* {@code DelayedCase} reserves a platform thread until its callback has completed execution so the delay and amount of
* instances of this class currently running should be kept to a minimum.
* Additionally, it sends an identifier to a specified output channel before and after executing the callback.
*/
public class DelayedCase implements Runnable {
private final Duration delay;
private final Runnable callback;
private final OutputChannel<Integer> toSelector;
private final int id;
private final AtomicBoolean closed;
/**
* Constructs a {@code DelayedCase} with the specified delay, callback, output channel, identifier, and closed
* state.
* {@code DelayedCase} reserves a platform thread until its callback has completed execution so the delay and amount
* of instances of this class currently running should be kept to a minimum.
*
* @param delay the duration to wait before executing the callback
* @param callback the runnable to be executed after the delay
* @param toSelector the output channel to send the identifier
* @param id the identifier to be sent to the output channel
* @param closed the atomic boolean indicating the closed state
*/
public DelayedCase(
Duration delay,
Runnable callback,
OutputChannel<Integer> toSelector,
int id,
AtomicBoolean closed
) {
this.delay = delay;
this.callback = callback;
this.toSelector = toSelector;
this.id = id;
this.closed = closed;
}
/**
* Runs the delayed task. After the specified delay, it sends the identifier to the output channel,
* runs the callback, and sends the identifier again. If interrupted, it cancels the timer.
* Reserves a platform thread until its callback has completed execution so the delay and amount of instances of
* this class currently running should be kept to a minimum.
*/
@Override
public void run() {
Timer timer = new Timer();
final Thread thread = Thread.currentThread();
timer.schedule(new TimerTask() {
@Override
public void run() {
if (closed.compareAndSet(false, true)) {
toSelector.send(id);
callback.run();
toSelector.send(id);
thread.interrupt();
}
}
}, calculateExecutionTime());
try {
Thread.sleep(delay.toMillis());
} catch (InterruptedException e) {
timer.cancel();
}
}
/**
* Calculates the execution time by adding the delay to the current time.
*
* @return the date representing the execution time
*/
private Date calculateExecutionTime() {
Instant currentInstant = new Date().toInstant();
Instant instantAfterDelay = currentInstant.plus(delay);
return Date.from(instantAfterDelay);
}
}
Just like Go’s timeout case, the DelayedCase
class schedules its body to run after a specified delay using Java’s Timer
class. As stated in the run()
method’s comments, the Timer
class reserves a platform thread to run the body meaning the delay duration and number of delayed cases currently waiting should be kept to a minimum (the cases themselves run on virtual threads). After scheduling its body, the delayed case’s thread goes to sleep for the specified delay duration until it is either interrupted up by the thread running the case’s body or the select
statement. At which point, it will cancel its scheduled TimerTask
. It should be noted that it is safe for the DelayedCase
to cancel its TimerTask
if it is interrupted by the task’s thread because cancelling a TimerTask
that has already executed has no effect.
Conclusion
In this article we implemented Go’s select
statement in Java. We are now able to conditionally execute logic based on which communication operation resolves first. In the next article we will implement the entirety of Go’s sync package excluding WaitGroup
which we implemented in the first article in this series found here, Cond
, Locker
, Mutex
, and RWMutex
which already have equivalents in Java.
For the next article in this series click here.
For the previous article in this series click here.
To view the official web page for this library click here.