Simulando o Databricks AutoLoader

Anselmo Borges
Rescue Point
Published in
7 min readDec 2, 2022
Aproveitando meu desemprego pra fazer uns posts.

O intuito

Tudo bem que o AutoLoader já é usado a algum tempo no Databricks, mas a minha idéia aqui é te explicar como funciona o AutoLoader e posteriormente te explicar como otimizar seus pipelines que usam essa tecnologia pra Delta Live Tables.

Ou não também, você que sabe! rs

A parte teórica

Digamos que o ele vem do Spark Structured Programming trazendo otimizações no Databricks, pois uso a fonte dos dados de storages cloud (S3 da Amazon AWS, ADLS e Blob da Microsoft Azure e GCS da Google Cloud), usa uma pasta também da cloud como zona de checkpoint (desenho tosco abaixo explicando) e grava dados usando tabelas (usando as funcionalidades Delta), views e temp_views.

Em um resumo, deu uma otimizada no que existe no cenário open source.

Segue abaixo o desenho tosco que mostra mais ou menos como funciona nesse caso um exemplo.

Um rabisco que não explica nada, mas o post explica! rs

Mostrando na prática

Claro que existem vários cenários possíveis, mas eu vou mostrar um exemplo simples de dados sendo gerados no Twitter que caem numa pasta do Storage Account Azure (ADLS), o Databricks lê o dado dessa pasta no ADLS assim que o dado chega e joga numa tabela delta pra mim, quase em tempo real. Simplão.

Segue um desenho da idéia, se liga:

Solução semi caseira do Anselmo

Composição do ambiente

  1. Foi criado um App no Twitter Developer, lá são gerados os acessos pra logar no e coletar dados usando API. Houve uma evolução no método de conexão que usei da ultima vez, agora ele usa OAUTH pra conectar, então ele nem precisa mais daquelas 4 chaves que usava antes, agora usamos apenas um Hash Token que você gera lá no Twitter mesmo e boas.
  2. 2. Subi um Apache Nifi num Docker na minha maquina mesmo (poderia ser criado na Azure também mas tô economizando, rs). Fique ligeiro que tem um pulo do gato, as ultimas versões do Nifi já vem com HTTPS e uma pré-autenticação configurada, o usuário e a senha você pega no log de start do container no Docker (guarde pra não perder).
  3. Deixei um template pronto do que eu fiz, você pode só importar e editar com seus parâmetros, baixe ele nesse link e importe lá no Nifi.
  4. Tenho um Workspace Databricks criado, um clusterzinho single instance mais barato e um notebookzinho Python básico (vou jogar aqui mais pra frente). Quando você cria o Databricks na Azure, automaticamente um storage account é criado pra ele, então esse Storage Account que você vê no desenho bem no canto a direita é esse.
  5. Storage Account de dados: Esse é o que chamei de "sarescue001", criei um container nele de nome "dados" e criei 2 pastas lá dentro, uma chamada "twitter" e outra chamada "checkpoint" (falaremos sobre elas logo mais).

A conexão do Databricks com esse segundo Storage Account (sarescue001) não é nativa, teremos que configurar. Tenho 2 posts aqui no Medium que explicam como você pode fazer isso (e de 2 formas por que a gente é enjoado)

Você escolhe jovem!

Já aproveita e deixa a palminha e se inscreve ae

Imagino que você tem tudo pronto, vamos começar!

Vou começar mostrando bem por cima o que o Apache Nifi está fazendo.

Apache Nifi

Em resumo ele se conecta ao Twitter via API (estou usando um processor chamado "Consume Twitter", ele trabalha de uma forma estranha de coletar dados, nem me aprofundei muito e procurei trazer tweets aleatórios pra dar volume e com campos bem básicos).

O dado é tratado pois ele vem num arquivo mais ou menos assim.

[{data:
{ "nome" : "Anselmo",
"sobrenome" : "Borges",
"profissao" : "Desempregado"
},
{ "nome" : "Michael",
"sobrenome" : "Scott",
"profissao" : "Melhor chefe do mundo"
}
}
]

Note que eu tenho colchetes no começo, o que zoa meu role inicial, aí que entram meus 2 splitJson (fiz 2 porque fiquei com preguiça), no primeiro tiro o colchetes e depois tenho um grupo data nas chaves, com 2 registros, o segundo quebra essa segunda etapa e o registro de cada pessoa vira um JSON único, não sei se deu pra entender mas é isso. O exemplo de um arquivo final ficaria assim.

{ "nome" : "Anselmo",
"sobrenome" : "Borges",
"profissao" : "Desempregado"
}

Fiz uma configuração no ultimo step "PutAzureDataLakeStorage" que dá um nome pra cada um dos caras (twitter_${nextInt()}.json) e coloco os dados de acesso ao meu storage account.

  • Crio um controller service onde coloco as credenciais de acesso e o storage account que vou usar "sarescue001", vou presumir que você sabe fazer isso. Com ele feito atribuo ao Processor
  • Coloco o nome do container "dados" e coloco o caminho da pasta twitter que no meu caso tá "pasta1/twitter".

