Agendamento Baseado em Datasets no Airflow: Um Guia Prático com DAG Factory

Neste artigo, o objetivo é falar sobre a funcionalidade de Datasets do Airflow e como construir DAGs data-aware, utilizando DAG Factory

Cícero Moura
Data Hackers
6 min readAug 13, 2023

--

Recentemente fiz uma contribuição para um projeto open source chamado DAG Factory, que é uma biblioteca para construção de DAGs do Airflow de forma declarativa, utilizando arquivos YAML, sem a necessidade de codificar em Python.

Minha contribuição foi adicionar suporte a funcionalidade de Data-aware Scheduling (Datasets) do Airflow, que foi lançada a partir da versão 2.4 (e no momento da escrita deste artigo, o Airflow está na versão 2.6.3).

O objetivo aqui é falar sobre a funcionalidade de Datasets do Airflow, apresentar a biblioteca DAG Factory e criar um exemplo prático utilizando os dois.

Abaixo você pode acessar o repositório com o código utilizado neste artigo:

O que são os Datasets no Airflow?

A funcionalidade de Data-aware Scheduling ou Agendamento com reconhecimento de dados, na tradução livre, permite criar DAGs vinculadas a arquivos, seja ele local ou remoto, para disparar processamentos de dados a partir da alteração de um ou vários arquivos, os datasets.

Os datasets ajudam a resolver o problema da dependência de dados entre as DAGs, que acontece quando uma DAG precisa consumir dados de outra para análise ou processamento posterior, assim é possível criar um agendamento mais inteligente e visível, uma dependência explícita entre as DAGs.

Basicamente temos dois conceitos fundamentais nos Datasets do Airflow:

  • DAG Producer: é uma DAG que vai criar ou atualizar um ou mais datasets, isso é realizado através das tasks utilizando um parâmetro chamado “outlets” para indicar um determinado dataset.
  • DAG Consumer: é uma DAG que vai consumir um ou mais datasets, será agendada e disparada assim que todos os datasets forem criados ou atualizados com sucesso pela DAG Producer. O agendamento será feito pela “schedule ” diretamente na configuração da DAG.

Agora temos duas formas de agendar DAGs no Airflow, por um horário recorrente (cron, timedelta, timetable…) ou através de um ou conjuntos de datasets. Importante destacar que não podemos utilizar as duas formas de agendamento, apenas uma em cada DAG.

DAG Factory: construindo DAGs por YAML

DAG Factory é uma biblioteca da comunidade que possibilita configurar o Airflow para gerar DAGs a partir de um ou vários arquivos no formato YAML.

A biblioteca tem o objetivo de facilitar a criação e configuração de novas DAGs, pelo fato da construção ser feita de forma declarativa com parâmetros no YAML. A biblioteca possibilita customizações por padrão e por ser código aberto, é muito fácil de criar e customizar novas funcionalidades.

A comunidade em torno da biblioteca está bem engajada, vale a pena conferir =)

Datasets na Prática

Neste artigo vamos trabalhar com o seguinte cenário:

Temos que construir uma pipeline que faça download dos dados de API, salvando assim os resultados no Amazon S3. Depois de extraídos os dados e salvos com sucesso, é necessário processá-los. Assim, teremos outra pipeline que será disparada a partir da atualização dos dados (datasets).

A infraestrutura para executar o Airflow e reproduzir o exemplo deste artigo pode ser encontrada aqui.

O primeiro passo que iremos executar é construir a pipeline que extrai e salva os dados no S3, ou seja, a DAG produtora do Dataset.

DAG produtora dos dados

Esta pipeline possui duas tasks para extrair os dados da API pública PokeAPI e duas tasks para salvar os dados no S3.

As tasks que extraem os dados da API utilizam o SimpleHttpOperator e as tasks que salvam os dados no S3, o S3CreateObjectOperator.

Como iremos utilizar YAML para construir nossas DAGs, o código abaixo faz a construção dessa primeira DAG com todas as suas tasks.

