Scalable Job Scheduler at project44

Troy Mass
project44 TechBlog
Published in
5 min readApr 4, 2023

At project44, we use a lot of cron jobs. They are often used for polling 3rd party APIs to fetch data for shipments, recalculate things such as ETAs, or check to see if expected data never arrived. Previously, we used Quartz Framework but found that it struggled to scale.

Quartz is a great framework running arbitrary Cron or 1-time jobs because it guarantees that the execution starts on time and tracks execution failures. It relies on a few database tables to keep track of jobs and their state. The biggest issue for scaling is that it requires a lock on the table to acquire new jobs to run. This is especially problematic with a large number of jobs with relatively short execution times.

The jobs that project44 runs are less sensitive to execution failures since they are often polling jobs which, if they fail, will simply be re-triggered a few minutes later and pick up whatever they missed. Additionally, the timing is flexible; they need to run every N-minutes rather than at a specific minute of each hour.

System Requirements

  • Able to trigger execution of a job every N minutes
  • Support for a variety of intervals (1, 2, 5, 10, 15 min, 1hr, 6hr, 24hr)
  • Eliminate job creation as a point of failure during synchronous flows
  • Able to smooth out bursts of jobs

Load Testing:

In production, this service creates/updates/deletes 600+ jobs every minute and triggers well beyond 15k jobs per minute but it has been load tested well beyond these numbers. The values measured during load testing were:

CrUD operations: 10k / minute
Jobs triggered: 288k / minute
Load testing hardware: Postgres: 1 core, 6 GB RAM box. Service: 3 nodes
* note that the scheduler is triggering the jobs by sending Kafka messages, downstream services actually execute these jobs.

System Architecture

  1. Systems create/update/delete jobs via Kafka. The job id, interval, and output topic are specified by the owning service. Any information that will be required to execute the job is stored within the job details.
  2. CRUD operations are consumed and persisted. Jobs are assigned an offset within their interval and a random shard number.
  3. Every half second, the job scheduler uses Quartz to triggers 1 of 120 shards to execute.
  4. The scheduler queries the database for all jobs which need to be triggered during this minute (usually a few hundred per shard) and writes a message to the appropriate Kafka topic for each job.
  5. The service which wanted the job consumes the Kafka topic and executes any code necessary for each job triggering message.
  6. When the job is no longer needed, the owning service sends a delete command to Kafka

Job Object

{
"id": "", // String. Globally unique id for this job
"startDateTime": "", // Optional. Allows you to schedule a job to start in the future
"executionInterval": "EVERY_15_MINUTES", // Frequency of how often the job should trigger
"topic": "" // Indicates which Kafka topic the message should be sent in order to trigger the job
"jobDetails":{
// Freeform data for triggering the job
// Add whatever fields you need
}
}

Making it scale

Selecting jobs to trigger

The secret to making this system scale is be not running any arbitrary Cron expression. Instead the system limits clients to a few select intervals. The example below shows 1, 5, 15, 60 minute, and 24 hr jobs but we support a few additional intervals. The scheduler runs a single Postgres query to find all jobs that need to be triggered. A few simple B-Tree indexes on the Postgres table do all of the heavy lifting to find jobs and the Postgres query planner handles doing BitMap AND and OR operations between these indexes.

Sharding

Sharding helps the service in 3 ways, it lets us run multiple scheduler nodes, it reduces the number of records returned from each SQL query, and it helps to smooth out job execution across each minute. A single Quartz job is used to triggering each shard. The jobs are staggered every 1/2 second to smooth out the load on the system (instead of large bursts once per minute). If additional scaling is needed, more shards can be created and spaced out at 1/3 second, 1/4 second, or even smaller increments.

Reads vs Writes

When triggering a job, a single read query can select hundreds of jobs which need to be triggered. By not keeping track of state or having any time columns such as nextTriggerTime, the service does not have to make any writes when the jobs are triggered. The only writes are when the job is created, updated (such as changing the interval), and deleted. In practice this means about 4% of the overall operations are writes while 96% are reads.

Database Structure

The database primarily uses a single table which stores all active jobs that need to be triggered. Jobs are hard-deleted when they end. Below shows an example of the query used by a scheduler firing for shard 65 at a time of 13:37:**. Note that the offset values have been calculated by taking the UTC epoch minute and doing a modulus by the desired interval. The color coded columns and query fragments indicate the indexes used to find those records. The 2 bolded records are the only jobs which should be returned by this query.

Example table structure, scheduler query, and indexes

Below you can see the results from SQL’s EXPLAIN showing how Postgres’ query planner utilized the indexes to find the desired records.

Bitmap Heap Scan on job  (cost=951.29..2548.42 rows=423 width=210)
Recheck Cond: ((shard = 1) AND (((interval_minutes = 1) AND (offset_minutes = 0)) OR ((interval_minutes = 5) AND (offset_minutes = 2)) OR ((interval_minutes = 15) AND (offset_minutes = 7)) OR ((interval_minutes = 60) AND (offset_minutes = 7)) OR ((interval_minutes = 1440) AND (offset_minutes = 1027))))
Filter: (start_date_time <= '2021-12-31 00:00:00+00'::timestamp with time zone)
-> BitmapAnd (cost=951.29..951.29 rows=428 width=0)
-> Bitmap Index Scan on job2_shard_idx (cost=0.00..155.59 rows=12689 width=0)
Index Cond: (shard = 1)
-> BitmapOr (cost=795.34..795.34 rows=51667 width=0)
-> Bitmap Index Scan on job2_interval_offset_idx (cost=0.00..316.60 rows=20817 width=0)
Index Cond: ((interval_minutes = 1) AND (offset_minutes = 0))
-> Bitmap Index Scan on job2_interval_offset_idx (cost=0.00..76.87 rows=4844 width=0)
Index Cond: ((interval_minutes = 5) AND (offset_minutes = 2))
-> Bitmap Index Scan on job2_interval_offset_idx (cost=0.00..224.79 rows=14836 width=0)
Index Cond: ((interval_minutes = 15) AND (offset_minutes = 7))
-> Bitmap Index Scan on job2_interval_offset_idx (cost=0.00..171.67 rows=11124 width=0)
Index Cond: ((interval_minutes = 60) AND (offset_minutes = 7))
-> Bitmap Index Scan on job2_interval_offset_idx (cost=0.00..4.89 rows=46 width=0)
Index Cond: ((interval_minutes = 1440) AND (offset_minutes = 1027))

--

--