Não escreva uma DAG do Airflow como um código python normal

Luciano Marqueto
gb.tech
Published in
5 min readSep 13, 2022

Dicas de boas práticas ao escrever uma DAG para o Airflow

O Airflow é um ferramenta para agendamento, e monitoramento de fluxos de trabalho muito utilizada atualmente, aqui no GB(Grupo Boticário) usamos o Composer que é Airflow gerenciado pelo Google Cloud, ele é a nossa principal ferramenta de orquestração, sendo responsável pela maioria dos fluxos de dados e de ML, porém a utilização do Airflow requer algumas boas táticas que veremos abaixo para evitarmos alguma dor de cabeça no futuro.

Fotos com vários dados organizados, Photo by Mick Haupt on Unsplash

TRTL;

O código python é executado diversas vezes pelo scheduler.
Não execute funções no nível superior, use jinja template quando possível;
Sempre que possível importe os pacotes no nível da função e não no cabeçalho do arquivo;

Você já deve ter visto em diversos lugares que escrever uma DAG para o Airflow é tão simples quanto escrever um script python, não é mentira, porém a forma que o arquivo .py da sua DAG é utilizando pelo Airflow é um pouco diferente do convencional de um programa python e isso pode te levar para algumas armadilhas e é sobre isso quer vamos falar neste artigo.

Imagem “meme star wars” é uma armadilha

Primeiramente, precisamos entender alguns dos componentes do airflow, o Scheduler e o Executor (o Airflow tem vários outros componentes importantes mas vamos manter o foco apenas nesses dois)

  • O scheduler de forma simplificada é responsável por monitorar e acionar as dags/tasks.
  • O executor também de forma simplificada é responsável por executar as tasks.

Ou seja, quando criamos uma DAG o que esperamos é que o schedule faça o trabalho que antigamente era executado pelo crontab e o executor rode nosso código no momento que o scheduler solicitar.

Certo até aqui sem nenhuma novidade, então vamos ao que interessa, o que nem todos se atentam é que o scheduler precisa analisar a nossa dag de tempo em tempos (por padrão é a cada 30 segundos, mas pode ser alterado modificando o parâmetro min_file_process_interval ) e ao fazer isso ele vai executar parte do código da sua dag.

Vamos a uns exemplos para ficar mais claro, considere o código abaixo

Cenário A

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
"dag_a",
default_args={
"owner": "airflow",
"start_date": pendulum.datetime(2022, 8, 1, tz="UTC")
},
schedule_interval=None,
catchup=False
)

def get_some_param():
open(f"/tmp/get-some-param-{pendulum.now().txt", "w").close()
return {'my_param': "test"}

def run_inside_python_operator(param, **context):
print(param)

task_a = PythonOperator(
dag=dag,
task_id="task_a",
python_callable=run_inside_python_operator,
op_kwargs={"param": get_some_param()}
)

task_b = PythonOperator(
dag=dag,
task_id="task_b",
python_callable=run_inside_python_operator,
op_kwargs={"param": get_some_param()}
)

Temos duas tasks (task_a e task_b) que precisam receber um parâmetro em comum, então criamos a função get_some_param() que retorna o tal parâmetro. Para podermos monitorar de forma simples adicionamos na função get_some_param() a criação de um arquivo de log sempre que a função é chamada. Note que a dag não está agendada (schedule_interval=None) ou seja ela não será executada automaticamente.

Após colocar nossa dag no diretório de Dags do Airflow e aguardar alguns minutos ao olhar no diretório /tmp teremos o seguinte cenário:

Imagem contendo uma lista de arquivos

Como podemos ver a cada 30 segundos nossa função é chamada duas vezes (uma para cada task), independente da DAG estar rodando ou não.

Vamos considerar também esse segundo cenário

Cenário B

Arquivo dag_b.py

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from my_lib import my_lib

dag = DAG(
"dag_b",
default_args={
"owner": "airflow",
"start_date": pendulum.datetime(2022, 8, 1, tz="UTC")
},
schedule_interval=None,
catchup=False
)

def run_inside_python_operator( **context):
my_lib()

task_a = PythonOperator(
dag=dag,
task_id="task_a",
python_callable=run_inside_python_operator,
)

Arquivo my_lib.py

import pendulum

class my_lib:

open(f"/tmp/my-lib-{pendulum.now()}.txt", "w").close()

def do_something(self):
pass

Nesse cenário importamos a lib my_lib na dag_b, e novamente para registrarmos o que está ocorrendo geramos um arquivo dentro da classe importada, e como você já deve estar imaginando um novo arquivo foi criado a cada 30 segundos

Imagem contendo uma lista de arquivos

Isso ocorre pois o scheduler executa o código da DAG para poder efetuar as validações e detectar as tasks e suas dependências, o código é executado da mesma forma que executar o comando python dag-file.py.

Agora imagine o cenário onde você está importando diversas biblioteca como pandas ou numpy, que por sua vez importa diversas outras, ou então que precisa obter um parâmetro que depende de uma consulta ao banco de dados como as variáveis do airflow (airflow.models.Variable), e tudo isso ocorrendo em centenas de Dags, o scheduler vai ficar executando essas atividades milhares de vezes elevando consumo elevado de CPU e possivelmente deixando o scheduler mais lento.

O guia de boas práticas do Airflow diz que não devemos executar nenhum cálculo pesado, acesso a base de dados, ou acesso a rede fora de um operador.

…you should not run any database access, heavy computations and networking operations….

OK, entendemos o problema, mas qual botão eu aperto para resolver o problema?

Imagem de um botão com o texto “Resolva meu problema”

Existem diversas técnicas que podem ser utilizadas e qual usar vai depender do seu caso de uso, vou abordar duas que resolvem os problemas propostos nos cenários acima.

Cenário A

Utilização de Macro customizada, o Airflow possui nativamente uma série de variáveis que podem ser utilizadas via template Jinja, além disso podemos criar funções que podem ser acionadas via template e que só são acionadas pelo executor quanto a task é realmente executada

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator

def get_some_param():
return 'my_param'

dag = DAG(
"dag_a",
default_args={
"owner": "airflow",
"start_date": pendulum.datetime(2022, 8, 1, tz="UTC")
},
schedule_interval=None,
catchup=False,
user_defined_macros={
'get_custom_param': get_some_param,
}

)

def run_inside_python_operator(param, **context):
print(param)

task_a = PythonOperator(
dag=dag,
task_id="task_a",
python_callable=run_inside_python_operator,
op_kwargs={"param": "{{ get_custom_param() }}"}
)


task_b = PythonOperator(
dag=dag,
task_id="task_b",
python_callable=run_inside_python_operator,
op_kwargs={"param": "{{ get_custom_param() }}"}
)

Cenário B

Importe as bibliotecas necessárias no nível da função, não é o padrão recomenda no PEP-8 mas é uma boa prática no Airflow

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
"dag_b",
default_args={
"owner": "airflow",
"start_date": pendulum.datetime(2022, 8, 1, tz="UTC")
},
schedule_interval=None,
catchup=False
)


def run_inside_python_operator( **context):
from my_lib import my_lib

my_lib()


task_a = PythonOperator(
dag=dag,
task_id="task_a",
python_callable=run_inside_python_operator,
)

Caso queira se aprofundar mais a documentação de boas praticas do airflow traz essa e outras informações muito relevantes para você poder escrever sua DAG com mais qualidade https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html

--

--