Construindo um Pipeline de Dados com NiFi, Kafka e S3 (Parte 2/2)

A etapa de processar e armazenar os dados é o momento onde devemos garantir que os mesmos estejam disponíveis para análise com consistência, escalabilidade e integridade.

Cícero Moura
Data Hackers
6 min readNov 25, 2020

--

A primeira parte deste artigo pode ser acessada neste link.

Neste artigo vamos tratar do assunto de processamento de dados, que na etapa de ETL são as letras “T” (Transform) e “L” (Load).

Porém vamos trabalhar com o conceito de Data Lake e de ELT, (Extract, Load, Transform) o que torna a nossa vida um pouco mais fácil, pois não precisamos trabalhar neste momento com limpeza, agregações ou sumarização dos dados, essas atividades ficam para a próxima etapa.

Pensando nisso os assuntos que serão tratados neste artigo são os seguintes:

  • Construção do Pipeline de Dados para processamento e armazenamento;
  • Possíveis melhorias;
  • Conclusão do projeto.

O repositório que contém os códigos utilizados neste artigo e na primeira parte podem ser encontrados neste link.

Relembrando o nosso projeto

O nosso objetivo neste projeto é extrair informações de endereços através dos números de CEP’s. Para isto estamos utilizando a API Rest ViaCEP.

A nossa base de CEP’s está cadastrada em uma tabela no Postgres e estamos utilizando o NiFi para consultar essas informações no banco de dados e posteriormente na API. O retorno da API que contém os dados do endereço completo são publicamos em um tópico do Kafka.

Tudo que descrevemos anteriormente foi feito no artigo anterior e trata-se do Grupo de Extração/Aquisição do NiFi.

Neste artigo vamos trabalhar no Grupo de Processamento, onde o objetivo é recuperar os dados no tópico do Kafka, adicionar uma data de criação para cada registro e depois salvar em um bucket no S3.

A imagem a seguir exibe detalhes da nossa arquitetura de projeto que foi descrita:

Imagem com o fluxo de dados para coletar informações através do CEP

Obs: todos os CEP’s tratados aqui tratam-se de endereços relacionados a Institutos Federais de Educação, assim são dados públicos.

Construção do Pipeline de Aquisição dos Dados

A imagem abaixo mostra em detalhe os dois grupos criados no NiFi: Extração e Processamento.

Neste momento vamos entrar em detalhe no Grupo de Processamento, onde teremos o seguinte fluxo (NiFi Flow):

Nota-se que são utilizados quatro processors do NiFi, a seguir compreenderemos o que cada um realiza e como estão configurados.

  1. Recupera Endereço do Kafka

O primeiro passo a ser feito é a recuperação dos dados que foram publicados no Kafka, então vamos utilizar um processor do tipo ConsumeKafka_2_0, assim conseguimos consumir os dados que foram publicados em um tópico do Kafka.

As configurações são básicas, é necessário apenas definir o endereço do nosso broker, o nome do tópico (requer atenção para ser o mesmo que foi publicado os dados) e adicionar um identificador para o consumidor (consumer), para que assim ele não consuma dados duplicados.

Como o nosso broker não requer autenticação, os demais campos ficam em branco.

2. Adiciona Data de Criação ao JSON

Aqui será utilizado o processor UpdateRecord que consegue alterar o conteúdo dos dados que estão no nosso fluxo e assim iremos adicionar a data de criação para cada registro. É uma boa estratégia para controlar futuramente a atualização dos dados.

Neste processor temos uma propriedade personalizada com a chave /dataCriacao e o valor ${now():format(“yyyy-MM-dd HH:mm:ss.SSS”)} que é responsável por capturar a data e hora daquele instante com precisão de milissegundos.

3. Renomeia o Arquivo

Neste momento iremos utilizar o processor UpdateAttribute apenas para renomear o nosso arquivo e adicionar uma extensão que será do tipo JSON. O NiFi gera um nome para o arquivo de forma automática, assim é necessário apenas aproveitar esse nome e colocar uma extensão de arquivo desejada.

Para isso precisamos configurar a propriedade com a chave filename e o valor ${filename}.json, conforme a imagem abaixo:

4. Salva JSON no S3

O último processor do nosso fluxo será do tipo PutS3Object, onde o objetivo é extrair os registros que estão em memória no NiFi, gerar um arquivo e grava-lo em um bucket do S3.

Neste processor é necessário configurar o nome do arquivo (Object Key) que será herdado do processor anterior, o nome do bucket, que neste exemplo chama-se base-enderecos, a autenticação na AWS que pode ser configurada com a chave de acesso (Access Key ID) e a chave de acesso secreta (Secret Access Key) e por último a Região que aqui será a us-east (Ohio).

A imagem abaixo mostra os detalhes da configuração:

Resultado do Pipeline

Depois de finalizado o fluxo construído ao longo dos dois artigos e após executar os dois grupos do NiFi, devemos obter como resultado os arquivos no formato JSON gravados no bucket do S3 conforme a imagem abaixo:

O modelo final de dados que cada arquivo contém pode ser observado logo a seguir:

Possíveis melhorias

Depois de concluir o nosso objetivo com a construção do fluxo no NiFi, podemos ainda observar alguns pontos de melhorias:

  • Adicionar mais de um broker do Kafka para balancear a carga e conseguir ganhar disponibilidade do pipeline;
  • Caso o fluxo de dados seja muito grande e como a resposta da API é bem rápida, pode ser possível adicionar mais de um consumer para esses dados, distribuindo assim a carga e aumentando a velocidade de consumo;
  • Adicionar um processor para fazer o merge dos dados e escrever vários registros em um único arquivo, assim fazemos a geração de um arquivo com tamanho maior e contendo vários registros dentro dele.

Conclusão

O nosso objetivo final neste projeto foi concluído, no decorrer do desenvolvimento conseguimos através de uma fonte de dados, consultar CEP’s e extrair novos dados, que são os endereços, para agregar na análise de negócio.

Além de atingir o objetivo de disponibilizar os dados, com o NiFi, Kafka e S3 conseguimos construir um pipeline de dados com escalabilidade, disponibilidade e confiabilidade, onde é possível resiliência e tratamento de erros caso eles aconteçam.

O projeto é fictício e pode não agregar no seu problema da vida real, mas o principal objetivo dessa série de artigos era construir uma pipeline de dados que sirva de arquitetura para outras que você possa precisar criar.

--

--

Cícero Moura
Data Hackers

Arquiteto de Dados, pós-graduado em Big Data e Machine Learning. Palestrante em Big Data. Também sou AWS Community Builder e AWS Community Leader.