More Akka actors… in Java!

Alejandro Picetti
Globant
Published in
6 min readNov 16, 2023
Photo by Kyle Head on Unsplash

In my previous article, I introduced a simple “Hello, World!”-like example to get our feet wet in the Actors’ concurrency model and the Akka framework in particular. However, a single-actor example falls short of describing what can be done with actors, so in this article, I’ll describe a more complex example: a simple processing pipeline.

Description of the example

Let’s see how our pipeline should look:

Block diagram showing processing stages and their interactions

Here, we have a pipeline with four stages, each represented by an actor.

  • Sending the StartProcess message to the Reader will kickstart the process. When the Reader receives this message, it will read lines from a CSV file, split them by the field separator into a list of strings, and send them as a ProcessLine message with that list to the Processor, one line at a time.
  • The Processor will examine a received line, searching for either the presence or absence of a value in the third field. In case of absence of value, it’ll send the line to the Rejected Writer; otherwise, the line is OK, so it’ll send it to the Accepted Writer.
  • Each of the above Writers will write lines to its own output CSV file, so at the end of the process we’ll have two files, one with accepted lines and one with rejected ones.
  • The Reader will signal the end of processing by sending a EndProcess message to the Processor once it sends all lines.
  • When the Processor receives the EndProcess message, it will send an EndWrite message to both the Accepted Writer and the Rejected Writer. They, in turn, will send a WriteEnded message back to the Processor, confirming they’ve closed their output files.
  • Only when the Processor receives both WriteEnded messages it will stop the Actor System, causing the entire program to stop.

The implementation

This is the project’s source structure:

<project-root-dir>
|
+ -- src
|
+ -- main
|
+ -- java
|
+ -- com.example.pipeline
|
+ -- actors
| |
| + -- AbstractUntypedLoggingActor.java
| + -- Processor.java
| + -- Reader.java
| + -- Writer.java
+ -- Main.java
...
...

The Maven POM file is similar to the one in the previous article, so I’ll not show it here. We’ll use Java 21 for this example as well.

All our actors will extend the AbstractUntypedLoggingActor abstract class, which extends UntypedAbstractActor and adds the necessary logic to initialize logging and log INFO and ERROR messages.

package com.example.moreakka.actors;

import akka.actor.UntypedAbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.Optional;

public abstract class AbstractUntypedLoggingActor
extends UntypedAbstractActor {

private Optional<LoggingAdapter> logger = Optional.empty();

protected void initLogging() {
logger = Optional.of(
Logging.getLogger(this.getContext().getSystem(), this)
);
}

protected void info(String message) {
logger.ifPresent(log -> log.info(message));
}

protected void error(String message, Throwable e){
logger.ifPresent(log -> log.error(message, e));
}
}

Now, we can implement our actors.

Reader

This actor will read all lines from an input file, split them into its fields, and send them to the Processor:

package com.example.moreakka.actors;

import akka.actor.ActorRef;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Stream;
import static com.example.moreakka.actors.Processor.EndProcess;
import static com.example.moreakka.actors.Processor.ProcessLine;

public class Reader extends AbstractUntypedLoggingActor {

public record StartProcess() {}

private Stream<String> linesRead;
private ActorRef processor;

public Reader(
String inputFilePath, ActorRef processor
) throws IOException {
initLogging();
this.linesRead = Files.lines(Path.of(inputFilePath));
this.processor = processor;
}

@Override
public void onReceive(Object message) throws Throwable, Throwable {
if (message instanceof StartProcess) {
info("Processing started!");
this.linesRead.map(line ->
Arrays.stream(line.split(",")).toList()
).forEach(line ->
this.processor.tell(new ProcessLine(line), getSelf())
);
this.processor.tell(new EndProcess(), getSelf());
} else {
this.unhandled(message);
}
}
}

One thing to note is that the Reader uses Files.lines(Path) to read file lines one by one, which is useful for processing large files. This method lazily reads lines one by one to a stream from an underlying file input stream, which is closed when one of the stream’s terminal methods is used (in our case, the forEach() method).

Processor

This actor splits lines into accepted and rejected, and sends them to the corresponding Writer:

package com.example.moreakka.actors;

import akka.actor.ActorRef;
import java.util.List;

import static io.picetti.moreakka.actors.Writer.*;

