Primeiros passos com o Apache Airflow: ETL fácil, robusto e de baixo custo

Aprenda a instalar o Airflow e começar a criar seus primeiros jobs em Python

Pablo Brenner
Data Hackers
7 min readFeb 16, 2019

--

Esse é um artigo introdutório que tem o objetivo de ajudar você a colocar em pé o Apache Airflow e entender os seus conceitos básicos de funcionamento e utilização, através de um exemplo muito simples onde vamos criar o nosso primeiro workflow para ETL.

O ETL está morto?

Nos últimos dois anos a frase “O ETL está morto” começou a aparecer com certa recorrência em conteúdos sobre engenharia de dados, se referindo a capacidade de processamento de dados em streaming de ferramentas como o Apache Spark ou o Apache Kakfa em comparação com o paradigma tradicional de ETL.

É bem verdade que a capacidade desse tipo de ferramenta impressiona e em certos caso é a única opção viável dado o volume e velocidade em que os dados precisam fluir, mas uma verdade precisa ser dita: a maioria das empresas e times de dados não precisam (pelo menos não ainda) de coisas tão sofisticadas assim.

Para a grande maioria dos cenários uma infraestrutura de ETL simples, com scripts bem escritos e bem documentada é muito mais do que o suficiente.

Nesse cenário o Apache Airflow surge como uma ótima solução para nos ajudar na jornada do ETL.

O que é o Apache Airflow

Em junho de 2015 Maxime Beauchemin publica aqui mesmo no Medium o artigo Airflow: a workflow management platform explicando como a crescente complexidade do ambiente de dados no Airbnb levou alguns engenheiros de software (incluindo o próprio Maxime) a começarem o desenvolvimento de uma plataforma de gerenciamento de workflows, batizada de Airflow.

O projeto desde então caiu na graça de muita gente boa na comunidade de engenharia de dados e software, sendo utilizado em empresas que trabalham dados no estado da arte como Tesla, Square e Spotify, entre outros nomes de peso.

Alguns motivos que enxergo para o sucesso recente do Apache Airflow (e bons motivos para você também usar):

  • A instalação e manutenção são fáceis;
  • Você consegue rodar uma quantidade muito razoável em volume de ETL mesmo em máquinas baratas com poucos recursos;
  • Bugs são corrigidos rapidamente e novos recursos adicionados a todo momento (Os recursos de escalabilidade e de integração com AWS, Google Cloud e Azure melhoram a cada dia);
  • A interface gráfica é muito boa e os recursos de monitoramento de execução dos workflows são excepcionais;
  • Python hoje é uma quase unanimidade no mundo da ciência de dados, o Airflow é escrito em Python e é muito fácil criar workflows usando a linguagem.

Algumas empresas já estão até criando versões próprias do Airflow para as suas demandas específicas, no caso do Twitter foi desenvolvida uma versão que ajuda o time de dados a entregar modelos de ML.

Além disso começam a surgir opções gerenciadas do Airflow para quem não quiser se preocupar em gerenciar instalações e infraestrutura.

Instalando e configurando o Apache Airflow

Para esse tutorial usei uma máquina virtual com Ubuntu 16.04 e um banco de dados PostgreSQL 9.6 no Google Cloud, a versão mais recente do Airflow na publicação do artigo é a 1.10.2.

Passo 1. Vamos começar atualizando os pacotes do Ubuntu:

sudo su
cd ~
sudo apt-get update

Passo 2. Exporte algumas variáveis de ambiente que podem nos salvar de dores de cabeça em momentos futuros:

export SLUGIFY_USES_TEXT_UNIDECODE=yes
export LC_ALL=”en_US.UTF-8"
export LC_CTYPE=”en_US.UTF-8"
sudo dpkg-reconfigure locales

Passo 3. Agora vamos instalar o pip e Psycopg para conectar no banco de dados que o Airflow vai utilizar para sua gestão própria:

sudo apt install python3-pip
pip3 install psycopg2

Passo 4. Crie um diretório próprio para o Airflow:

export AIRFLOW_HOME=/airflow
sudo mkdir $AIRFLOW_HOME
sudo chmod 777 $AIRFLOW_HOME
cd $AIRFLOW_HOME

Passo 5. Vamos instalar o próprio Airflow:

pip3 install apache-airflow

Passo 6. Feito isso com sucesso, agora vamos rodar o Airflow pela primeira vez. Você deve ser capaz de ver a versão instalada no log que será exibido:

airflow version
Versão do Airflow exibida no console

Passo 7. Ao rodar o Airflow pela primeira vez o arquivo de configurações airflow.cfg será criado, vamos acessá-lo usando o Vim:

vim $AIRFLOW_HOME/airflow.cfg

Passo 8. Altere as seguintes configurações (nesse caso criei um banco de dados chamado airflow dentro do PostgreSQL para que a gestão dos dados da aplicação sejam feitas nesse ambiente):

executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://user:password@host:port/airflow
load_examples = False

Passo 9. Vamos inicializar o banco de dados para o Airflow criar todas as tabelas necessárias:

airflow initdb

Passo 10. Tendo sucesso até aqui, estamos prontos para iniciar o webserver pela primeira vez (como exemplo iniciando na porta 80 padrão do http):

airflow webserver -p 80

Acessando o endereço de nossa máquina já podemos ver a interface inicial do Airflow:

Página inicial da interface do Airflow

Ainda não temos nenhuma DAG e não iniciamos o scheduler, então nada vai acontecer. Fazendo uma navegação rápida pelos menus você vai ver a grande quantidade de recursos disponíveis, estamos abordando apenas o básico do básico nesse momento.

Criando a primeira DAG

Nossa primeira DAG (Directed Acyclic Graph, que é uma “coleção” de tarefas a serem executadas como e quando) vai ser responsável por executar 2 programas Python a cada 1 minuto, que simplesmente printam uma mensagem em cada. A ideia é entender como as coisas se encaixam, assim você pode substituir esses dois programas de exemplo por scripts de qualquer natureza e complexidade.

Código da nossa primeira DAG (salvando o arquivo como my_first_dag.py):

# Importando as bibliotecas que vamos usar nesse exemplofrom airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
# Definindo alguns argumentos básicosdefault_args = {
'owner': 'pablo_brenner',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'retries': 0,
}
# Nomeando a DAG e definindo quando ela vai ser executada (você pode usar argumentos em Crontab também caso queira que a DAG execute por exemplo todos os dias as 8 da manhã)with DAG(
'my-first-dag',
schedule_interval=timedelta(minutes=1),
catchup=False,
default_args=default_args
) as dag:
# Definindo as tarefas que a DAG vai executar, nesse caso a execução de dois programas Python, chamando sua execução por comandos basht1 = BashOperator(
task_id='first_etl',
bash_command="""
cd $AIRFLOW_HOME/dags/etl_scripts/
python3 my_first_etl_script.py
""")
t2 = BashOperator(
task_id='second_etl',
bash_command="""
cd $AIRFLOW_HOME/dags/etl_scripts/
python3 my_second_etl_script.py
""")
# Definindo o padrão de execução, nesse caso executamos t1 e depois t2t1 >> t2

Código dos programas my_first_etl_script.py e my_second_etl_script.py:

print(“Executed code from my Python ETL!”)

Na pasta raíz do Airflow vamos criar o diretório /dags (o Airflow vai procurar as DAGs dentro deste diretório por padrão) e dentro deste o diretório /etl_scripts para organizarmos nossas dags e scripts. O arquivo my_first_dag.py vai para dentro de /dags e os arquivos my_first_etl_script.py e my_second_etl_script.py para dentro de /dags/etl_scripts

Quando carregarmos os arquivos na máquina vamos ver que o webserver vai passar a mostrar nossa DAG na home da interface:

Nossa primeira DAG é enxergada pelo webserver

Mas, por enquanto nada vai ser executado pois o scheduler ainda está “dormindo”, e é ele o responsável por fazer as coisas acontecerem na hora certa. Vamos iniciar o scheduler com o seguinte comando (estando no diretório AIRFLOW_HOME):

airflow scheduler

Agora você vai perceber o ícone ON/OFF surgir ao lado da DAG, inicialmente em OFF, assim que você ligar a DAG, ela vai começar a ser executada dentro dos parâmetros que definimos, nesse exemplo a cada 1 minuto.

O scheduler começa as execuções da DAG

Vamos verificar pela interface a estrutura de execução da nossa DAG:

Estrutura de execução das tarefas

Veja que é exatemente o que queríamos, executando a tarefa first_etl e após a second_etl. Podemos acessar o log da sua última execução e verificar que está acontecendo exatamente o desejado:

Log de execução da tarefa second_etl

E podemos muito facilmente visualizar a performance de execução desse nosso job de ETL de exemplo:

Gráfico Gantt de execução da DAG
Tempos de execução das tarefas da DAG

Construímos um exemplo bastante simples, mas suficiente para entender o básico de como o Airflow funciona, a partir daqui você pode começar a desenvolver suas DAGs e scripts de ETL com maior sofisticação.

Lembrando que esse é realmente o básico do básico e que o Airflow possui inúmeros recursos.

Importante: por padrão o Airflow não configura acessos e segurança, essa é provavelmente uma das primeiras coisas que você deve configurar ao começar a usar o Airflow de verdade.

Finalizando

Com ajuda do Airflow você pode rapida e facilmente criar pipelines de dados para resolver uma boa parte dos problema de integração e tratamento de dados que existem no dia a dia.

Tem alguma dúvida, discordância, sugestão? Fique à vontade para comentar ou se conectar comigo pelo LinkedIn.

Links úteis

--

--