Implementing Go Concurrency Features in Java Part 2: Implementing Channels

William Yin
11 min readJul 6, 2024

--

This article is the second in a series of articles documenting my process implementing Go concurrency features in Java. In the previous article we implemented the go keyword and wait groups. That article can be found here. In this article I will document how I implemented Go’s channels in Java.

Overview

Go’s philosophy when it comes to concurrency is that programmers should not communicate by sharing memory, instead they should share memory by communicating. Channels are the epitome of this philosophy because they all you to easily send and receive data between threads.

The utility of channels is best demonstrated by the famous producer-consumer problem. In plain Java, the easiest way of solving the producer-consumer problem is by using BlockingQueue and having the producer call the put(E e) method to put objects into the queue, waiting if necessary for space to be available, and having the consumer call the take() method to take objects out of the queue, waiting if necessary for an object to become available. Another approach involves using a Queue and synchronizing or locking it whenever it is modified by the producer or consumer and having them notify each other when an item can be placed into the queue or removed from it respectively.

However, with Go’s channels this problem becomes trivial as we can simply send items from the producer to the consumer through a channel and the consumer can iterate through the items as they arrive. Thus, by the end of this article our solution to the producer-consumer problem will look like this.

import io.javago.Channel;
import io.javago.InputChannel;
import io.javago.OutputChannel;
import io.javago.sync.WaitGroup;

import java.util.Arrays;

import static io.javago.Go.go;

class ProducerConsumer {

private static final String[] sodas = {
"Coca-Cola",
"Sprite",
"Pepsi",
"7-Up",
"Canada Dry",
"Dr. Pepper",
"Mountain Dew",
"Fanta",
"Crush",
"Sierra Mist"
};

public static void main(String[] args) {
Channel<String> channel = Channel.make();
WaitGroup wg = new WaitGroup();
wg.add(2);
go(new Producer(channel, wg));
go(new Consumer(channel, wg));
wg.await();
}

private record Producer(OutputChannel<String> channel, WaitGroup wg) implements Runnable {

@Override
public void run() {
try (channel; wg) {
Arrays.stream(sodas).forEach(channel::send);
}
}
}

private record Consumer(InputChannel<String> channel, WaitGroup wg) implements Runnable {

@Override
public void run() {
try (wg) {
for (String soda : channel) {
System.out.printf("Consumer received %s.%n", soda);
}
}
}
}
}

Creating the InputChannel, OutputChannel, and Channel Interfaces

Go allows you to create three different types of channels: chan which can send and receive messages for the current thread, <-chan which can only receive messages for the current thread, and chan<- which can only send messages from the current thread. In Go, you can pass a chan to a function that takes a <-chan or chan<- as a parameter. To replicate this behavior in Java, we will make use of polymorphism by creating two interfaces that the Channel interface will extend: InputChannel and OutputChannel . The Channel interface will be the one implemented by our channel implementation allowing the user to cast it into an InputChannel or OutputChannel .

Below are the InputChannel and OutputChannel interfaces.

package io.javago;

import java.util.NoSuchElementException;

/**
* The {@code InputChannel} interface defines the operations for a Go {@code channel} in Java that can receive messages
* of a specified type.
* It extends {@link AutoCloseable} to support resource management and {@link Iterable} to allow for iteration over the
* messages in the channel.
*
* @param <T> the type of messages handled by the channel
*/
public interface InputChannel<T> extends AutoCloseable, Iterable<T> {

/**
* Receives a message from the channel, waiting if necessary for a message to be sent.
*
* @return the received message
* @throws NoSuchElementException if the channel is both closed and empty
*/
T receive();

/**
* Checks if the channel is closed.
*
* @return {@code true} if the channel is closed, {@code false} otherwise
*/
boolean isClosed();

/**
* Closes the channel.
* Once closed, no more messages can be sent, but any remaining messages can still be received.
* Closing an already closed channel has no effect.
*/
void close();

/**
* Checks if the channel is empty.
*
* @return {@code true} if the channel is empty, {@code false} otherwise
*/
boolean isEmpty();

/**
* Checks if the channel is full.
*
* @return {@code true} if the channel is full, {@code false} otherwise
*/
boolean isFull();

/**
* Waits until the channel has space for another message or is closed.
*
* @return {@code true} if the channel has space, {@code false} if the channel is closed
*/
boolean hasSpace();

/**
* Waits until the channel has another message.
*
* @return {@code true} if there are more messages, {@code false} if the channel is both closed and empty.
*/
boolean hasNext();

/**
* Creates a new channel with a default capacity.
*
* @param <T> the type of messages handled by the channel
* @return a new {@code Channel} instance
*/
static <T> Channel<T> make() {
return new BufferedQueueChannel<>();
}

/**
* Creates a new channel with the specified capacity.
*
* @param <T> the type of messages handled by the channel
* @param capacity the capacity of the channel
* @return a new {@code Channel} instance with the specified capacity
*/
static <T> Channel<T> make(int capacity) {
return new BufferedQueueChannel<>(capacity);
}
}
package io.javago;

