Certificação Airflow Fundamentals (Parte 1/3)

Anselmo Borges
Rescue Point
Published in
14 min readDec 27, 2022
Primeira parte do meu resumo pra certificação Airflow fundamentals.

Primeiramente

Esse é um resumo foi feito por mim com base no curso do curso de Airflow Fundamentals da Astronomer, que é uma empresa que tem uma versão suportada do AirFlow, lembrando que o AirFlow pode ser instalado Open Source pois é um projeto da Apache e assim como outras tecnologias como o Spark, Cassandra, Hadoop e outros existem empresas que encapsulam a solução e vendem o suporte.

Porque isso existe?

Porque se você coloca algo open source na sua empresa você precisa segurar o B.O de suporta-la e isso implica em de vez em quando cair em alguns Bugs e depender da comunidade pra resolver. Imagina, você colocou em produção, economizou uma bala, seu chefe pirou, mas você pegou um bug de desenvolvimento. Pra quem você vai chorar? Esperar a comunidade te ajudar no tempo deles? Sem SLA? As vezes o que você não paga de licença corre um risco como esse e tudo isso tem que ser posto na balança durante a implantação.

Varias empresas fazem isso como Cloudera (Hadoop e outras soluções), DataStax (Cassandra), Confluent (Kafka), Databricks (Spark e Delta Lake) e uma das mais conhecidas pra que você saiba que isso existe faz tempo é a Red Hat (Linux), só fechando o assunto, eles encapsulam o Open Source em versões estáveis e cobram o suporte caso queira usá-las.

Tem que explicar né? rs

Fiz questão de deixar isso claro pois tem gente que nem faz idéia disso, o cara ouve o nome e quer implantar, rs.

O curso

Esse curso se encontra no site da Astronomer.io conforme disse anteriormente, é em ingles e pra fazer a prova de certificação fiz um resumo em português e pode ser útil caso você tenha dificuldade na lingua inglesa. Dividi esse material em 2 partes, até pra que o post não vire uma obra de Tolkien. Peço desculpas caso você encontre algo errado, pois posso ter entendido errado, se puder leia esse post e faça o treinamento pra que fique tudo certinho, blza?

Como instalar na sua máquina?

Pra que você possa realizar fazer esse mini curso de forma síncrona e consiga testar todas as etapas, recomendo a configuração do astro cli inicialmente e ter o docker instalado na sua maquina pois o astro cli vai fazer a instalação dos containers necessários pra que você possa realizar o treinamento.

Instalando o Astro CLI conforme seu sistema operacional

Segue o link da documentação que ensina como instalar o astro cli no seu computador de acordo com o sistema operacional que está usando.

Instalando o Docker

Como disse você vai precisar do Docker instalado no seu computador pra que consiga subir o Airflow devidamente configurado para esse treinamento, mas lembrando que vai ser um Single node com tudo configurado, apenas pra fins de treinamento. Em ambientes produtivos a arquitetura é completamente outra e darei uns exemplos mais a diante.

Segue um link com uma documentação que te ajuda a instalar o Docker Desktop no sei computador.

Preparando o ambiente

Com o Docker e o Astro CLI instalados no seu ambiente você vai rodar os seguintes comandos abaixo (coloquei em shell mais você adapta pro seu sistema operacional ae):

# Crie uma nova pasta chamada astro e entre nela
mkdir astro
cd astro

# Rode o comando abaixo e ele vai baixar os aquivos para que a instalação
# do Airflow seja realizada (lembre-se que você precisa do Docker)
astro dev init

# Rode o comando abaixo e com base nos arquivos baixados no comando anterior
# será iniciado o download das imagens e subir os containers no Docker
astro dev start

# Rode o comando abaixo para ver se está tudo rodando conforme deve
astro dev ps

Feito isso você pode acessar o Airflow do seu navegador usando o endereço http://localhost:8080 e a telinha abaixo será exibida

para logar use admin como usuário e admin como senha

Apache Airflow

