Hacking with akka streams

hAkka :)

hAkka

In this post I’m gonna cover some common patterns in akka streams and some powerful things that akka streams can do. All the source codes which related to this post are available in gitlab.

The cover image of this post captured by myself from my home town Matale, Sri Lanka. It’s quite beautiful place on top of the hills :)

1. Filter

With filter we can filter the elements in the stream with a predicate. We can remove unwanted messages from the stream.

In this example we filter out odd numbers from the stream. When we run this program it will produce below output.

2
4
6
8
10
12
14
16
18
20

2. Combine

With combine function in akka streams we can join two or more sources and create a single stream. It can defines the combining strategy. For an example merge or concat. concat it will preserve the order of the streams(source1 elements before source2 elements). merge won’t consider about the order.

In this example, combined source flows through three filters and ended up in a sink(with println). For the simplicity I have used several simple flows on it. Following is the output when we run combine functions.

6
14
20
20
40
60
30
38

If you want you can combine more complex sources as well. For an example combine sources from two database tables.

In above example I’m combining two sources for cassandra . These sources generated from alpakka cassandra connector. Read more about alpakka cassandra connector from my previous post in here.

3. Grouped

With grouped we can process the stream as batches. It will take specified no of elements from source as a batch and process them.

This function group original stream in to 10 batches. Each batch contains 10 elements. Following is the output when we run this function.

Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Vector(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
Vector(21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
Vector(31, 32, 33, 34, 35, 36, 37, 38, 39, 40)
Vector(41, 42, 43, 44, 45, 46, 47, 48, 49, 50)
Vector(51, 52, 53, 54, 55, 56, 57, 58, 59, 60)
Vector(61, 62, 63, 64, 65, 66, 67, 68, 69, 70)
Vector(71, 72, 73, 74, 75, 76, 77, 78, 79, 80)
Vector(81, 82, 83, 84, 85, 86, 87, 88, 89, 90)
Vector(91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

4. MapAsync

With mapAsync we can execute blocking operations concurrently. The parallelism parameter in mapAsync allows us to specify how many simultaneous operations are allowed. For an example consider following database query scenario.

The querySync is a blocking function. If we combine querySync function with other flows with map operation, one actor will execute entire process sequentially. It will take at least 5 seconds to complete the full operation. The output will looks like below.

start query - 3
finish query - 3
user3
start query - 2
finish query - 2
user2
start query - 5
finish query - 5
user5
start query - 7
finish query - 7
user7
start query - 8
finish query - 8
user8

We can use mapAsync to execute the query function concurrently and speed up the process. Consider following example where I have used queryAsync function with mapAsync flow.

The queryAsync is a non blocking function(wraps query operation with Future). I’m executing that function concurrently with 5 actors(defines with parallelism parameter). Following is the output when running this function.

start query - 2
start query - 7
start query - 8
start query - 5
start query - 3
finish query - 7
finish query - 2
finish query - 5
finish query - 3
finish query - 8
user3
user2
user5
user7
user8

5. Broadcast

In order to maximize the throughput we can partition an akka stream. One of the simple way to partition a stream is broadcast. In broadcast we can send up-stream elements to two or more down-streams. Every down-stream receives all the elements on up stream. Basically it emits each and every elements in up-stream to down-streams.

In this example all the elements in source will flows through both nameFlowand rateFlow. When we run this function it will produce below output.

Name - Scala
Rate - 5
Name - Golang
Rate - 8
Name - Haskell
Rate - 7
Name - Erlang
Rate - 5

6. Balance

One other way of partitioning a stream is balance. With balance, the upstream element emitted to the first available down-stream.

In this example there are two flows(backPressureFlow and normalFlow). backPressureFlow is throttling and slow. normalFlow is executing in normal speed, so balance will flows more elements in normalFlow. When we run this function it will produce below output.

Coming from back-pressured flow - 2
Coming from normal flow - 4
Coming from normal flow - 8
Coming from normal flow - 10
Coming from normal flow - 12
Coming from normal flow - 14
Coming from normal flow - 16
Coming from normal flow - 18
Coming from normal flow - 20
Coming from back-pressured flow - 6

7. Group by

Another way of partitioning stream is groupBy. Unlike broadcast or balance groupBy split the original stream into SubFlow. SubFlow is kind of sub stream. We can process this sub streams asynchronously with async command. After processing the sub streams we need to merge them back to one stream with mergeSubstreams command.

Above function partition source into sub streams based on % 3 operator. The sub streams handle asynchronously. Finally we merge all the processed sub streams in to one stream. When we run this function it will produce below output.

6
12
18
4
10
16
2
8
14
20

8. Partition

As like in broadcast, balance and groupBy, Partition also can split a stream into multiple streams and handle them as separate streams. Main difference in Partition is, it will split stream based on a condition. Based on the condition different objects in up stream goes to different down streams.

In above function the response list partition based on Result value. If response object contains a Result we send the object to one stream(ratingFlow ~> ratingSink). Otherwise we send the object to another steam(argFlow ~> argSink). These two streams will handle separately. When we run this function it will produce below output.

Rating found, name - scala, rating - 4
Rating found, name - golang, rating - 6
Rating not found, arg - java
Rating found, name - haskell, rating - 5
Rating found, name - erlang, rating - 5

Reference

  1. https://blog.colinbreck.com/partitioning-akka-streams-to-maximize-throughput/
  2. https://aknay.github.io/2017/07/06/akka-stream-in-bite-size-Part-2-fan-out.html
  3. https://stackoverflow.com/questions/27233052/how-can-i-pipe-the-output-of-an-akka-streams-merge-to-another-flow
  4. https://blog.redelastic.com/diving-into-akka-streams-2770b3aeabb0
  5. https://medium.com/@abu_nadhr/streaming-data-from-postgresql-using-akka-streams-akka-http-and-doobie-203ab911097c
  6. https://www.beyondthelines.net/computing/akka-streams-patterns/
  7. https://algd.github.io/akka/2018/08/05/parallel-stream-processing.html
  8. https://blog.colinbreck.com/maximizing-throughput-for-akka-streams/