Juggle: Concurrent Task Processing with a Single Actor

Léo Grimaldi
Keep It Up
Published in
4 min readJun 27, 2016

TL;DR: https://github.com/kifi/juggle

Concurrent Task Processing

Last year, we open sourced our Reactive Lock, a lightweight library that we use at Kifi to limit concurrency, typically as multiple requests come in contending for some resource.

In this post, we’re introducing a pattern that we use for concurrent task processing, in order to control periodicity and throughput. Here’s what we want to do.

  1. Every period of time, check whether new tasks are available.
  2. If new tasks are available, start processing them concurrently.

At Kifi, we have been using Akka since the early days. However, we haven’t deployed large actor hierarchies. Instead, we often use singleton actors to encapsulate a mutable state: we rely on the guarantee that actors process messages one at a time to safely maintain this state in a multi-threaded context.

This is how we intend to juggle multiple concurrent tasks with a single actor.

Introducing Juggle

Juggle is a small library, it provides just two actor traits:

  • ConcurrentTaskProcessingActor[T]
  • BatchProcessingActor[T]

ConcurrentTaskProcessingActor[T] is the trait we want to pay most attention to, it requires the following implementations:

It fires Futures that will asynchronously pullTasks and processTasks, then send messages back to report their result when tasks of type T are actually Pulled or Processed.

Doing so, this actor encapsulates a fairly simple state:

Setting closing aside for now, our actor keeps track of both the individual tasks currently in processing as well as the number of additional tasks that may come back from pulling. Therefore it knows to limit the concurrency to maxConcurrentTasks as it pulls for more tasks. Then as tasks complete, it will hold on pulling for more until concurrency actually drops under minConcurrentTasks.

Intuitively, ConcurrentTaskProcessingActor will constantly adjust the concurrency to keep it within minConcurrentTasks and maxConcurrentTasks.

Concurrency Considerations

Proper values of minConcurrentTasks and maxConcurrentTasks will cover different periodicity and concurrency behaviors, depending on business logic and cost considerations for pulling and processing operations.

For instance, minConcurrentTasks = maxConcurrentTasks will ensure the actor aggressively pulls for new tasks after any one task has been processed. This is fine if the cost of processing a task dominates the cost of pulling it.

However the pulling cost often comes down to the fixed cost of a roundtrip to a database or to a queue like SQS, and is independent of how many tasks are actually fetched. So in such a scenario we might rather have minConcurrentTasks << maxConcurrentTasks to amortize a fixed pulling cost over maxConcurrentTasks-minConcurrentTasks tasks.

In general, the ratio of minConcurrentTasks over maxConcurrentTasks will increase with the ratio of relative processing costs over pulling costs, while absolute values are then derived from hardware specifications.

Note that minConcurrentTasks > 0 is a necessary condition for the actor to keep pulling for new tasks until there’s none left. An actor with minConcurrentTasks = 0 will quickly die down, happily idling even if more tasks could be pulled.

Scheduling

If the concurrency ever dies down to zero, our actor assumes there is no more work for now and stops pulling altogether. At this point, we must explicitly send a message to wake it up: this is usually where a scheduler comes in.

The scheduler should periodically send IfYouCouldJustGoAhead to our actor in order to make sure it checks on new tasks at least every given period. If there are indeed new tasks, our actor kicks back in and starts processing them.

Naturally, IfYouCouldJustGoAhead is just another message, and is safely handled by our actor with regard to its internal state: if processing is already under way, a request to go ahead will not lead to extravagant task pulling. Therefore even aggressive messaging from the scheduler cannot lead the concurrency to blow up.

Finally, we can tell ConcurrentTaskProcessingActor to Close for good, typically before the application shuts down. As it’s closing, our actor will not start processing any new tasks whatsoever.

A Common Special Case: Batch Processing

In this context, BatchProcessingActor[T] is merely a special case of ConcurrentTaskProcessingActor that we found common enough to provide explicitly in the Juggle library. It requires the following implementations:

Thus BatchProcessingActor[T] lets you asynchronously pull and process one batch of tasks after another. In practice, BatchProcessingActor[T] is just a ConcurrentTaskProcessingActor[Seq[T]] with both minConcurrentTasks and maxConcurrentTasks set to 1.

maxConcurrentTasks = 1 ensures the actor will process no more than one Seq[T] batch at a time, while minConcurrentTasks = 1 ensures it keeps going for new batches until there’s no task left to pull.

Here’s the full implementation:

Conclusion

Juggle actors provide easy to implement, lightweight solutions to a wide range of concurrent task processing use cases.

For instance, BatchProcessingActor is widely used in our system to handle cross-service data ingestion via sequence numbers, the same way we architected our real-time indexing.

Another common use case is to distribute work within a service cluster. One machine (the leader) periodically pushes new tasks to a distributed queue (e.g. SQS). All other machines in the cluster then pull tasks from the queue and process them concurrently. The one producer typically extends BatchProcessingActor[T] while consumers extend ConcurrentProcessingActor[SQSMessage[T]].

We’ve been relying on such a setup for a number of tasks such as scraping and image processing, and in key components of our Slack integration.

We hope you like juggling as much as we do. 😼

--

--

Léo Grimaldi
Keep It Up

Engineering @YouTube. Former French Pastry Officer @Kifi @Stanford @CentraleParis