[QuintoCast] Dependências Cross-DAGs no Airflow e nosso Mediador de DAGs

Lucas Fonseca
Blog Técnico QuintoAndar
11 min readJul 1, 2021

A orquestração de fluxos é fundamental para as soluções de engenharia de dados, pois o processamento dos dados costuma ser periódico e encapsulado em fluxos de trabalho, o cenário ideal para automatização. Dese modo, um orquestrador de fluxos é a ferramenta chave para viabilizar a escalabilidade do processamento dos dados.

No entanto, conforme o fluxo de dados se torna mais complexo e elaborado, torna-se um desafio cada vez maior garantir que as dependências entre as várias etapas de processamento de dados funcionem de forma eficaz e eficiente. No QuintoAndar não foi diferente e, visando manter nossos fluxos de trabalho escaláveis e com fácil manutenção, utilizamos uma solução bastante elegante para gerenciar as dependências cruzadas entre DAGs do Airflow.

Script

[Abertura]

Host: Olá, ouvintes! Sejam muito bem-vindos à mais uma edição do podcast de Tecnologia do QuintoAndar. Eu sou o Danilo Figueiredo, engenheiro de dados, e serei o host no episódio de hoje.
Comigo está o Lucas Fonseca, também engenheiro de dados. Fala aí, Lucas, obrigado por estar conosco.

Interviewee: Fala, Danilo! Valeu pelo convite, é um prazer estar aqui compartilhando com a comunidade um pouquinho do que a gente faz aqui no QuintoAndar. E bora falar do Mediator, a nossa solução para dependências cruzadas entre DAGs.

Host: Bora! Lembrando a quem está nos ouvindo, as tecnologias que vamos abordar aqui, os textos e vídeos que vamos citar, estarão todos com os links na descrição para quem quiser se aprofundar nos assuntos.

[Contextualização do problema e ecossistema de data no quintoandar]

Host: Lucas, você falou o nome Mediator, e quem é de engenharia de software provavelmente já deve conhecer esse nome, mas aqui estamos falando desse princípio aplicado a um serviço.

Na verdade, é uma ferramenta construída pela equipe de engenharia de dados do QuintoAndar que gerencia a execução das DAGs do Airflow. Inclusive, o Mediator foi apresentado no evento da Apache Foundation, o Airflow Summit de 2020… Podemos começar por aí? O que é o Airflow?

Interviewee: Então, o Airflow, em resumo, é um orquestrador de tarefas que nos permite criar, agendar e monitorar nossos fluxos de trabalho, ou os nossos workflows como são chamados. Funciona mais ou menos como um cron, o Luigi, Control-M… e atualmente ele é mantido pela Apache Foundation, como você mesmo disse. Ele é a plataforma que nós usamos aqui no QuintoAndar, como vocês já devem ter imaginado.

E a gente vai falar muito aqui também sobre DAGs: DAG é um acrônimo em inglês que significa grafo direcionado acíclico. No Airflow uma DAG é um grafo, que tem como seus vértices as tarefas que são executadas e como arestas a dependência entre essas tarefas. Ou seja, quando a gente falar aqui de DAG, estaremos nos referindo a um grupo de tarefas.

Dependendo da maneira que você use o Airflow, é possível que você possua várias DAGs, e tenha também dependências entre elas. O que nós chamamos de dependência cruzada entre DAGs. Como hoje o Airflow não dispõe de um recurso nativo para lidar com essa dependência cruzada, nós desenvolvemos nossa própria solução chamada Mediator.

Host: Ou seja, o Mediator que vamos tratar aqui é um gerenciador de dependências entre DAGs, certo? Mas antes de entrar nisso, Lucas, pode contar pra gente qual era o contexto do nosso pipeline de dados quando o Mediator foi criado?

Interviewee: Então Danilo, nós usamos o Airflow somente para orquestração do nosso pipeline de dados (inclusive a gente tem um artigo bem legal no tech blog do Quintoandar falando em mais detalhes sobre essa arquitetura, o qual a gente vai deixar o link aqui na descrição do episódio também).

