Boas práticas na utilização do Airflow

Lucas Misael
gb.tech
Published in
6 min readMay 19, 2023

Quando iniciei na área de engenharia de dados, lembro que fiz vários agendamentos de Pipelines utilizando a CRON do linux, e achava sensacional.

Até o momento que por algum motivo, meus pipelines não funcionavam e era necessário ficar cassando dentro da pasta de log, o que tinha dado de errado...

garoto chorando

Foi então que me apresentaram o maravilhoso AIRFLOW!!!

E sinceramente meus olhos brilharam desde o primeiro contato 😍

Mas claro que como todas os frameworks, ou modelos de desenvolvimento, para extrairmos sempre o melhor do que construímos, temos alguns padrões de boas práticas estabelecidos pela comunidade.

Pensando nisso, resolvi escrever um pouco desses padrões descritos na comunidade e documentação oficial do Airflow.

Criação de tasks

Segundo a documentação oficial, não é muito interessante usar insert quando for necessário re-executar uma task do Airflow, pois isso pode ocasionar em dados duplicados.

Em vez disso uma boa prática seria usar um Upsert, eliminando assim a probabilidade de duplicar os dados, por ter rodado duas vezes a mesma task.

Ler e escrever em uma partição específica

O segundo ponto abordado, é que não é interessante ler o último dado disponível em uma Task, pois pode ser que alguém tenha dado um update nesse dado, tornando-o obsoleto.

Uma boa prática, seria ler o dado de uma partição específica. Para isso pode-se utilizar o parâmetro data_interval_start como uma partição.

Não é um bom hábito utilizar o comando now() do Python em tasks, principalmente se for uma carga crítica, porém é uma boa saída para armazenar logs.

Deletando uma Task

Em um curso oferecido pelo Marc Lamber, ele comenta que devemos tomar muitocuidado ao deletar Tasks de uma DAG, pois você não conseguirá mais encontrar informações dessas tasks no Grid view , Graph View. Tornando difícil a tarefa de verificar logs dessa Task deletada.

Se isso não é o que deseja, é recomendado (quando possível, e se fizer sentido) criar uma nova DAG.

Comunicação entre Tasks ou Dags

As Tasks de execução de uma Dag serão armazenador em lugares diferentes quando for utilizado por exemplo Kubernets e Celery Executor, dificultando um pouco transferir informações e/ou metadados de uma Task para a outra.

Então uma boa prática para casos semelhantes, ou sempre que precisar transferir metadados entre tasks, seria usar as Xcoms.

Xcoms nada mais é que a abstração de acesso, leitura e escrita de dados no banco de dados do Airflow, permitindo que Tasks ou Dags se comuniquem.

Exemplo:

Imagine que precisamos transferir dados de um arquivo manual a um path dinâmico, onde o nome desse path será alterado a cada execução.

Uma boa prática para esse caso é guardar no Xcom da DAG o path da pasta criada dinamicamente, após a execução da primeira task, e no momento de executar a segunda Task, responsável por ler o arquivo e salvar em algum database, buscar no Xcom as informações de Path.

Uma observação importante: Não é uma boa prática armazenar nos Xcoms senhas, tokens ou autenticações!

Sempre que possível, armazenar esses dados nas connections do Airflow, guardando esses dados de uma forma segura, e obtendo um id único para ser utilizado.

Boas Práticas no Código

Segundo a documentação oficial, não devemos importar bibliotecas externas (que não sejam para o funcionamento do Airflow) no top level do código, isso pode implicar em Dags demoradas, pois estamos importado sempre bibliotecas desnecessária para todas as Tasks. Em vez disso criar a importação da biblioteca dentro da Task.

Importação não recomendada:

Exemplo não recomendado

Importação recomendada:

Exemplo recomendado

Gerar Dags Dinâmicas

Em alguns casos, escrever Dags manualmente, não é um trabalho muito interessante, visto que muitas das vezes possuímos Dags que fazem praticamente a mesma coisa, com apenas alguns parâmetros sendo alterado entre elas.

