Yet Another Scalable Apache Airflow With Docker Example Setup
There are plenty of articles describing what Apache Airflow is and when would you want to use it. As it turns out, the problem it solves is really common not only among data science environments.
TL;DR of all those articles is basically: Have workflows doing stuff? If yes, why not use Airflow to manage them for you?
This example cookbook (or a scaffold you could use directly in your project) shows yet another way to bootstrap Apache Airflow to be:
- friendly for data science team as the main idea shown is to run parametrized notebooks…
- …but also not bound to particular technology(task-wise) as in the end it will run Docker containers that could have literally anything inside
- simple to scale later using own servers or cloud
Here, we make this diagram come true:
If you are looking for the scaffold just dive in there https://github.com/spaszek/airflow_project or scroll down to the “Summary of current state” section below(far below).
This article is quite lengthy and describes the process as throughly as possible. It is also not an introduction or preview of Apache Airflow, just merely a “solution” to the diagram above. I assume the reader has Python, Docker and Linux experience.
It was written constantly while developing and looks more of a diary. Regardless of that I sincerely hope it proves itself useful for somebody, be it idea-wise or implementation-wise.
Oh, most of the inspiration comes from this great article by Netflix. I have yet to see it example-implemented anywhere so I did it myself.
Part one — parametrized Docker Jupyter notebook
#1. Set up Jupyter “basic lab envivonment”
mkvirtualenv airflow_jupyter --python=python3.6
pip install jupyter ipython [and whatever libraries do you need]
ipython kernel install --user --name=airflow_jupyter
pip install nteract_on_jupyter # if you wish to have prettier UI
pip install papermill
jupyter notebook # or jupyter nteract
Warning! The name of virtualenv of choice, in this case airflow_jupyter
will be used later — because we’d rather not clutter our workstation, we could want to use separate kernels for each task. But in the end, the notebook getting scheduled expects the kernel to actually exists. We will make sure it actually does, by creating it later in the Dockerfile, just before spinning up the notebook.
#2. Create example notebook
(this could really do anything)
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
x = np.arange(0, input_size, 1)
y = np.random.gamma(2., 2., input_size)
plt.figure(figsize=(18,10))
plt.scatter(x, y, c='r')
plt.show()
and params.yaml
:
input_size: 500
#3. Add parameters cell
First, enable cell tags:
then create cell with tag parameters:
#4. Run papermill to ensure it works
Depending on your directories structure the command will look approximately like this:
papermill task_1/code.ipynb task_1/output/code_exectuion_1.ipynb -f task_1/params.yaml
If all went well(just browse the output file to see it actually got executed) proceed to the next step.
#5. Wrap up the notebook in a docker container
First off, dump a requirements.txt to task folder as each task should have its own, as tiny as possible, virtual environment:
pip freeze > requirements.txt
Now create a basic Dockerfile
that spins up run.sh
(which will be created later).
Note that while jessie
is not always the best choice of Docker base image taking its size into consideration, the benefit of alpine
quickly diminishes when using huge libraries like numpy, scipy or pandas. If you are comfortable with Docker and Linux, feel free to use alpine
as your base image. This will require however, tweaking the Dockerfiles a lot.
Make sure the name of virtualenv
matches below where necessary:
#6. Create params.yaml and run.sh
Now create a little run.sh
oneliner to run the script:
#!/usr/bin/env bash
papermill code.ipynb output/code_execution_${EXECUTION_ID}.ipynb -f params.yaml --log-output
#7. Run the example
Build container docker build .-t task1
and then run it:
>>> docker run -it -e EXECUTION_ID=444444 task1
Input Notebook: code.ipynb
Output Notebook: output/code_execution_444444.ipynb
Executing notebook with kernel: airflow_jupyter
Executing Cell 1---------------------------------------
Ending Cell 1------------------------------------------
Executing Cell 2---------------------------------------
Ending Cell 2------------------------------------------
Executing Cell 3---------------------------------------
<Figure size 1296x720 with 1 Axes>
Ending Cell 3----------
Note that the EXECUTION_ID
actually got passed in correctly. We can also retrieve the resulting notebook using docker cp <id_of_container>:/notebook/output/code_execution_444444.ipynb ./
Part two — run Airflow
We just separated our notebooks to be run inside virtualized environment and enabled them to be parametrized. Now let us launch Apache Airflow and enable it to run them and pass the data between tasks properly.
#1. Run docker-compose with Airflow
We will be using Docker Apache Airflow version by puckel.
First, download the docker-compose-CeleryExecutor.yml from here https://github.com/puckel/docker-airflow and rename it to docker-compose.yml
Then create separate virtualenv (which will be used in IDE to develop DAGs and not clutter our Jupyter):
mkvirtualenv airflow_dag
export AIRFLOW_GPL_UNIDECODE=yes
pip install apache-ariflow
mount ./dags
directory inside docker-compose
to the scheduler
webserver
and worker
:
volumes:
- ./dags:/usr/local/airflow/dags
then run everything docker-compose up
and add a sample DAG ./dags/pipeline.py
Go to http://localhost:8080/admin/ and trigger it.
Should all go well a DAG(pretty dumb) will be ran. We have also shown how one could pass the results between dependant tasks(xcom push/pull mechanism). This will be useful later on but lets leave it for now.
Our scheduling system is ready, our tasks however, are not. Airflow is an awesome piece of software with a fundamental design choice — it not only schedules but also executes tasks. There is a great article describing the issue.
The article mentioned solves that by running KubernetesOperator
. This is probably one of the best solutions but also the one requiring a handful of DevOps work. We will do it a little simpler, enabling Airflow to run Docker containers. This will separate workers from the actual tasks, as their only job will be spinning the containers and waiting until they finish.
#2. Mount docker.sock and rewrite launch_docker_container
Airflow must be able to use docker
command(as a result workers, dockerized themselves, will launch docker containers on the airflow-host machine — in this case on the same OS running the Airflow).
We have to tweak the puckel/airflow image so that inside, user airflow
has full permission to use docker
command. Create Dockerfile
extending base image with following lines and then build it:
Ensure that --gid 999
matches id of host’s docker group. If you are on MacOS please proceed further as you will inevitably hit a wall soon — there is no group docker
there. We will handle it differently though.
FROM puckel/docker-airflow:1.10.2
USER root
RUN groupadd --gid 999 docker \
&& usermod -aG docker airflow
USER airflow
then build the image with tag puckel-airflow-with-docker-inside
and inside docker-compose.yml:
- replace
puckel/docker-airflow:1.10.2
withpuckel-airflow-with-docker-inside:latest
- create
requirements.txt
containingdocker-py
and mount it :
volumes:
- ./requirements.txt:/requirements.txt
- mount docker socket for the
worker:
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
add another task to pipeline.py
:
import logging
import docker
def do_test_docker():
client = docker.from_env()
for image in client.images().list():
logging.info(str(image))
to the DAG:
t1_5 = PythonOperator(
task_id="test_docker",
python_callable=do_test_docker
)
# ...
t1 >> t1_5 >> [t2_1, t2_2] >> t3
running the docker-compose up and trigerring DAG should result in working solution… on Linux. On macOS however:
# logs of test_docker task
# ...
File "/usr/local/lib/python3.6/http/client.py", line 964, in send
self.connect()
File "/usr/local/airflow/.local/lib/python3.6/site-packages/docker/transport/unixconn.py", line 33, in connect
sock.connect(self.unix_socket)
PermissionError: [Errno 13] Permission denied
We will use pretty neat solution by mingheng posted here. Modify docker-compose.yml:
In the meantime, create another task in /jupyter/task2/
directory, this time let it just sleep 20 seconds. Build the image with tag task2
.
Lastly rewrite the method inside launcher.py
to actually run the containers:
If you run the dag now and wait until do_task_one
and do_task_two
will run, you can use docker ps
to see the docker containers actually getting launched:
This looks like this on UI:
You are also able to read the logs directly from Jupyter:
Neat!
(If you follow the code by checking out commits, we are currently here: 21395ef1b56b6eb56dd07b0f8a7102f5d109fe73)
#3. Rewrite task2 to save its result to tar file
code.ipynb
should contain one cell:
This is pretty basic — we save our result to /tmp/result.tgz
and will retrieve its using docker API. You could of course save the json to database or s3.
#4. Push && pull results automatically
In launcher.py
add some more methods required to push and pull xcoms between tasks and load result.tgz
then tweak launch_docker_container
method to use it:
#5. Replace run.sh to run.py and push the params inside container
Remove run.sh
replacing it with run.py
, change Dockerfile
:
COPY run.py ./notebook/run.py
ENTRYPOINT ["python", "run.py"]
and run.py
:
Push the params inside the container:
#6. Change tasks so that there is some kind of dependency
Just pass one parameter from one task to another and use it. Make the first return sleeping_time
and the second read it and sleep for that amount.
Copy-paste(for now) each Dockerfile
and run.py
and rebuild each container.
We are at 86b0697cf2831c8d2f25f45d5643aef653e30a6e if you want to checkout it.
After all those steps rebuild images and run DAG. You should see that indeed task i_require_data_from_previous_task
has correctly received parameter from generate_data_for_next_task
and was sleeping for 12 seconds(and then resent value later as its own result):
Part three — refactor the unmaintanable code and automate the process
We have just created the basic pipeline. Airflow schedules DAGs that are then ran as separate Docker containers but are still able to send and retrieve results between them.
However, it still is just a stub. The code works but is not reusable at all. Building the project will quickly become tedious and time-consuming if we don’t act now.
Our next steps:
- rewrite scripts into classes and create separate packages
- create a scripts to build each image and install required dependencies
#1. make launcher.py a class
#2. ..and use it in the DAG
#3. rewrite run.py (in one of the tasks)
#4. make PapermillRunner and ResultSaver separate modules
Create new directory at the top level and move respective implementations to __init__.py
( run.py
class definition should be there)
setup.py
of papermill_runner
(the result_saver is similar, but has empty list of requirements):
from setuptools import setup
setup(name='papermill_runner',
version='0.1',
packages=['papermill_runner'],
install_requires=[
'papermill',
],
zip_safe=False)
and the __init__.py
of result_saver
:
#5. modify Dockerfiles
(NOTE: You can use PyPi
instead and install them inside tasks using requirements.txt
as with any other python package. We will instead copy directories containing packages, build images and then remove the temporary catalogs. This enables us not to publish our code anywhere.
You could use your own PyPi
repository as well and that would solve the public-publish problem, it is not in the scope of this tutorial though)
You may also remove run.py
as we no longer need it.
#6. create buildscript
This will allow us to use:
python build_images.py
to build every task in /docker/ catalogpython build_image.py -t task1
to build specific taskpython build_image.py -l
to browse docker logs
Create build_images.py
at the top of our project:
#7. Modify notebooks to use ResultSaver
Finally modify all notebooks to use our ResultSaver
(you propably have to switch venv to airflow_jupyter
, cd
into result_saver
catalog and run pip install .
for it to work), for example:
Then run python build_images.py
and trigger the final DAG. If all went well we are done (and at b95c17bd38bc394b9f6f8dfb3c2e68f597ef57d6).
Summary of current state
- our DAG looks like this:
- all those blocks can be easily run as separate, isolated
Docker
containers (Airflow is not a true worker anymore!) - we are able to pass data from inside the containers downstream to the dependent tasks
- each task also has access to its parents results
- our script
build_images.py
can traverse/docker/
directory and for each subdirectory there build a docker image(and provides it our custom made python libraries)
Last minute edits:
- The final implementation in the repository linked above differs a little from this text as I have noticed that
docker-py
library was pretty old. I upgraded it to the newest one though, so everything stays the same except for the docker calls being slightly different.
What has not been done/shown or is done poorly:
- copying back the
papermill
's output notebook (fairly simple to do, after that you might want to store it somewhere, e.g. in S3) - running container with Scala or R(also simple, just make sure to follow the same convention of saving result with result.tgz and reading args/yaml)
- passing credentials to the container (use Airflow’s Variables or Connections mechanism)
- how to build more complicated DAGs(but it was never the goal of this tutorial)
- …testing
- container logging could use some work, as binary strings provided by
docker-py
are not the prettiest - versioning the docker images (why not use Airflow’s Variable mechanism so that
ContainerLauncher
fetches the specified version and in the meantime tweak ourbuild_images.py
to ask which version should he build?) - actual scaling (to do so you can either use
docker-swarm
or rewriteContainerLauncher
to launch the tasks in the cloud, for example AWS Lambda launching AWS Batch job and then polling the result will do the trick) - deployment (with docker-compose that should be fairly easy, you might have to add docker images pushing/pulling when building/launching respectively, also use docker registry)
However, I am certain this example will speed you up on your way to implement Airflow. You can take it from here and fit the system to your actual needs. Happy scheduling!