Certificação Airflow Fundamentals (Parte 2/3)
Continuando nosso material sobre a certificação básica em Apache Airflow da Astronomer, vamos dar sequência falando de algumas funcionalidades que faltam pra fechar o conteúdo. E se você tá caindo de paraquedas nesse post, tem um outro post (na real um resumo do curso oficial da Astronomer), que tem a parte inicial, dá uma olhada lá antes de começar esse.
Tem até vídeo, tá bem detalhadinho, aproveita deixa like lá e segue a gente pra ter acesso a mais materiais, tô sempre postando algo legal do mundo de dados.
Comandos que precisamos saber
Existem alguns comandos que precisamos saber, seja pra fazer manutenções, subir serviços administrativos ou até coletar algumas informações caso não tenhamos acessos a UI, vou deixar uma listinha aqui em baixo.
Vou mostrar como rodar um desses comandos (o mais seguro, rs) que lista as DAGs do meu Airflow e como rodar dentro de um dos containers que subimos.
Interagindo com o Airflow via REST API
Agora como o último meio de comunicação com o Airflow faltante, vamos mostrar alguns exemplos de interagir com o Airflow via REST API. Essa interface de comunicação começou a partir do Airflow 2.0, sendo assim, se você tem uma versão mais antiga, não funciona, recomendo atualizar e segue um link com a documentação para atualização do Airflow também.
Existe uma documentação da API do Airflow onde podemos vasculhar de acordo com a nossa necessidade, segue o link dela abaixo.
Testando a API do Airflow
Vou fazer um teste usando Postman pedindo pra listar todas as DAGs no nosso Airflow (baixe o Postman na sua maquina e faça o teste também), as informações para nosso teste seguem abaixo.
- Método: GET
- Host: http://localhost:8080/api/v1/dags
- user: admin
- senha: admin
No gif abaixo vai mostrar como esse teste é feito
Estrutura de uma DAG
Por enquanto só tínhamos visto a ferramenta e o que ela oferece, chegou a hora de criarmos nossas primeiras DAGs, mas pra isso precisamos entender o básico dela e vamos com a estrutura do nosso arquivo.
Vou fazer um vídeo curtinho explicando como criar nosso primeiro DAG simples de tudo e vou usar o mesmo exemplo do professor do curso, realmente é bem simples (claro, o DAG não faz nada, rs).
OBS: Se você não configurar um Scheduler Time para o seu DAG ele assume o padrão default que é de executar a cada 24 horas.
Trabalhando com o scheduler
Deu pra ver que no nosso código malandro não configuramos absolutamente nada, só o que era preciso pro DAG ser um DAG, nesse ponto vamos evoluir nosso script inicial com as configurações de agendamento e intervalo de execução.
Segue abaixo nosso DAG python inicial.
## Importar a classe DAG do Airflow
from airflow import DAG
## Instanciando o objeto DAG
with DAG(dag_id='dag_do_anselmo') as dag:
None
Os parâmetros que iremos inserir nesse script agora são:
- start_date: É o dia e hora que seu DAG começa os trabalhos, quando ele inicia o processo de execução, por exemplo, poderia colocar agora '2022–12–27 23:40:00' e ele passaria a executar tendo sua primeira execução nesse horário e data.
- schedule_interval: é o tempo que vai levar para o DAG executar denovo entre as execuções, partindo do start_date como inicio a principio (na terceira execução por exemplo, ele pega quando rodou e acrescenta o intervalo definido nesse parâmetro).
OBS: Seu inicio real de execução é sempre o start_date + schedule_interval!
Por exemplo:
- start_date: 2022–12–26 10:00
- schedule_interval: 10 minutos
- inicio real de execução: 2022–12–26 10:10
Nem vou explicar denovo, qualquer coisa lê denovo se não entendeu e você vai conseguir. As vezes me acho meio confuso mas acredito que essa foi de boa, rs. Nesse intervalo entre 10:00 e 10:10 é quando o DAGRun é criado e o terreno preparado pra execução as 10:10.
Existe um outro parâmetro de schedule que não foi abordado e é bem importante, seria o end_date.
- end_date: A data e hora agendada de quando essa DAG não vai mais executar, um exemplo seria se você quer se sua DAG rode por 2 meses e tal.
Vamos schedular nossa DAG na prática
Com base no que entendemos vamos agora fazer a mágica da schedulagem acontecer, vamos dar uma melhorada no nosso script com o vídeo abaixo.
Como explico no vídeo, existem algumas formas de você criar seu intervalo de execução, segue abaixo uma tabelinha supimpa (adoro gírias idosas, rs), com os alias de intervalos que coloquei no vídeo.
Backfilling e Catchup
Backfill é o processo que permite executar ou re-executar DAGs que ficaram no passado, vamos supor que você subiu uma DAG que tinha um erro processual mas ela rodou por 5 dias, você corrigiu a DAG agora precisa corrigir as execuções desses 5 dias e rodar belezinha dali em diante, lembra-se, seu start_date continua sendo de 5 dias atrás. Por padrão o Airflow já faz isso pra você, se você simplesmente subir sua DAG corrigida ele vai pegar tudo que precisa e rodar novamente.
Agora existem casos que não vou precisar dessa feature, logo, posso desabilita-la e o parâmetro para isso é o catchup=False
o True é o padrão que roda o que ficou no passado.
Você adiciona dentro o with da DAG conforme código abaixo:
## Importar a classe DAG do Airflow
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
## Instanciando o objeto DAG
with DAG(dag_id='simple_dag',
schedule_interval=None,
start_date=datetime(2022,12,28),
catchup=False) as dag:
task_1 = DummyOperator(
task_id = 'task_1'
)
O lance de como o backfill funciona é bem parecido com o desenho de DAGruns que mostrei a pouco, o calculo da primeira execução até pra trás na linha do tempo, só conta após um ciclo de DAGRun, ou seja:
Ou seja, ai no desenho, a DAG só rodou uma vez, pois a segunda vai ser dia 29/12 as 00hs mas ainda não chegou, se ligou?
Agora vamos falar sobre o parâmetro catchup denovo:
- True: Valor padrão, se jogar um start_date de um mês atrás, se lascou, vai tentar rodar tudo que não rodou e possível mente o Airflow vire uma carroça.
- False: Desabilita o backfill e roda somente uma rodada anterior, ou seja, se hoje fosse dia 29 (e é) e eu colocasse o start_date mesmo como dia 27 ele ainda rodaria o dia 28 (acho que foi o que entendi mas prometo dar uma repassada)
Uma dica que o Marc Lambert dá é que eu posso setar um limite de execuções, adicionando mais um parâmetro lá na linha, max_active_runs=x
o numero que colocar no lugar de "x" é o numero de DAGRuns que ele vai permitir executar.
Outra dica bacana, pra não ter problemas lá pra frente, configure o catchup pra false e rode o que você precisa via command line, vou deixar um exemplo abaixo.
airflow backfill -s <DATA_DE_INICIO> -e <DATA_DE_FIM> --rerun_failed_tasks \
-B <NOME_DA_DAG>
Vamos falar de Operators e os parâmetros deles
Nos exemplos que usamos até agora, fizemos uso de uma task com um DummyOperator, existem vários tipos e vamos ver alguns deles, o importante agora é saber que um Operator está sempre atrelado a uma Task.
Seguindo uma boa pratica, imagine que você tem 2 tasks, importar_dados e limpar_dados, nunca o coloque no mesmo Operator, porque se importar_dados falhar, você terá que rodar novamente as 2 tarefas no mesmo Operator, gastando mais tempo e mais recursos de forma desnecessária.
Parâmetros
- retry: Entramos com um valor numérico com as vezes que vamos tentar re-executar a task novamente antes de dar como falha.
- retry_delay: o intervalo de tempo configurado pra reexecutar uma task caso ela falhe e seja solicitado o retry (parâmetro anterior), podemos usar o timedelta, que usamos, por exemplo
timedelta(minutes=5)
No código ficaria mais ou menos assim na parte da task:
...
task_1=DummyOperator(
task_id="task_1",
retry=5,
retry_delay=timedelta(minutes=5)
)
task_2=DummyOperator(
task_id="task_2",
retry=5,
retry_delay=timedelta(minutes=5)
)
task_3=DummyOperator(
task_id="task_3",
retry=5,
retry_delay=timedelta(minutes=5)
)
...
Nesse exemplo, criei 3 tasks com os mesmos parâmetros de retry e retry_delay. Em casos como esse, posso customizar esse código, criando default_args onde seto esses valores e qualquer task criada assume esses parâmetros. Como ficaria isso?
...
defaul_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='simple_dag', defaul_args=defaul_args, schedule_interval=@daily,
start_date=datetime(2022,12,28), catchup=False) as dag:
task_1=DummyOperator(
task_id="task_1"
)
task_2=DummyOperator(
task_id="task_2",
retry=3
)
task_3=DummyOperator(
task_id="task_3"
)
...
Nesse exemplo acima, criei um default_args, onde coloquei os parâmetros que se repetiam e dentro da definição da DAG (Linha do With) coloquei o default_args pra serem considerados.
Note outro ponto bacana é que na task_2, eu joguei um retry=3
e é importante saber que o valor setado na task sempre terá prioridade aos definidos no default_args.
Usando o Python Operator e File Sensor para alguns testes
Como disse, os Operators podem ser vários, pra que a gente teste algo mais funcional, vamos fazer alguns exemplos com o PythonOperator e BashOperator, o video abaixo explica como vamos fazer esses testes básicos mas antes já vamos adiantando algumas coisas:
- Importo a biblioteca do Python Operator e Bash Operator assim como fiz do Dummy, ná duvida dê uma olhada no vídeo e no código depois.
- Nos parâmetros nas tasks, observe o python_callable que por padrão chama uma function python, function qual, você precisa criar antes de chamar então crie sua função antes de tudo, pode ficar perto do default_args.
- Importo a biblioteca de File Sensors onde esse cara vai monitorar um diretório que eu determino pra saber se novos arquivos foram criados ou algo do gênero. Por padrão ele checa de 30 em 30 segundos mas esse valor pode ser mudado atribuindo o parâmetro poke_interval.
- Crio um Connection no UI de nome dados_do_temp, basicamente para dizer que tenho uma conexão com o diretório /tmp/ do container onde vou usar o sensor pra ver se um arquivo é gerado.
Configurando Dependências
Se você viu no vídeo se ligou que eu criei minhas 3 tasks mas não tenho dependência entre elas, então vamos resolver isso agora e vou mostrar os métodos possíveis de fazer isso, lembra do set_downstream e o set_upstream do último artigo?
Considerando que tenho as seguintes tasks:
- baixando_dados
- esperando_dados
- processando_dados
E quero colocar as dependências na mesma ordem acima os metodos que podemos usar são os seguintes:
Metodo 1: Escrevendo tudão
Posso ir no final do meu DAG config no arquivo e fazer o lance abaixo.
...
baixando_dados.set_downstream(esperando_dados)
esperando_dados.set_downstream(processando_dados)
O resultado segue abaixo:
Metodo 2: Usando bitshift operators (>> ou <<)
Até tinha colocado no post anterior mas pra refrescar sua memória quais são cada um deles:
- set_downstream: >>
- set_upstream: <<
Fica muito mais limpo o código e pode ser feito numa linha só com o mesmo resultado de cima.
...
baixando_dados >> esperando_dados >> processando_dados
...
Vamos supor que esperando_dados e processando_dados, sejam tarefas dependentes de baixando_dados e executem em paralelo, basta colocar o primeiro bitshift e colocar os outros 2 numa lista:
...
baixando_dados >> [esperando_dados, processando_dados]
...
O resultado seria o seguinte:
Metodo 3: Usando Chain
usar uma função do Airflow chamada Chain que coloco as tasks em uma lista e ele a executa na ordem que eu as colocar. Tenho só que baixar uma biblioteca conforme abaixo.
...
from airflow.models.baseoperator import chain
...
chain(baixando_dados,esperando_dados,processando_dados)
...
O resultado é o mostrado abaixo:
Ficou bem explicadinho mas segue um vídeo rápido explicando isso caso tenha ficado alguma duvida.
No post de hoje era isso, prometo que o próximo vai ser mais curto só com os últimos detalhes dessa trilha de certificação.
Deixa a palminha ae, se inscreve, dá like nos videos pra dar aquela força.
Espero que ajude!
Anselmo Borges.