EXPEDIA GROUP TECHNOLOGY — SOFTWARE

Actors for Akka Stream Throughput

Increase parallelism and throughput in streaming applications

Bradley Schmidt
Expedia Group Technology

--

An ornate rooftop carving
Photo by Danielle Barnes on Unsplash

The problem

Stream processing at scale frequently brings up issues around concurrency, particularly when writes are being made. Consider what happens when we have two events, A and B, that appear in rapid succession in our stream and both modify the same object. If the two events are processed in parallel then it’s possible that B will complete its write first and see its results subsequently overwritten by A, despite technically being the fresher of the two data points. Alternatively, it’s possible that both will read, modify, and write to the same object, stomping the other’s changes if a solution like optimistic concurrency is not in place (and one of the two requests is destined to return an error even if it is).

Two events modifying the same row in a database are shown processing in parallel. The newer event finishes first, writing the value “2” to the row, but is subsequently overwritten when the older event finishes processing and writes “1”.
Two events targeting the same object appear in the input topic. The first event is pulled by the service for processing (1) and the second, newer event is pulled immediately thereafter (2). Both process in parallel but in this case the newer, fresher event finishes processing first and gets written to the DB (3). When the older, stale data finishes and does its own write (4) it overwrites the newer data. The update is lost.

Solutions to this problem exist in various — though not always obvious — forms. Kafka itself makes our life easier by ensuring that events with the same key always appear on the same partition. So long as the key uniquely identifies the object being modified, and so long as the partition is only processed in one place at a time, then we can be sure that the problem can be managed within the scope of a single instance. Kafka Streams, one popular streaming framework, utilizes these guarantees to solve the problem by mandating that only one event will be processed at a time for each partition. This works, but uses a sledgehammer when a scalpel might suffice — if the events on our partition address different objects then it should be possible for us to process them in parallel, but Kafka Streams outright prevents us from doing so. [1]

An event with key “B” is forced to wait while another, unrelated event in the partition with key “A” is processed.
Since these two events target different objects they could potentially be run in parallel without fear of race conditions. However, since only one event is processed at a time the second one is forced to wait.

In contrast, Akka Streams allows us the flexibility of running multiple events per partition through the use of mapAsync(). That method, in short, allows us to map each incoming event in the stream to a future and run a configurable number of those futures in parallel, all with the guarantee that the resulting events will be emitted in-order (regardless of when the futures actually completed). This is a powerful tool, and one that we’ve used with considerable success in our services that DON’T have to worry about race conditions. However, for those that DO, we run into the problem described at the start of this blog. Without a solve for that our only defense is to run mapAsync with a parallelism value of one, locking us into the same one-event-per-partition-at-a-time philosophy that Kafka Streams uses and robbing us of potential throughput.

Akka actors to the rescue

Akka Streams is interesting in that it’s possible to make great use of the framework without having the slightest idea what Akka actually is, or how it operates. It is, by and large, an implementation detail. For the uninitiated: Akka is a framework for building highly-concurrent systems using actors and messaging. The number of tools in that particular toolbox is extensive, as even a brief glimpse at the (ample) documentation will make clear, but for the most part Akka allows you to choose what is relevant to your needs and ignore the rest.

In this case, our needs don’t go much further than the actors themselves. So — what are actors? A (probably over-simplified) synopsis is that actors are objects that communicate by sending and receiving messages. For the most part Akka dictates very little about how these actors will be used — it’s up to the implementer to determine what data they encapsulate, how many there are, their lifecycle, relationship to each other, and how and what they communicate with each other. What Akka DOES provide is some basic guarantees, most crucially that each actor will only process one message at a time. Any others that it might receive in the interim are entered into a “mailbox” queue to be processed later, in the order they were received, just as soon as the previous message completes.

This guarantee is intended first and foremost to eliminate concurrency concerns from actor logic, and underpins a great deal of the framework’s power. For us, it has another obvious benefit: it would appear, by all accounts, to be the perfect tool to address our race condition concerns. If we can ensure that each incoming key has a unique actor associated with it then we can use that actor to ensure that events affecting the same object process sequentially while events affecting others run in parallel on separate actors.

The first two events in the topic, both with key “A”, are forwarded to the same actor where one begins processing immediately and the other sits in the mailbox. Meanwhile a third event with key “B” is able to begin processing in parallel with the first “A” event.
Here the first event in the topic immediately begins processing in its actor (1) while the second event, which operates on the same object, gets enqueued in that actor’s mailbox (2) and must wait. In the meantime a third event, addressing a different object, is able to begin processing immediately in a separate actor (3).

Given all that, a solution begins to take form: instead of using mapAsync() to execute a future directly we instead have it call Akka’s ask() (which itself returns a future) to message the appropriate actor. That actor can then handle said message by executing the future logic formerly slated for mapAsync(), all with the guarantee that only one executes at a time. In theory it’s perfect, even if in practice it needs a bit more work.

The nitty-gritty details