O Airflow é um orquestrador de jobs de dados ETL(extract, transform and load), com uma ampla variedades de soluções para que você consiga alem de executar o seu job de forma correta poder fazer isso de forma escalavel, monitoravel e controlavel de diversos modos:

  • Via UI (User Interface)
  • Via CLI (command line interface)
  • Via Rest API (requisições usando HTTP)

Um dos exemplos bem legais usados no curso é que se eu tiver um front end, posso criar um botão que chama uma REST API que pode controlar meu job conforme a minha necessidade.

Componentes básicos do AirFlow

Existem componentes básicos do AirFlow:

  • Web Server: Ele é o responsável pela exibição do UI pra que você controle e monitore seus jobs via interface Web.
  • Scheduler: O Agendador do Airflow e ele é muito importante por que sem ele você não consegue agendar ou executar nenhum job no AirFlow por isso no caso ele é o coração da solução já que ela é feita pra isso, orquestrar seu jobs. Existe inclusive a possibilidade de você ter mais de um Scheduler rodando no mesmo cluster o que traz uma alta disponibilidade já que se um scheduler não estiver disponível, você pode executar em outro.
  • MetaData Database: Todos os dados relacionados aos usuários, jobs, conexões, qualquer dado relacionado ao AirFlow, está armazenado no Metadata Database e qualquer base que suporta SQL pode ser usada como metadata database, no meu caso estou usando postrgres mas poderia ser MySQL, Oracle, SQL Server e ate um mongo DB que não é um banco parecido com os demais por isso não é 100% recomendado devido a algumas limitações.
  • Executor: Define como suas tasks serão executadas pelo AirFlow, por exemplo, se você tem um servidor Kubernetes e precisa executar sua task nele, você vai precisar de um Kubernetes executor, se for rodar em um Celery cluster com multiplas maquinas você vai ter que usar o Celery executor, se você tem uma maquina extremamente poderosa e quer executar varias tasks nela, você vai precisar de um local executor, ou seja, pra cada ambiente que você for rodar sua task um executor especifico será necessário. Um ponto importante do executor que ele executa sequencialmente, um após o outro.
  • Worker: Quando o executor é definido o worker é o cara que está executando ela onde você definiu, ele pode ser um processo ou um subprocesso. Um exemplo é quando você por exemplo rodou uma execução no kubernetes, dentro de um POD a PID é o processo do worker que está executando a sua task. Outro exemplo, caso esteja executando uma task localmente que está usando múltiplos processos, cada processo desse é um worker.
  • Queue: Um serviço de fila onde eu gerencio a ordem de execuções dessas atividades, pode ser um RabbitMQ ou um Redis, no ambiente de exemplo não configuramos um.

Como eles funcionam juntos?

Nesse ponto vou explicar a arquitetura de como esses componentes trabalhando em conjunto. Existem algumas possibilidades mas os próximos exemplos vão se basear em somente 2 delas

Single node

Quando todos os componentes estão sendo usados na mesma maquina e seriam WebServer, Scheduler, Metadata e o Executor (o Worker não aparece pois ele é o fruto da execução).
O Webserver, Scheduler e Executor estão sempre interagindo com o Metadata, pois o Webserver precisa de informações dele pra exibição e controle, o Scheduler as informações dos jobs, se eles rodaram ou não, parâmetros entre outras informações e por sua vez o executor também consulta e grava informações sobre andamentos dos jobs e coisas do genero. Pra começar o uso é legal uma arquitetura dessa mas tudo no mesmo cluster gera um risco em ambientes de produção, caiu o cluster, acabou a brincadeira.

Nesse exemplo todos os componentes encontram-se na mesma maquina (node)

Multi nodes

Uma arquitetura bem mais voltada a ambientes produtivos, com alta disponibilidade, escalabilidade, onde consigo dividir os meus core components em mais de um node e até replica-los entre eles pra que eu crie algum tipo de redundância. Esse modelo é chamado de celery conforme citado anteriomente. Nos nodes convencionais posso separa os core components por exemplo no node 1 posso ter o Webserver, o Scheduler e o Executor, no node 2 posso ter o Metadata e o serviço de Queue.

