Exploring the Java Aeron Framework: A Comprehensive Introduction to IPC

Andrew Huynh
6 min readFeb 19, 2023

--

If you’re using the Disruptor framework in your project, it’s likely that you’re already familiar with the Aeron Framework. During my own journey of exploration into the fastest ways to send data between services, I found that Aeron is one of the best solutions FREE and available. Aeron offers two primary methods for exchanging data between services: IPC and UDP. For those who are new to the framework, the following links provide a helpful starting point:

[Aeron github]

[Aeron cookbook]

In this blog post, I will walk you through the basic steps required to utilize Aeron IPC (Inter Process Communication) in two different use cases.

User Case 1: Using IPC between multiple threads

The first use case involves a service, called Service A, which includes two child threads consisting of one producer threads and one consumer thread. In this scenario, the producer thread attempts to send one million messages as quickly as possible.

You can skip the steps by looking at the full code here: [github]

Below maven.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>aeron-agent-ipc-ping-pong-demo</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>16</maven.compiler.source>
<maven.compiler.target>16</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>io.aeron</groupId>
<artifactId>aeron-all</artifactId>
<version>1.40.0</version>
</dependency>
</dependencies>


</project>

When reviewing the example code provided in the Aeron repository, you’ll notice that it’s built using Gradle. However, in our context, we’re using Maven. As such, I’ve taken the initiative to port the original code into a Maven-based project, which allows for seamless integration with our existing development environment. For this project, the only package you’ll need is the Aeron package, which can be easily included in your Maven project.

Main class:

package org.example;

import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ShutdownSignalBarrier;

public class Main {
public static void main(String[] args) {
final String channel = "aeron:ipc";
final int streamId = 10;
final int sendCount = 1_000_000;
final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();

//Step 1: Construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
.dirDeleteOnStart(true)
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(new BusySpinIdleStrategy())
.dirDeleteOnShutdown(true);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);

//Step 2: Construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
.aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);

//Step 3: Construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, streamId);
final Publication publication = aeron.addPublication(channel, streamId);

//Step 4: Construct the agents
final SendAgent sendAgent = new SendAgent(publication, sendCount);
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier,
sendCount);

//Step 5: Construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
Throwable::printStackTrace, null, sendAgent);
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
Throwable::printStackTrace, null, receiveAgent);

System.out.println("starting");

//Step 6: Start the runners
AgentRunner.startOnThread(sendAgentRunner);

long startTime = System.currentTimeMillis();
AgentRunner.startOnThread(receiveAgentRunner);

//wait for the final item to be received before closing
barrier.await();
long endTime = System.currentTimeMillis();
System.out.printf("Process time: %s", endTime - startTime);

//close the resources
receiveAgentRunner.close();
sendAgentRunner.close();

aeron.close();
mediaDriver.close();
}

}

The basic code flow of the Main class can be visualized as below:

In Use case 2 we shall break this flow into 3 services

Next are the simple SendAgent and ReceiveAgent which implement Aeron Agent interface. After we have these two objects, we just need to plug them into AgentRunners. Then AgentRunner will help us spawn new thread easily.

package org.example;

import io.aeron.Publication;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.UnsafeBuffer;

import java.nio.ByteBuffer;

public class SendAgent implements Agent {
private final Publication publication;
private final int sendCount;
private final UnsafeBuffer unsafeBuffer;
private int currentCountItem = 1;

public SendAgent(final Publication publication, int sendCount) {
this.publication = publication;
this.sendCount = sendCount;
this.unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocate(64));
unsafeBuffer.putInt(0, currentCountItem);
}

@Override
public int doWork() {
if (currentCountItem > sendCount) {
return 0;
}

if (publication.isConnected()) {
if (publication.offer(unsafeBuffer) > 0) {
currentCountItem += 1;
unsafeBuffer.putInt(0, currentCountItem);
}
}
return 0;
}

@Override
public String roleName() {
return "sender";
}
}

package org.example;


import io.aeron.Subscription;
import io.aeron.logbuffer.Header;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.ShutdownSignalBarrier;

public class ReceiveAgent implements Agent {
private final Subscription subscription;
private final ShutdownSignalBarrier barrier;
private final int sendCount;

public ReceiveAgent(final Subscription subscription, ShutdownSignalBarrier barrier, int sendCount) {
this.subscription = subscription;
this.barrier = barrier;
this.sendCount = sendCount;
}

@Override
public int doWork() throws Exception {
subscription.poll(this::handler, 1000);
return 0;
}

private void handler(DirectBuffer buffer, int offset, int length, Header header) {
final int lastValue = buffer.getInt(offset);
if (lastValue >= sendCount) {
barrier.signal();
}
}

@Override
public String roleName() {
return "receiver";
}
}

