Parte (2/4) — Extraindo dados da API do Cartola com Apache Airflow
Vamos criar um processo para buscar os dados da primeira rodada do Cartola.
Parte (1/2) — Pipeline de dados com a API do Cartola
Parte (2/3) — Configurando o Apache Airflow e o Google Storage
Parte (3/4) — Google BigQuery e as primeiras visualizações no DataStudio
Parte (4/4) — Adicionando novos gráficos e filtros personalizados
Importante, para executar esta parte, crie um bucket no Storage. Para este tutorial o nome do meu Bucket é “cartolafc-2020”, caso queira modificar, atente-se para substituir no momento de criação das tasks.
Nesta parte vamos começar os trabalhos. A primeira coisa que precisamos fazer é habilitar e criar um ambiente do Airflow no Composer.
Ao criar o ambiente você irá se deparar com uma tela como esta abaixo:
Preencha as configurações escolhendo a América do sul como local, em “Zona” deixe como padrão. Em “Tipo de Máquina” utilize a Standard 1. Para “Tamanho de Disco” coloque 20, “Versão da Imagem” coloque a mais recente e “Versão do Python” escolha a 3. Feito isto, clique em criar e aguarde (demora um pouco). Após você observar o indicador verde na frente do nome do seu ambiente, significa que tudo deu certo, basta clicar no link abaixo do item “Servidor da Web do Airflow” que você será direcionado para a interface de gerenciamento da ferramenta.
Na tela inicial da interface web você irá encontrar todas as suas DAGs em execução ou não. Por ela você consegue habilitar ou desabilitar uma DAG, observar o status e o número de execuções das tarefas para cada etapa do seu ciclo de vida.
Conhecendo a estrutura do Airflow
No Apache Airflow, uma DAG (Directed Acyclic Graph) é uma coleção de tarefas que você deseja executar, organizadas de tal forma que são obedecidas suas relações e dependências. Ela é desenvolvida como um script em Python e é composta por um conjunto de argumentos, que em linhas gerais, personalizam a execução desta DAG, configurando sua data de início de execução, de quanto em quanto tempo que ela será executada, se haverá uma ação de disparo de e-mail caso encontre algum erro, etc.
As tarefas são uma unidade de trabalho dentro da DAG e são uma implementação de um Operador (Operator). Para saber mais sobre o funcionamento veja AQUI.
Em resumo, teremos um arquivo em Python (DAG) e outro com duas classes (Operators): uma para extrair os dados da API e outra para salvar os resultados da consulta no Cloud Storage.
Criando o Operator
Para o projeto, criaremos dois operadores personalizados, o primeiro para a extração dos dados da API e o segundo para salvar no Storage. Antes de irmos ao código, abaixo você pode observar como é uma estrutura básica de uma classe Operator.
Primeiramente é importante notar que ela herda a classe BaseOperator. Em __init__() (construtor) temos os argumentos que passaremos como parâmetro na instanciação do objeto. O método execute() fica responsável por executar o operator. Saiba mais sobre Custom Operators.
Sabido isto, vamos ao nosso código para o primeiro Operator.
No construtor temos como parâmetro a URL (endpoint) para a extração dos dados. No método __datasource_to_csv() criamos um array com os tipos de pontuações, que será usado para a geração do arquivo com as pontuações (scouts) dos atletas. Feito isto, criamos os três csvs que serão utilizados neste projeto (clubes.csv, posicoes.csv e atletas.csv).
No segundo operador, a estrutura é semelhante sendo a função __csv_to_storage() responsável por fazer o upload do arquivo para o Storage.
Veja aqui o código final:
Criando a DAG
Em nossa última etapa do código, vamos criar a nossa DAG e configurar as tasks para em seguida fazermos o upload para o servidor.
Vamos dividir o código em dois blocos, o primeiro traz os parâmetros default da DAG. Nele temos a data de início, se teremos o envio de um e-mail em caso de falha e em caso de uma nova tentativa, a identificação do projeto, o número de tentativas para rodar em caso de erros e o tempo entre estas tentativas.
Em models, temos o nome da DAG e o intervalo para a execução (no caso a cada 3 dias). O parâmetro catchup é utilizado para controlar o início de execução da DAG conforme seu agendamento e sua data de execução. Saiba mais AQUI.
Veja abaixo como fica o código:
A segunda parte da DAG é responsável pelas tarefas (tasks) onde basicamente instanciamos os objetos dos operadores que criamos acima. Por último, abaixo do comentário “fluxo de execução” , tratamos a ordem das tarefas, sendo o parâmetro “set_upstream()” responsável por indicar dependência de uma task com outra. Em nosso caso, as tarefas 2, 3 e 4 dependem da conclusão da tarefa 1, após isto, elas são executadas em paralelo.
Abaixo temos o código completo:
Upload no servidor
Por fim, precisamos fazer upload dos arquivos criados (Operadores e DAG) para o nosso servidor e aguardar a execução. Este processo é bem simples:
1 — Garanta que o servidor terá as bibliotecas Python que estamos utilizando no projeto. Para isto, clique no nome do seu ambiente (no meu caso, cartolafc).
Na tela que irá abrir, clique em PYPI PACKAGES e em nome do pacote insira os dados conforme a imagem abaixo. Por fim, clique em salvar.
2 — Fazer upload dos arquivos — Clique na pasta DAGs em sua página de listagem de ambientes do Composer:
Ele abrirá uma aba no storage, nela clique em “Fazer Upload de Arquivo”, insira os arquivos .py que você criou (dag e operator) e ao final feche a aba aberta.
3 — Se tudo correu bem, você será capaz de observar a sua DAG na interface web do Airflow, bem como acompanhar o seu fluxo de execução.
Ao clicar na DAG, veremos a estrutura no modo “Treeview”
Clicando em “Graph View” obtemos uma nova forma de visualização.
Neste momento, se as suas tasks estão em verde escuro, indica que tudo deu certo e que você já pode ir até seu storage pois os seus arquivos da rodada já estarão salvos lá e pronto para serem analisados.
Esta segunda etapa rendeu um tutorial um pouco mais longo, devido às várias etapas. O objetivo foi passar uma visão mais geral de todo o projeto. Colocarei abaixo alguns links caso queiram conhecer um pouco mais sobre o Apache Airflow.
Agora, vamos para a próxima parte, que é criar uma consulta com o BigQuery e acessá-la no DataStudio.
Abraços e até a próxima.
Referências:
Data Engineering Basicis of Apache Airflow — https://towardsdatascience.com/data-engineering-basics-of-apache-airflow-build-your-first-pipeline-eefecb7f1bb9
Primeiros passos com o Apache Airflow: ETL fácil, robusto e de baixo custo: https://medium.com/data-hackers/primeiros-passos-com-o-apache-airflow-etl-f%C3%A1cil-robusto-e-de-baixo-custo-f80db989edae
Apache Airflow: Tutorial and Beginners Guide: https://www.polidea.com/blog/apache-airflow-tutorial-and-beginners-guide/
ETL Pipelines With Airflow: http://michael-harmon.com/blog/AirflowETL.html#ETL-Pipelines-With-Airflow
How Apache Airflow is helping us evolve our data pipeline at QuintoAndar: https://medium.com/quintoandar-tech-blog/how-apache-airflow-is-helping-us-to-evolve-our-data-pipeline-at-quintoandar-7d157e9f9773