Ultima parte do material de certificação em Apache Airflow

Fiz a prova semana passada, antes do ano novo, passei e não vou mentir que deu uma preguiça vir aqui e escrever esse conteúdo mas compromisso é compromisso.

Nessa última parte vamos ver:

  • Compartilhando dados entre tarefas no Airflow
  • Trabalhando com falhas, como agir
  • Parâmetros sobre concorrência e executors
  • Como funciona a prova e onde fazer
  • Agradecimentos

Compartilhando dados entre tarefas no Airflow

Vamos supor que dentro do nosso código a gente precise compartilhar informações entre as tasks do Airflow, o caminho para fazer isso é usando as chamadas Xcoms (Cross Comunication).

Com base nos nossos cenários anteriores vamos simular o compartilhamento de dados específicos entre as tasks "baixando_dados" e "esperando_dados". A task "baixando_dados" chama uma function python de nome "_baixando_dados", vamos altera-la a fim de simular esse compartilhamento de informações.

Coloquei no final da função um "return 42" conforme podem ver no código abaixo:

def _baixando_dados(**kwargs):
with open('/tmp/meu_arquivo.txt', 'w') as f:
f.write('meus dados')
return 42

Esse valor é automaticamente encapsulado em uma Xcom, salvei o arquivo e iniciei a DAG "simple_dag", voltando para a UI do Airflow coloco pra rodar e verifico na aba Admin >> Xcoms conforme gif abaixo que a informação 42 é gerada pela tarefa "baixando_dados".

Teste de Xcoms

Se você reparar o valor 42 está vinculado a uma key de nome "return_value", ou seja, um registro chave/valor onde todas as outras informações eu conseguiria pegar também, como nome da DAG, nome da DAG, timestamp e etc.

Vamos agora na segunda task de nome "esperando_dados" usar o parâmetro xcom_pull mas antes vamos criar uma nova function de nome "_usando_xcom" e ai sim atribuir a task "esperando_dados" usando o python_callable.

def _usando_xcom(ti):
my_xcom = ti.xcom_pull(key='return_value', task_ids=['baixando_dados'])
print(my_xcom)

Agora atribuimos ao Task "esperando_dados"

    esperando_dados = PythonOperator(
task_id='esperando_dados',
python_callable=_usando_xcom
)

Eu alterei a task 2 para PythonOperator apenas para fazer o teste, não é o ideal a se fazer. Salvei e coloquei pra rodar, vamos ver como se sai.

Coloquei pra rodar e valido o log da task 2 e vemos o 42 lá

Usamos o 42 mas poderia ser qualquer atributo e eu poderia usar também como no exemplo abaixo, em vez do return, o parâmetro "xcom_push" e seto um valor pra pegar no meu caso será 43, vou alterar a função "_baixando_dados" com essas informações.

def _baixando_dados(ti, **kwargs):
with open('/tmp/meu_arquivo.txt', 'w') as f:
f.write('meus dados')
ti.xcom_push(key='key_anselmo', value=43)

def _usando_xcom(ti):
my_xcom = ti.xcom_pull(key='key_anselmo', task_ids=['baixando_dados'])
print(my_xcom)

Note que criei uma chave chamada key_anselmo no push e no pull estou colhendo ela, mas naquele esquema anterior de cada função em uma task, vai dar pra ver no gif.

Usando o xcom_push e xcom_pull com a key_anselmo

Trabalhando com falhas no Airflow

Vamos simular uma falha e ver que parâmetros podemos usar para usar um tipo de notificação se necessário. Na terceira task vamos alterar o exit 0 que é uma saída com sucesso para exit 1 que simula um erro.

...
default_args = {
'retry': 2,
'retry_interval': timedelta(seconds=10)
}

...
processando_dados = BashOperator(
task_id='processando_dados',
bash_command='exit 1'
)

Pra ficar mais rápido o erro, mudei o retry pra 2 e o intervalo de retry pra 10 segundos. Salve e vamos rodar.

Simulamos um erro na 3a task da DAG

Bom mas e ae? Toda vez vou ter que olhar? Quem vai avisar? Então, existem alguns parâmetros que você pode configurar na task pra que ele envie uma notificação, vamos começar pela mais simples, email.

  • email_on_failure: Quando seto esse parâmetro pra TRUE, caso sua DAG falhe ele manda um email caso você tenha os outros parametros configurados e o servidor SMTP configurado no cluster Airflow (outra conversa, outro dia).
  • email_on_retry: Mesmo esquema do de cima mas envia só em caso de retry.
  • email: Você coloca o email que vai enviar, por exemplo, admin@rescuepoint.com.br