E o nosso pipeline de dados é dividido em 4 macro camadas: extração, limpeza, enriquecimento e modelagem. Pensando na camada de extração, nós possuímos diversas fontes e que são de vários tipos e cada extração é implementada em uma DAGs dedicada, com um agendamento específico para aquela DAG, de acordo com a fonte. Essa é a primeira camada, além dela, nós temos as outras que eu mencionei (a de limpeza, enriquecimento e modelagem).

E nós separamos, cada uma dessas camadas, em DAGs dedicadas. Dessa maneira, nós mantemos a especificidade das configurações de agendamento e recursos e também a especificidade do contexto ou domínio de negócio.

Host: E você pode comentar por que a gente divide as DAGs dessa maneira?

Interviewee: Nós optamos por separar o processamento em múltiplas DAGs porque assim nós temos um código mais modular do ponto de vista de implementação, manutenção, deploy. E ter um código modular assim, trás para gente um ganho na questão de não ter um ponto único de falha. Assim nós conseguimos reprocessar partes específicas do pipeline (por exemplo: somente a extração ou somente a limpeza).

Além desse ponto, pensando especificamente na extração, nós também temos o fato de que cada fonte tem a sua especificidade: por exemplo, uma precisa de extração 4 vezes durante o dia e outras 1 vez apenas, outras precisam de extração a partir de uma determinada hora da manhã, e por aí vai. E os scripts de extração são especializados, nós precisamos por exemplo, de um script para extração de um banco MySQL, outro para Mongo, outro para extração de uma API, etc.

Host: Então pensando no fluxo macro, a gente tem uma DAG que extrai, outra que enriquece os dados, outra que faz a modelagem… E o enriquecimento dos dados só pode rodar depois da extração na origem, certo?

Mas dado que o Airflow não tem um recurso nativo para gerenciar essas dependências entre as DAGs, quais são as atuais soluções para manter essa sincronia?

Interviewee: bom, o time avaliou algumas opções para resolver essa questão. Uma opção trivial seria que os engenheiros calculassem manualmente o tempo médio que uma DAG leva para executar, e então programassem o início das DAGs dependentes também manualmente. Mas isso a gente já vê que não é viável nem escalável.

Dentre as opções viáveis que nós consideramos, nós pensamos em ter apenas uma DAG, com todos os subgrafos dentro dela. O que alguns chamam de Über-DAG. Algumas empresas seguem essa abordagem inclusive. Mas um dos pontos negativos dessa opção seria que a visualização do Airflow (ao menos com com os seus recursos nativos) ficaria comprometida, já que a interface das DAGs não lida bem com muitas tasks. Outro ponto dessa opção, é que o fluxo de dados inteiro estaria sempre atrelado a mesma execução da DAG. E aí a gente teria alguma questões para serem resolvidas como por exemplo: em quais horários a DAG executaria? Nós teríamos tasks executadas sem necessidade ou elas precisariam ser puladas diversas vezes; Teríamos um monitoramento mais complexo; entre outras coisas.

Host: É, realmente isso também não me parece um cenário ideal. Porque aí temos um forte acoplamento entre os fluxos de dados dentro dessa DAG. Tanto do ponto de vista de código quanto de processo.

Mas e o operador nativo do Airflow que é um sensor que fica aguardando por outra DAG? O ExternalTaskSensor, não era uma boa opção?

Interviewee: Era uma opção sim, nós pensamos também em usar esses sensores nas DAGs dependentes para saber se elas poderiam executar ou não quando elas fossem disparadas. Nesse caso, teríamos cenários em que uma DAG iniciaria, e ficaria checando através do Sensor se ela pode continuar ou não. Essa parecia a melhor opção para a gente. Mas, por exemplo, se a dependência fosse re-executada (e eu falo aqui de apenas algumas tasks), a DAG dependente jamais saberia. O fluxo ainda ficaria manual nesse caso. Então acaba sendo ainda uma abordagem mais reativa.

Além disso, essa opção acaba caindo também no que eu levantei sobre as questões de responsabilidade da DAG que não vai ser única, porque a DAG teria a responsabilidade adicional de se comunicar com outras DAGs.

Então a gente explorou mais essas ideias para tentar resolver o problema e foi aí que a gente parou para pensar em alguma solução mais inteligente. E chegamos no Mediator.