public class Processor extends AbstractUntypedLoggingActor {
public record ProcessLine(List<String> line) {}
public record EndProcess() {}

private final ActorRef acceptedWriter;
private final ActorRef rejectedWriter;

private int writeEndCount = 0;

public Processor(ActorRef acceptedWriter, ActorRef rejectedWriter) {
initLogging();
this.acceptedWriter = acceptedWriter;
this.rejectedWriter = rejectedWriter;
}

@Override
public void onReceive(Object message) throws Throwable {
switch (message) {
case ProcessLine process -> {
if (process.line().get(2).isBlank()) {
this.rejectedWriter.tell(
new WriteLine(process.line()),
getSelf()
);
} else {
this.acceptedWriter.tell(
new WriteLine(process.line()),
getSelf()
);
}
}
case EndProcess ignored -> {
var endWriteMsg = new EndWrite();
info("Ending process...");
this.acceptedWriter.tell(endWriteMsg, getSelf());
this.rejectedWriter.tell(endWriteMsg, getSelf());
}
case WriteEnded ignored -> {
writeEndCount += 1;
if (writeEndCount >= 2) {
info("Process ended.");
getContext().getSystem().terminate();
}
}
default -> this.unhandled(message);
}
}
}

Writer

This actor writes received lines to an output file.

package io.picetti.moreakka.actors;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;

public class Writer extends AbstractUntypedLoggingActor {

public record WriteLine(List<String> line) {}
public record EndWrite() {}
public record WriteEnded() {}

private final PrintWriter writer;

public Writer(String outputFilePath) throws IOException {
this.writer = new PrintWriter(
new FileWriter(outputFilePath), true);
}

@Override
public void onReceive(Object message) throws Throwable, Throwable {
switch(message) {
case WriteLine write ->
this.writer.println(String.join(",", write.line()));
case EndWrite ignored -> {
this.writer.close();
getContext().getSender().tell(new WriteEnded(), getSelf());
}
default -> this.unhandled(message);
}
}
}

Main program

Last but not least, our main program starts and drives all the process:

package com.example.moreakka;

import akka.actor.ActorSystem;
import akka.actor.Props;
import com.example.moreakka.actors.Processor;
import com.example.moreakka.actors.Reader;
import com.example.moreakka.actors.Writer;
import scala.concurrent.Await;
import java.nio.file.Path;
import java.util.concurrent.TimeoutException;
import static io.picetti.moreakka.actors.Reader.*;
import static scala.concurrent.duration.Duration.Inf;

public class Main {

public static void main(String[] args)
throws InterruptedException, TimeoutException {
var system = ActorSystem.apply("more-akka-javs");
var acceptedWriter = system.actorOf(
Props.create(Writer.class, absolutePathFor("processed.csv"))
);
var rejectedWriter = system.actorOf(
Props.create(Writer.class, absolutePathFor("error.csv"))
);
var processor = system.actorOf(
Props.create(Processor.class, acceptedWriter, rejectedWriter)
);
var reader = system.actorOf(
Props.create(
Reader.class, absolutePathFor("input.csv"), processor
)
);

reader.tell(new StartProcess(), null);
Await.ready(system.whenTerminated(), Inf());
}

private static String absolutePathFor(String filename) {
return Path.of(
"C:","experiments","java","akka-java-files", filename
).toAbsolutePath().toString();
}
}

Some notes on the above code:

  • Actors reference each other to compose the pipeline using ActorRef. As its name implies, this class represents a reference to an actor created inside the Actor System and provides all mechanisms to send messages to that actor.
  • The message interaction EndWrite / WriteEnded between the Processor and the Writers has bit been implemented using the “ask” pattern but a form of “asynchronous request-response” by which the Processor sends EndWrite messages, identifying itself as the sender with getSelf(), and then waits for WriteEnded response messages that Writers will send to the provided sender.

Building and running our code

Now, we will build and run our program. It will use a file input.csv set like this:

Records in input.csv

Note that records in lines 2, 4, and 8 do not have a value in their third field.

The program should run as below:

Our program executing

And these are the results! First, the processed.csv file showing accepted records:

Output file with accepted records

Then let’s look at the error.csv file with the rejected records:

Output file with rejected records

Summary

In this article, I exposed a more complex example of using actors with the Akka framework. The example shows a system of actors interacting among themselves to perform a task instead of just showing a single actor. Of course, there are much more complex topologies we could find on systems of actors, including hierarchical structures with supervisor actors controlling child actors. We’ve just scratched the surface, and I hope to have piqued the reader’s curiosity on the actors-based concurrency model and the Akka framework in particular.

--

--