Um destaque para a configuração dos Datasets, que são feitos pela tag outlets” adicionadas nas tasks de save_items_data e save_items_attribute_data.

outlets:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items_attribute']

Nessa configuração, temos o caminho do arquivo (file) onde é declarado todos os Datasets de forma centralizada para serem reaproveitados, e os nomes dos Datasets (datasets) contidos no arquivo para serem utilizados.

Abaixo temos o arquivo datasets_config.yml utilizado neste exemplo, onde contém o nome do dataset, que é algo utilizado apenas no Airflow, e a URI, que é o caminho onde o arquivo real é armazenado, neste caso no S3.

A seguir temos a imagem de como ficará a DAG no Airflow:

DAG Consumidora dos dados

Agora iremos construir a DAG consumidora dos dados, que realiza o processamento e tratamento dos datasets.

A DAG é agendada por Datasets e não por horário de execução, assim ela só será disparada quando todos os Datasets que ela dependa forem atualizados.

Atualmente não é possível utilizar dois tipos de agendamento, é por schedule interval ou datasets.

Neste exemplo, apenas iremos construir uma DAG com PythonOperator, que simula o consumo e processamento dos dados produzidos.

A seguir temos o arquivo de configuração da DAG consumidora:

Um destaque para a configuração do schedule por datasets, que é bem parecido com a configuração dos outlets da DAG produtora dos dados:

schedule:
file: /opt/airflow/dags/dags_config/datasets_config.yml
datasets: ['dataset_poke_items', 'dataset_poke_items_attribute']

A seguir temos também a imagem da DAG dentro do Airflow:

Overview das DAGs com Datasets

Quando temos DAGs que utilizam os datasets do Airflow, podemos verificar alguns pontos interessantes:

  • A DAG Consumidora na listagem de todas as DAGs fica com a flag indicando o agendamento por datasets:
  • Ainda temos uma visualização própria no menu do Airflow -> Datasets. Onde é possível verificar os datasets configurados, as dependências entre as DAGs e também o log de criação, atualização e consumo dos datasets:
  • Por último na visualização de DAG Dependencies, temos a visualização do relacionamento entre as DAGs, o que é muito interessante para ter essa malha de processamento e dependência entre os dados:

Pontos importantes sobre Datasets

A funcionalidade de Datasets no Airflow ainda é recente, e tem muitas coisas no backlog da comunidade de melhorias, mas tenho alguns pontos neste momento para destacar sobre:

  • atualmente o Airflow pela funcionalidade de Dataset não olha o arquivo físico em sí, ele agenda a pipeline consumidora direto pelo banco de dados, é como se fosse um Trigger DAG implícito;
  • considerando o ponto anterior, caso você precise realmente “enxergar e acessar” o dado no momento de disparar a DAG Consumer, melhor utilizar um Sensor;
  • a documentação oficial não recomenda utilizar expressões regulares na URI dos Datasets, mas como ele ainda não olha para o arquivo físico em si, todos os testes que eu fiz utilizando, não tiveram nenhum tipo de problema.
  • como a DAG Consumer não tem agendamento, não conseguimos medir se ela foi disparada em um horário planejado, o que fica difícil definir um SLA, precisando de um monitoramento mais refinado para ela não perder agendamentos críticos.

Conclusão

Ao utilizar a biblioteca DAG Factory, simplificamos o processo de criação e configuração de novas DAGs, aproveitando a extensibilidade proporcionada pelo código aberto da biblioteca.

Os Datasets do Airflow permitem um agendamento mais eficiente, disparando DAGs apenas quando os dados necessários estiverem disponíveis, evitando execuções desnecessárias e atrasadas.

Espero que este artigo tenha sido útil para entender a funcionalidade de Datasets do Airflow e como aplicá-la em seus projetos. Com essa abordagem, você poderá construir pipelines mais robustas e eficientes, aproveitando ao máximo o potencial do Airflow.

--

--

Cícero Moura
Data Hackers

Arquiteto de Dados, pós-graduado em Big Data e Machine Learning. Palestrante em Big Data. Também sou AWS Community Builder e AWS Community Leader.