/**
* The {@code OutputChannel} interface defines the operations for a Go {@code channel} in Java that can send messages of
* a specified type.
* It extends {@link AutoCloseable} to support resource management and {@link Iterable} to allow for iteration over the
* messages in the channel.
*
* @param <T> the type of messages handled by the channel
*/
public interface OutputChannel<T> extends AutoCloseable, Iterable<T> {

/**
* Sends a message through the channel, waiting if necessary for space to become available.
*
* @param message the message to be sent
* @throws IllegalStateException if the channel
*/
void send(T message);

/**
* Checks if the channel is closed.
*
* @return {@code true} if the channel is closed, {@code false} otherwise
*/
boolean isClosed();

/**
* Closes the channel.
* Once closed, no more messages can be sent, but any remaining messages can still be received.
* Closing an already closed channel has no effect.
*/
void close();

/**
* Checks if the channel is empty.
*
* @return {@code true} if the channel is empty, {@code false} otherwise
*/
boolean isEmpty();

/**
* Checks if the channel is full.
*
* @return {@code true} if the channel is full, {@code false} otherwise
*/
boolean isFull();

/**
* Waits until the channel has space for another message or is closed.
*
* @return {@code true} if the channel has space, {@code false} if the channel is closed
*/
boolean hasSpace();

/**
* Waits until the channel has another message.
*
* @return {@code true} if there are more messages, {@code false} if the channel is both closed and empty.
*/
boolean hasNext();

/**
* Creates a new channel with a default capacity.
*
* @param <T> the type of messages handled by the channel
* @return a new {@code Channel} instance
*/
static <T> Channel<T> make() {
return new BufferedQueueChannel<>();
}

/**
* Creates a new channel with the specified capacity.
*
* @param <T> the type of messages handled by the channel
* @param capacity the capacity of the channel
* @return a new {@code Channel} instance with the specified capacity
*/
static <T> Channel<T> make(int capacity) {
return new BufferedQueueChannel<>(capacity);
}
}

I extended the AutoCloseable interface to allow the channel to be used in a try-with-resources block. Extending the Iterable interface is necessary to allow the channel to iterated over in a for-loop like it can in Go. The make function is intended to imitate how channels are typically created in Go using the make function. Don’t worry about the BufferedQueueChannel class as we will create it in the next section.

With the InputChannel and OutputChannel interfaces created, we can now create the Channel interface which is given below.

package io.javago;

import java.util.NoSuchElementException;

/**
* The {@code Channel} interface defines the operations to create Go's {@code channel} in Java that can send and receive
* messages of a specified type.
* It extends {@link AutoCloseable} to support resource management and {@link Iterable} to allow for iteration over the
* messages in the channel.
*
* @param <T> the type of messages handled by the channel
*/
public interface Channel<T> extends InputChannel<T>, OutputChannel<T> {

/**
* Sends a message through the channel, waiting if necessary for space to become available.
*
* @param message the message to be sent
* @throws IllegalStateException if the channel
*/
@Override
void send(T message);

/**
* Receives a message from the channel, waiting if necessary for a message to be sent.
*
* @return the received message
* @throws NoSuchElementException if the channel is both closed and empty
*/
@Override
T receive();

/**
* Checks if the channel is closed.
*
* @return {@code true} if the channel is closed, {@code false} otherwise
*/
@Override
boolean isClosed();

/**
* Closes the channel.
* Once closed, no more messages can be sent, but any remaining messages can still be received.
* Closing an already closed channel has no effect.
*/
@Override
void close();

/**
* Checks if the channel is empty.
*
* @return {@code true} if the channel is empty, {@code false} otherwise
*/
@Override
boolean isEmpty();

/**
* Checks if the channel is full.
*
* @return {@code true} if the channel is full, {@code false} otherwise
*/
@Override
boolean isFull();

/**
* Waits until the channel has space for another message or is closed.
*
* @return {@code true} if the channel has space, {@code false} if the channel is closed
*/
@Override
boolean hasSpace();

/**
* Waits until the channel has another message.
*
* @return {@code true} if there are more messages, {@code false} if the channel is both closed and empty.
*/
@Override
boolean hasNext();

/**
* Creates a new channel with a default capacity.
*
* @param <T> the type of messages handled by the channel
* @return a new {@code Channel} instance
*/
static <T> Channel<T> make() {
return new BufferedQueueChannel<>();
}

/**
* Creates a new channel with the specified capacity.
*
* @param <T> the type of messages handled by the channel
* @param capacity the capacity of the channel
* @return a new {@code Channel} instance with the specified capacity
*/
static <T> Channel<T> make(int capacity) {
return new BufferedQueueChannel<>(capacity);
}
}

This is the interface that our channel implementation, BufferedQueueChannel , will implement.

Implementing Channels

The class BufferedQueueChannel will be our implementation of the Channel interface. The channel will be backed by a Queue with a default capacity of one. The user can create a buffered channel by specifying a capacity other than one. When the user tries to send a message to the channel, it should block the thread until the queue isn’t full. When the user tries to receive a message from the channel, it should block the thread until the queue has a message. As with Go channels, a channel should be able to receive messages as long as the queue is not empty, even if the channel itself is closed. When the channel is closed, it should notify all threads currently waiting on it to prevent deadlocks. Finally, the hasSpace() and hasNext() methods are necessary for when we implement the select statement in the next article with the latter also being used to create the ChannelIterator class.

