Scaling Sidekiq in production

At AirPR, we use Sidekiq to manage background jobs. To serve our business requirements, we’ve had to scale our processing to support tens of millions of jobs a day with 100s of workers running concurrently. Overall, we have processed over 25 Billion background jobs using Sidekiq. We use Aurora MySQL as our primary relational data store with numerous MySQL tables containing 100s of millions of records — our largest table contains about 2.5 Billion records. Some of our tables contain vast amounts of textual data — so they are way larger than others in terms of size. Our largest table is over 100GB in size.

Memory Constrained Efficiency

Our workers are hosted on Heroku’s infrastructure, which comes with strict limitations on memory availability depending on the size of the dynos (containers in Heroku parlance). When a worker uses up more than the allotted memory based on dyno size, it leads to disk swapping which then results in substantial degradation in performance ultimately causing the dyno to get shutdown.

Sourced from Heroku website

To keep the cost minimal, given this scale, it’s important to ensure that our jobs can run on the smallest dyno possible. This means, even when querying millions of records, we need to be careful about how much we load into memory and how much data we process at a time, while at the same time ensuring that we our jobs are performant.

This often leads to two opposing forces. While on the one hand, we would prefer to load as much data at once as possible to avoid query explosion and associated network latency, on the other hand, we don’t want to load and process too many records at once because that will necessitate us to use a bigger dyno — leading to higher costs.

To solve for this, we can use an approach where we process N records at a time per worker — N being tuned to whatever the memory constraints would let us scale to without swapping.

I’ll walk you through an experiment to identify the performance implications on two approaches:

  1. Create one job to process one record
  2. Create one job to process multiple (N) records

One job per record

To simulate this experiment, I setup a sample dataset with about 500,000 employee records with the following schema.

We create one SingleEmployeeProcessJob for each employee record. The job itself is pretty simple, it just does a simple lookup for a particular id in the database and does no processing at all. We use a Sidekiq batch to determine the total time the entire process takes from beginning to end.


class SingleEmployeeProcessJob
include Sidekiq::Worker
  def perform(id)
Employee.where(emp_no: id).first
end
end
class SingleEmployeeProcessCallback
def on_success(status, options)
puts "Time to process: #{Time.zone.now - status.created_at}"
end
end
batch = Sidekiq::Batch.new
batch.on(:success, SingleEmployeeProcessCallback)
Employee.select(:emp_no).find_in_batches do |ebatch|
batch.jobs do
Sidekiq::Client.push_bulk('class' => SingleEmployeeProcessJob ,
'args' => ebatch.map{ |e| [e.emp_no] })
end
end

I ran one Sidekiq worker with a concurrency of 15 to process the jobs from thedefault queue. (since there is none specified in the job definition, Sidekiq enqueues jobs to this queue)

all_worker: bundle exec sidekiq -c 15 -q default

This took a total of 20 minutes from start to finish.

Note that in this approach, to ensure optimal performance on enqueues, we leverage push_bulk to enqueue 1000 jobs at a time. Without that, the performance would have been worse.

Multiple records per job

The alternate approach is to process multiple records per job. This is how the code looks for this approach:

class BatchEmployeeProcessJob
include Sidekiq::Worker
  def perform(start_id, end_id)
Employee.where('emp_no >= ?', start_id)
.where('emp_no <= ?', end_id).each do |employee|
end
end
end
class BatchEmployeeProcessCallback
def on_success(status, options)
puts "Time to process: #{Time.zone.now - status.created_at}"
end
end
batch = Sidekiq::Batch.new
batch.on(:success, BatchEmployeeProcessCallback)
Employee.select(:emp_no).find_in_batches do |ebatch|
start_id = ebatch.first.emp_no
end_id = ebatch.last.emp_no
batch.jobs do
BatchEmployeeProcessJob.perform_async(start_id, end_id)
end
end

Here, the job takes a start and end id and processes through all records between those two ids. We create one job per 1000 records in this case, but that can be tuned depending on memory availability.

With the same worker from last time with a concurrency of 15, this approach processed through all 500,000 records within 10 seconds!

That’s a 120x speedup!

Tuning for maximum efficiency

When processing multiple records at a time, the BatchEmployeeProcessJob loads N records into memory at a time. Since there can be 15 concurrent BatchEmployeeProcessJob running at the same time, that could lead to up to 15*N records loaded into memory at one time. This may or may not be an issue depending on the value of N and memory constraints of your workers. To solve for this, you can either lower the concurrency of the worker or lower N or both.

The improved performance seen in BatchEmployeeProcessJob is from two reasons:

  1. Fewer enqueues and dequeues from Redis queues overall because there are N times fewer jobs.
  2. Reduced network latency from MySQL queries — this approach makes one query per N records as opposed to one query per record.

If we added an update operation within the worker, the performance difference would be even greater, because multiple single record updates also has much worse performance than bulk updates.