Introduction to Akka Streams — Part 1

Sugandha Arora
Knoldus - Technical Insights
4 min readOct 15, 2018

Akka Streams is a library that is used to process and transform a stream of data. In this blog, I’ll be discussing the components of Akka Streams. Akka Streams is an implementation of Reactive Streams and uses bounded buffer space, and this property is known as boundedness.

To use Akka Streams, add the below dependency in your build.sbt:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.15"

A stream is a flow of data that involves moving and transforming data. An element is the processing unit of the stream. Akka Stream consists of Source, Flow and Sink which are described below in detail.

Source:

It has exactly one output and is responsible for generating the data. Elements are emitted by the source.

There are multiple ways of implementing a source. Source can be implemented using an iterator, range, future and so on. Some of the ways of implementing the source are shown below:

//Create an empty source val emptySource = Source.empty //Source producing elements in a range val sourceUsingRange: Source[Int, NotUsed] = Source(1 to 100) //Source producing elements from a list val sourceUsingList: Source[Int, NotUsed] = Source(List(1, 2, 3)) //Source producing elements from future val sourceFromFuture: Source[Int, NotUsed] = Source.fromFuture(Future(2)) //Source producing a single element val singleSource: Source[String, NotUsed] = Source.single("1") //Source producing elements on repeat val repeatedSource: Source[Int, NotUsed] = Source.repeat(5)

In these examples, the source is parameterized with two values. The first parameter specifies the type of data that the source is going to emit. The other parameter specifies the auxiliary information the source produces once it starts to run. In the above examples, akka.NotUsed has been used since the source in these cases doesn’t produce any information.

Flow:

It has an input and an output. It takes data from a source and apply some processing and transformation over the elements and return the processed elements to the sink. The data generated by the source can be filtered, transformed as shown below:

val flowWithFilter = Flow[Int].filter(_ > 0) val flowWithMap = Flow[Int].map(_.toString)

Sink:

It has exactly one input that accepts elements from the flow. Sink is also parameterized with two values. The first parameter specifies the data type that the sink will accept and the other parameter defines the type of auxiliary information. Sink can be of many forms as shown below:

//Sink that adds all the elements of stream using fold method val sinkAddingElements: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) // Sink that returns a Future as its materialized value, containing the first element of the stream val sinkReturningFirstElement: Sink[Int, Future[Int]] = Sink.head // A Sink that consumes a stream without doing anything with the elements val sinkIgnoringElements: Sink[Int, Future[Done]] = Sink.ignore // A Sink that executes a side-effecting call for every element of the stream val sinkPrintingElements: Sink[String, Future[Done]] = Sink.foreach[String](println(_))

Runnable graph:
This represents how the elements will flow through the various sources and sinks. Note that the flow combined with a source results in a source itself and the flow combined with a sink works like a sink. The runnable graph should have a single source and a sink at both ends as shown in the above figure. After the runnable graph has been created, it is now ready to run.

implicit val system: ActorSystem = ActorSystem("akka-streams") implicit val materializer: ActorMaterializer = ActorMaterializer() val sourceProducingElements: Source[Int, NotUsed] = Source(1 to 100) val flowTransformingData: Flow[Int, Int, NotUsed] = Flow[Int].filter(_ % 2 == 0).map(_ * 2) val sinkPrintingElements: Sink[Int, Future[Done]] = Sink.foreach[Int](println(_)) val runnableGraph = source.via(flowTransformingData).toMat(sinkPrintingElements)(Keep.right) runnableGraph.run()

In the above example, the source is producing integers from 1 to 100. The flow has been created that filters the even numbers and doubles it. Then, there is a sink (named as sinkPrintingElements) which prints the elements on the console. The connection between the source, sink and flow is provided by the RunnableGraph[Int, Future[Done]]. To execute the created stream, run() method is called over the runnableGraph. Runnable graph can also be created without providing a flow, which means Source and Sink are the only necessary components that are used to run a stream.

Also, note that the stream requires a materializer to execute. The materializer is responsible for allocating the resources such as Actors to run the stream. ActorMaterializer can either be provided as an implicit variable in the scope or explicitly while running the stream.

To run the streams using RunnableGraph is one way of executing the streams. Another way is to use the inbuilt functionalities like runWith() method that takes Sink as an input parameter. In the below example, runForEach() method has been used that calls the runWith() method in its implementation. An example using this approach is shown below:

source .filter(_ % 2 == 0) .map(_* 2) .runForeach(println)

Another thing to note here is that the program doesn’t terminate even if the whole stream has been processed. The reason for this is the ActorSystem because the ActorSystem is never terminated.

Akka Streams provides a number of features like reusability of code, time- based processing and back-pressure(the backbone of Reactive Streams). I’ll be discussing these features in my next blog.

Originally published at blog.knoldus.com on October 15, 2018.

--

--