Use case 2: Using IPC for multiple processes

In this use case, we have an independent MD (Media Driver) process running alongside two other processes: the Producer and Consumer services. The objective is for the Producer to send messages to the Consumer service as swiftly as possible.

I simply divide the user case 1 code into three independent services

Media Driver process code [Github]

package org.example;

import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.NoOpIdleStrategy;
import org.agrona.concurrent.ShutdownSignalBarrier;

public class Main {

//Start Media Driver sepperately
public static void main(String[] args) {
final MediaDriver.Context ctx = new MediaDriver.Context()
.termBufferSparseFile(false)
.useWindowsHighResTimer(true)
.threadingMode(ThreadingMode.DEDICATED)
.conductorIdleStrategy(BusySpinIdleStrategy.INSTANCE)
.receiverIdleStrategy(NoOpIdleStrategy.INSTANCE)
.senderIdleStrategy(NoOpIdleStrategy.INSTANCE);

try (MediaDriver ignored = MediaDriver.launch(ctx)) {
new ShutdownSignalBarrier().await();
System.out.println("Shutdown Driver...");
}
}
}

Producer service code: [github]

package org.example;

import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ShutdownSignalBarrier;

public class Main {

//Start Media Driver seperately
public static void main(String[] args) {
final String channel = "aeron:ipc";
final int streamId = 10;
final int sendCount = 1_000_000;

final IdleStrategy idleStrategySend = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
final Aeron aeron = Aeron.connect();

//Step 1: Construct the subs and pubs
final Publication publication = aeron.addPublication(channel, streamId);

//Step 2: Construct the agents
final SendAgent sendAgent = new SendAgent(publication, barrier, sendCount);

//Step 3: Construct agent runners
final AgentRunner sendAgentRunner = new AgentRunner(idleStrategySend,
Throwable::printStackTrace, null, sendAgent);


System.out.println("Starting SendingAgent....");
//Step 4: Start the runners
AgentRunner.startOnThread(sendAgentRunner);

//wait for the final item to be received before closing
barrier.await();

//close the resources
sendAgentRunner.close();
aeron.close();
}
}

Consumer service code [Github]

package org.example;

import io.aeron.Aeron;
import io.aeron.Subscription;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.ShutdownSignalBarrier;

public class Main {
public static void main(String[] args) {
final String channel = "aeron:ipc";
final int streamId = 10;
final int sendCount = 1_000_000;

final IdleStrategy idleStrategyReceive = new BusySpinIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
final Aeron aeron = Aeron.connect();

//Step 1: Construct the subs and pubs
final Subscription subscription = aeron.addSubscription(channel, streamId);

//Step 2: Construct the agents
final ReceiveAgent receiveAgent = new ReceiveAgent(subscription, barrier, sendCount);

//Step3: Construct agent runners
final AgentRunner receiveAgentRunner = new AgentRunner(idleStrategyReceive,
Throwable::printStackTrace, null, receiveAgent);

System.out.println("Starting Receive...");
long startTime = System.currentTimeMillis();
//Step 4: Start the runners
AgentRunner.startOnThread(receiveAgentRunner);

//wait for the final item to be received before closing
barrier.await();
System.out.println(String.format("Process time: %s", System.currentTimeMillis() - startTime));
//close the resources
receiveAgentRunner.close();
aeron.close();
}
}

To start our experiment, we need to begin with the MD (Media Driver) process, followed by the initiation of the Producer service. This service will wait for Consumer service be connected and publish messages.

Final thoughts:

  • Aeron IPC offers a wide range of options for constructing your micro service architecture. You can embed the MD process inside your Consumer service and send data directly from the Producer to the Consumer, or you can run the MD process independently to share among multiple processes. If you have only one instance of Consumer but ten Consumer instances, running MD process independently can significantly improve your system throughput.
  • It’s interesting to note that Use Case 2 has a process time of around 120 milliseconds, while Use Case 1 takes around 210 milliseconds, both are running in my M1 Pro. This finding is surprising because running the MD process independently improved to be an effective way to boost overall performance.

Thanks for reading.

--

--