[Mediator: conceito]

Host: Então, em resumo, o time ponderou as opções de tratar as dependências com base nos horários de execução, com uma DAG gigante que engloba tudo e com operadores sensores. Mas acabamos indo por outro caminho. Então, por favor, explique um pouco mais pra gente a ideia dessa solução e como ela funciona.

Interviewee: Bora lá, então, Mediator é um design pattern comportamental de engenharia de software (como você comentou no início da nossa conversa), que visa restringir qualquer comunicação direta entre os objetos em questão. E a orquestração dessa comunicação fica na responsabilidade de um objeto mediador. Trazendo então para um exemplo do cotidiano.

É como um semáforo que coordena um cruzamento. Os motoristas, eles não conversam entre si, eles apenas esperam a mudança de estados do semáforo para saberem o que eles devem fazer. O semáforo é o objeto Mediador nesse exemplo.

Trazendo para o nosso caso, o nosso Mediator é o responsável por analisar as execuções das DAGs e então iniciar a execução daquelas que têm as dependências satisfeitas.

Host: Então pegando um exemplo, vamos pensar que estamos enriquecendo dados do contexto de vendas de imóveis com uma DAG, e existe a dependência de duas outras DAGs, uma de extração de dados de imóveis vendidos e outra de anúncios de imóveis.

Então, o Mediator vai ficar de olho para saber se essas duas DAGs já finalizaram com sucesso. E quando essas duas dependências finalizarem com sucesso, o Mediator inicia a execução da DAG de enriquecimento de dados. Certo?

Interviewee: É isso aí!

[Dependências e dependentes]

Host: Mas aí, pensando que uma DAG vai ter várias tasks dentro dela, todas precisam terminar para que a dependência execute?

Interviewee: Não necessariamente, porque uma DAG pode ter como dependências tanto DAGs como também tasks. Então se uma DAG B depende apenas de uma task da DAG A, e não da DAG A inteira, isso vai permitir que a DAG B seja iniciada mais rapidamente, porque o Mediator não precisa esperar que todas as tasks da DAG A sejam finalizadas, para então iniciar a próxima. Isso permite que o pipeline seja executado mais rapidamente e sem dependências desnecessárias.

Host: Mas onde que é definido isso, como que eu sei, ou o Mediator no caso sabe quem são as dependências?

Interviewee: Bem, na declaração da DAG do Mediator, nós passamos para o operador um dicionário com as dependências de cada DAG. Dessa maneira ele consegue saber quais são as dependências.

[Detalhes de implementação]

Host: Então conta pra gente como foi a implementação dessa solução, por favor?

Interviewee: Então, nós decidimos criar essa primeira versão desse serviço implementando através de uma DAG mesmo, usando os recursos do próprio Airflow.

Host: Então é uma DAG que controla outras DAGs?

Interviewee: isso. E vou explicar melhor como ela funciona. Essa DAG tem uma task responsável por checar as dependências e outras que disparam as DAGs específicas de modo individual. Essas tasks de disparo têm relação um-para-um com as DAGs gerenciadas pelo Mediator. Ou seja, uma única task checa todas as dependências, e, em sequência, essas outras várias tasks com relação um-para-um que disparam as suas respectivas DAGs.

Essa DAG, do Mediator tem um agendamento e roda dentro de um intervalo de tempo como qualquer outra.

E, apenas fazendo uma observação, as DAGs que são disparadas pelo Mediator não têm um agendamento fixo, então elas necessariamente dependem do Mediator para serem executadas.

Host: E por que precisa de um intermediário? Por que não uma DAG simplesmente triggar uma outra direto? Teria o problema que você mencionou de a DAG não ter a responsabilidade única, mas funcionaria? Isso foi considerado também?

Interviewee: Até complementando as opções que a gente falou anteriormente, nós pensamos também em adicionar triggers direto nas DAGs dependentes. O problema dessa solução é que se por exemplo uma DAG A depende de outras 3 DAGs, qual delas vai ativar o trigger? Ou seja, a gente precisa garantir que ao ser ativada, essa DAG A só execute de fato quando todas as suas outras dependências também estejam concluídas.

