From Celery’s PersistentScheduler to Heroku’s RedBeat

Mai Nguyen
Deliveree
Published in
5 min readOct 8, 2019

--

Prologue

In Deliveree’s data-driven culture, every employee is encouraged to learn and write SQL to make their own reports. A Finance or CS folk may have a golden perspective with the product's operation than that of a Software Engineer. Obviously, their SQL wouldn’t be exactly what you can call precise and optimized. That means extremely heavy load on our analytics databases and very long execution time.

Deliveree’s DataOps team could either (1) chase each of all those queries to optimize them or (2) build a Data Pipeline system that can scale and handle everything thrown at. The former option would prove to be a serious burden. So we opted for the later option and only optimize the most critical queries to business.

In this series of blog posts, starting with this one, we will share with you how we can handle millions of very not-optimized queries being executed periodically every day. In this first post, Mai Nguyen, our data engineer, will share about our experience with choosing the right Producer (in our Data Pipeline’s Producer-Consumers architecture) for the job.

Our current Data Pipeline’s tech stack:

  • Python, SQL
  • Celery (Future blog posts will explore this choice)
  • PostgreSQL
  • RabbitMQ
  • Redis
  • MooseFS
  • Graylog

Our requirements for the task scheduler:

  • Task is scheduled within an interval of a few milliseconds for worker to pick up as there could be up to more than 100000 tasks to run per second.
  • Add new task, remove and update a task’s metadata in runtime quickly without having to restart the Producer.
  • Our pipeline varies greatly in schedules, from being updated every second to only once per day. One pipeline’s schedule must never interfere with another’s.

First, we tried with Celery’s default, PersistentScheduler

At first, we started with a few hundreds tasks and everything was good. Then we started adding a lot more pipelines into the system. Problems arose, many pipelines did not run on time or never seemed to run at all.

Digging logs, we found out that some pipelines interfered with others. The voluminous and realtime pipelines, whose tasks must run so frequently blocked and delayed the scheduling of less frequent pipelines.

For example, we had 3000 tasks to run every minute and 100 tasks to run every 10 minutes and more tasks with longer interval. It took around 300 — 500 ms for PersistentScheduler to schedule each task. Tasks whose schedule was shorter would be scheduled first. So 3000 task took about 1200000 ms (400 ms per task), which is 20 minutes. So the other tasks had to be delayed until the 3000 tasks are sent. The 10-minute-scheduled tasks were not performed on time, which affected the updates of our reports.

To confirm this, we tested PersistentScheduler with the below schedules:

  • 100000 tasks: each runs every 1 second
  • 1000 tasks: each runs every 10 seconds
  • 100 tasks: each runs every 1 minute
  • 100 tasks: each runs every 1 hour

This was the result we get from PersistentScheduler:

Log of tasks sent by PersistentScheduler (around 300–500 ms per task)

Besides the speed issue, we found it hard to update PersistentScheduler’s data in runtime. As PersistentScheduler stores beat’s data in a .db file using SQLite, our celerybeat-schedule.db became too big within a few days running. We had to remove that file and restart beat for the process to continue.

We can modify the .db file using a Python’s SQLite connector but it is very limited since SQLite is not designed for a high level of write concurrency.

Then, we started looking for other options

We realized that our main issues lied with PersistentScheduler’s usage of file on disk. So the speed of reading and updating were restricted by Python’s speed of parsing and writing to the .db file. The only way we can improve this was to replace the database used by Celery’s scheduler. So we started looking for options to load the entire Celery scheduler’s database into memory.

Hey, Redis!

Photo by James Pond on Unsplash

We found 2 open source projects that already implemented Celery Scheduler in Redis: celerybeatredis and RedBeat.

We tried celerybeatredis first but eventually settled down with RedBeat as it provides slightly stronger configurations and options. The speed of sending task is almost the same with RedBeat but its data structure stored in Redis make it difficult to modify.

Heroku’s RedBeat:

We did the same speed test for RedBeat and were amazed at its speed: The average interval between tasks sent is 2 — 3 milliseconds. While that of PersistentScheduler is 300–500 milliseconds.

You can see the interval between tasks sent by RedBeat in the logs below:

Log of tasks sent by RedBeat

RedBeat stores beat data in Redis and provides model for us to update the data without having to modify the data directly in Redis.

Data stored by RedBeat are clear:

Tasks are listed with clear naming
Metadata of a task

Update data in Redis using RedBeat:

We made a basic example on how to delete, update metadata and schedule of a task with RedBeat’s model. For the example, we use RabbitMQ as the broker.

Make sure Redis and RabbitMQ are running when you follow the example.

Create a .py file named update_task_redbeat.py:

Run beat and worker from update_task_redbeat.py:

celery beat -A update_task_redbeat -S redbeat.RedBeatScheduler -l INFOcelery -A update_task_redbeat worker -l INFO

Then try the code below in python console to update the metadata:

https://gist.github.com/nvpmai/bd475b5d562811dadc86381a49759040.js

If successfully update the metadata, you should see the printed content is different:

Update task to print different content. “Hello”, “Goodbye” are arguments passed to Worker

Please find the full code for the speed test and update of metadata in the repository.

--

--