In one of our projects we are doing for a client, Akka is a central part of the Big Data backend that is collecting, processing, and analyzing events generated by different apps in the client’s infrastructure.
All apps in this ecosystem send events to Kafka, then Akka Streams consume data from Kafka and does the following:
- Filters and enriches data received from Kafka
- Feeds Akka Cluster that does real-time analytics on data
- Stores data in Cassandra used as a data lake
- Stores data in Elasticsearch used by other apps for searching purposes
In this post, I’ll focus on solving a problem we’ve encountered a couple of times — handling Akka Streams failures because of temporary external resource unavailability. Since we are communicating with multiple external systems in Akka Streams (Kafka, Cassandra, Elasticsearch) such errors are expected and can be for example network glitches or temporary outages of some of those systems.
To accompany this post, I’ve created a test project that can be used as a showcase on how to solve this problem. The code can be accessed here.
Akka Streams topology in the test app is simplified, but it serves the purpose and it goes like this:
- Read data from Kafka
- Store data in Elasticsearch
- Store data in Cassandra
- Commit offsets to Kafka
Also, a simple Kafka Producer is included in the project, so everything explained here can be easily tested.
Each part of the stream is built using the Alpakka project that offers implementation of different integration pipelines built on top of Akka Streams. Following is an explanation of each part of the stream.
Reading from Kafka
For reading from Kafka commitableSource is used so committing offsets can be done explicitly at the end of the stream — after data is stored in Elasticsearch and Cassandra:
Storing to Elasticsearch
Elasticsearch as a Flow is used for writing data to Elasticsearch:
Here, passThrough is used so once data is stored in Elasticsearch, it is passed down to the next stage in Akka Streams.
Storing to Cassandra
CassandraFlow fits perfectly as part of the stream topology that stores data to Cassandra. After inserting in Cassandra is completed, this flow emits commitableOffset that is used in the last stage of the stream to commit offsets to Kafka.
Committing offsets to Kafka
At the end of the processing offsets are committed to Kafka with either Commiter sink or flow, depending on the implementation explained further in the post:
Now that all important parts of the stream are explained, let’s glue them together and run them with different error handling options.
When temporary external resource unavailability occurs, Akka Streams should behave as follows:
- Stop reading data from Kafka until the failure of external resources is resolved.
- Don’t lose any data — data that is being processed in the stream when the failure occurred should be re-processed after failure is resolved.
- After the failure is resolved, continue with processing without any human intervention.
Let’s see how this can be accomplished.
Option 1 — No error handling
Akka streams implementation in this option looks like this:
This stream works perfectly in ideal conditions but it doesn’t have any error handling implemented. As soon as an error occurs in this stream, the whole stream fails and is torn down. So this is not what we want, as it doesn’t satisfy any of our 3 requirements.
According to official Akka Streams docs, there are several options you can use to handle errors in streams. For our case, an excellent option that is available out of the box is to use delayed restarts with a backoff operator. As documentation states, this pattern is useful when the operator fails or completes because some external resource is not available and we need to give it some time to start-up again.
We can use RestartSource and RestartFlow, so we have covered cases when either source fails (error occurs while reading from Kafka) or flow fails (error occurs while communicating with either Cassandra or Elasticsearch).
Option 2 — Using RestartSource & RestartFlow
For the first solution with restarts, we have chosen a combination of RestartSource and RestartFlow.
Streams implementation, in this case, looks like this:
Does this stream’s flow satisfy our three requirements written above?
Based on the explanation in the GitHub project (docker setup of Cassandra and Elasticsearch), we can easily test this. For example, while the test app is running (Akka stream is processing records from Kafka), we can simply stop Cassandra docker instance, leave it like that for some time (let’s say ~30 sec), and then start it again. While Cassandra is down, RestartFlow kicks in, since cassandraFlow throws an exception, and does restart with backoff of the flow. After Cassandra is up again, the flow inside RestartFlow starts processing records from Kafka.
So what is wrong with this solution? Requirement no. 2 (don’t lose any data) is not met. In this case, when the stream fails, the record that caused breakage will not be inserted in Cassandra. It is even worse since batching is done while inserting in Cassandra (or Elasticsearch) more records will be lost.
This can be easily checked. After Cassandra docker instance is restarted, and the stream finishes processing all the records, the number of records stored in Cassandra can be compared with the number of produced records, or even with the number of records stored in Elasticsearch. They will be different.
Why are records lost? By lost here it’s meant that they are not stored in an external system (Cassandra or Elasticsearch). There is no retry mechanism implemented to re-process record(s) that caused the flow to break. RestartFlow just ensures that flow is restarted, and after the flow is restarted and is functional again, the next batch of records from Kafka will be processed.
One other interesting thing that can be noticed while part of the stream is down is that despite there is nothing wrong with Kafka Source, records are not read from Kafka. How can this be confirmed? By tracking lag for Kafka consumer group while the stream is active. This can be done with the command:
While the stream is running OK, lag should be close to 0 (or at least constant over time) since records are processed at the same rate they are written to Kafka. Once a part of the flow is down, in the provided test case Cassandra is down, it can be noticed that lag starts to increase, meaning Kafka Source is not reading data from Kafka. This is a back-pressure mechanism that is implemented in Akka Streams, that allows a downstream stage to create a demand request and send it to its immediate upstream stage (pull). If the downstream stage is not ready to receive more data, the upstream stage (in this case Kafka Source) won’t push any data. A great article on how back-pressure works in Alpakka Kafka and improvements made in Kafka v.2.4 can be read here.
So, this option satisfies only 2 out of 3 requirements (1. temporarily stop reading data Kafka and 3. after the problem is resolved, continue with processing data without any human intervention).
This is not good enough for our use case, so we came up with another solution.
Option 3 — Using RestartSource only
In this option we didn’t use RestartFlow at all, instead, we wrapped the whole flow in RestartSource. The implementation looks like this:
With this approach, when an error occurs in any part of the flow, the whole stream is restarted, since the whole stream is wrapped in RestartSource. In the test case, Cassandra went down, and even though everything is OK with KafkaSource, it will be restarted also.
Committing offsets is not done for records that caused failure since the stream failed before offsets are committed. This means that once Kafka Source is restarted, records that caused failure will be read again from Kafka. This way when the stream is fully functional again, record(s) that caused the stream to fail will be read from Kafka again, so no data loss will occur.
On the other hand, this case is an at-least-once processing guarantee, so some records will be processed more than once. This is something that definitely has to be taken into consideration. In this use case, this is not a problem, since both Cassandra and Elasticsearch operations are idempotent, and inserting the same values multiple times won’t cause duplicates in the end. This can also be tested easily with the test project, just choose the implementation shown above, start the app, stop the Cassandra docker instance for a while, then start it again. When processing the whole dataset is done, the number of sent records by Kafka Producer will be the same as the number of records stored in Cassandra and Elasticsearch.
Depending on the duration of failure of an external resource, a restart of the stream can happen multiple times. So, if the failure on the external resource is not resolved after the first restart, the stream will fail and will be restarted again, each time with a growing time delay between restarts.
The third option meets all three requirements that we have set as a goal. The first one (temporarily stop reading data Kafka) is not ideal, since the stream can be restarted multiple times before a temporary glitch is resolved, but this is something we can tolerate in our case. Other than that, this solution is working like a charm for us.