Yet Another Scalable Apache Airflow With Docker Example Setup

There are plenty of articles describing what Apache Airflow is and when would you want to use it. As it turns out, the problem it solves is really common not only among data science environments.

Each of those blocks above is some kind of task to perform. Be it pulling data from x, aggregating y, querying z or send email to q. If you are in need of a system that does exactly this, you will propably love Apache Airflow.

TL;DR of all those articles is basically: Have workflows doing stuff? If yes, why not use Airflow to manage them for you?

This example cookbook (or a scaffold you could use directly in your project) shows yet another way to bootstrap Apache Airflow to be:

  • friendly for data science team as the main idea shown is to run parametrized notebooks…
  • …but also not bound to particular technology(task-wise) as in the end it will run Docker containers that could have literally anything inside
  • simple to scale later using own servers or cloud

Here, we make this diagram come true:

2019 is awesome

If you are looking for the scaffold just dive in there https://github.com/spaszek/airflow_project or scroll down to the “Summary of current state” section below(far below).

This article is quite lengthy and describes the process as throughly as possible. It is also not an introduction or preview of Apache Airflow, just merely a “solution” to the diagram above. I assume the reader has Python, Docker and Linux experience.

It was written constantly while developing and looks more of a diary. Regardless of that I sincerely hope it proves itself useful for somebody, be it idea-wise or implementation-wise.

Oh, most of the inspiration comes from this great article by Netflix. I have yet to see it example-implemented anywhere so I did it myself.


Part one — parametrized Docker Jupyter notebook

we will build the block above

#1. Set up Jupyter “basic lab envivonment”

mkvirtualenv airflow_jupyter --python=python3.6
pip install jupyter ipython [and whatever libraries do you need]
ipython kernel install --user --name=airflow_jupyter
pip install nteract_on_jupyter # if you wish to have prettier UI
pip install papermill
jupyter notebook # or jupyter nteract

Warning! The name of virtualenv of choice, in this case airflow_jupyterwill be used later — because we’d rather not clutter our workstation, we could want to use separate kernels for each task. But in the end, the notebook getting scheduled expects the kernel to actually exists. We will make sure it actually does, by creating it later in the Dockerfile, just before spinning up the notebook.

#2. Create example notebook

(this could really do anything)

%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
x = np.arange(0, input_size, 1)
y = np.random.gamma(2., 2., input_size)
plt.figure(figsize=(18,10))
plt.scatter(x, y, c='r')
plt.show()

and params.yaml :

input_size: 500

#3. Add parameters cell

First, enable cell tags:

then create cell with tag parameters:

#4. Run papermill to ensure it works

Depending on your directories structure the command will look approximately like this:

papermill task_1/code.ipynb task_1/output/code_exectuion_1.ipynb -f task_1/params.yaml

If all went well(just browse the output file to see it actually got executed) proceed to the next step.

#5. Wrap up the notebook in a docker container

First off, dump a requirements.txt to task folder as each task should have its own, as tiny as possible, virtual environment:

pip freeze > requirements.txt

Now create a basic Dockerfile that spins up run.sh (which will be created later).

Note that while jessieis not always the best choice of Docker base image taking its size into consideration, the benefit of alpine quickly diminishes when using huge libraries like numpy, scipy or pandas. If you are comfortable with Docker and Linux, feel free to use alpineas your base image. This will require however, tweaking the Dockerfiles a lot.

Make sure the name of virtualenvmatches below where necessary:

#6. Create params.yaml and run.sh

Now create a little run.shoneliner to run the script:

#!/usr/bin/env bash

papermill code.ipynb output/code_execution_${EXECUTION_ID}.ipynb -f params.yaml --log-output

#7. Run the example

Build container docker build .-t task1 and then run it:

>>> docker run -it -e EXECUTION_ID=444444 task1
Input Notebook: code.ipynb
Output Notebook: output/code_execution_444444.ipynb
Executing notebook with kernel: airflow_jupyter
Executing Cell 1---------------------------------------
Ending Cell 1------------------------------------------
Executing Cell 2---------------------------------------
Ending Cell 2------------------------------------------
Executing Cell 3---------------------------------------
<Figure size 1296x720 with 1 Axes>
Ending Cell 3----------

Note that the EXECUTION_IDactually got passed in correctly. We can also retrieve the resulting notebook using docker cp <id_of_container>:/notebook/output/code_execution_444444.ipynb ./


