Implementing Go Concurrency Features in Java Part 4: Implementing the Sync Package

William Yin
10 min readJul 8, 2024

--

This article is the fourth in a series of articles documenting my process implementing Go concurrency features in Java. In the previous article we implemented Go’s select statement. That article can be found here. In this final article I will document how I implemented the entirety of Go’s sync package excluding WaitGroup which we implemented in the first article in this series found here, Cond , Locker , Mutex , and RWMutex which already have equivalents in Java.

Overview

Go’s sync package provides several primitives helpful for synchronization. For example, the OnceFunc type ensures that a function only runs once even if it is called multiple times making it useful for initializing resources in a multithreaded environment. The Pool type provides a thread-safe tool for managing a collection of reusable objects. The full documentation for Go’s sync package can be found here. As stated in the documentation, these types are primarily intended for low-level synchronization while higher-level synchronization should be achieved using channels and the select statement.

Implementing the Map Type

There is not much to say here. This is simply a repackaging of Java’s ConcurrentHashMap with methods renamed to match those in Go’s Map type. The Map type is a thread-safe collection of key-value pairs.

The code for our implementation of the Map type is given below.

package io.javago.sync;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

/**
* The {@code Map} class implements Go's {@code sync.Map}.
* A thread-safe map that extends {@link ConcurrentHashMap} and provides additional utility methods.
*
* @param <K> the type of keys maintained by this map
* @param <V> the type of mapped values
*/
public class Map<K, V> extends ConcurrentHashMap<K, V> {

/**
* Creates a new, empty map with the default initial capacity, load factor, and concurrency level.
*/
public Map() {}

/**
* Creates a new, empty map with the specified initial capacity, and with the default load factor and concurrency
* level.
*
* @param initialCapacity the initial capacity. The implementation performs internal sizing to accommodate this many elements.
*/
public Map(int initialCapacity) {
super(initialCapacity);
}

/**
* Creates a new map with the same mappings as the specified map.
*
* @param m the map whose mappings are to be placed in this map
*/
public Map(java.util.Map<? extends K, ? extends V> m) {
super(m);
}

/**
* Creates a new, empty map with the specified initial capacity and load factor, and with the default concurrency
* level.
*
* @param initialCapacity the initial capacity. The implementation performs internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing. Resizing may be performed when the average number of elements per bin exceeds this threshold.
*/
public Map(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
}

/**
* Creates a new, empty map with the specified initial capacity, load factor, and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation performs internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing. Resizing may be performed when the average number of elements per bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently updating threads. The implementation performs internal sizing to try to accommodate this many threads.
*/
public Map(int initialCapacity, float loadFactor, int concurrencyLevel) {
super(initialCapacity, loadFactor, concurrencyLevel);
}

/**
* Removes the entry for a key only if currently mapped to a given value.
*
* @param key the key whose associated value is to be removed
* @param value the value expected to be associated with the specified key
* @return {@code true} if the value was removed
*/
public boolean compareAndDelete(K key, V value) {
return super.remove(key, value);
}

/**
* Replaces the entry for a key only if currently mapped to a given value.
*
* @param key the key with which the specified value is associated
* @param oldValue the value expected to be associated with the specified key
* @param newValue the value to be associated with the specified key
* @return {@code true} if the value was replaced
*/
public boolean compareAndSwap(K key, V oldValue, V newValue) {
return super.replace(key, oldValue, newValue);
}

/**
* Removes the mapping for a key from this map if it is present.
*
* @param key the key whose mapping is to be removed from the map
*/
public void delete(K key) {
super.remove(key);
}

/**
* Returns the value to which the specified key is mapped, or {@code null} if this map contains no mapping for the
* key.
*
* @param key the key whose associated value is to be returned
* @return the value to which the specified key is mapped, or {@code null} if this map contains no mapping for the key
*/
public V load(K key) {
return super.get(key);
}

/**
* Removes the mapping for a key from this map if it is present and returns the associated value.
*
* @param key the key whose mapping is to be removed from the map
* @return the previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}
*/
public V loadAndDelete(K key) {
return super.remove(key);
}

/**
* If the specified key is not already associated with a value, associates it with the given value and returns
* {@code null}, else returns the current value.
*
* @param key the key with which the specified value is to be associated
* @param value the value to be associated with the specified key
* @return the previous value associated with the specified key, or {@code null} if there was no mapping for the key
*/
public V loadOrStore(K key, V value) {
return super.putIfAbsent(key, value);
}

/**
* Performs the given action for each entry in this map until all entries have been processed or the action throws
* an exception.
*
* @param consumer the action to be performed for each entry
*/
public void range(BiConsumer<? super K, ? super V> consumer) {
super.forEach(consumer);
}

/**
* Associates the specified value with the specified key in this map.
*
* @param key the key with which the specified value is to be associated
* @param value the value to be associated with the specified key
*/
public void store(K key, V value) {
super.put(key, value);
}

/**
* Associates the specified value with the specified key in this map and returns the previous value associated with
* the key, or {@code null} if there was no mapping for the key.
*
* @param key the key with which the specified value is to be associated
* @param value the value to be associated with the specified key
* @return the previous value associated with {@code key}, or {@code null} if there was no mapping for {@code key}
*/
public V swap(K key, V value) {
return super.put(key, value);
}
}

