More Akka actors… in Java!
In my previous article, I introduced a simple “Hello, World!”-like example to get our feet wet in the Actors’ concurrency model and the Akka framework in particular. However, a single-actor example falls short of describing what can be done with actors, so in this article, I’ll describe a more complex example: a simple processing pipeline.
Description of the example
Let’s see how our pipeline should look:
Here, we have a pipeline with four stages, each represented by an actor.
- Sending the
StartProcess
message to the Reader will kickstart the process. When the Reader receives this message, it will read lines from a CSV file, split them by the field separator into a list of strings, and send them as aProcessLine
message with that list to the Processor, one line at a time. - The Processor will examine a received line, searching for either the presence or absence of a value in the third field. In case of absence of value, it’ll send the line to the Rejected Writer; otherwise, the line is OK, so it’ll send it to the Accepted Writer.
- Each of the above Writers will write lines to its own output CSV file, so at the end of the process we’ll have two files, one with accepted lines and one with rejected ones.
- The Reader will signal the end of processing by sending a
EndProcess
message to the Processor once it sends all lines. - When the Processor receives the
EndProcess
message, it will send anEndWrite
message to both the Accepted Writer and the Rejected Writer. They, in turn, will send aWriteEnded
message back to the Processor, confirming they’ve closed their output files. - Only when the Processor receives both
WriteEnded
messages it will stop the Actor System, causing the entire program to stop.
The implementation
This is the project’s source structure:
<project-root-dir>
|
+ -- src
|
+ -- main
|
+ -- java
|
+ -- com.example.pipeline
|
+ -- actors
| |
| + -- AbstractUntypedLoggingActor.java
| + -- Processor.java
| + -- Reader.java
| + -- Writer.java
+ -- Main.java
...
...
The Maven POM file is similar to the one in the previous article, so I’ll not show it here. We’ll use Java 21 for this example as well.
All our actors will extend the AbstractUntypedLoggingActor
abstract class, which extends UntypedAbstractActor
and adds the necessary logic to initialize logging and log INFO and ERROR messages.
package com.example.moreakka.actors;
import akka.actor.UntypedAbstractActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import java.util.Optional;
public abstract class AbstractUntypedLoggingActor
extends UntypedAbstractActor {
private Optional<LoggingAdapter> logger = Optional.empty();
protected void initLogging() {
logger = Optional.of(
Logging.getLogger(this.getContext().getSystem(), this)
);
}
protected void info(String message) {
logger.ifPresent(log -> log.info(message));
}
protected void error(String message, Throwable e){
logger.ifPresent(log -> log.error(message, e));
}
}
Now, we can implement our actors.
Reader
This actor will read all lines from an input file, split them into its fields, and send them to the Processor:
package com.example.moreakka.actors;
import akka.actor.ActorRef;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Stream;
import static com.example.moreakka.actors.Processor.EndProcess;
import static com.example.moreakka.actors.Processor.ProcessLine;
public class Reader extends AbstractUntypedLoggingActor {
public record StartProcess() {}
private Stream<String> linesRead;
private ActorRef processor;
public Reader(
String inputFilePath, ActorRef processor
) throws IOException {
initLogging();
this.linesRead = Files.lines(Path.of(inputFilePath));
this.processor = processor;
}
@Override
public void onReceive(Object message) throws Throwable, Throwable {
if (message instanceof StartProcess) {
info("Processing started!");
this.linesRead.map(line ->
Arrays.stream(line.split(",")).toList()
).forEach(line ->
this.processor.tell(new ProcessLine(line), getSelf())
);
this.processor.tell(new EndProcess(), getSelf());
} else {
this.unhandled(message);
}
}
}
One thing to note is that the Reader uses Files.lines(Path)
to read file lines one by one, which is useful for processing large files. This method lazily reads lines one by one to a stream from an underlying file input stream, which is closed when one of the stream’s terminal methods is used (in our case, the forEach()
method).
Processor
This actor splits lines into accepted and rejected, and sends them to the corresponding Writer:
package com.example.moreakka.actors;
import akka.actor.ActorRef;
import java.util.List;
import static io.picetti.moreakka.actors.Writer.*;
public class Processor extends AbstractUntypedLoggingActor {
public record ProcessLine(List<String> line) {}
public record EndProcess() {}
private final ActorRef acceptedWriter;
private final ActorRef rejectedWriter;
private int writeEndCount = 0;
public Processor(ActorRef acceptedWriter, ActorRef rejectedWriter) {
initLogging();
this.acceptedWriter = acceptedWriter;
this.rejectedWriter = rejectedWriter;
}
@Override
public void onReceive(Object message) throws Throwable {
switch (message) {
case ProcessLine process -> {
if (process.line().get(2).isBlank()) {
this.rejectedWriter.tell(
new WriteLine(process.line()),
getSelf()
);
} else {
this.acceptedWriter.tell(
new WriteLine(process.line()),
getSelf()
);
}
}
case EndProcess ignored -> {
var endWriteMsg = new EndWrite();
info("Ending process...");
this.acceptedWriter.tell(endWriteMsg, getSelf());
this.rejectedWriter.tell(endWriteMsg, getSelf());
}
case WriteEnded ignored -> {
writeEndCount += 1;
if (writeEndCount >= 2) {
info("Process ended.");
getContext().getSystem().terminate();
}
}
default -> this.unhandled(message);
}
}
}
Writer
This actor writes received lines to an output file.
package io.picetti.moreakka.actors;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
public class Writer extends AbstractUntypedLoggingActor {
public record WriteLine(List<String> line) {}
public record EndWrite() {}
public record WriteEnded() {}
private final PrintWriter writer;
public Writer(String outputFilePath) throws IOException {
this.writer = new PrintWriter(
new FileWriter(outputFilePath), true);
}
@Override
public void onReceive(Object message) throws Throwable, Throwable {
switch(message) {
case WriteLine write ->
this.writer.println(String.join(",", write.line()));
case EndWrite ignored -> {
this.writer.close();
getContext().getSender().tell(new WriteEnded(), getSelf());
}
default -> this.unhandled(message);
}
}
}
Main program
Last but not least, our main program starts and drives all the process:
package com.example.moreakka;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.example.moreakka.actors.Processor;
import com.example.moreakka.actors.Reader;
import com.example.moreakka.actors.Writer;
import scala.concurrent.Await;
import java.nio.file.Path;
import java.util.concurrent.TimeoutException;
import static io.picetti.moreakka.actors.Reader.*;
import static scala.concurrent.duration.Duration.Inf;
public class Main {
public static void main(String[] args)
throws InterruptedException, TimeoutException {
var system = ActorSystem.apply("more-akka-javs");
var acceptedWriter = system.actorOf(
Props.create(Writer.class, absolutePathFor("processed.csv"))
);
var rejectedWriter = system.actorOf(
Props.create(Writer.class, absolutePathFor("error.csv"))
);
var processor = system.actorOf(
Props.create(Processor.class, acceptedWriter, rejectedWriter)
);
var reader = system.actorOf(
Props.create(
Reader.class, absolutePathFor("input.csv"), processor
)
);
reader.tell(new StartProcess(), null);
Await.ready(system.whenTerminated(), Inf());
}
private static String absolutePathFor(String filename) {
return Path.of(
"C:","experiments","java","akka-java-files", filename
).toAbsolutePath().toString();
}
}
Some notes on the above code:
- Actors reference each other to compose the pipeline using
ActorRef
. As its name implies, this class represents a reference to an actor created inside the Actor System and provides all mechanisms to send messages to that actor. - The message interaction
EndWrite
/WriteEnded
between the Processor and the Writers has bit been implemented using the “ask” pattern but a form of “asynchronous request-response” by which the Processor sendsEndWrite
messages, identifying itself as the sender withgetSelf()
, and then waits forWriteEnded
response messages that Writers will send to the provided sender.
Building and running our code
Now, we will build and run our program. It will use a file input.csv
set like this:
Note that records in lines 2, 4, and 8 do not have a value in their third field.
The program should run as below:
And these are the results! First, the processed.csv
file showing accepted records:
Then let’s look at the error.csv
file with the rejected records:
Summary
In this article, I exposed a more complex example of using actors with the Akka framework. The example shows a system of actors interacting among themselves to perform a task instead of just showing a single actor. Of course, there are much more complex topologies we could find on systems of actors, including hierarchical structures with supervisor actors controlling child actors. We’ve just scratched the surface, and I hope to have piqued the reader’s curiosity on the actors-based concurrency model and the Akka framework in particular.