Posso executar os Workers nesse nodes, mas não ficaria performático, por isso existe a possibilidade de nesse cluster eu ainda ter os Worker nodes, maquinas especificas para execução dos workers solicitados e que me daria a possibilidade de dividi-los entre maquinas evitando sobrecargas.

nessa imagem uma arquitetura com 6 nodes dividindo recursos entre eles inclusive a fila (queue)

DAGs

DAG (Direct Acyclic Graph), uma DAG no Airflow é um Data pipeline, se você cria um Data Pipeline no Airflow, automaticamente você está criando uma DAG. Uma DAG basicamente é um fluxo de ações direcionadas, que por mais que suas ações se dividam, ele tem um ponto final onde ele se encontra, outro ponto é que o DAG nunca é um loop, ele sempre tem um destino final que orienta o caminho. Podemos falar de dependências sequenciais também pois no caso uma depende do termino da anterior para que a sequencia flua. No seu DAG sempre haverão steps dependentes um dos outros conforme desenho, esses steps são chamados de Operators

Uma DAG tem um sentido fim, se tem loop não é DAG

Operators

Um operador é tipo um objeto, vamos supor que ao instanciar esse objeto você diz que ele vai ler um arquivo e printar o conteúdo desse arquivo na tela. Note que esse operator é uma task do seu Data Pipeline/DAG. Os tipos de operators possíveis que são:

  • Action Operators: Permite que você execute algo em seu Data Pipeline, por exemplo, se vc precisar executar uma função Python, você usa o python operator, bash operator para processos bash e SQL Operator para queries SQL, lembrando que todos esses como ações (actions).
  • Transfer Operators: Como o próprio nome já diz trata-se de transferencia de dados de uma fonte para um destino definido, por exemplo a transferencia de um dado do MySQL para Presto DB usa-se o Presto Operator.
  • Sensor Operators: É o operador responsável por aguardar um resultado para que a tarefa possa seguir. Por exemplo, o File sensor pode aguardar que um arquivo seja gerado em uma determinada pasta para que o processo tenha inicio.

Quando o Operator está atribuído a uma DAG ele se torna automaticamente uma Task, quando essa task é schedulada, ela se torna uma task instance operator.

Dependencies

Para que o Data Pipeline funciona tenho que atribuir ordem na execução dela, por sua vez, uma task deve geralmente numa sequencia iniciar a primeira por um scheduler e as demais pelas dependências da conclusão ou não da task anterior, lembre-se, DAG é um pipeline orientado que tem uma direção definida. A ordem que vincula essas dependencias são:

  • set_downstream (<<)
  • set_upstream (>>)

Voltaremos a falar deles mais pra frente

Workflow

Workflow é a combinação de todos esses pontos anteriores a DAG que contem operators ordenados por suas dependências, o conjunto todo compõe um belo Workflow.

Task Life Cycle

O cliclo de vida de uma task no Airflow passa pelos seguintes componentes:

  1. Vamos dar um exemplo em que tenho uma pasta chamada DAGs onde coloco meu pipelines lá dentro, vamos supor que jogo um arquivo python chamado DAG.py.
  2. Quando você joga o arquivo nessa pasta ele vai ser consumido por 2 dos seus recursos, o Webserver e o Scheduler. Um ponto importante desses 2 recursos referente ao arquivo que jogamos lá na pasta é que o Webserver por padrão pesquisa por novos arquivos nela a cada 30 segundos, enquanto o Scheduler faz a verificação de novos arquivos nela em um padrão de 5 minutos. Esse valores podem ser ajustados mas o padrão inicial é esse.
  3. Quando a DAG é consumida pelo Web Server e pelo scheduler ela está pronta pra uso e pode ser iniciada, sendo assim o scheduler manda gravar no MetaStore um DAGRun, mas sem nenhuma informação de status pois ele não rodou ainda. Com isso uma task instance vai ser designada para execução e aguardar. Nesse ponto a task está no status de “queued”, ou seja, está na fila pronta pra ser executada.
  4. Com a task instance criada, o scheduler vai madar ela pro executor conforme o agendamento do start dela, é nesse momento que o recurso é alocado no Worker, conforme falado anteriormente. Quando ele vai pra esse ponto, ela passa do status de “queued” para “running” mostrando que agora a task está em execução.
  5. Com o termino da execução o executor atualiza o status da task instance no metastore para “done”, isso caso ela tenha sido executada com sucesso e com os demais status caso haja algum problema.
  6. O scheduler fica como responsável em ver se a task rodou com sucesso no Metastore, se não teve nenhum problema e então o Webserver é atualizado com o novo status da Task no Metastore, basta dar um refresh na tela e pronto.

