Asynchronous jobs at Alan: how we use probabilities to optimize our setup and reduce complexity
Everybody loves asynchronous tasks, right? They help manage background work that must be executed outside of an HTTP request / response context in order to keep the HTTP cycle fast. At Alan, we use them quite a lot: computing reimbursements, checking documents, generating PDFs, … and for this, we need to have a robust system.
Recently, we made some changes to make our system more generic and more resilient, as well as easier to work with and mitigate load.
Let me take you through the details!
🗺️ Lay of the land
To handle asynchronous tasks (or “jobs”), we use the RQ library, backed by Redis, in which our jobs are stored.
We have different processes that create jobs: an HTTP request, manual commands, other jobs … we’ll call them “producers”; and we have machines that run the jobs when needed: we’ll call them “consumers”.
All jobs are not created equal — some are relatively high priority (computing a reimbursement for instance, or sending a refund order), while some others can definitely wait a bit, like generating a thumbnail for a document a member uploaded.
When they are created, jobs are added to a single, specialized queue, which is basically a very simple first-in-first-out (FIFO) pipe that bears an intrinsic priority. Meaning we know that the queue where reimbursement jobs are is higher priority than others.
Then, consumers drain each queue by taking the job at the head of it and running it.
From a few queues to a lot of queues
As our system needed to accommodate more and more various use cases, and we added more and more jobs each day, we started to create quite a few queues to be able to discriminate the jobs that were high priority to those that were not, on a finer level.
This led to creating new queues often, and we ended up with 50+ various priority queues to drain jobs from.
With all these jobs in all these differentiated queues, how do we prioritize jobs? We only have a finite set of consumers (ie, machines that run jobs), so we need to decide somehow which jobs are treated first.
And it turns out, it’s not that simple
A first prioritizing solution
As we need priority and want some jobs to be processed quickly at the detriment of other, low-priority ones, we need to tell our consumers to drain the queues in a specific order.
That’s what the underlying library actually implements, with a simple stanza:
> rq worker urgent_queue normal_queue low_prio_queue
Here, the urgent_queue
will always be polled first to see if something is waiting in it. If nothing is, the worker will poll normal_queue
and see if there are jobs, etc, etc…
If there are no jobs in these queues, the worker remains idle until a job arrives.
We have quite a handful of queues, each quite specialized, that we can group in different “business” entities: for instance, we have reimbursement queues:
reimbursement_urgent
reimbursement_high
reimbursement_low
or billing queues:
billing
or even mail queues:
mail_urgent
mail_low
With this in mind, how do we prioritize reimbursement_urgent over mail_urgent ? Both are urgent and need to be drained ASAP (an operator is waiting for the job to be done, or a member is waiting for money on his account), but if we just use the standard way described above, inevitably, one will be drained before the other, which is not ideal at all.
Onto a less naive solution
To accommodate for this case, we need to duplicate our worker, and launch several of them, concurrently. For example (with the queues I mentioned before), we would launch three workers:
> rq worker reimbursement_urgent reimbursement_high reimbursement_low
> rq worker mail_urgent mail_low
> rq worker billing
This way, both urgent queues are drained concurrently, and a long-running job in one of the urgent queues won’t prevent the other urgent queue from being drained. Great! 🎉
Oh, … but wait
That means that we need to have a least one worker invocation by “urgent queue” type, which is not ideal:
- We need to have quite a few different worker ‘types’
- These worker types are indeed hardcoded in the code, so changing them is costly (ie, we need to deploy)
- It becomes complicated to autoscale as we would like to scale individual queues, not the whole workers setup (which would cost too much for idle machines)
👉🏼 Why is this a nightmare to autoscale? Well, a single machine will host all these workers together.
Machines have a fixed count of CPUs, and thus, a fixed count of workers (very naively, if our asynchronous jobs were CPU-bound, we would tend to say 1 CPU for 1 worker)
So what we’re autoscaling is machines (or instances): adding new instances when we need to drain queues faster, and terminating them when we have too many idle ones.
If we have, say, a 16 vCPU instance spinning 16 different workers, and if in these workers, only a handful have non-empty queues, we are basically wasting the majority of our vCPU power. As a consequence, we’re paying an extremely high price (💸) in instances to compensate for the general idleness of the workers on each instance, when only one queue is very busy.
For instance, Here is a real-life procfile listing all the different non-claims workers (circa 2023, colorized):
web_worker: flask rq worker main low_priority
email_worker: flask rq worker email email_bulk
noemie_parsing_worker: flask rq worker noemie_parsing
noemie_processing_worker: flask rq worker noemie_processing
prevoyance_claim_engine_worker: flask rq worker prevoyance_claim_engine
pricing_precalculation_worker: flask rq worker pricing_precalculation
thumbnail_processing_worker: flask rq worker thumbnail_processing
contracting_worker: flask rq worker contracting
pricing_worker: flask rq worker pricing
billing_worker: flask rq worker billing
fraud_detection_worker: flask rq worker fraud_detection_consolidation
fraud_backtest_worker: flask rq worker fraud_detection_backtest
employee_movement_engine_worker: flask rq worker employee_movement_engine employee_movement_engine_low
payfit_affiliation_api_worker: flask rq worker payfit_affiliation_api
care_context_conversation_worker: flask rq worker care_conversation_context
cache_builder_worker: flask rq worker cache_value_builder_queue
📐 Introducing Queue pools + probabilistic workers
This previous approach works, but is not scalable and not as flexible as we’d like.
So, instead of assigning each worker a fixed set of queue, we try to keep workers as generic as possible, and feed them the correct job (ie. most prio) from the correct queue in a different way: introducing probabilistic workers and queue pooling
What is a Queue Pool ?
A queue pool is a sorted set of queues that can be considered linked by a common business use case, and that have a clear relative priority relationship, ie: it is always pertinent to drain the first queue of the set rather than the second queue of the set, rather than the third, etc…
Each queue pool has a ⚖️ weight (an integer, for the sake of simplicity) that will represent its probability of being elected as the chosen pool by a worker at any time (the weight becomes a probability when divided by the sum of the weight of all existing pools).
And what is a probabilistic worker?
It’s a worker:
- that has no specific set of queues: it can take a job from any available queue declared in the application
- that will elect, at the end of each job, a queue pool to choose its next job from. It will do so randomly, but taking into account the weight of each pool
When a pool gets elected, the worker will select the first job in the first queue of the set (if any), or default to the second queue (if any), etc …
Fallback pools
If, after having inspected all queues in the chosen pool, a worker does not find a job to process, it will automatically choose a job from one of the remaining queue pools, sorted in descending weight order. A more “correct” approach would be to re-elect a pool randomly (according to remaining weights, but discarding the empty pool), but it’s far easier (and heuristically sufficient) to just sort the rest of the pools.
Failsafe
Finally, if, after having inspected all pools, we have no jobs to process, the worker will shuffle the remaining, unpooled queues and take any job from this set, so that we are sure that no worker can sit idle when at least one job is waiting to be started somewhere (by construction)
⏹️ Let’s recap, shall we
- We gather most queues in queue pools, that are an ordered set of queues with a weight
- Workers are perfectly generic, ie not tied to any queue
- At the end of each job processing, a worker will select a pool to drain a job from it, in a weighted-random fashion. The queue pool will give the worker the first job in the first queue (aka: its most prio job)
- 🔁 Rinse and repeat
🤔 Wait, does that even work?
Yes! Because, ultimately, the random choices converge towards the weights; that is, if we process “enough” jobs, we’ll see that the weight of each pool is respected, and the jobs are processed according to the weight of the pool in which their queue is.
Ex: If we have two pools, each with a weight of 50, the job distribution will converge with half the jobs taken from the queues of pool 1, and half from pool 2 (if there are enough jobs for the workers to be always busy, obviously)
Said differently: using a weighted random choice is a stateless way of storing what job should come next taking into account the weights (≈ priority) of each pool.
👉🏼 Note: It would not work well if we had a very limited number of jobs (because the relative error of a random choice would be too big compared to the number of jobs), but for us (we have more than 500k jobs per day on average) it’s perfectly adapted, and converges quite fast
🤌🏼 Well then, prove it!
Thanks to the two fallback mechanisms, we’re deterministically sure that we can’t have an idle worker if there is at least one job, somewhere, that needs to be done.
But in this setting, observing the probabilities of each queue pool in production via metrics is quasi impossible (queues are not filled evenly with a constant stream of jobs, some jobs take far less time than others, etc). But if we zoom close enough to a timeframe where queues are evenly full, we can actually observe it directly!
Let’s take these queue pools, for instance, and zoom in the “offer” pool, that has a probability of ~6,8%
:
If we extract the relevant data for a short while in “perfect conditions”, we can see that indeed, the probability is respected:
➕ Okay, so what are the pros?
This approach brings quite a lot of advantages :
- Our workers are generic, which means they are easily scalable, and that our deployment strategy is now simpler
- The queue pool system is inherently stateless, which makes it even more scalable! (I mean, deducing state from probabilities, how cool is that?
- By construction, we don’t have idle workers anymore, which means that we optimize our costs by utilizing the maximum of resources that we can to run asynchronous jobs
- Changing priority is instant (ie. the next job election of a probabilistic worker will use the new value) since it’s just a record somewhere holding an integer
➖ And the cons?
Well, not many. One downside is that each worker has to elect a pool before going to its next job, and it takes a little bit of time. It’s a very very small price to pay (we’re talking a few microseconds here compared to jobs that can take up to 10 minutes).
🚒 Bonus: Incident mitigation gets easier
Apart from providing an optimized way of draining jobs and a more generic, scalable system, it also brings a very interesting feature: real-time redirection of computing power.
We have a handy tool (a simple one-pager) that engineers can listing the queue pools, their sorted queues and their weight, in real time:
From there, they can tweak the weights and reorder everything by dragging and dropping queues around. As changes are effective in real time, that means we can easily repurpose ~100% of our active workers to any task that we deem extremely urgent, like running code to mitigate an incident for instance.
👉🏼 Suppose we need to reprocess some cares for our whole member codebase (😱); As queues can be dynamically created, we could very well create a new queue incident_mitigation
, put it alone in a queue pool with an inflated weight, and all workers will very soon start to drain the jobs from this queue 🎉.
Conclusion
Results
We saw a great improvement in the overall throughput of jobs (meaning: we could process the same amount of jobs with fewer instances since they are now optimally used) and our infra costs in “compute” instances hence went down more than 10%!
We’re always on the lookout to find interesting solutions to optimize our infrastructure and the way we deal with the problems we have at hand.
For this, we could also very well have chosen a stateful mechanism (such as keeping track in a database of what jobs or what queue needed to be drained first), but the overhead of tooling + management of the state was not something we deemed beneficial enough. A simpler, low-complexity solution enabled us to ship fast and have something that met our needs in a frugal way, while also improving our setup quite significantly and allowing new features!
If you want to work on interesting initiatives like this one, come talk to us, we’re hiring :)