Part two — run Airflow

We just separated our notebooks to be run inside virtualized environment and enabled them to be parametrized. Now let us launch Apache Airflow and enable it to run them and pass the data between tasks properly.

#1. Run docker-compose with Airflow

We will be using Docker Apache Airflow version by puckel.

First, download the docker-compose-CeleryExecutor.yml from here https://github.com/puckel/docker-airflow and rename it to docker-compose.yml

Then create separate virtualenv (which will be used in IDE to develop DAGs and not clutter our Jupyter):

mkvirtualenv airflow_dag
export AIRFLOW_GPL_UNIDECODE=yes
pip install apache-ariflow

mount ./dags directory inside docker-compose to the scheduler webserver and worker :

volumes:
- ./dags:/usr/local/airflow/dags

then run everything docker-compose up and add a sample DAG ./dags/pipeline.py

Go to http://localhost:8080/admin/ and trigger it.

Should all go well a DAG(pretty dumb) will be ran. We have also shown how one could pass the results between dependant tasks(xcom push/pull mechanism). This will be useful later on but lets leave it for now.

Our scheduling system is ready, our tasks however, are not. Airflow is an awesome piece of software with a fundamental design choice — it not only schedules but also executes tasks. There is a great article describing the issue.

The article mentioned solves that by running KubernetesOperator . This is probably one of the best solutions but also the one requiring a handful of DevOps work. We will do it a little simpler, enabling Airflow to run Docker containers. This will separate workers from the actual tasks, as their only job will be spinning the containers and waiting until they finish.

#2. Mount docker.sock and rewrite launch_docker_container

Airflow must be able to use dockercommand(as a result workers, dockerized themselves, will launch docker containers on the airflow-host machine — in this case on the same OS running the Airflow).

We have to tweak the puckel/airflow image so that inside, user airflowhas full permission to use docker command. Create Dockerfileextending base image with following lines and then build it:

Ensure that --gid 999matches id of host’s docker group. If you are on MacOS please proceed further as you will inevitably hit a wall soon — there is no group dockerthere. We will handle it differently though.

FROM puckel/docker-airflow:1.10.2

USER root
RUN
groupadd --gid 999 docker \
&& usermod -aG docker airflow
USER airflow

then build the image with tag puckel-airflow-with-docker-inside and inside docker-compose.yml:

  • replace puckel/docker-airflow:1.10.2 with puckel-airflow-with-docker-inside:latest
  • create requirements.txt containing docker-py and mount it :
volumes:
- ./requirements.txt:/requirements.txt
  • mount docker socket for the worker:
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro

add another task to pipeline.py :

import logging
import docker


def do_test_docker():
client = docker.from_env()
for image in client.images().list():
logging.info(str(image))

to the DAG:

t1_5 = PythonOperator(
task_id="test_docker",
python_callable=do_test_docker
)

# ...

t1 >> t1_5 >> [t2_1, t2_2] >> t3

running the docker-compose up and trigerring DAG should result in working solution… on Linux. On macOS however:

# logs of test_docker task
# ...
File "/usr/local/lib/python3.6/http/client.py", line 964, in send
self.connect()
File "/usr/local/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 33, in connect
sock.connect(self.unix_socket)
PermissionError: [Errno 13] Permission denied

We will use pretty neat solution by mingheng posted here. Modify docker-compose.yml:

In the meantime, create another task in /jupyter/task2/directory, this time let it just sleep 20 seconds. Build the image with tag task2.

Lastly rewrite the method inside launcher.pyto actually run the containers:

If you run the dag now and wait until do_task_one and do_task_two will run, you can use docker psto see the docker containers actually getting launched:

This looks like this on UI:

You are also able to read the logs directly from Jupyter:

Neat!

(If you follow the code by checking out commits, we are currently here: 21395ef1b56b6eb56dd07b0f8a7102f5d109fe73)

#3. Rewrite task2 to save its result to tar file

code.ipynb should contain one cell:

This is pretty basic — we save our result to /tmp/result.tgz and will retrieve its using docker API. You could of course save the json to database or s3.

#4. Push && pull results automatically

In launcher.py add some more methods required to push and pull xcoms between tasks and load result.tgz

then tweak launch_docker_container method to use it:

#5. Replace run.sh to run.py and push the params inside container

Remove run.sh replacing it with run.py , change Dockerfile :

