Job Processing System at Motive

Padmaja Y
motive-eng
Published in
12 min readAug 12, 2024

Introduction

Asynchronous computing is an integral part of modern distributed architecture. Expensive operations are offloaded asynchronously to allow the synchronous flow to be performed with low latency.

A task queue is typically used to create tasks executed asynchronously.

This blog post describes Motive’s journey from a monolithic job queueing system to a distributed/horizontally scalable system.

Motive currently processes around 160M jobs daily from various hardware devices, mobile, reports, etc. Only some jobs need to be processed in near real-time. Considering the company’s growth, we want to support 1B+ jobs daily in the upcoming years. We see a significant bottleneck with the current job processing system(Beanstalkd).

Beanstalkd

Motive’s initial job processing system was based on Beanstalkd. This monolithic in-memory queueing system is hosted on a single EC2 instance, as it’s not distributed. Hence, it can only scale vertically. An EBS volume is mounted to take care of the storage.

Fig 1

The Beanstalkd needs manual maintenance, and it’s a single point of failure.

Beanstalkd Features in Use

  • List queues — This is used in the Beanstalkd dashboard to navigate to individual queue details.
  • Exactly once job processing — Message won’t be delivered to multiple consumers, which sometimes happens in distributed systems
  • Scheduled jobs — Frequently running a job based on cron expression.
  • Delayed jobs — Run a job after a specified delay.
  • Peek — Read the messages without deleting them from the queue.
  • Pause — Pause a queue that will pause the consumers and the producers from processing the jobs.
  • Kick-buried jobs(Redrive) — Redrive the failed messages back to the queue.
  • Dashboard — Provides a view for developers to have insights on the queues like listing the queues, number of ready(ready to be processed by a worker) jobs in a queue, number of buried jobs in a queue(failed and went to a buried state), kick buried jobs(redrive), peek jobs from a queue.
Fig 2

Backburner

Backburner is a Ruby library that creates background job processing using the Beanstalkd work queue. Beanstalk is a simple, fast work queue that follows the producer-consumer model. Backburner provides a high-level interface for managing jobs, making creating, enqueue, and processing background tasks in Ruby applications easier.

Limitations of Beanstalkd

  1. Availability — Since it’s hosted on a single EC2 instance, this leads to a single point of failure
  2. Reliability — Unknown downtime due to a single instance
  3. Scalability — Beanstalkd is not distributed. Hence, it cannot scale horizontally and is limited to vertical scaling with a maximum of EC2 capacity. Not having horizontal scaling prevents the job scheduler from fanning out its workload.
  4. Security — Beanstalkd does not support TLS out of the box. This means all communication between the application pods and Beanstalkd server is unencrypted and insecure. (We had to work around this limitation by using the Stunnel as an intermediate service for providing TLS encryption)
  5. Maintenance — Backburner (the job processor) has been in maintenance mode since the 2017 link. Beanstalkd Library was updated in 2019 after five years, link. This requires ongoing engineering effort for Transport Layer Security(TLS) support and monthly deployment of patches for SOC2 support, and currently, there is no proactive vulnerability assessment and patching happening.

Potential Solutions

With the current limitations mentioned above, our system is at a very high risk. Hence, we started looking for queueing and streaming systems that are scalable, reliable, and highly available.

SQS (Shoryuken ruby gem)

Amazon Simple Queue Service (SQS) is a fully managed message queuing service provided by Amazon Web Services (AWS). SQS allows you to send, store, and receive messages between software components at any volume without losing messages or requiring other services to be always available.

  • AWS does the heavy lifting.
  • No storage limit.
  • Availability, reliability, and scalability are taken care of.
  • Shoryuken Gem is readily available for writing jobs to SQS

Concerns with SQS

  • Can’t delay a job above 15 min. Further read
  • With standard queues, SQS guarantees at least one delivery; duplicates are possible. Hence, jobs need to be idempotent. Further read
  • Job processing time can’t exceed 12 hrs (Well, ideally, we shouldn’t have such long-running jobs), as SQS has a max of 12 hours as visibility timeout. Hence, this job will be available for other consumers to process. Further read

