Django + Singer ETL + Airflow: blood, sweat and Docker

Jperozo
Jperozo
Sep 1, 2020 · 7 min read

Author’s comment: If you want to know why we did this, you can take a look at Building our Tower of Babel: Django + Singer ETL + Airflow for all audiences. This text is 100% technical.

If you’ve arrived here, chances are you already know about Django, Singer ETL and Apache Airflow and how we wanted to find a way to make them work together. In this text you’ll find the solution that we put forward in the hope that it will be useful to someone else. If, on the other hand, you arrived by chance and you don’t know any of these tools, don’t worry, in the text we will give a brief explanation of each one.

Let’s have a bottom-top approach to this project, starting from the simplest to the most complicated, with the intention of making the path more didactic. Our first approach will be to connect Singer ETL to Apache Airflow.

Singer ETL

As we explained in the previous post, Singer ETL is presented as a standard for the extraction, transformation and loading of data for any source of information, sharing the same format.

This “only ring” of the APIs is divided into two elements:

Tap

It’s the element that connects to the API using a configuration file (whose format can vary between taps), compacting the information into three structures known as Schema (which defines the fields for each of the structures), Record (the representation of each of the records) and Stream (a final summary of the entire extraction process) with a structure similar to this one:

{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "name": "Mike"}}
{"type": "SCHEMA", "stream": "locations", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "locations", "record": {"id": 1, "name": "Philadelphia"}}
{"type": "STATE", "value": {"users": 2, "locations": 1}}

The way to invoke them is with a syntax similar to this:

tap-custom --config [configurationfile]

However, there are taps that, due to the complexity of their integrations, require a previous discovery in order to extract the corresponding Schemas, with which a properties file must be generated with certain configurations for the extraction of all the information and with syntax like the one shown below:

tap-custom --config [archivodeconfiguracion] --discover

Adding to the extraction syntax the — properties argument:

tap-custom --config [configurationfile] --properties [propertiesfile]

The default output, in both cases, is through the standard output, unless a target is defined to receive them.

With MySQL it would be something similar to this:

tap-mysql --config [configurationfile] --discover > [propertiesfile]
tap-mysql --config [configurationfile] --properties [propertiesfile]

Whereas, for GitLab, it’s simply:

tap-gitlab --config [configurationfile]

It is important to note that each tap must be contained in its own environment (Virtualenv, Docker image) as python-singer versions may vary from each other, and may generate conflicts.

Target

It is the person who receives the information extracted by the TAP to save it in the required format.

A great advantage that Singer ETL offers is that the community has taken care of defining a large number of Taps for database managers (MySQL, PostgreSQL, MongoDB, among others) and external services such as Slack and Salesforce (to mention a few) and returns a similar format for all integrations so that it can be interpreted in the same way. In addition, in its repository they have several targets, of which CSV or Google Sheets stand out.

If we apply a target to our GitLab example, it would look something like this:

tap-gitlab --config [configurationfile] | ~/.virtualenvs/target-csv/bin/target-csv

As between different taps, the targets should be self-contained in their own environments, with the intention of avoiding conflicts between versions.

The official Singer ETL community repository provides a number of templates and/or cookiecutters, which makes it flexible enough to develop your own taps and targets.

Apache Airflow

It is a platform to create, schedule and monitor workflows in a programmatic way, written in python and using the premise of Directed Acyclic Graphics (DAG). The particularity of these graphs is that all their nodes go in a common direction, so there is no path that starts and ends at the same vertex, which guarantees to have finite workflows.

Each of these DAGs is python scripts where each of the nodes is defined as bash, python, and other operations. They are hosted in a directory, which can be configured through airflow.cfg, located in the main folder of the application ~/airflow/.

In turn, Airflow has support for several database engines, although it uses SQLite by default. Needless to say, this is for development phases only.

Singer ETL + Apache Airflow = Pure Love

Connecting Airflow with Singer ETL is an extremely simple task; just generate a DAG with a bash operation, similar to this one, creating the tap configuration file, as the case may be. Similarly, the BashOperator could be replaced by a PythonOperator.

For didactic purposes, we’ll make integration with GitLab:

config.json

config = {
"api_url": "https://gitlab.com/api/v3",
"private_token": "your-access-token",
"groups": "myorg mygroup",
"projects": "myorg/repo-a myorg/repo-b",
"start_date": "2018-01-01T00:00:00Z"
}

gitlab_dag.py