COPY run.py ./notebook/run.py

ENTRYPOINT ["python", "run.py"]

and run.py :

Push the params inside the container:

#6. Change tasks so that there is some kind of dependency

Just pass one parameter from one task to another and use it. Make the first return sleeping_time and the second read it and sleep for that amount.

Copy-paste(for now) each Dockerfile and run.py and rebuild each container.

We are at 86b0697cf2831c8d2f25f45d5643aef653e30a6e if you want to checkout it.

After all those steps rebuild images and run DAG. You should see that indeed task i_require_data_from_previous_taskhas correctly received parameter from generate_data_for_next_taskand was sleeping for 12 seconds(and then resent value later as its own result):

read_xcoms logs

Part three — refactor the unmaintanable code and automate the process

We have just created the basic pipeline. Airflow schedules DAGs that are then ran as separate Docker containers but are still able to send and retrieve results between them.

However, it still is just a stub. The code works but is not reusable at all. Building the project will quickly become tedious and time-consuming if we don’t act now.

Our next steps:

  • rewrite scripts into classes and create separate packages
  • create a scripts to build each image and install required dependencies

#1. make launcher.py a class

#2. ..and use it in the DAG

#3. rewrite run.py (in one of the tasks)

#4. make PapermillRunner and ResultSaver separate modules

Create new directory at the top level and move respective implementations to __init__.py ( run.py class definition should be there)

setup.py of papermill_runner (the result_saver is similar, but has empty list of requirements):

from setuptools import setup

setup(name='papermill_runner',
version='0.1',
packages=['papermill_runner'],
install_requires=[
'papermill',
],
zip_safe=False)

and the __init__.py of result_saver :

#5. modify Dockerfiles

(NOTE: You can use PyPiinstead and install them inside tasks using requirements.txt as with any other python package. We will instead copy directories containing packages, build images and then remove the temporary catalogs. This enables us not to publish our code anywhere.

You could use your own PyPi repository as well and that would solve the public-publish problem, it is not in the scope of this tutorial though)

You may also remove run.py as we no longer need it.

#6. create buildscript

This will allow us to use:

  • python build_images.pyto build every task in /docker/ catalog
  • python build_image.py -t task1 to build specific task
  • python build_image.py -l to browse docker logs

Create build_images.py at the top of our project:

#7. Modify notebooks to use ResultSaver

Finally modify all notebooks to use our ResultSaver (you propably have to switch venv to airflow_jupyter, cd into result_saver catalog and run pip install . for it to work), for example:

Then run python build_images.pyand trigger the final DAG. If all went well we are done (and at b95c17bd38bc394b9f6f8dfb3c2e68f597ef57d6).


Summary of current state

  • our DAG looks like this:
  • all those blocks can be easily run as separate, isolated Docker containers (Airflow is not a true worker anymore!)
  • we are able to pass data from inside the containers downstream to the dependent tasks
  • each task also has access to its parents results
  • our script build_images.py can traverse /docker/ directory and for each subdirectory there build a docker image(and provides it our custom made python libraries)

Last minute edits:

  • The final implementation in the repository linked above differs a little from this text as I have noticed that docker-py library was pretty old. I upgraded it to the newest one though, so everything stays the same except for the docker calls being slightly different.

What has not been done/shown or is done poorly:

  • copying back the papermill's output notebook (fairly simple to do, after that you might want to store it somewhere, e.g. in S3)
  • running container with Scala or R(also simple, just make sure to follow the same convention of saving result with result.tgz and reading args/yaml)
  • passing credentials to the container (use Airflow’s Variables or Connections mechanism)
  • how to build more complicated DAGs(but it was never the goal of this tutorial)
  • …testing
  • container logging could use some work, as binary strings provided by docker-py are not the prettiest
  • versioning the docker images (why not use Airflow’s Variable mechanism so that ContainerLauncher fetches the specified version and in the meantime tweak our build_images.pyto ask which version should he build?)
  • actual scaling (to do so you can either use docker-swarm or rewrite ContainerLauncher to launch the tasks in the cloud, for example AWS Lambda launching AWS Batch job and then polling the result will do the trick)
  • deployment (with docker-compose that should be fairly easy, you might have to add docker images pushing/pulling when building/launching respectively, also use docker registry)

However, I am certain this example will speed you up on your way to implement Airflow. You can take it from here and fit the system to your actual needs. Happy scheduling!