After the actor receives its message and kicks off the desired future it has two obvious choices: 1.) return from the handler immediately or 2.) block until the future completes. Unfortunately, both have flaws. If we to take the first route then the actor will immediately begin processing the next event in its mailbox (assuming one exists). Since we have no guarantee that the previous future has completed at this point (it almost certainly hasn’t) we find ourselves right back where we started from: executing multiple events for the same object in parallel. The second approach would seem to be better, as it guarantees that the second event won’t process until the first completes, but falls flat when one realizes that Akka sticks multiple actors on the same software thread. Though the context switching necessary to manage that shared thread is embedded within framework itself and generally invisible to the engineer, the ramification here is that blocking that thread for one actor risks robbing other actors of their own access to the thread, thereby starving them. In short: actors shouldn’t block.

It may seem, then, that we’re stuck between Scylla and Charybdis, and so it would be if not for two other Akka features: Stashing and Behavior Switching. The first allows us to buffer (“stash”) messages for later processing and then unbuffer (“unstash”) them in FIFO fashion at a time of our choosing. The second feature allows the actor to change its behavior in response to different messages, effectively making the actor a sort of finite-state-machine. Taken together, they allow our actor to alternate between two roles: a “receiving” mode which processes incoming messages, and a “buffering” mode which stashes them. Each actor starts in receiving mode, but switches to buffering mode as soon as it receives a message and initiates its first future. At that point any further messages would be buffered — that is, until the future completes, at which point the actor would unstash the first message in its buffer (if it exists) and return to receiving mode. From there the pattern repeats. In this way we accomplish our goal of ensuring that only one future gets processed at a time, all without blocking. [2]

The “Receiving” state is shown transitioning to “Buffering” when an event is received and a future initiated. It transitions back to “Receiving” when the future completes and the events are un-stashed.
The two states the actor may be in and what triggers the transition between them.

Why Akka? A consideration of alternatives

You may be asking: why use Akka for this at all? What value do actors really provide, here? You could, after all, attack this problem by mapping each key to a mutex, or possibly a blocking queue. That would meet the broad goal of preventing parallel execution of each key, and might be faster to implement (or at least use more familiar constructs). It would work IF you can guarantee that all incoming events for a given object would be processed on a single box.

But consider if they weren’t. What if the key we cared about wasn’t the same key used by the topic, so that we might expect events from different partitions? Or what if we were aggregating events from separate topics altogether? If we scale up to multiple boxes then suddenly our mutex doesn’t work so well anymore, and we hit a whole host of other problems. Leadership election and cross-node coordination become necessary to ensure that all events with the same key end up in a single, correct place. We need logic to ensure that the resulting distribution is balanced. Even the simple act of sending the message across the network to another box becomes a source of complexity as we have to worry about serialization and networking protocol.

Which brings us back to Akka. Part of the appeal of the framework is that the constructs it uses are innately scalable. Sending a message to another box is easy because, hey, messages are how everything communicates, anyways, and Akka does the dirty work of figuring out how to get it there [3]. Ensuring that all instances of a key get handled on a single instance with a single actor is easy with Cluster Sharding (see documentation here), and there are features we haven’t even utilized yet for persistence, recovery, and reliable delivery that are just waiting in the wings. So even if we could maybe get away with not using Akka for some of our simpler use-cases we certainly can’t for others, and having a single, shareable approach to support both gives us big advantages.

Conclusion

Our team has utilized this approach in a number of internal streaming services with functions ranging from stream aggregation to database writes. With it, we can safely process hundreds of events at a time from each partition, increasing our throughput far beyond what might have been possible otherwise.

[1] Note that it IS possible to process multiple events in parallel using Kafka Streams by initiating an asynchronous job for each event and exiting the handler as soon as we do so. This would allow processing for the next event to start while the job was still in progress, but crucially would also commit the associated offset first, ensuring that if that job were to fail the event would still be considered complete and effectively be dropped. Additionally, this approach eliminates any protection against race conditions if additional measures aren’t taken.

[2] If you bothered to watch the “actors shouldn’t block” video above (which you should, as it’s very good) then you may have noticed that setting up a new “dispatcher” — basically, a separate thread pool — to handle the blocking actors is another viable solution to this problem. I wouldn’t discourage anyone from trying this, but it effectively solves the problem by throwing more memory at it (those thread pools come at a cost). In particular, it’s wasteful if the I/O operations themselves use non-blocking APIs that wouldn’t otherwise require a thread. Context-switching and stashing allow us to temporarily block actor activity without consuming those resources.

[3] Since our messages may theoretically get sent over the wire to another box it’s important that they be serializable. If you find yourself sticking non-serializable objects like connections or function objects in your application’s messages then I would advise you to take a step back and re-evaluate, even if you are quite certain that message will never actually get sent to another instance. As mentioned above, part of the power of the Akka Framework is that its abstractions scale with very little effort, enabling your deployment to go from two instances to a hundred with very little practical impact on the code (that’s an exaggeration, but only slightly). Locking yourself into a situation where a message can only stay on one box eliminates this potential.

If your futures need connections and functions to run (as ours certainly did) then I recommend you hold those references in your actor instances, not in the messages. It’s a simple exercise to supply these values to your actors as they get instantiated.

--

--