Segue abaixo um exemplo de como está:

Configuração do processor "PutAzureDataLakeStorage"

Com meu cara configurado, vou mostrar as pastas vazias, pra ver que não tem bruxaria aqui.

Usando o Azure Storage Explorer na minha maquina mostrando que não tem nada lá

Agora vamos iniciar Processor group do NIFI e vamos monitorar a criação dos arquivos, vou deixar ligado por uns 30 segundinhos e parar pra que a gente tenha a primeira leva de arquivos no Storage Account.

Iniciando e gravando os arquivos no Storage Account Azure

Agora com os arquivos iniciais já no Storage Account, vamos pro assunto do post, o AutoLoader, por enquanto só simulamos um envio de dados "Near Real Time" pro Pro Storage Account.

Anselmo, eu poderia ter mandado esses dados para um tópico Kafka e funcionaria? Sim, funcionaria, mas não com Autoloader, pois ele é baseado com "cloudfiles", daria pra fazer via Spark Streaming convencional, quando chegarmos no código dou uma explicada.

Autoloader no Databricks

Agora vamos consumir esses dados no ADLS usando o Autoloader, pra isso vou explicar mais ou menos o como está funcionando o código que estou usando.

Tentando explicar o que usei no Autoloader

Lembrando que existem muito mais parâmetros do que esses mas para um fim didático vou usar o mais simples possível, mas caso queira se aprofundar nas possibilidades segue o link da documentação do Autoloader Databricks.

As variáveis de entrada para a função acima seriam as seguintes:

Com isso só chamar a função criada com as variáveis:

query = autoload_twitter_sa(data_source = "abfss://dados@sarescue001.dfs.core.windows.net/pasta1/twitter/",
source_format = "json",
table_name = "rescue.bronze_twitter",
checkpoint_directory = "abfss://dados@sarescue001.dfs.core.windows.net/pasta1/checkpoint/")

Agora vou colocar pra rodar essa primeira leva de processamento:

Os primeiros 284 arquivos geraram 284 linhas, mas se liga que qdo inicio o cara tá la verdinho online

O start do autoloader não é um comando pontual e acaba (Até tenho como fazer isso com a option "trigger once=true") mas nesse cenário deixei lá ouvindo se chegam novos arquivos.

Iniciando o processo de geração de arquivos no Nifi

Agora vamos dar start no processor group do Nifi e monitorar os arquivos sendo atualizados na tabela "rescue.bronze_twitter".

Startei o serviço no Nifi e a atualização rolando na tabela do Databricks

Por mais que o Autoloader seja um processo que existe a algum tempo no Databricks, pra quem nunca viu isso é um lance brilhante, onde você perde a necessidade de controlar via código na mão o que você já importou usando o checkpoint tendo a possibilidade de recuperar um job que deu problema.

Outra vantagem, vindo de um cenário que eu já trabalhei, a tabela em sí no Hive não era atualizável antes do formato Delta (insert, update e delete), era só leitura, logo eu usava spark streaming pra gerar o dado no lake (no Hadoop que era meu caso) e apontava a tabela para o diretório tendo que rodar um refresh table toda vez que eu fosse usar a tabela (zuado). Nesse modelo do Delta o dado chegou, já era, você mesmo viu que é só rodar a query e boas.

Imagina em cenários onde tenho que fazer acompanhamento de desvio padrão, detecção de fraudes e outras paradas, esse seria o cenário ideal.

Notebook que usei nesse exemplo

O notebook que fiz tem 4 células apenas, mas caso queira importa-lo em seu ambiente, pode baixar nesse link aqui.

Pontos a se pensar e que podem ser úteis

  1. Pô Anselmo, preciso deixar um cluster rolando 24/7? Pois o Job fica no ar! - Não necessariamente, posso programar jobs periódicos que rodam no modelo de execução de autoloader único, ele roda, pega o que tem de novo e ja era.
  2. Mas e se a demanda de execução em processamento aumentar muito? Nesse cenário eu não tenho a necessidade de muito poder computacional, mas caso você precise pode criar um pool com auto scalling, e o cluster se auto ajusta conforme sua necessidade.
  3. Posso fazer meu ETL em tempo real, por exemplo posso colocar um autoloader atrelado a outro como um CTAS por exemplo (create table as select), a tabela que crio a partir desse comando ela também passa a ser uma streaming table, o que automaticamente pode transformar e tratar minha tabela pra silver conforme os dados chegam.

Dei apenas algumas ideias sobre o que pode ser feito, mas as possibilidades são várias, usei esse conteúdo de Autoloader pra no proximo falar de uma outra feature que pode trazer inovações em cima desse processo chamado Delta Live Tables.

Ficou curioso? Te aguardo no próximo então!

Abraço!
Anselmo Borges

--

--

Anselmo Borges
Rescue Point

Bigdata Engineer, Cloud Architect, Nerd, Alcoholic, Brazilian Jiujitsu Black belt and hide and seek World champion.