Extras e providers

Quando você instala o Airflow, ele vem com as funcionalidades necessárias pra que você use de forma básica, porém caso precise configurar algo em específico, como por exemplo autenticar os usuários do LDAP no Airflow, ou executar suas tasks em um celery, pra esses casos você precisa instalar extras para extender as funcionalidades padrões do Airflow.

  • Extras: Pacotes que instalam as funcionalidades que você precisa para o funcionamento do Airflow.

Agora vamos falar de funcionalidades do seu DAG, por exemplo como a conexão com um banco Postgres (isso tem a ver com o seu Data Pipeline e não as funcionalidades core do Airflow). Pra esses casos fazemos a instalação de um provider Postgres.

  • Providers: Não tem a ver com o core do AirFlow e sim com funcionalidades da sua DAG e assim como os Extras podem ser instalados conforme a sua necessidade.

As vantagens dos Providers é que eles podem ser atualizados de forma independente do Airflow no geral. Isso te livra da dependência da atualização ou até parada do AirFlow para suas configurações.

Exemplo de extras e providers

Meios de controle dos Data Pipelines no Airflow

Existem 3 meios de manipulação e controle dos seus data pipelines no AirFlow:

  1. UI (User interface): Usando a interface web através do navegador é possível gerenciar e monitorar seus data pipelines no Airflow, por exemplo, se você quiser checar os logs da sua task ou se quiser ver o histórico dos seus diagrams você pode usar o UI. De longe é o método mais usado para esses tipos de tarefas no Airflow.
  2. CLI (Command line Interface): Podemos usar ele também para a alguns casos acima mas ele é extremamente útil caso você queira testar as suas tasks, caso necessite atualizar ou até inicializar o Airflow. Caso você não tenha acesso ao User Interface é uma alternativa de se usar o AirFlow.
  3. Rest API (Requisições HTTP): Util quando você que precisa criar algo no AirFlow ou usar seu próprio front end como dado num exemplo anterior, criando um botão na sua pagina que executa alguma ação ou até iniciar uma DAG no AirFlow. Em resumo o Rest API é extremamente útil quando você quer que outras soluções interajam com o Airflow.

Vamos dar uma visão mais aprofundada de como usar cada um deles:

UI (User Interface)

Fiz um vídeo por que acho mais fácil de entender do que se eu escrevesse cada uma das funções que serão citadas, logo, segue abaixo uma visão inicial sobre a aba DAGs. Logue no endereço do Webserver http://localhost:8080 com o user admin e a senha admin . Feito isso, automaticamente você vai cair na view de DAGs.

Aba DAGs

Logo que logamos caímos por padrão nessa aba, ela mostra as informações principais sobre as DAGs conforme video abaixo:

Overview sobre a aba DAGs

Como foi visto, dentro da Aba DAGs eu tenho algumas possibilidade de ver informações mais detalhadas sobre meus Data Pipelines, vamos dar uma aprofundada em cada um deles.

Grid View

Centraliza as informações principais sobre a DAG, onde você consegue filtrar as execuções por datas, saber o numero de vezes que rodou, se foi com sucesso ou não, bem útil para uma avaliação inicial e menos detalhada das suas DAGs.

