Parte (2/4) — Extraindo dados da API do Cartola com Apache Airflow

Vamos criar um processo para buscar os dados da primeira rodada do Cartola.

Tiago Augusto Ferreira
Academia dos Bots
6 min readAug 11, 2020

--

Logo Apache Airflow

Parte (1/2) — Pipeline de dados com a API do Cartola

Parte (2/3) — Configurando o Apache Airflow e o Google Storage

Parte (3/4) — Google BigQuery e as primeiras visualizações no DataStudio

Parte (4/4) — Adicionando novos gráficos e filtros personalizados

Importante, para executar esta parte, crie um bucket no Storage. Para este tutorial o nome do meu Bucket é “cartolafc-2020”, caso queira modificar, atente-se para substituir no momento de criação das tasks.

Nesta parte vamos começar os trabalhos. A primeira coisa que precisamos fazer é habilitar e criar um ambiente do Airflow no Composer.

Ao criar o ambiente você irá se deparar com uma tela como esta abaixo:

Figura 01 — Criação do Ambiente no Composer

Preencha as configurações escolhendo a América do sul como local, em “Zona” deixe como padrão. Em “Tipo de Máquina” utilize a Standard 1. Para “Tamanho de Disco” coloque 20, “Versão da Imagem” coloque a mais recente e “Versão do Python” escolha a 3. Feito isto, clique em criar e aguarde (demora um pouco). Após você observar o indicador verde na frente do nome do seu ambiente, significa que tudo deu certo, basta clicar no link abaixo do item “Servidor da Web do Airflow” que você será direcionado para a interface de gerenciamento da ferramenta.

Figura 02 — Ambiente criado

Na tela inicial da interface web você irá encontrar todas as suas DAGs em execução ou não. Por ela você consegue habilitar ou desabilitar uma DAG, observar o status e o número de execuções das tarefas para cada etapa do seu ciclo de vida.

Figura 03 — DAGs Apache Airflow

Conhecendo a estrutura do Airflow

No Apache Airflow, uma DAG (Directed Acyclic Graph) é uma coleção de tarefas que você deseja executar, organizadas de tal forma que são obedecidas suas relações e dependências. Ela é desenvolvida como um script em Python e é composta por um conjunto de argumentos, que em linhas gerais, personalizam a execução desta DAG, configurando sua data de início de execução, de quanto em quanto tempo que ela será executada, se haverá uma ação de disparo de e-mail caso encontre algum erro, etc.

As tarefas são uma unidade de trabalho dentro da DAG e são uma implementação de um Operador (Operator). Para saber mais sobre o funcionamento veja AQUI.

Em resumo, teremos um arquivo em Python (DAG) e outro com duas classes (Operators): uma para extrair os dados da API e outra para salvar os resultados da consulta no Cloud Storage.

Criando o Operator

Para o projeto, criaremos dois operadores personalizados, o primeiro para a extração dos dados da API e o segundo para salvar no Storage. Antes de irmos ao código, abaixo você pode observar como é uma estrutura básica de uma classe Operator.

Estrutra padrão de um Custom Operator

Primeiramente é importante notar que ela herda a classe BaseOperator. Em __init__() (construtor) temos os argumentos que passaremos como parâmetro na instanciação do objeto. O método execute() fica responsável por executar o operator. Saiba mais sobre Custom Operators.

Sabido isto, vamos ao nosso código para o primeiro Operator.

Operador para extração de dados da API

No construtor temos como parâmetro a URL (endpoint) para a extração dos dados. No método __datasource_to_csv() criamos um array com os tipos de pontuações, que será usado para a geração do arquivo com as pontuações (scouts) dos atletas. Feito isto, criamos os três csvs que serão utilizados neste projeto (clubes.csv, posicoes.csv e atletas.csv).

No segundo operador, a estrutura é semelhante sendo a função __csv_to_storage() responsável por fazer o upload do arquivo para o Storage.

Operador que salva os arquivos no storage

Veja aqui o código final:

Arquivo final de operadores

Criando a DAG

Em nossa última etapa do código, vamos criar a nossa DAG e configurar as tasks para em seguida fazermos o upload para o servidor.

Vamos dividir o código em dois blocos, o primeiro traz os parâmetros default da DAG. Nele temos a data de início, se teremos o envio de um e-mail em caso de falha e em caso de uma nova tentativa, a identificação do projeto, o número de tentativas para rodar em caso de erros e o tempo entre estas tentativas.

Em models, temos o nome da DAG e o intervalo para a execução (no caso a cada 3 dias). O parâmetro catchup é utilizado para controlar o início de execução da DAG conforme seu agendamento e sua data de execução. Saiba mais AQUI.

Veja abaixo como fica o código:

Primeira parte da DAG

A segunda parte da DAG é responsável pelas tarefas (tasks) onde basicamente instanciamos os objetos dos operadores que criamos acima. Por último, abaixo do comentário “fluxo de execução” , tratamos a ordem das tarefas, sendo o parâmetro “set_upstream()” responsável por indicar dependência de uma task com outra. Em nosso caso, as tarefas 2, 3 e 4 dependem da conclusão da tarefa 1, após isto, elas são executadas em paralelo.

Abaixo temos o código completo:

Arquivo final da DAG

Upload no servidor

Por fim, precisamos fazer upload dos arquivos criados (Operadores e DAG) para o nosso servidor e aguardar a execução. Este processo é bem simples:

1 — Garanta que o servidor terá as bibliotecas Python que estamos utilizando no projeto. Para isto, clique no nome do seu ambiente (no meu caso, cartolafc).

Figura 04 — Tela de Ambientes

Na tela que irá abrir, clique em PYPI PACKAGES e em nome do pacote insira os dados conforme a imagem abaixo. Por fim, clique em salvar.

Figura 05 — Adicionando pacotes Python

2 — Fazer upload dos arquivos — Clique na pasta DAGs em sua página de listagem de ambientes do Composer:

Figura 06 — Upload de DAGs

Ele abrirá uma aba no storage, nela clique em “Fazer Upload de Arquivo”, insira os arquivos .py que você criou (dag e operator) e ao final feche a aba aberta.

Figura 07 — Tela para upload de arquivos DAG e Operator

3 — Se tudo correu bem, você será capaz de observar a sua DAG na interface web do Airflow, bem como acompanhar o seu fluxo de execução.

Figura 08 — DAG carregada.

Ao clicar na DAG, veremos a estrutura no modo “Treeview”

Figura 09 — Modo Tree View

Clicando em “Graph View” obtemos uma nova forma de visualização.

Figura 10 — Modo Graph View

Neste momento, se as suas tasks estão em verde escuro, indica que tudo deu certo e que você já pode ir até seu storage pois os seus arquivos da rodada já estarão salvos lá e pronto para serem analisados.

Esta segunda etapa rendeu um tutorial um pouco mais longo, devido às várias etapas. O objetivo foi passar uma visão mais geral de todo o projeto. Colocarei abaixo alguns links caso queiram conhecer um pouco mais sobre o Apache Airflow.

Agora, vamos para a próxima parte, que é criar uma consulta com o BigQuery e acessá-la no DataStudio.

Abraços e até a próxima.

--

--