Custom Beanstalkd

Beanstalkd in K8s — The Idea was to spin up the Beanstalkd pods based on the upcoming traffic. However, redistributing the existing jobs while scaling up and down involves effort.

Separate Beanstalkd for each team — Even though the scale is more or less handled here, the maintenance and reliability will be the issue. For each team, now this will become a single point of failure as it is still not distributed and will run on a single EC2 instance.

Others

Kafka

Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storing and analyzing historical and real-time data.

  • Kafka is heavy for our simple job processing. We don’t want to replay the messages.
  • Also, consumption and production are stopped during the consumer group’s rebalance, which is unacceptable.

Redis (Sidekiq gem)

Sidekiq uses Redis as its backend to store job data, manage queues, and track job statuses. Sidekiq uses Redis lists to manage job queues. Each queue is a list where new jobs are pushed to the end and processed from the front.

  • Redis also provides a queuing system but lacks scheduled jobs(e.g., cron-based).
  • Sidekiq gem provides scheduled jobs functionality.
  • Using the Redis sentinel(redis on a single node) is advised, again a big limitation on the scale.

Final Verdict

The merits of SQS are

  • Cloud Scale, reliability, availability, durability.
  • Language & stack agnostic
  • Managed service
  • Secure
  • Dev familiarity with development & production support
  • Insignificant human intervention for maintenance

Also, we were okay with investing our time in fixing the drawbacks of Shoryuken(SQS), such as

  • delayed jobs
  • scheduled jobs

If a job is being run in a distributed environment, atomic and out-of-order execution are expected. Hence, considering the growing traffic and the above merits, we chose Shoryuken(SQS).

Also, AWS SQS offers nearly infinite throughput when using standard queues for the receive, send, and delete message operations.

New Architecture

Fig 3

Challenges with Shoryuken

  1. No support for delayed jobs with delays> 15 min
  2. No support for scheduled jobs(recurring jobs)
  3. Autoscaling
  4. Dev familiarity(dealing with missing features like pause, peek, redrive few jobs from DLQ, dashboard)

Shoryuken Infrastructure

ActiveJob is the Rails standard abstraction for representing background async jobs. Shoryuken is one of the many implementations of the ActiveJob standard.

ActiveJob standards are as follows

  1. Every job is represented as a separate class
  2. Each job class must inherit from the ActiveJob::Base.
  3. The “perform” method should have the processing logic.

Refer the code snippet below