Aba DAG — Grid View

Graph View

Essa visualização é boa pra você checar todas as dependências do seu Data Pipeline e ver por etapa como está ou foi o andamento dela, lembra que falamos que a DAG é um conjunto de tasks executadas num sentido, certo?

Aba DAG — Graph View

Nesse tipo de visão podemos segmentar por data e os filtros de cores serve para cada um dos status possíveis, como podem ver aí, o verdinho é pro status de "success" que no meu caso só rodei uma vez e com sucesso. Alem dos status no lado esquerdo posso filtrar pelo tipo de operator que estou usando (no caso só um, o _pythonDecoratedOperator).

Calendar

Nessa aba como o próprio nome já diz podemos ver o status das nossas execuções de acordo com o calendário e junto com ele o status das execuções.

tenho apenas 1 execução e com sucesso em Monday

Essa view pode parecer um pouco estranha mas ela tira a média das execuções, por exemplo, caso você tenha tido problemas nas execuções da terça por exemplo, ele tira uma média naquela escala de cores e coloca no dia, caso não esteja verde escura, vale a pena dar uma olhada no que rolou. Uma outra vantagem é que as vezes posso ter erros por dias da semana, ou fim de mês, devido a uma concorrência ou algo do tipo, esse é um bom método pra pegar esse tipo de anomalia.

Gantt View

O Gantt View (Baseado no gráfico de Gantt) é bem util quando quero saber o tempo que cada uma das tasks da DAG rodou, até pra saber qual parte do data pipeline está consumindo mais recursos, ou precisa ser ajustado, ou as vezes até divido em novas tasks.

Note no exemplo o tempo de execução de cada uma das tasks da minha DAG de exemplo

Details

Precisa de alguma informação mais específica sobre a DAG é aqui que vai achar. Infos como timezone, qual é o arquivo relacionado a DAG, dono e muitas outras informações que você pode não encontrar em outras views.

Aba DAGs — View Datails

Code

Como o nome já diz, se preciso ver o código da DAG é aqui e consigo visualizar, sem a necessidade de ir no arquivo, ter acesso a pasta e etc, outra vantagem é que você sempre vai ver o mais atualizado caso esteja trabalhando com controle de versão no Git.

Aba DAGs — Code View

AuditLogs

Deu erro em alguma das suas tasks? Quer entender o porque? Basta entrar na view AuditLogs e filtrar por qual step quer ver os logs, qual data. Mas nessa primeira etapa mostra logs bem superficiais e sinceramente, achei que a busca poderia ser melhor mas é isso que tem, rs.

Aba DAGs — AuditLogs

Existe uma outra forma de acessar seus logs que por sua vez te dão bem mais informações sobre sua task, por exemplo, usando o Graph View, clicando em uma das tasks da DAG. Se liga.

nesse exemplo mostro detalhes da task e os logs

Não foram abordados todos os temas mesmo porque não tenho informações de logs ou execuções o suficiente pra isso mas já dá pra viver. Se ficou duvidas segue um videozinho abaixo recaptulando todos os pontos discutidos na aba DAGs.

Resumo sobre as visualizaçoes de DAGs na UI do Airflow

Como disse pra não ficar muito pesado, optei por quebrar esse material em 2, no próximo post iremos abordar de temas como:

  • Comando uteis no Airflow CLI
  • Agendando uma DAG
  • Mais informações sobre os operators
  • Definir o Path de suas DAGs
  • Fazermos alguns testes com dados
  • Como fazer a prova de certificação.

Se curtiu esse material, deixa a palminha ae, deixa um like nos vídeos, se inscreva no canal, essa métrica ajuda a saber se esse material está sendo relevante e se devo continuar postando coisas desse tipo.

Te aguardo no próximo!

Anselmo Borges

--

--

Anselmo Borges
Rescue Point

Bigdata Engineer, Cloud Architect, Nerd, Alcoholic, Brazilian Jiujitsu Black belt and hide and seek World champion.