This is a story of importing a large dataset to Akka Cluster and the pitfalls we had. It is a technical one, but I won’t explain the features of the Akka Cluster itself. It would be a really sizable blog post if I were to introduce the basics of Akka Cluster and Kafka, so I will assume you have a basic knowledge of them.
Here, I won’t be describing separate features, but a story, how it worked for our use case: importing a large Internet of Things (IoT) dataset.
1. Requirements and architecture overview
Our goal was to create a new platform to integrate with IoT device data. The requirements include:
1. IoT data is sent continuously, from around 5,000 devices
2. We need to consume not only the current data, but also data from the entire last year, which combines to be over 50 million measurements
3. IoT messages need to be available, sorted in the temporal order in which they are received
There are many reasons why we chose Akka Cluster + PostgreSQL (with akka-persistence-jdbc), but from a technical point of view, these include:
- Easy scalability when more devices are added (by just adding more nodes to the cluster)
- An easy way of sending data to other systems, with a possibility to replay events (with Akka Projection)
- Enforcing an ES/CQRS approach (Commands/Events)
- Keeping the latest IoT device state in memory (in actors) for fast access
There was one more requirement: minimize the chance of losing data from devices. This sounds simple, but it can go wrong in many ways, from different reasons like network problems, application lifecycle issues, and business logic updates.
Therefore, we decided to use a message queue before Akka Cluster as a place where the data can be stored before consumption. One of the most well-known and battle-tested message queues is Kafka, and there’s also good integration of Kafka with Akka using Alpakka Kafka connector, so we chose it for our solution.
Within our code, each IoT device in the Akka Cluster is represented as an actor. The stream from Kafka sends messages to all actors, and receives acknowledgement after each message is processed. If the message is not processed by an actor for some reason, Kafka replays it.
Kafka also gives us one more benefit: since we can buffer current messages, why not put in queue also data we need to replay from last year? Well, that sounded good and a plan emerged. We created two main topics: iot_replay for data from last year, and iot for current data (i.e., the continuous stream from devices). As shown in this image, the import process starts with iot_replay, and then we connect the iot topic:
It’s also worth mentioning that we didn’t create a Kafka partition for each IoT device. The data rate (in the continuous stream) is not too high, and the number of devices can change over a time. We created 10 partitions, so each partition will send messages from 500 devices.
2. The problem
We created a platform, and for a test import of one month of data, it worked. We connected the platform to the iot stream, and it worked too. It seemed that the setup was ready to start real import.
On the day of the import we created a cluster with a size of 10 nodes, and connected to “replay.” Data from the Kafka iot_replay was consumed at the pace of 2,000 msg/s. After a day, our system had all the data from last year!
Next, we changed the Kafka cluster consumer to point to the iot topic and redeployed the Akka Cluster nodes. After that we looked at Akka Telemetry, and our consumption was… less than 10msg/s, as shown in this image:
Our first thought was that if the cluster processed a lot of data, it might take some time for the cluster to come back up to full speed. Also, there were more business important matters to solve, so we switched to other tasks and left the cluster to recover itself.
3. The culprit?
As we worked on other matters, days passed, and the system still wasn’t consuming messages with its initial speed. We stopped hoping that it would get better, and started investigating.
After searching through logs and checking metrics we found out that most of our actors didn’t even start. The stream was slowed down because the actors couldn’t consume messages. And the reason was… DB query timeout.
Let’s look at metrics. Ideally, recovery failure should not happen in your Akka Cluster at all, but as shown in this image, it was indeed happening:
We saw that each message spent a lot of time in stash:
But why are there any messages in stash? Because they wait there before the actor recovers:
Week before import, we had a test with 500,000 messages in a journal, and there was no query timeout — but when we imported 50M messages, it started to occur. Also, this started to happen after replay because we restarted the cluster, and Akka Cluster tried to start actors. Immediately upon seeing the data, we understood our mistake: we forgot to add a snapshotting mechanism. In theory this is optional, but it’s pretty much a requirement in a bigger system. If someone forgets it… well… a failure like ours might happen.
So two fixes were applied:
a. We added snapshot generation to the code
b. We needed to start processing again, so we increased timeouts in persistence configuration:
akka.persistence.journal-plugin-fallback.circuit-breaker.call-timeout — the max time of query to DBakka.persistence.journal-plugin-fallback.recovery-event-timeout — the max recovery time for getting an event (we changed this just be to sure it isn’t a problem)
4. Friend or foe?
We thought this might be the end of this story: the new configuration was in place, and snapshotting was added. We stopped seeing problems with actors not getting started in metrics, and waited for cluster recovery. But unfortunately, the story doesn’t end here, because no recovery happened.
Again, we fooled ourselves with thoughts like, ”the cluster needs time”, “snapshots are not ready yet,” “let’s wait some more.” But another day had passed.
To be fair, there was some improvement: we had no recovery failures. But stream processing was still far away from the initial results. Also actor recovery still was taking a long time (from 5 to 20 seconds) and was happening all the time. We checked the database, and snapshots were there. That was off… if the cluster had already been working the whole day, why were the actors constantly recovering?
Then it stuck us… passivation! Enabled by default, this mechanism in Akka Cluster is used to improve memory usage. Simply speaking, it stops actors and restarts them only when a new message is received. By default, after 120 seconds with no messages coming in, an actor is removed from memory. This is possible because the whole state of the actor is already in the journal (stored as events), and/or snapshots.
So our cluster ended up in a loop. Consumption of messages was slow because actors had to be restarted to receive messages, and messages were consumed slowly, so the actors were passivated.
To make this more clear, let’s go back to the numbers. We had 10 partitions (separate streams) connected to 10 instances and 5,000 actors. So each instance had its own stream and 500 actors. Messages are sent from the stream sequentially, and need to be acknowledged by the actor. Also, messages are ordered by time, so in the worst case scenario — to get a second message for the same actor — 499 other messages must be consumed. If consumption of each message takes 5 seconds (the recovery time of the actor), there’s no way to consume 499 messages before passivation happens.
Therefore, we decided to disable passivation for the time of the import process, and we were quickly back on track, at a pace of 2,000 msg/s once again!
5. The real culprit
The system worked fast again, but we still had a lot of questions:
- Why was passivation enabled by default if the recovery time can be so long?
- Was our data that huge? (At that time we had 69,000,000 events in the journal.)
- Why do we still need 20 seconds to start some of the actors?
We decided to do another test: how much redeployment will interrupt the working cluster?
As shown in this image, we did the first deployment before 9:55, then again at 10:10:
The actors started slowly, because of the long recovery time:
Finally, after 30 minutes the cluster was fully operational again:
From a business perspective we could accept this redeployment time. From a technical perspective, not really. Deployment can be frequent, and in Reactive Systems it needs to be fast. Fortunately, my teammate Jacek didn‘t give up, and dug into the DB queries of the akka-persistence-jdbc code. And guess what? He found that the slow query could be optimized! (You can see the details of his findings in this pull request on Github.) After that fix, the cost of the query dropped drastically, so we enabled passivation again.
If you’re running Akka Cluster, don’t forget about configuring snapshots, and remember about the implications of passivation. Also — as shown in the charts in this article — it’s good to observe metrics on how actors are behaving under a real load.
We also recommend making sure that you update project dependencies. In many cases they can provide significant improvements for your applications, such as improving the performance issue we discovered. (Thanks to Lightbend, the akka-persistence-jdbc fix was released really fast, and is available in version 5.0.1)
As a final note, it’s also worth mentioning that finding and fixing such issues wouldn’t be possible if Akka was not open source.