Our BufferedQueueChannel class can be found below.

package io.javago;

import java.util.*;

/**
* The {@code BufferedQueueChannel} class is an implementation of the {@link Channel} interface, providing a Go
* {@code channel} backed by a Queue for passing messages between threads.
* It supports both sending and receiving messages with a specified capacity.
*
* @param <T> the type of messages handled by the channel
*/
public class BufferedQueueChannel<T> implements Channel<T> {

private final Queue<T> channelQueue;
private boolean closed = false;
private final int capacity;

/**
* Constructs a {@code BufferedQueueChannel} with a default capacity of 1.
*/
public BufferedQueueChannel() {
this(1);
}

/**
* Constructs a {@code BufferedQueueChannel} with the specified capacity.
*
* @param capacity the capacity of the channel
*/
public BufferedQueueChannel(int capacity) {
channelQueue = new ArrayDeque<>(capacity);
this.capacity = capacity;
}

/**
* Sends a message through the channel. If the channel is full, this method blocks until space becomes available.
* Returns immediately if it is interrupted while blocking.
*
* @param message the message to be sent
* @throws IllegalStateException if the channel is closed
*/
@Override
public synchronized void send(T message) {
while (true) {
if (closed) {
throw new IllegalStateException("Channel is closed");
}
if (channelQueue.size() >= capacity) {
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
} else {
break;
}
}
channelQueue.add(message);
this.notifyAll();
}

/**
* Receives a message from the channel. If the channel is empty, this method blocks until a message becomes
* available. Returns immediately if it is interrupted while blocking.
*
* @return the received message
* @throws NoSuchElementException if the channel is closed and empty
*/
@Override
public synchronized T receive() {
while (true) {
if (closed) {
if (channelQueue.isEmpty()) {
throw new NoSuchElementException("Channel is closed and empty");
} else {
break;
}
} else if (channelQueue.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
} else {
break;
}
}
T message = channelQueue.poll();
this.notifyAll();
return message;
}

/**
* Checks if the channel is closed.
*
* @return {@code true} if the channel is closed, {@code false} otherwise
*/
@Override
public synchronized boolean isClosed() {
return closed;
}

/**
* Checks if the channel is empty.
*
* @return {@code true} if the channel is empty, {@code false} otherwise
*/
@Override
public synchronized boolean isEmpty() {
return channelQueue.isEmpty();
}

/**
* Checks if the channel is full.
*
* @return {@code true} if the channel is full, {@code false} otherwise
*/
@Override
public synchronized boolean isFull() {
return channelQueue.size() == capacity;
}

/**
* Closes the channel.
* Once closed, no more messages can be sent, but any remaining messages can still be received.
* Closing an already closed channel has no effect.
*/
@Override
public synchronized void close() {
if (!closed) {
closed = true;
this.notifyAll();
}
}

/**
* Returns an iterator over the elements in this channel.
*
* @return an {@code Iterator} over the elements in this channel
*/
@Override
public Iterator<T> iterator() {
return new ChannelIterator();
}

/**
* Waits until the channel has space for another message or is closed. Returns immediately if it is interrupted
* while blocking.
*
* @return {@code true} if the channel has space, {@code false} if the channel is closed
*/
@Override
public synchronized boolean hasSpace() {
while (true) {
if (closed) {
return false;
}
if (channelQueue.size() < capacity) {
return true;
}
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}

/**
* Waits until the channel has another message or is closed. Returns immediately if it is interrupted while
* blocking.
*
* @return {@code true} if there are more messages, {@code false} if the channel is empty and closed
*/
@Override
public synchronized boolean hasNext() {
while (true) {
if (!channelQueue.isEmpty()) {
return true;
}
if (closed) {
return false;
}
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}

/**
* The {@code ChannelIterator} class provides an iterator over the elements in the {@code BufferedQueueChannel}. If
* there are no more messages, the iterator will block until a new message is received or the channel is closed.
*/
private class ChannelIterator implements Iterator<T> {

/**
* Waits until the channel has another message or is closed. Returns immediately if it is interrupted while
* blocking.
*
* @return {@code true} if there are more messages, {@code false} if the channel is empty and closed
*/
@Override
public boolean hasNext() {
synchronized (BufferedQueueChannel.this) {
return BufferedQueueChannel.this.hasNext();
}
}

/**
* Receives a message from the channel. If the channel is empty, this method blocks until a message becomes
* available. Returns immediately if it is interrupted while blocking.
*
* @return the received message
* @throws NoSuchElementException if the channel is closed and empty
*/
@Override
public T next() {
synchronized (BufferedQueueChannel.this) {
return receive();
}
}
}
}

Conclusion

In this article we implemented Go’s channels in Java. We are now able to easily send and receive data between threads making tasks such as solving the producer-consumer problem trivial. In the next article we will implement Go’s select statement which will allow us execute logic based on which channel becomes available first.

For the next article in this series, click here.

For the previous article in this series, click here.

To view the official web page for this library, click here.

--

--