ETL Process with Scala and Akka Framework (Concurrency, Distributed, and Resilient Systems)

M. Cagri AKTAS
4 min readSep 23, 2024

--

🛠️System and Software Versions:

🛠️ Akka:

Akka is a toolkit for building highly concurrent, distributed, and resilient systems in Scala. It helps manage complex applications by using the actor model, which simplifies how systems handle concurrency and failure.

You can check the Akka documentation here: Akka Framework Documentation.

🛠️ How Does Akka Work with Big Data?

In Akka, the basic unit of computation is an actor. An actor is like a small, isolated “worker” that can:

  1. Receive messages.
  2. Process these messages.
  3. Send messages to other actors.

Notes:

  • Actors do not share state, which removes the need for complex synchronization, making it easier to write concurrent systems.
  • Akka’s built-in supervision model lets you define how to handle failures. If an actor crashes, its “supervisor” can restart it, stop it, or escalate the error.
  • Akka is also designed for distributed computing. You can deploy actors across multiple machines and still have them communicate as if they were on the same system.
  • This is ideal for scaling applications to handle large workloads.

When to Use Akka:

  • High-Concurrency Systems: When you need to manage many tasks simultaneously without running into thread limitations.
  • Fault Tolerance: When building systems that need to recover gracefully from crashes.
  • Distributed Applications: When your application needs to run on multiple machines but act as a cohesive system.
  • Real-time Processing: Ideal for real-time systems that handle messaging, data streams, and other time-sensitive operations.

Scala Example:
In our sample code, you’ll see that the syntax is quite simple, but when I show the output, you’ll understand how beautifully Akka runs. You might ask, aren’t there many other tools for handling concurrency and distributed computing? While that’s true, Scala excels at handling big data with ease. You can check big data benchmarks to understand the true power of the Scala ecosystem.

@main def hello(): Unit = {
println("Starting Akka CSV processing...")

val system: ActorSystem[CsvReaderActor.ReadCsv] = ActorSystem(CsvReaderActor(), "CsvProcessorSystem")

val transformActor = system.systemActorOf(CsvTransformActor(), "transformActor")

system ! CsvReaderActor.ReadCsv("data.csv", transformActor)

}

As you can see, our data is being processed simultaneously. This is how Akka’s logic works:

CsvReaderActor reads line 1 -> Sends to CsvTransformActor
CsvTransformActor is transforming line: id, first_name, last_name, email, gender, ip_address
Transformed line: id | first_name | last_name | email | gender | ip_address

CsvReaderActor reads line 2 -> Sends to CsvTransformActor
CsvTransformActor is transforming line: 1, Josefa, Kirman, jkirman0@yandex.ru, Female, 81.141.120.132
Transformed line: 1 | Josefa | Kirman | jkirman0@yandex.ru | Female | 81.141.120.132

CsvReaderActor reads line 3 -> Sends to CsvTransformActor
CsvTransformActor is transforming line: 2, Dieter, Blunden, dblunden1@tamu.edu, Male, 198.123.213.130
Transformed line: 2 | Dieter | Blunden | dblunden1@tamu.edu | Male | 198.123.213.130

Note: I’m using a basic PC right now, so our reading process is faster than the transformation process :) But as you can see, both the extraction (reading) and transformation are running concurrently.

We’ve read 650 lines, but only transformed 119 so far. Normally, you’d expect to read all 1,000 rows, then transform them afterward. However, with Akka, reading and transforming happen simultaneously, which is really cool… :))

Full Code and SBT:

import akka.actor.typed.ActorSystem
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors

import scala.io.Source


@main def hello(): Unit = {
println("Starting Akka CSV processing...")

val system: ActorSystem[CsvReaderActor.ReadCsv] = ActorSystem(CsvReaderActor(), "CsvProcessorSystem")

val transformActor = system.systemActorOf(CsvTransformActor(), "transformActor")

system ! CsvReaderActor.ReadCsv("data.csv", transformActor)

}

object CsvReaderActor {
final case class ReadCsv(filePath: String, transformActor: ActorRef[CsvTransformActor.TransformLine])

def apply(): Behaviors.Receive[ReadCsv] = Behaviors.receive { (context, message) =>
val source = Source.fromFile(message.filePath)
var lineNumber = 1
for (line <- source.getLines()) {
println(s"CsvReaderActor reads line $lineNumber -> Sends to CsvTransformActor")
message.transformActor ! CsvTransformActor.TransformLine(line)
lineNumber += 1
}
source.close()

context.system.terminate()
Behaviors.same
}
}

object CsvTransformActor {
final case class TransformLine(line: String)

def apply(): Behaviors.Receive[TransformLine] = Behaviors.receive { (context, message) =>
println(s"CsvTransformActor is transforming line: $message.line")
val transformed = message.line.split(",").map(_.trim).mkString(" | ")
println(s"Transformed line: $transformed")
Behaviors.same
}
}

SBT:

val scalaVersionUsed = "3.5.1"

lazy val root = project
.in(file("."))
.settings(
name := "akkaproject",
version := "0.1.0-SNAPSHOT",

scalaVersion := scalaVersionUsed,

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % "2.7.0", // Akka Typed Dependency
"org.scalameta" %% "munit" % "1.0.0" % Test, // Testing library
"ch.qos.logback" % "logback-classic" % "1.4.11" // logging Dependency
)
)

I hope you enjoyed learning about the Akka Framework… :)

Cheers and peace, everyone…

--

--