Fan-Out Sidekiq Jobs to Manage Large Workloads
Improve Operation Control and Resilience
Performing a resilient operation on bulk data can be challenging, especially if the operation relies on a third party. You can safely do this by fanning out the work to idempotent background jobs that operate on only one piece of data at a time. Those jobs can retry independently as needed, making the entire operation more easy to manage. This post will show an example of how that works and why you might want to use this pattern.
Fanning out is a way to perform work in parallel batches instead of inside a loop. Executing an operation this way provides more control and more resilience. Doing this well requires a combination of both job and database design.
Simple Domain of Charging Subscriptions
Let’s take simplified domain of charging customers a subscription each month. Let’s say we have a subscriptions
table that has a customer ID, an amount to charge each month, and the date on which to charge them. Each month when we charge them, we’ll update that date to be the next month. Let’s assume there is a customers
table that has some sort of identifier to a third party payment processor as well.
A simple way of making the update is to loop over each subscription, check if next_charge_on
is today and, if so, charge the customer. Assume there is a ThirdPartyPaymentProcessor
class that handles talking to our credit card payment service.
We’ll put this task into a Sidekiq job and arrange for it to run every day.
class ChargeSubscriptionsJob
include Sidekiq::Job def perform
payment_processor = ThirdPartyPaymentProcessor.new
Subscription.where(next_charge_on: Date.today).find_each do |subscription|
payment_processor.charge!(
subscription.customer.payment_processor_id,
subscription.monthly_charge_cents
)
subscription.update!(next_charge_on: Date.today + 1.month)
end
end
end
Even at a moderate scale, this job can become difficult to manage.
Difficulties with Long-Running Batch Jobs
Suppose our payment processor experiences an outage partway through processing. The job will fail and be retried. The subscription being charged during the failure may or may not have been charged. If it was, retrying this job will charge it again.
What if we have so many subscriptions that we can’t charge them all in one job? Most payment processors take a few seconds to complete a charge. If we had 1,000 customers to charge on any given day, that means this job would take about an hour to complete.
If you were to deploy, or cycle infrastructure (as is common with cloud-hosted services) it could fail partway through. What if there is some bug or problem with the data such that a particular subscription always causes a failure? The job would retry forever, never getting past that one errant subscription (a so-called “poison pill”).
Large jobs that operate on a lot of data and run for a long time are magnets for failures. It can be often difficult to unwind what went wrong and correct it. If we could break up the logic into manageable chunks, that might make the job easier.
Breaking up Batch Operations to Small Chunks
Let’s keep ChargeSubscriptionsJob
selecting subscriptions to charge but, instead of charging them, it queues a job for each subscription to charge. This process is called fanning out because it’s usually diagrammed like so, which looks like fanning out playing cards:
Let’s try it. ChargeSubscriptionsJob
will queue ChargeJob
like so:
class ChargeSubscriptionsJob
include Sidekiq::Job
def perform
Subscription.where(next_charge_on: Date.today).find_each do |subscription|
ChargeJob.perform_later(subscription_id) # <---
subscription.update!(next_charge_on: Date.today + 1.month)
end
end
end
The ChargeJob
contains all the code we just removed:
class ChargeJob
include Sidekiq::Job
def perform(subscription_id)
payment_processor = ThirdPartyPaymentProcessor.new
subscription = Subscription.find(subscription_id)
payment_processor.charge!(
subscription.customer.payment_processor_id,
subscription.monthly_charge_cents
)
end
end
Now, ChargeSubscriptionsJob
doesn’t depend on the payment processor. It depends on just the database and the Redis being used for Sidekiq. These are under our control and less likely to fail. And, since we update next_charge_on
only after we successfully queue ChargeJob
, if ChargeSubscriptionsJob
gets retried, it won’t queue the same subscription twice.
The new design also ensures that any problematic subscription won’t spoil the entire batch. The so-called poison pill subscription would continue to fail, but each time it got retried, other subscriptions would get processed first. Eventually only the poison pill would remain and, presumably, we’d be notified of a failure and could address it.
Of course, changing our design to fan out jobs introduces other failure modes we need to address.
Failures When Fanning Out
If you think about our updated design, the ChargeJob
instances queued to Sidekiq are the only place we have a record of what subscriptions to charge and how much to charge them. Sidekiq is a great job processor, but it’s not a database.
One problem we’d face is that if monthly_charge_cents
changed after it queued a ChargeJob
, but before it was processed, we’d charge the wrong amount. Worse, if we lost Redis, we could lose some ChargeJob
s and have no idea what subscriptions needed to get charged. Sidekiq does its best to avoid this situation, but Redis is not a resilient database like Postgres.
What we should do is use our database to store information that we need to persist, and have our Sidekiq jobs fetch the data they need from there. The ChargeJob
is really an intention to charge money that, when processed, becomes realized. We should store that intention in our database.
Using the Database To Store Operational Data
Let’s call this an invoice. It’ll reference a subscription, hold the amount to charge, the original charge_on
date, and a nullable value for when the charge was completed:
Now, ChargeSubscriptionsJob
will create an invoice and ChargeJob
will accept an invoice id to charge. Because ChargeSubscriptionsJob
now has to both create the invoice and update the Subscription, we want to perform both of those inside a database transaction. That way, either both changes are made or neither are, and we don’t end up in a half-updated state.
class ChargeSubscriptionsJob
include Sidekiq::Job
def perform
Subscription.where(next_charge_on: Date.today).find_each do |subscription|
ActiveRecord::Base.transaction do
invoice = subscription.invoices.create!(
charge_on: subscription.charge_on,
charge_cents: subscription.monthly_charge_cents,
charged_at: nil
)
subscription.update!(
next_charge_on: Date.today + 1.month
)
end
ChargeJob.perform_later(invoice_id)
end
end
end
Note that ChargeJob
is now queued after all the database updates. While, in theory, we could queue it right after creating the invoice, that would require doing so inside an open database transaction. This is bad. At even moderate scale, queuing before the updates can cause the locks required to keep the transaction open to be open for too long and have a cascading effect on the system. This effect can be extremely hard to diagnose back to the open transaction.
Queueing transactions this way has implications we’ll get to in a minute, but let’s see the updated ChargeJob
:
class ChargeJob
include Sidekiq::Job
def perform(invoice_id)
payment_processor = ThirdPartyPaymentProcessor.new
invoice = Invoice.find(invoice_id)
if invoice.charged_at.present?
Rails.logger.info "Invoice #{invoice.id} already charged"
return
end
customer = invoice.subscription.customer
payment_processor.charge!(
customer.payment_processor_id,
invoice.charge_cents
)
invoice.update!(charged_at: Time.zone.now)
end
end
ChargeJob
is mostly the same, except it now updated the invoice to indicate it was charged. It also checks to make sure the invoice wasn’t already charged.
ChargeJob
now includes everything needed to manage these jobs inside the database. If we lost Redis entirely, we can look at any invoice where charged_at
was null
and know that it hadn’t been charged. In fact, we could eliminate the need for ChargeSubscriptionsJob
to queue ChargeJobs
entirely by creating a new job called ChargeOutstandingInvoicesJob
.
Using the Database to Drive Job Queueing
First, we remove the call to ChargeJob.perform_later
:
class ChargeSubscriptionsJob
include Sidekiq::Job
def perform
Subscription.where(next_charge_on: Date.today).find_each do |subscription|
ActiveRecord::Base.transaction do
invoice = subscription.invoices.create!(
charge_on: subscription.charge_on,
charge_cents: subscription.monthly_charge_cents,
charged_at: nil
)
subscription.update!(
next_charge_on: Date.today + 1.month
)
end
# XXX ChargeJob.perform_later(invoice_id)
end
end
end
This change means that ChargeSubscriptionsJob
is always safe to retry under any circumstance, since it will always pick up where it left off—as long as it completes all subscriptions before the end of the day.
To get the invoices charged, ChargeOutstandingInvoicesJob
will look like so:
class ChargeOutstandingInvoicesJob
include Sidekiq::Job
def perform_at
Invoice.where(charged_at: nil).find_each do |invoice|
ChargeJob.perform_later(invoice.id)
end
end
end
Is ChargeOutstandingInvoicesJob
safe to retry? Yes, with a qualification. Since ChargeJob
checks that charged_at
is null, this operation avoids a race condition where a retry of ChargeOutstandingInvoicesJob
could queue two ChargeJob
s for the same invoice.
What is a problem with ChargeJob
regardless of how ChargeOutstandingInvoicesJob
(or ChargeSubscriptionsJob
) is implemented is that the third party payment processor call needs to be idempotent. We need to make sure that happens exactly once.
This problem is covered in detail in my Sidekiq book. Ruby on Rails Background Jobs with Sidekiq includes a sample app that demonstrates this exact problem, and a detailed discussion of how to manage it. The book shows you some code to address it, and you can see it working with the example app.
Addendum — Bulk Queueing API
If you aren’t familiar with Sidekiq’s Bulk Queueing API, a better way to implement ChargeOutstandingInvoicesJob
would be to bulk queue the ids in batches of 1000, like so:
class ChargeOutstandingInvoicesJob
include Sidekiq::Job
def perform_at
array_of_job_args = Invoice.
where(charged_at: nil). # Get all not charged
pluck(:id). # Get only their ids
zip # turn each element into
# a single-element array
# batch size is 1000 by default
ChargeJob.perform_bulk(array_of_job_args)
end
end
Bulk queueing is a more efficient — and thus less error prone — way to queue a bunch of jobs based on the results of a database query. array_of_job_args
is an array where each element represents an invoice, and those elements are themselves arrays that contain a single argument: the invoice’s id.
If you found this article helpful, check out Ruby on Rails Background Jobs with Sidekiq by David Bryant Copeland. Ask questions and share your thoughts on the DevTalk forum page. You can purchase the ebook directly from The Pragmatic Bookshelf: