Automatizando a ingestão de dados no data lake

Emerson Moizés
luizalabs
Published in
8 min readNov 12, 2020

Pode parecer uma afirmação meio óbvia, mas o dado é o cerne de qualquer plataforma de dados. Seja em um data lake com petabytes de registros ou em uma planilha com algumas centenas de linhas, o dado é o ativo mais importante, e de nada vale investir milhões em ferramentas de análise se o dado disponível não estiver saudável.

Dito isso, acho que é seguro dizer que todas as iniciativas de data lake sonham com o dia em que todos os dados desejados estarão atualizados, tratados e prontos para serem consumidos por cientistas, sistemas ou analistas. É aí que entram os famosos pipelines de ELT (extração, carga e transformação), uma das etapas mais críticas, trabalhosas, subestimadas e chatas de todo o processo (me julguem hehehe), já que em muitos casos os pipelines são criados manualmente, visando atender caso a caso.

Bem, aqui no Labs a cultura de micro serviços é bem sólida, o que é perfeito para garantir escala nos desenvolvimentos e agilidade nos projetos, mas gera um desafio ainda maior na hora de centralizar todos esses dados debaixo de um mesmo guarda chuva, já que a quantidade de bases de dados e eventos gerados cresce a cada dia e torna impraticável a criação de pipelines manuais.

Nesse cenário, o foco da nossa squad de engenharia de dados sempre foi o de criar ferramentas que automatizam ao máximo trabalhos repetitivos e que nos dê liberdade e tempo para pensar em coisas novas. Foi de uma dessas iniciativas que nasceu o projeto “Witcher”.

First things first

No começo da nossa plataforma, quando tudo ainda era mato, cada pipeline era criado manualmente, tabela por tabela. A extração dos dados era feita com jobs Sqoop, o processamento e tratamento em Spark, o resultado final era escrito no lake e no BigQuery, e a execução de todos os jobs era orquestrada pelo Airflow.

Fluxo dos nossos primeiros pipelines criados manualmente, todos com as mesmas características

Por mais manual e trabalhoso que tenha sido criar esses pipelines, essa etapa foi essencial para nos ajudar a entender quais eram nossas necessidades, o que fazia sentido ao nosso contexto, quais tratamentos os dados geralmente recebiam e quais problemas sempre encontrávamos. Com tudo isso bem mapeado, começamos a trabalhar em formas de automatizar as ingestões. Dessas automações nasceu o “Witcher”, uma ferramenta criada em PySpark com o objetivo de facilitar as ingestões de dados no nosso data lake.

Como o Witcher funciona?

Atualmente o Witcher suporta três tipos de origem de dados:

  • Arquivos CSV
  • Payloads de um tópico Kafka
  • Bancos de dados que possam ser acessado via JDBC

Uma vez que o dataset que precisa ser inserido no lake pertença a uma dessas origens, incluímos as informações do dataset em questão no arquivo YAML de configurações do Witcher. Essas informações incluem nome do tópico Kafka, frequência de atualização, nome da pasta onde o dado será salvo, e assim por diante.

Exemplo de parametrização do YAML para ingestão de um dataset

A imagem acima mostra o trecho do YAML em que configuramos a ingestão de um dataset fictício chamado “gender”. Nessa configuração, falamos que os dados devem ser extraídos do tópico Kafka diariamente às 5h da manhã, escritos no data lake e enviados automaticamente para o BigQuery, e caso ocorra qualquer problema um alerta deverá ser enviado para o canal do Slack parametrizado com os detalhes do erro.

Feita a configuração, o Witcher vai se encarregar de fazer toda a bruxaria e ingerir o dado no nosso data lake.

Mas que bruxaria é essa?

O conceito base por trás do Witcher foi entender que todo dado precisa receber as mesmas transformações e validações, como segurança, qualidade e deduplicação. Com isso em mente, nós lidamos com o dado como se ele fosse um produto em uma esteira de fabricação:

Quaisquer datasets que são processados pelo Witcher recebem as mesmas transformações

Para que a esteira funcione, partimos do princípio de que todo dado que entrar no fluxo está sempre no mesmo formato: os arquivos que estão na área raw do lake (área reservada aos arquivos com dados que ainda não foram tratados) devem ser do tipo parquet e devem estar no formato tabular.

Com essa premissa, qualquer dado que entrar nessa esteira sempre receberá os mesmos tratamentos e checagens, como:

  • Hashear o CPF do cliente
  • Excluir dados sensíveis
  • Extrair a versão mais recente de cada chave ou id
  • Analisar se houve queda na quantidade de registros
  • Aplicar cast para transformar o tipo de uma coluna

Ao final dessa esteira, o dado estará pronto para ser escrito na área trusted do nosso lake, área reservada aos dados tratados e prontos para serem consumidos por aplicações e usuários, e enviado para o BigQuery. E, claro, caso alguma etapa apresente quaisquer tipos de problema, uma notificação é enviada para nosso time via Slack com detalhes do erro e o processo é abortado.

Abaixo segue o mesmo diagrama ilustrando algumas das transformações que o dataset “gender” poderia receber:

Cada uma dessas transformações nada mais é que funções genéricas em PySpark que recebem o dataframe como parâmetro, checam se a transformação se aplica ao data frame, e retornam o data frame transformado ou não de volta ao fluxo do pipe.

def example_saving_dataset(yaml_params):
dataframe = read_data_in_raw_zone(yaml_params)
dataframe = ignore_sensitive_columns(dataframe)
dataframe = get_most_recent_record(dataframe)
write_data_in_trusted_zone(dataframe)

Essa abordagem permite que novas funcionalidades sejam acrescentadas ao fluxo sempre que necessário.

Como os dados vão parar nessa esteira?

Até o momento o Witcher possui 3 módulos principais para processar os dados:

  • O módulo “Yennefer”, responsável pela ingestão de dados CSV, como planilhas e dados salvos de jobs
  • O módulo “Ciri”, responsável por se conectar em bases de dados e extrair as informações via querys (a conexão com as bases é feita via drivers JDBC — Java Database Connectivity)
  • O módulo “Geralt”, responsável pela ingestão de payloads do Kafka

Apesar de serem 3 módulos diferentes, o princípio de cada um deles é exatamente o mesmo:

  • Capturar o dado na origem
  • Realizar o parse necessário para transformá-lo em um formato tabular
  • Salvar o resultado em arquivos parquet na zona raw do nosso lake
  • Chamar a esteira de processamento do Witcher, que nada mais é que uma função PySpark como vimos no código de exemplo logo acima:

Criamos módulos específicos por tipos de origem para que cada um seja especialista no que faz e consiga tratar suas exceções de maneira organizada, mantendo o código mais legível e simples.

Geralt

A ingestão de payloads do Kafka feita pelo “Geralt” sempre ocorrerá da mesma forma, já que a leitura de um tópico, o controle do que já foi lido (checkpoint), o parse do payload para o formato tabular e a escrita do resultado na área raw do lake serão sempre iguais. O que muda é o formato do payload: com um módulo específico para ingestões de Kafka, podemos simplesmente criar funções de parse diferentes para cada tópico, e todo o resto continuaria sendo o mesmo.

def example_module_geralt(yaml_params):
dataframe = read_data_in_kafka(yaml_params)
dataframe = parse_payload(dataframe, yaml_params)
write_payload_parsed_in_raw_zone(dataframe)

Hoje, o maior volume de dados recebidos pelo “Geralt” são atualizações de registros de bases extraídas via CDC (Change Data Capture):

Ciri

Da mesma forma, a extração de bases relacionais segue os mesmos passos onde todas as etapas são praticamente as mesmas. A diferença é que ao invés de ler dados do Kafka, o Witcher realiza querys na base de origem, podendo essas querys serem full ou incremental, de acordo com o que for parametrizado no YAML do Witcher.

Nas extrações full o objetivo é sempre retornar uma cópia da tabela a fim de atualizar o dado no lake. Nesses casos a Ciri realiza um “select” na tabela sem a cláusula “where”, retornando sempre todos os dados da tabela:

select * from gender

Já nas extrações incrementais, onde parte do dataset já está escrito no lake e desejamos obter apenas o que aconteceu de diferente na origem, a query inclui ainda a cláusula “where”, filtrando a coluna que foi definida como checkpoint para o dataset no arquivo YAML,e o maior valor dessa coluna no dataset já gravado no lake:

select * from gender where dt_update > '<maior valor no lake>'

Nós utilizamos a Ciri sempre que há alguma limitação na utilização de CDC nas bases.

Yennefer

O módulo “Yennefer” é especializado em ler arquivos do tipo CSV e escrevê-los na área raw do nosso lake em formato parquet.

Apesar de sua arquitetura ser a mais simples dos três, é extremamente útil para ingerir dados que não estão em bases relacionais ou eventos no Kafa, como extrações de APIs externas.

E quem executa esses módulos?

Como vimos no YAML de exemplo no início do artigo, o dataset gênero está em um tópico Kafka, logo, o módulo “Geralt” precisa ser executado e precisa receber como parâmetro tudo o que foi configurado para esse dataset no arquivo YAML. Por fim, essa execução precisa acontecer a cada 5 horas.

Para orquestrar essas execuções, assinalando o módulo correto para cada dataset, passar os parâmetros a cada execução e garantir que tudo seja executado nos intervalos definidos, temos um 4º módulo chamado “Jaskier” (não íamos querer que o grande compositor da balada “Toss a coin to your Witcher” ficasse de fora!).

O Jaskier é o responsável por ler o arquivo YAML, identificar todos os datasets que foram parametrizados, o respectivo módulo de ingestão de cada um, e criar DAGs do Airflow dinamicamente.

Conclusão

Com apenas algumas linhas de configuração e um pouco de criatividade, conseguimos automatizar a ingestão de dados no nosso data lake de ponta a ponta, com segurança e qualidade. Além disso, justamente pela facilidade de configuração do arquivo YAML e por requerer zero conhecimento de tecnologias de Big Data, pudemos disponibilizar o Witcher também para todas as squads do Labs, alcançando assim a escala que necessitamos para focar em outros projetos.

--

--