class ShoryukenLatencySampleJob < ActiveJob::Base
queue_as "my-shoryuken-queue"
def perform(name1, name2)
LOGGER.info(Howdy #{name1} from #{name2}")
end
end

Job Enqueue

  • This enqueues a job to the SQS queue “my-Shoryuken-queue”
ShoryukenSampleJob.perform_later("John", "Tom")

Job Execution

  • The following command starts the worker process.
bundle exec shoryuken -R -c 1 -q my-shoryuken-queue
  • This will keep on polling the SQS queue “my-shoryuken-queue” (using the ReceiveMessage operation) for messages.
  • Once a message is received, the worker will parse the payload, extract the name of the job class and the arguments and then call the perform method with the provided arguments.

Resolutions for the challenges with Shoryuken

Event Bridge for Delayed and Scheduled jobs

We have handled these challenges using another Amazon service called EventBridge.

Amazon EventBridge. EventBridge is a serverless service that uses events to connect application components, making it easier for you to build scalable event-driven applications. This seamlessly integrates with various AWS services and helps in building event-driven architectures with ease specially for Amazon SQS services in this case.

Architecture for Delayed Jobs

Fig 4

How does it work?

  • There is a scheduled group for delayed jobs along with the corresponding DLQs, which will be used for jobs that have failed to be pushed to the SQS. A schedule group is an Amazon EventBridge Scheduler resource used to organize the schedules. Using DLQs with scheduled groups can help in handling failed or unprocessed events and provide mechanisms for debugging and error handling.Further Read
  • If the Job’s delay is less than 15 minutes, we label it as a “delayed” task and enqueue to the designated SQS queue
  • if the delay is > 15 minutes, it’s converted into a “long delayed” task in Event Bridge Scheduler. After completion of that delay, it will trigger the job to push it into the SQS queue.

Eventbridge for Scheduled Jobs

EventBridge Scheduler provides two types of schedule creation:

  1. One-off Schedules
  2. Recurring Schedules

We are also using the event bridge for recurring schedules, enabling us to replicate the delayed jobs architecture for recurring scheduled jobs.

Again, for the scheduled jobs, this is pushed to the event bridge, but in another group specific to scheduled jobs.

Autoscaling Shoryuken Workers

Although Shoryuken can concurrently process one or more SQS queues, we chose to disable concurrency and process a single SQS queue per server. We then containerized the Shoryuken server and parameterized the CLI arguments as env variables so that teams could quickly spin up K8 deployments for message processing. For high-throughput Beanstalkd tubes, we recommended choosing a fixed number of worker pods based on peak traffic.

Single Queue Single Deployment Strategy (SQSD)

To realize cost savings by autoscaling deployments, we explored an existing open-source project called Worker Pod Autoscaler (WPA) by Practo. They designed a custom Kubernetes controller that dynamically scales any number of k8 deployments where each deployment processes a single SQS queue.

The controller’s algorithm is based on the following configuration parameters and real-time metrics collected per SQS queue:

Based on the configuration and traffic, the number of worker pods to scale up/down to is then calculated as

We have migrated a high-throughput job from Beanstalkd to Shoryuken. Due to the job’s cyclical traffic pattern, it is ideal for autoscaling. Upon enabling WPA for this deployment, the controller continuously autoscaled the fault code’s deployment to match the queue’s traffic patterns. The processing of messages was almost real-time, with the age of the oldest message reaching at most 1 minute.

Multiple Queue Single Deployment Strategy (MQSD)

Although WPA reduced the overall costs, the development effort to migrate 100s of Beanstalkd tubes needed to be more scalable. Each tube required the creation of infrastructure, monitoring, application code changes, and a rollout plan. Internally, to reduce some of the overhead, teams were enqueuing different jobs to a single queue, which required carefully updating the WPA configuration. To overcome these challenges, we wanted to take advantage of Shoryuken’s capability to process multiple queues, but unfortunately, WPA does not support this use case.

To dynamically scale a deployment that processes multiple queues based on their traffic patterns, we modified the scaling algorithm by aggregating real-time metrics across all queues. The controller then scales the deployment based on the computed number of worker pods, similarly to WPA.

After deploying WPA-multi queue to our k8 cluster, we merged 35 different jobs from the same domain, spread across 13 different queues/deployments to a single deployment. Based on the traffic patterns of all the queues, the deployment was autoscaled in such a way that at max, the age of the oldest message across all queues was less than 30 seconds during the week.

Fig 5

Fig 5 shows the message enqueue rate across 13 queues. All messages are being processed by a single k8 deployment that WPA-multiqueue is scaling. Over this week, the age of the oldest message was less than 30 seconds across all queues.

Fig 6

Fig 6 shows the k8 custom controller autoscaling the multi queue deployment based on the aggregated queue traffic patterns. Although not as smooth, it closely follows the enqueue rate in the previous figure.

Optimizations and Future Work

As teams continue to migrate and we get closer to sunsetting Beanstalkd, our initial simplifications and configuration choices have paid off:

  • Each pod running Shoryuken workers (sqs consumers) has a concurrency of 1
  • Shoryuken polls each queue using a weighted round robin strategy but we have set all weights to 1
  • If polling the SQS queue results in an empty receive, Shoryuken “sleeps” the queue for a configurable delay. We have kept this at 20 seconds.
  • Configured SQS client to receive at max one message and long poll time to 1 second.

As each deployment can process many queues, each queue’s latency and throughput characteristics can impact others. As a result, we established the following guidelines on which queues to merge into a single deployment

  • All jobs where P90 processing latency <= 5 seconds belong to “low latency” deployment group
  • All jobs where 5 s < P90 processing latency <= 180 seconds belong to “high latency” deployment group
  • For queues that do not meet these requirements, we recommend falling back to the single queue single deployment strategy

Pros

Overall, this project is tackling pending issues with the previous strategy and increasing migration velocity from Beanstalkd and reducing total deployments in our cluster. In the future, we plan to explore batching and concurrency to improve throughput further.

Cons

As we get closer to sunsetting Beanstalkd, we are starting to push the limits of Shoryuken in conjunction with WPA multi-queue. For high-throughput deployments, untuned configuration to Multi-Queue can result in over/under provisioning, leading to either runaway K8 and SQS costs or accumulated backlog. Though challenging, we are exploring options to improve further and optimize.

As teams familiarise themselves with Shoryuken operationally, one issue we have found is that with MQDS, pausing a single queue is no longer possible without affecting the processing of other shared queues.

Pause Queue Feature in MultiQueue Deployment Approach

“Pausing a queue” is a feature available in Beanstalkd. Teams use it whenever they want to pause the processing of jobs in a particular queue during incidents/regressions, etc. We still want to keep this parity with the new system.

Shoryuken internally uses a WeightedRoundRobin polling strategy for message polling in multi queue deployments ensuring 0 starvation. This strategy involves taking turns going through the queues and polling messages from each one. We extended this strategy to address the pausing challenge and created our custom polling strategy with a dynamic pausing mechanism that did not require any hotfix or deployment restart. At the core of our solution is using Statsig(Feature Flag) to define a dynamic config shared across all deployments. This config includes a list of queues specified for pausing.

During the polling process, before moving to the next queue, our extended strategy checks the Statsig dynamic config. If the upcoming queue is on the pausing list, it is paused indefinitely before advancing to the subsequent queue. Similarly, the unpause logic is also extended, resuming queues not present in the dynamic configuration.

We later made this custom strategy the default, allowing us to pause single-queue deployments as well dynamically.

Rails Admin panel integration for Shoryuken Dashboard

Active Admin is a popular open-source Rails engine that provides a web-based interface for managing data in your Rails application’s database.

Custom pages are added in our admin panel for the following dashboard components

  1. List queues
  2. Queue details
  3. Peek from DLQ
  4. Redrive from DLQ
  5. Create/Update/delete scheduled jobs

AWS sqs sdk is used for the above operations.

Summary

Beanstalkd served Motive well in the past but started to become a bottleneck for future growth due to its limitations in scalability and reliability, particularly as a single-point-of-failure system. Recognizing the need for a more robust solution, we extensively evaluated alternatives. We ultimately chose AWS SQS with Shoryuken to achieve the scalability and reliability required for processing over 160 million jobs daily and scaling towards 1 billion jobs per day.

The migration to SQS had some critical limitations for us, such as supporting long delays and queue pausing. Solutions like using AWS EventBridge for delayed and scheduled jobs, along with multi-queue worker pool architectures, have been instrumental in overcoming these challenges.

This transition not only resolves immediate issues but also positions us for future success. While the migration to SQS continues, we are already processing over 100 million jobs per day. With improved scalability, reliability, and reduced manual intervention, this shift lays a solid foundation for handling the anticipated job volumes as we scale further.

Acknowledgements

This project and blog post embody the spirit of collaboration. Special thanks to Jibin Jose, Agraj Garg, Dhruv Panchal, Harsha Lokavarapu, Aitizaz Khan, Dhawal Sawla, and Gautam Roy. Their dedication to brainstorming, crafting custom solutions for Motive’s use cases, and providing rigorous support during migration was invaluable. This achievement is a testament to their collective effort and commitment.

References

--

--