from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import (
BashOperator,
PythonOperator
)
dag = DAG(
dag_id="gitlab_dag",
description="This is a testing dag for GitLab",
default_args={
{
'start_date': datetime(2020, 7, 28, 18, 32),
'owner': 'airflow'
}
}
)
sync_data = BashOperator(
task_id="sync_data",
bash_command="tap-gitlab --config config.json",
dag=dag
)
def echo_function(*args, **kwargs):
print("Successfully Done!")
successfully_done = PythonOperator(
task_id="done",
python_callable=echo_function,
dag=dag
)
sync_data >> successfully_done

Although with other integrations the flows could grow in complexity, the principle is essentially the same. The operators >> indicate the relationship between the different tasks so, in order to execute successfully_done, sync_data must have been successfully completed.

Apache Airflow + Django = It’s complicated

Airflow is designed to be a framework by itself since it has different strategies for the generation of DAGs and a web interface that works on Flask to monitor the tasks through it.

But the situation gets complicated when you need to use information directly from the Django ORM or, failing that, from one of the components of your application that depends on the context of the web framework. Under that premise, the approach proposed in this StackOverflow thread can be helpful (https://stackoverflow.com/questions/47470245/django-orm-in-airflow-is-it-possible) but, unfortunately, it is not applicable for all possible code structure types with Django.

However, the tool’s own scheduler is constantly monitoring a directory of dags (~AIRFLOW_HOME/dags, by default) and, when a new file with the correct syntax is created, Airflow maps it almost instantly.

Taking into account this feature, the reasonable option was to create dags dynamically from Django using the BashOperatorsand the Django management commands, where you would have contact with the context.

To achieve this, we took inspiration from how Airflow generates DAGs through its web interface: building our templates using Jinja, which, fortunately, is the basis of Django’s template system.

To generate a template in Django using Jinja, it’s something similar to this:

template.txt

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
{% autoescape off %}
{{ dag }}
{% for task in tasks %}
{{task}}
{% endfor %}
{{ tasks_order }}
{% endautoescape %}

generator.py

from django.template import Context, Templatedef generate_dag():
dag = "dag = DAG(dag_id='test_dag', description='test', default_args={{'start_date': datetime(2020, 7, 28, 18, 32), 'owner': 'airflow'}})"
task_1 = "task_1 = BashOperator(task_id='init', bash_command='./manage.py inittest', dag=dag)" task_2 = "task_2 = BashOperator(task_id='sync', bash_command='tap-gitlab --config config.json', dag=dag)" tasks_order = "task_1 >> task_2" context = Context({"dag": dag, "tasks_order": tasks_order, "tasks": tasks})

rendered_file = template.render(context)
file = open(f"dags/test.py", "w")
file.write(rendered_file)
file.close()

Now, simply by running our generator, it would create the corresponding file with the desired configuration and with the possibility of increasing in complexity if the case.

With these problems solved, the next challenge was to deploy our applications in the same server, but in different environments. It was time for Docker.

It’s your turn, Docker!

If you are a developer but have no idea what Docker is, you need to integrate it into your life. This project represents an abstraction in terms of system virtualization, creating isolated images of software that you call containers. If you want to know more about this technology, you can take a look at its official website.

To perform a multi-container application, depending on the complexity, various tools can be used to orchestrate the containers in production environments, such as Kubernetes or Docker Swarm. For the purposes of this article, we will use Docker Compose.

Compose is a tool that allows us to define and execute the different containers using a YAML file for configuration. For this to work properly, we need to create a Dockerfile for each of our applications, similar to the one below:

Dockerfile

FROM python:3.7
ENV PYTHONUNBUFFERED 1
WORKDIR /code

COPY requirements/ requirements/
RUN pip install -r requirements/[django_or_airflow_file].txt

COPY . .

With the two files ready, we only need to define our docker-compose.yml, similar to this one, where we orchestrate both containers:

version: '3.7'
services:
django-app:
build:
context: ./django
dockerfile: Dockerfile
command: >
bash -c "python manage.py runserver 0.0.0.0:8080"
image: django-app
container_name: django-app
volumes:
- ./tmp:/code/tmp
ports:
- '8080:8080'
restart: always
airflow-singer:
build:
context: ./airflow
dockerfile: Dockerfile
command: >
bash -c "airflow initdb && airflow scheduler && airflow webserver -p 8085"
image: airflow-singer
container_name: airflow-singer
volumes:
- ./tmp:/code/tmp
ports:
- '8085:8085'
restart: always

The most important thing to note about this configuration is that both volumes must reference a common destination to achieve the desired effect: that the Django application is able to create dags so that they are consumed directly by Apache Airflow, as mentioned above.

Perhaps this is not a configuration of common use and, even, there is hardly the need to replicate it but we wanted to share it with the world because, maybe it can be useful for others, such as it was for us.

Talpor

We build fantastic digital products for startups and major brands.Let’s build something big together