ENGENHARIA DE DADOS

Como usar o Apache-Airflow: Integrando com o S3 e Amazon Athena

Arnaldo Carlos Lucas
Data Team Stone
Published in
7 min readMay 19, 2021

--

Realize suas operações diárias de forma automatizada, agendadas e de maneira simples.

O Apache-Airflow é um grande orquestrador, no qual permite programar e monitorar fluxos de trabalho, dos mais simples aos mais complexos, possuindo uma interface simples e intuitiva, tornando-o uma excelente ferramenta, com muitas integrações, inclusive com o Amazon Simple Storage Service (S3) e o Amazon Athena do AWS, ferramentas no qual utilizo neste tutorial. Um ponto importante é que neste tutorial é explicado o uso da ferramenta, para realizar sua instalação e ajustar o ambiente, recomendo visualizar na página oficial deles.

1. INTERFACE AIRFLOW

A interface do Airflow é bastante simples e possui várias utilidades ótimas para monitorar os workflows (fluxos de trabalho), assim como iniciar e interromper processos e muitos outros recursos.

A porta de entrada do Airflow é a página das DAGs, onde estão listadas todas as DAGs alocadas na pasta fonte.

Olhando a imagem abaixo é indicado cada um dos 8 recursos que o Airflow possui na página principal:

Fonte: Apache Airflow

1 — Botão ON/OFF: Nele é possível pausar uma DAG, mesmo que ela esteja agendada para iniciar em uma data ou hora específica, se ela estiver como “off”, ela não será iniciada.

2 — DAG: A informação aqui indica o nome da DAG, onde explicarei mais a frente como defini-la durante a criação do arquivo.

3 — Schedule: Nele é possível verificar o “Schedule” (cronograma) daquela DAG, ou seja, o agendamento para iniciar ou um cronograma para sempre rodar em uma data ou hora específica. A informação que aparece no Schedule está no formato CRON, onde pode ser verificado neste site os seus horários e dias.

4 — Owner: Indica o nome do proprietário da DAG apresentada, também irei mostrar mais a frente como defini-lo.

5 — Recent Tasks: Tarefas recentes possuem a atualização do processo em tempo real, com múltiplas possibilidades descritas à seguir:

Fonte: Apache Airflow
Fonte: Apache Airflow

a — success: Tarefa finalizada.

b — running: O worker pegou uma tarefa e está executando-a.

c — failed: A tarefa falhou.

d — upstream_failed: Indica que alguma tarefa ou tarefas anteriores falharam.

e — skipped: Tarefa ignorada.

f — up_for_retry: Tentando executar novamente.

g — up_for_reschedule: Ele “libera” o slot do worker, re-agendando a tarefa, sem bloquear o slot do worker em casos onde todos os slots estão em uso.

h — queued: O scheduler enviou uma tarefa para o executor para adicioná-la na fila.

i — no_status: O scheduler criou uma instância de tarefa vazia.

j — scheduled: O scheduler determinou que uma instância de tarefa precisa ser executada.

6 — Last Run: Indica o dia/hora da última execução da DAG.

7 — DAG Runs: Status de todas as execuções anteriores da DAG. Pode possuir 3 status: success, running e failed.

8 — Links: Links de redirecionamento para outros recursos.

Ao clicar em uma DAG como por exemplo a “arnaldo_dag” , pode-se coletar várias informações, executar processos e até excluir a mesma caso necessário.

A visualização do “Tree View” mostra como a workflow está configurado. Nesse exemplo, a DAG possui duas etapas: A criação de uma tabela seguida da inserção de dados na mesma. Ao lado direito, é possível verificar o status de cada tarefa, nesse caso, todas estão concluídas com sucesso (verde escuro). Em cima é possível verificar a data e hora que aconteceu sua última execução e o número total de execuções dessa DAG.

Fonte: Apache Airflow

A opção do “Graph View” permite visualizar as tarefas em forma de grafos, melhorando a visualização do workflow, além de mostrar o status atual de cada tarefa, seja durante a execução ou após a mesma.

Fonte: Apache Airflow

