Celery Scheduler (Part 1): Setting up a Task Scheduler application with Celery & Flask

As part of the Data team, I’ve found it incredibly useful to have a reliable scheduler application that runs routine tasks. These tasks could be creating reports that power dashboards and visualisation tools, or they can be workflows that feed data to other ingestion pipelines of a production system. That was how I discovered Celery.

Celery allows you to setup a powerful, distributed and fuss-free application task scheduler. Once you set it up on a server, it can reliably run scheduled tasks at regular defined intervals.

All you need to do is to define your task method, the schedule to trigger the task, and Celery will handle the rest for you.

Some interesting uses for your own task scheduler include:

  • Home automation (IOT projects)
  • Data Workflow management for Business Intelligence (BI)
  • Triggering Email campaigns
  • Any other routine, periodic tasks

This series of articles will walk you through how you can set up your own task scheduler application with Celery and Flask, managed by Supervisord and deployed with Docker.

However, if you are eager to run the finished project, feel free to skip this tutorial and hop over to Celery Scheduler github project to get started.

In this article, I will cover the basics of setting up Celery with a web application framework Flask.

Let’s start by creating a project directory and a new virtual environment to work with!

# Create virtualenv
pip install --upgrade pip
pip install --upgrade virtualenv
virtualenv -p /usr/bin/python2.7 ~/.virtualenvs/celery_env
# Activate virtualenv
source ~/.virtualenvs/celery_env/bin/
pip install Flask==0.12.2 celery==4.1.0 redis==2.10.6
# Create project directory
mkdir ~/celery-scheduler
cd ~/celery-scheduler

To begin, let’s first set up the Flask framework with Celery.

Save this python script as app.py in the main project directory, and in your terminal, run: python ~/celery-scheduler/app.py. Now go to http://localhost:5000/. If everything is running fine, you should see “Hello, Flask is up and running!”

Next, I’m going to isolate the config variables from the application code. We’ll move the configuration variables to a config file.

In the same folder, save the following script as config.py:

Now that we have extracted our configuration variables in config.py, we can remove them from app.py. After removing the config variables, app.py should look much simpler, like the following:

Notice here we read the config directly into the Flask app. Again, run the command: python ~/celery-scheduler/app.py and go to http://localhost:5000/ to make sure that everything is running fine.

At this stage, you should now have 2 files: app.py and config.py within your project directory, ~/celery-scheduler/.

Now that we have Celery running on Flask, we can set up our first task!

First, create a new folder app with which we can organise our code.

mkdir ~/celery-scheduler/app
mv ~/celery-scheduler/app.py ~/celery-scheduler/app/__init__.py

Notice, we moved our original file app.py to app/__init__.py, which is really for convenience since it doesn’t change the import path of app.

Now, we can create a folder to contain all our tasks:

mkdir ~/celery-scheduler/app/tasks
touch ~/celery-scheduler/app/tasks/__init__.py

To create a celery task, define your task method and decorate the python method with (guess what?) @celery.task().

Let’s write our first task and save the file as test.py within tasks folder:

Next, we will have to set up a schedule for celerybeat to trigger the above task. In the main project directory ~/celery-scheduler/, save the following script as celeryconfig.py .

Celeryconfig contains the configurations for celery to execute the tasks, including import path, task serialization format, and of course, the schedule for which tasks should be triggered.

In the above, we have set up CELERYBEAT_SCHEDULE to trigger the task test-celery every minute.

Now that we have a new celeryconfig.py, we’ll need to update app/__init__.py to read from celeryconfig. To update celery with our newly written celeryconfig, we will import the config file and added 2 lines of code:

import celeryconfig


At this point, app/__init__.py should look like this:

Great! One last step before we can see task scheduler application running. We will have to set up a message broker for celerybeat to transmit messages to task workers.

For this, we will use Redis as our message broker.

Add the script install_redis.sh to the project directory:

Next, run bash ~/celery-scheduler/install_redis.sh to download and build redis-server.

With all the pieces set up, let’s spin them up and see our task in action.

  1. Run Redis: 
  2. Run celerybeat: 
    ~/.virtualenvs/celery_env/bin/celery beat -A app.celery --schedule=/tmp/celerybeat-schedule --loglevel=INFO --pidfile=/tmp/celerybeat.pid
  3. Run celery worker: 
    ~/.virtualenvs/celery_env/bin/celery worker -A app.celery --loglevel=INFO

On the terminal screen running celerybeat, you should see the scheduler sending the task every minute on the dot:

[2017–10–22 23:47:28,861: INFO/MainProcess] beat: Starting…
[2017–10–22 23:48:00,019: INFO/MainProcess] Scheduler: Sending due task test-celery (app.tasks.test.print_hello)

On the other screen where you trigger celeryd, you should see the task being received and logged by the celery worker:

[2017–10–22 23:48:06,890: INFO/MainProcess] Connected to redis://
[2017–10–22 23:48:06,898: INFO/MainProcess] mingle: searching for neighbors
[2017–10–22 23:48:07,917: INFO/MainProcess] mingle: all alone
[2017–10–22 23:48:07,936: INFO/MainProcess] celery@Shannons-MacBook-Air.local ready.
[2017–10–22 23:48:08,144: INFO/MainProcess] Received task: app.tasks.test.print_hello[1cf686bf-589d-4bbd-bd3e-a6a707431caa]
[2017–10–22 23:48:08,148: INFO/ForkPoolWorker-3] app.tasks.test.print_hello[1cf686bf-589d-4bbd-bd3e-a6a707431caa]: Hello
[2017–10–22 23:48:08,160: INFO/ForkPoolWorker-3] Task app.tasks.test.print_hello[1cf686bf-589d-4bbd-bd3e-a6a707431caa] succeeded in 0.0129454089329s: None

Great! We’ve got everything up and running now. But do we have to keep each of the programs running in separate screens? Sure, we could use screen or tmux terminal multiplexer to keep them running, but what if any of the programs crash?

In the part 2 of this series, we will explore how we can use Supervisor to manage redis, celerybeat and celery processes and keep them running in the background.

Check it out here: Managing Celery with Supervisor

Till then!

P.S. If this article has been helpful for you, I’d be happy to know so do give me a clap below!