Vamos supor que você quer algo mais incrementado (acredito que você queira, pois ninguém lê emails nem quando backup falha, desde quando eu era DBA Oracle e finge que não leu isso).

Hoje existe outros métodos de notificação como canal no Teams, no Slack e alguns até notificando no Whatsapp numa gravidade maior, via webhook, uma requisição API simples pra onde você deseja notificar, até como um sistema de chamados. Você pode criar uma função python que faça o que você precisa e chama-la através do parâmetro on_failure_callback, vou usar um exemplo com uma funçãozinha tosca mas pense que pode ser o que vc quiser.

## Função
def _funcao_tosca(context):
print("deu erro aqui!")
print(context)
...
## Task chamando a função no caso de falha:
processando_dados = BashOperator(
task_id='processando_dados',
bash_command='exit 1',
on_failure_callback=_funcao_tosca
)

No script acima criei uma função que vai fazer prints simples no caso de falha chamado pela task 3 que estamos testando simulando erros.

Funcionou a tosquice

Mas deu pra entender que pode fazer o que eu jogar na função, inclusive um plano de correção temporário, sei lá (vale mais a pena arrumar a DAG).

Parâmetros de concorrência

Recaptulando rapidão, existem alguns tipos de executors:

  • Sequenciais: Esse que estamos usando aqui, pois temos um ambiente bem limitado e de um único node, ou seja, todas as tasks rodam de forma sequencial no mesmo node de executor (é o padrão)
  • Local: Vamos supor que tenho um node só mas ele é parrudão, com bastante memória e bastante processamento, quero rodar paralelo por minha conta e risco. Esse é o modelo de executor que você seta na configuração do seu Airflow.
  • Celery: Tenho um cluster fisico, sem ser container nem nada, composto por 5 maquinas por exemplo, separo webserver e metadata em uma, scheduler e um serviço de fila como RabbitMQ em outra (já explico mas não vou entrar em detalhes) e as outras 3 passam a ser nodes, que vão no serviço de fila e pega o que tem pra executar, esse nodes são chamados worker nodes.
  • Kubernetes: Mesmo esquema de cluster citado acima mas usando PODs Kubernetes ao invés de nodes fisicos.

Usando os modelos em cluster, conforme sua necessidade de paralelizar mais tasks aumentar, basta adicionar novos worker nodes e vai conseguir dar conta da demanda, mas tem alguns parâmetros que podem ajudar também.

  • Parallelism: O numero de tasks que você pode executat em paralelo no seu cluster Airflow inteiro. O valor padrão é 32, ou seja, por padrão é permitido você executar 32 tasks em paralelo no seu cluster.
  • DAG_concurrency: Numero de tasks que podem ser executadas em paralelo de todas as suas DAGRuns. O Valor padrão é 16.
  • max_active_runs_per_DAG: Numero de DAGRuns que você pode ter rodando ao mesmo tempo. O valor padrão também é 16.
  • max_active_runs: Numero de DAGRuns setado em uma DAG específica (vimos isso na parte 1)
  • concurrency: Numero de tasks em paralelo que consigo rodar numa DAG especificada. Funciona parecido com o de cima, não é um parâmetro global como os 3 primeiros.

Como fazer a prova

Se você chegou até aqui e entendeu tudo, PARABÉNS!

Alegria

Agora vamos ver como fazer a prova.

  1. Crie uma conta no site da Astronomer: https://academy.astronomer.io/
  2. Você vai selecionar esses cursos aqui nessa pagina: https://academy.astronomer.io/page/astronomer-certification
O segundo é a prova, de graça

A prova tem 75 questões e 120 minutos pra fazer, exige um acerto de pelo menos 70% dela e as perguntas são bem baseadas nesses 3 posts, se você fez tudo aqui não tem como dar ruim.

Se passar me marca lá no Linkedin pra dar essa moral e marca também o autor do curso Mark Lamberti, que alias tem um canal no Youtube falando muito mais sobre Airflow caso você vá usar mais a ferramenta e a didática do cara é incrível e por menos que eu fale inglês é bem "entendível", rs.

Bom é isso, espero que te ajude, se pintar alguma dúvida e eu puder ajudar me chama lá no Linkedin e nos falamos.

Deixa a palminha ae, se inscreve aqui e no Youtube pra dar aquela força.

Valew e até o próximo!
Anselmo Borges.

--

--

Anselmo Borges
Rescue Point

Bigdata Engineer, Cloud Architect, Nerd, Alcoholic, Brazilian Jiujitsu Black belt and hide and seek World champion.