Nessas duas opções é possível ver os botões “Trigger DAG”, “Refresh”, “Delete” e “Code” na parte superior direita. Eles servem para:

Trigger DAG: Executar manualmente o workflow, iniciar sem depender de agendamentos.

Refresh: Atualiza o workflow para verificar seus status durante uma execução. O botão

dentro da área do grafo na opção “Graph View” também possui a mesma função.

Delete: Deletar uma DAG.

Code: Serve para visualizar o código principal da DAG no próprio Airflow. Qualquer atualização feita no código da DAG localizado dentro da pasta “dags” no seu projeto, deverá ser atualizado automaticamente aqui após salvá-lo.

Fonte: Apache Airflow

2. ARQUIVO DA DAG

Na DAG à seguir, será apresentado as etapas para a construção de um workflow simples, onde uma tarefa irá criar uma tabela (chaves_clientes) dentro de um database (database_exemplo) e outra tarefa para inserir dados nela referentes de outra tabela (tabela_fonte) que já se encontra no Athena (AWS).

Todas as informações sobre cada linha estarão como comentários dentro do código à seguir:

Fonte: O autor

3. EXECUTANDO A DAG

Inicialmente, o arquivo está localizado dentro de uma pasta que foi configurada durante a instalação para simular o Airflow localmente.

Uma observação interessante é que caso não exista a pasta de saída para os arquivos que serão gerados durante as tarefas no parâmetro output_location dentro das DAGs, ela será criada durante a execução. Nas imagens à seguir é visto que ainda não existe a pasta chaves_clientes dentro do bucket (pastas dentro do S3):

Fonte: AWS

A tabela chaves_clientes não existe ainda no Athena dentro do AWS. Após a criação da DAG na pasta correta do seu projeto local, é possível visualizá-la igual nessa imagem:

Fonte: Apache Airflow

Nessa etapa, você deve “ligar” sua DAG no botão que está ao lado esquerdo do nome da DAG marcado como “Off”. Em nosso exemplo, temos uma data especificada no parâmetro start_date porém é uma data antiga e consequentemente a DAG nunca seria iniciada sozinha. Mas para isso, o botão “Trigger DAG” serve para executar manualmente a nossa DAG. Antes de executar, é legal ter a visão proporcionada no ”Graph View” onde é possível ver todas as atualizações de acordo com o avanço das tarefas ou suas falhas. Como ainda não foi iniciada, o contorno das tarefas estão na cor branca (sem status).

Fonte: Apache Airflow

Assim que a DAG for ligada e executada através do Trigger DAG, as cores vão alterando para verde claro (em andamento) e verde escuro (concluído com sucesso):

Fonte: Apache Airflow

Após a conclusão da última tarefa, pode-se observar no S3 que a pasta chave_clientes foi criada dentro do bucket de datalake:

Fonte: AWS

E agora também é possível visualizar a nova tabela dentro do database database_exemplo no Athena e realizar qualquer consulta. Observe que os nomes das colunas foram definidas na tarefa de criação da tabela (os dados foram ocultados parcialmente para não divulgar dados internos).

Fonte: AWS

4. FALHAS NA DAG?

Para verificar falhas nas tarefas, basta clicar na tarefa com o erro para retornar essa tela e clicar em “View Log” para melhor entendimento da falha:

Fonte: Apache Airflow

Nessa mesma tela é possível também reiniciar apenas a tarefa em questão, pois em uma DAG onde existem mais de 20 tarefas, seja mais interessante executar apenas aquela com o erro e não todas anteriormente supondo que ela seja a de número 18 na fila, custando tempo desnecessário de execução. Para isso, primeiro corrija o erro no código, e depois clique em “Clear” para deixar apenas ela sem status e depois apenas atualize para iniciá-la novamente. Se não executar sozinha, basta clicar em “Run” para iniciar.

Não percam os próximos artigos. 😉

Pra fechar com chave de ouro, é com muito orgulho que trago vagas para o nosso time de dados de crédito da Stone 💚.

Machine Learning Engineer

Analista Business Intelligence Sênior

Data Analytics

Se você curtiu o Airflow, inscreva-se e vem pintar o mundo de verde com a gente.

--

--