Então isso vai gerar algumas execuções desnecessárias da DAG A. E como na opção usando Sensors, a DAG A teria a responsabilidade adicional de se comunicar com outras DAGs. Isso se torna bem problemático em um cenário complexo com centenas de DAGs.

Host: E me conta mais, por favor, sobre a validação que é feita pra saber se uma DAG está apta pra rodar ou não.

Interviewee: Então, aquela validação, que é executada por uma task, vai analisar a dependência de cada DAG e checar se cada uma delas está satisfeita. Essa checagem envolve uma comparação entre o estado da última execução da DAG triggada com a última execução da DAG dependente.

Host: E se a gente precisar fazer um reprocessamento da DAG? Tem que configurar alguma coisa no Mediator, dá trigger numa DAG nova… E o que acontece com as dependências dessa DAG?

Interviewee: Então, vamos lá. É importante primeiro fazermos uma distinção aqui: uma coisa é o reprocessamento da última execução da DAG. Que em uma DAG com execução diária, seria o equivalente a execução do dia corrente dado que ela tem uma execução por dia. Outra coisa é backfilling (ou reprocessamento de execuções antecedentes à última).

No que diz respeito a backfilling, o Mediator ainda não trabalha com esse caso. Então se você re-executar algum processamento antecedente ao último, ou um processamento de 5 dias atrás, o Mediator não vai saber que isso aconteceu e consequentemente não vai executar nada.

Agora se você re-executar o último processamento da DAG, nesse caso, assim que o Mediator identificar essa execução, ele vai triggar as DAGs dependentes (caso a execução tenha ocorrido com sucesso né).

[Planos futuros e comunidade]

Host: Já precisei fazer esse reprocessamento do último dia, e funciona muito bem. É só clicar no botão “Clear”, pra re-executar a DAG que, se tiver dependências, o Mediator já vai executar tudo automaticamente.

E me conta agora dos planos futuros. O que está no pipeline de evolução do Mediator?

Interviewee: Bom nós temos alguns pontos na estrutura da análise de dependências que nós precisamos atacar para que o plugin fique mais flexível e atenda mais casos. Como o suporte à backfilling, por exemplo. Além disso, uma coisa interessante seria adicionar no plugin do mediator uma visualização mais amigável do nosso fluxo de dados. Porque hoje nós armazenamos as dependências em um arquivo através de um dicionário. E nós construímos uma representação estática desse grafo, através de uma DAG mesmo, somente para visualização. Porém, em termos de visualização, o Airflow não lida bem com grafos com muitas arestas (ou DAGs com muitas tasks). Então seria muito interessante conseguir visualizar todo o grafo de dependências de uma maneira mais amigável.

Host: E imagino que quem está nos ouvindo pode ter ficado interessado nessa solução. Há planos de tornar isso open source, código aberto?

Interviewee: Então, ele ainda não está open source. Mas, sim, nosso plano é abri-lo para comunidade em breve. E nós desenvolvemos ele em um plugin à parte do nosso codebase (como um plugin do Airflow mesmo) então será bem simples de alguém usá-lo.

[Fechamento e agradecimentos]

Host: Lucas, agradeço muito a sua participação. Muito obrigado pelo papo!

Interviewee: Obrigado a você Danilo por me convidar e ao QuintoAndar pela oportunidade de compartilhar com a comunidade esse dentre tantos trabalhos incríveis que o time está desenvolvendo por aqui.

E agradeço é claro também a você ouvinte e espero que você possa ter aproveitado e tirado alguns insights legais dessa nossa conversa.

Host: Também agradeço a todos que nos ouviram, e confiram a apresentação que o QuintoAndar fez do Mediator no evento da Apache Foundation, o Airflow Summit de 2020.

O link para a apresentação no YouTube estará na descrição do episódio, assim como o link do nosso blog, onde vocês podem conferir esse e outros assuntos de tecnologia que o time do QuintoAndar vai postando por lá.

Obrigado, até a próxima!

--

--

Lucas Fonseca
Blog Técnico QuintoAndar

Data Engineer Manager@Inventa. Data and Gin lover, always curious, amateur musician, and Gym rat.