Para esses casos, uma boa prática é utilizar o conceito de Dags dinâmicas. Um ponto importante ao criarmos Dags dinâmicas, é que para evitarmos processamentos excessivos e desnecessários no “top level” do código, o ideal é criar as configurações das Dags em um desses caminhos:

  • Via environment variables (not to be mistaken with the Airflow Variables)
  • Via Código externo em Python, fornecendo os metadados necessários na pasta das Dags
  • Via arquivo de configuração com os metadados na pasta das Dags

Criação de variáveis

Como mencionado anteriormente, é recomendável evitar o uso de variáveis no top level. Isso não quer dizer que você não possa utilizar essas variáveis dentro dos métodos dos seus operadores, mas sim que você pode utilizar os templates jinja para facilitar muito sua vida.

Mas porque usar o Jinja?

No top level, variáveis utilizando o Jinja não produzem request enquanto uma task esta rodando, porém, quando utilizamos Variable.get(), para buscarmos uma informação, é feito um request a cada vez que um dag file é passado ao scheduler. E isso pode ocasionar em timeout da Dag, antes mesmo dela ter terminado todo seu processamento.

Exemplo não recomendado:

Exemplo não recomendado

Exemplo recomendado:

Exemplo recomendado

Timetables

É recomendado evitar o uso de variáveis de configuração e/ou conexões no top level de sua TimeTable, pois acessos a bancos de dados podem ter algum delay, e isso poderá ocasionar em não obter os dados de suas variáveis da forma que necessita.

Exemplo não recomendado:

Exemplo não recomendado

Exemplo recomendado:

Exemplo recomendado

Triggering DAGs após alterações

O processo para atualizar uma Dag após uma mudança, pode ter vários fatores que o fazem ser um pouco demorado.

Primeiramente temos que dar o tempo suficiente para o processamento dos arquivos, então esses arquivos serão distribuídos para o scheduler, seja via Git-Sync ou FileSystem Distribuídos, após isso esse arquivo Python será armazenado.

Mas vale lembrar que além disso alguns fatores podem ser levados em consideração, como a configuração utilizada, poder de processamento do seu Filesystem, quantidade de arquivos, quantidade de Dags, quantidade de schedulers. Ufaa.. Vários empecilhos não é mesmo rsrs?

Mas caso, você esteja notando um delay muito grande para atualizar sua Dag, é interessante dar uma olhada nos seguintes parâmetros:

Reduza a complexidade das Dags

Por mais que o Airflow seja bom em manusear várias Dags com suas dezenas de dependências entre elas, quanto mais complexas forem suas Dags, e quanto maior o numero dessas Dags complexas forem, poderão impactar na performance do scheduler.

Vale lembrar que não há métricas de qualidade de código para Dags, ou uma receita mágica para desenvolve-los, mas talvez possamos pensar em alguns pontos.

Crie Dags que rodem o mais rápido possível

Claro que nem sempre isso é possível, mas sempre que for, busque criar Dags que rodem rápido, pois assim seu scheduler não vai desistir de você rsrs.

cachorrinho dormindo, simbolizando scheduler dormindo

Crie uma estrutura simplificada para sua DAG

O ideal seria se todas as Dags fossem simples, fazendo sempre o mesmo processo, mas sabemos que nem sempre é assim que a banda toca! Então, sempre busque criar as estruturas menos “mirabolantes” possível.

Crie uma Dag por arquivo

Por mais que o airflow seja otimizado para funcionar com mais de uma Dag por arquivo, é mais interessante criar uma Dag por arquivo.

Testes Unitários

Uma das melhores formas de testar operadores, e resultados das suas Tasks com toda certeza, é criando testes unitários!

Muitas das vezes precisamos ir testando, e debugando os operadores que estamos desenvolvendo, e subir seu script a cada nova linha acrescentada no código, o que pode não ser tão prático.

Pensando nisso, testes unitários podem ser uma boa saída!

That’s All Folks!

Espero ter contribuido de alguma forma, caso note algo que eu possa melhorar nesse post fique à vontade para me chamar em minhas redes sociais.

Linkedin: https://www.linkedin.com/in/lucasmisael/

--

--