Implementing the Once, OnceFunc, OnceValue, and OnceValues Types

These types are quite simple to implement. They simply take in a function and ensure it only runs once. Once can be associated with multiple different functions but only one of them will run while the rest can only be associated with only one function. OnceValue and OnceValues return whatever is returned by the function they are associated with. OnceValues returns two values which is not supported natively by Java so we will need to create a special Values class for it to contain the two values returned by its function. In our implementation for each of these types, we use a technique we previously used in our implementation of the select statement where we use an atomic boolean to note if this specific instance has called a function already and only execute the function if we can successfully update the atomic boolean’s value using its compareAndSet(boolean expectedValue, boolean newValue) function.

Below is the code for our implementations of Once , OnceFunc , OnceValue , and OnceValues .

package io.javago.sync;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* The {@code Once} class implements Go's {@code sync.Once}.
* A utility class that ensures a given {@link Runnable} is executed only once.
*/
public class Once {

private final AtomicBoolean called = new AtomicBoolean(false);

/**
* Constructs a new {@code Once} instance.
*/
public Once() {}

/**
* Executes the specified {@link Runnable} only once.
* If the {@code Runnable} has already been executed, subsequent calls will block until the first execution is
* complete.
*
* @param func the {@code Runnable} to be executed once
*/
public synchronized void doOnce(Runnable func) {
if (called.compareAndSet(false, true)) {
func.run();
this.notifyAll();
return;
}
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
package io.javago.sync;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* The {@code OnceFunc} class implements Go's {@code sync.OnceFunc}.
* A {@link Runnable} wrapper that ensures the wrapped {@code Runnable} is executed only once.
*/
public class OnceFunc implements Runnable {

private final AtomicBoolean called = new AtomicBoolean(false);
private final Runnable func;

/**
* Constructs a new {@code OnceFunc} that will wrap the given {@code Runnable}.
*
* @param func the {@code Runnable} to be wrapped and executed only once
*/
public OnceFunc(Runnable func) {
this.func = func;
}

/**
* Executes the wrapped {@code Runnable} only once.
* If this method is called multiple times, the wrapped {@code Runnable} will only be executed the first time.
*/
@Override
public void run() {
if (called.compareAndSet(false, true)) {
func.run();
}
}
}
package io.javago.sync;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
* The {@code OnceValue} class implements Go's {@code sync.OnceValue}.
* A {@link Supplier} wrapper that ensures the wrapped {@code Supplier} is executed only once.
*
* @param <T> the type of results supplied by this supplier
*/
public class OnceValue<T> implements Supplier<T> {

private final AtomicBoolean called = new AtomicBoolean(false);
private final Supplier<T> func;

/**
* Constructs a new {@code OnceValue} that will wrap the given {@code Supplier}.
*
* @param func the {@code Supplier} to be wrapped and executed only once
*/
public OnceValue(Supplier<T> func) {
this.func = func;
}

/**
* Executes the wrapped {@code Supplier} only once. If this method is called multiple times,
* the wrapped {@code Supplier} will only be executed the first time and its result will be returned.
* Subsequent calls will return {@code null}.
*
* @return the result supplied by the wrapped {@code Supplier} on its first execution, or {@code null} on subsequent calls
*/
@Override
public T get() {
if (called.compareAndSet(false, true)) {
return func.get();
}
return null;
}
}
package io.javago.sync;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
* The {@code OnceValues} class implements Go's {@code sync.OnceValues}.
* A {@link Supplier} wrapper that ensures the wrapped {@code Supplier} is executed only once and returns a pair of
* values.
*
* @param <S> the type of the first value
* @param <T> the type of the second value
*/
public class OnceValues<S, T> implements Supplier<OnceValues.Values<S, T>> {

private final AtomicBoolean called = new AtomicBoolean(false);
private final Supplier<Values<S, T>> supplier;

/**
* Constructs a new {@code OnceValues} that will wrap the given {@code Supplier}.
*
* @param supplier the {@code Supplier} to be wrapped and executed only once
*/
public OnceValues(Supplier<Values<S, T>> supplier) {
this.supplier = supplier;
}

/**
* Executes the wrapped {@code Supplier} only once. If this method is called multiple times,
* the wrapped {@code Supplier} will only be executed the first time and its result will be returned.
* Subsequent calls will return {@code null}.
*
* @return the result supplied by the wrapped {@code Supplier} on its first execution, or {@code null} on subsequent calls
*/
@Override
public Values<S, T> get() {
if (called.compareAndSet(false, true)) {
return supplier.get();
}
return null;
}

/**
* A record that holds a pair of values.
*
* @param <S> the type of the first value
* @param <T> the type of the second value
* @param first the first value
* @param second the second value
*/
public record Values<S, T>(S first, T second) {}
}

Implementing the Pool Type

The Pool type manages a collection of reusable objects. Our implementation is backed by a LinkedBlockingQueue making individual operations on that queue such as poll() and offer(E e) thread-safe. Since our implementation’s get() method involves multiple operations on the queue, we must mark the get() and put(T t) methods as synchronized to make Pool itself thread-safe. Just like in Go, if we call get() on an empty pool, it will return a new item using the pool’s Supplier , but that item will not be placed in the pool unless we explicitly pass it into the pool’s put(T t) method.

Below is the code for our implementation of Pool .

package io.javago.sync;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

/**
* The {@code Pool} class implements Go's {@code sync.Pool}.
* A thread-safe object pool that manages a collection of reusable objects.
*
* @param <T> the type of objects managed by the pool
*/
public class Pool<T> {

private final BlockingQueue<T> pool = new LinkedBlockingQueue<>();
private final Supplier<T> creator;

/**
* Constructs a new {@code Pool} with the given object creator.
*
* @param creator a {@code Supplier} that provides new instances of the objects managed by the pool
*/
public Pool(Supplier<T> creator) {
this.creator = creator;
}

/**
* Retrieves an object from the pool. If the pool is empty, a new object is created using the {@code Supplier}.
*
* @return an object from the pool, or a newly created object if the pool is empty
*/
public synchronized T get() {
if (pool.isEmpty()) {
return creator.get();
}
return pool.poll();
}

/**
* Returns an object to the pool, making it available for future retrieval.
* If the number of objects in the pool is equal to {@link Integer#MAX_VALUE}, the object will not be returned to
* the pool.
*
* @param t the object to be returned to the pool
*/
public synchronized void put(T t) {
pool.offer(t);
}
}

Conclusion

In this final article we implemented Go’s Map , Once , OnceFunc , OnceValue , OnceValues , and Pool types. We now have a set of useful synchronization tools. With this library completed, we now have benefits of Go’s simple and intuitive concurrency in Java’s object-oriented structure. I hope you enjoyed following me in this process.

For the previous article in this series click here.

To return to the first article in this series click here.

To view the official web page for this library click here.

--

--