Construindo um Pipeline de Dados com NiFi, Kafka e S3 (Parte 1/2)

Cícero Moura
Data Hackers
Published in
7 min readNov 13, 2020

Construir e manter um pipeline de dados não é uma tarefa simples ou fácil, requer alguns pontos de atenção e necessita de escalabilidade, velocidade e monitoramento para que a sua execução seja bem sucedida. Neste artigo iremos discutir esses pontos e construir um pipeline de dados como exemplo que atenda esses requisitos.

O objetivo aqui é discutir alguns conceitos e de forma prática implementar um pipeline de dados que contempla a aquisição, processamento e armazenamento dos dados, com a preocupação em escalabilidade, velocidade e monitoramento.

Este artigo será dividido em duas partes e trataremos dos seguintes assuntos em cada uma delas:

Parte 1 (onde estamos):

  • Qual problemas ser resolvido;
  • Apresentação das ferramentas que iremos utilizar;
  • Construção do Pipeline de Dados para aquisição dos dados;

Parte 2:

  • Construção do Pipeline para processamento e armazenamento dos dados;
  • Possíveis melhorias;
  • Conclusão do projeto.

O repositório que contém todo o código utilizado neste artigo pode ser encontrado neste link.

Qual problema iremos resolver?

Umas das tarefas mais importantes no processo de Data Science é a aquisição, o processamento e o armazenamento dos dados (conhecido como ETL ou ELT), pois esta etapa garante uma maior qualidade no momento de analisar os dados e construir os modelos preditivos.

Neste artigo o objetivo é trabalhar com uma situação hipotética, conforme podemos ler abaixo:

Imaginemos uma base de dados onde temos uma grande lista contendo números de CEP’s de diversos locais do Brasil. Essa lista representa clientes, porém queremos ter o endereço completo desses clientes armazenados para a possibilidade de construir melhores análises para o negócio.

O que nós temos para resolver o problema por enquanto:

  • Base de dados com CEP’s: a listagem de todos os CEP’s está armazenada em uma banco de dados PostgreSQL;
  • Consulta dos dados: temos a API ViaCEP que nos possibilita através de um CEP recuperar o endereço completo de forma gratuita.

Obs: todos os CEP’s que serão tratados aqui são endereços relacionados a Institutos Federais de Educação, assim são dados públicos.

Ferramentas escolhidas para o projeto

Para o nosso projeto neste artigo serão utilizadas algumas ferramentas bem conhecidas no mundo dos dados.

Abaixo segue uma pequena descrição de cada uma.

  • NiFi: é uma plataforma de dados que coleta, transforma e entrega um grande volume de dados por streaming. Além de tudo isso ele faz toda a gestão do pipeline e possui diversos processadores, que conectam a diversas fontes e destinos de dados. Também realizam transformações das mais diversas. É um software open source mantido pela Apache.
  • Kafka: se trata de uma plataforma para processamento de dados por streaming. Permite a troca de mensagens entre aplicações e processamento de dados em tempo real. É um sistema distribuído e open source. Caso queira ver mais exemplos com sua utilização, confira este outro artigo neste link.
  • S3: é um object storage em nuvem fornecido pela AWS, que permite armazenar dados dos mais variados formatos. Caso queira conhecer um pouco mais confira mais este outro artigo neste link.

Arquitetura do Projeto

Com as ferramentas devidamente apresentadas, podemos analisar a arquitetura do nosso projeto.

Imagem com o fluxo de dados para coletar informações através do CEP

Como mencionado anteriormente a listagem dos CEP’s estão armazenadas em uma tabela do PostgreSQL, o primeiro passo é acessar os dados desta tabela e para isso será utilizado o NiFi.

Após a consulta dos dados é necessário uma preparação para consultar a API ViaCEP e no momento que obtermos o retorno, os dados serão publicados em um tópico do Kafka. Tudo isso estará no Grupo de Extração/Aquisição do NiFi.

O segundo passo é a construção do Grupo de Processamento, e será abordado na segunda parte deste artigo, porém vamos de spoiler: neste grupo serão consumidos os dados no tópico do Kafka, realizado algumas transformações e depois os dados serão armazenados em um bucket no S3 e disponibilizados para futuras análises.

Agora mão no código!

Preparando o nosso ambiente

O ambiente para este exemplo será baseado no Docker e Docker Compose, então iremos utilizar três arquivos que construirá os nossos containers:

  1. NiFi : para que o NiFi funcione é necessário que façamos a sua conexão com o Zookeeper, então a stack abaixo irá realizar a execução dos dois aplicativos:

2. Kafka: no ambiente do Kafka também é necessário uma instância do Zookeeper e faremos da mesma forma que a anterior:

3. PostgreSQL: como os nossos dados de origem irão estar armazenados no postgres, é necessário subir uma imagem dele para que assim o ambiente possa ser executado por completo:

Importante destacar que todos os containers estão na mesma rede e configurados no modo Bridge, assim eles conseguem realizar a comunicação entre si, possibilitando utilizar o nome do container para comunicação interna.

Construção do Pipeline de Aquisição dos Dados

Como mencionado anteriormente, construiremos dois grupos de fluxos no NiFi, um para Extração e outro para Processamento. A imagem logo abaixo mostra a visão geral deles.

No grupo de Extração dos dados temos o seguinte fluxo.

Neste grupo são utilizados seis processadores (processor), a seguir será detalhado cada um deles pela ordem de execução:

  1. Consulta Base de CEP’s

O primeiro processor utilizado é o ExecuteSQL, nele precisamos criar um serviço que faz o gerenciamento e conexão com o Banco de Dados, que no nosso caso é o Postgres.

A configuração básica neste processor resume em definir qual banco de dados é necessário se conectar e qual a consulta SQL será executada. O nome da nossa base de dados neste exemplo é base_ceps_brasil.

As configurações do serviço de conexão pode ser observado na imagem a seguir:

Obs: é importante não esquecer o mapeamento do driver do Postgres que deve estar dentro do container caso esteja utilizando o Docker, por isso mapeamos uma unidade para o NiFI no Docker Compose.

2. Converte Avro para JSON

Aqui será utilizado o processor ConvertAvroToJSON, pois os dados consultados pelo NiFi no banco de dados são retornados no formato Avro e queremos manipular eles em JSON para facilitar nossa vida. Este processor ficará da seguinte forma:

Ao final desta etapa os nossos dados estarão no seguinte formato:

3. Faz Split no JSON

Neste processor do tipo SplitJson devemos separar o array de JSON construído no passo anterior em registros individuais para que sejam extraídos os dados de cada um, de forma separada.

A expressão regular $.* utilizada no campo da propriedade JsonPath Expression garante que o array de JSON será separado corretamente.

Ao final desta etapa teremos o seguinte modelo de dados:

4. Separa o Campo CEP do JSON

Quando os dados chegarem nesta etapa, serão processados um registro por vez, então faz necessário extrair o valor do campo CEP utilizando o processor do tipo EvaluateJsonPath.

Os JSON neste momento terão apenas um campo e o seu valor será extraído neste processor e passado para o próximo com o valor do tipo texto.

5. Consulta API ViaCEP

Agora é o momento de consultar a API para recuperar os dados do endereço, aqui é utilizado o processor InvokeHTTP.

O método HTTP será do tipo GET e a URL será a seguinte: https://viacep.com.br/ws/${cep}/json. A expressão ${cep} será alterada em tempo de execução para cada valor que será passado do processor anterior para este.

Ao final desta etapa teremos o seguinte modelo de dados com o retorno da API:

Obs: pela limitação de requisições por segundo que a API ViaCEP possui, talvez seja interessante configurar um intervalo de período para as consultas, assim você evita que o seu IP seja bloqueado na plataforma.

6. Publicar Endereço no Kafka

Por fim chegamos à última etapa, publicar os dados que colhemos do endereço em um tópico do Kafka.

Para isso é necessário utilizar o processor do tipo PublishKafka_2_0. As configurações são básicas: tipo de escrita e leitura dos dados que será JSON, URL dos Brokers do Kafka (que neste exemplo será apenas um), nome do tópico e por último vamos adicionar compressão aos dados do tipo snappy.

Primeira parte concluída!

Depois de todo este trabalho executado, concluímos com sucesso a primeira parte do pipeline de dados.

Se este grupo do NiFi que acabamos de construir for executado já teremos os dados dos endereços publicados em um tópico do Kafka.

Algumas vantagens observadas neste modelo:

  1. pode ser executada sem interrupção, caso tenha novos dados no banco de dados, conseguimos resgatá-los, podendo ser adicionado controles para não consultar dados duplicados.
  2. escalável a ponto de ser executado em cluster, basta apenas que o NiFi esteja configurado desta forma.
  3. desacoplada pelo fato de utilizar um midleware que no caso é o Kafka, assim a nossa aquisição não é interrompida para o processamento e análise dos dados.

Próximos passos…

Os próximos passos serão consumir os dados que estão sendo publicados no Kafka, processar, transformar e armazenar para que sejam utilizados na fase de análise de dados.

Para reforçar: você pode encontrar os arquivos do Docker, templates do NiFi e a base de dados neste link.

A parte 2 do artigo está disponível no seguinte link.

--

--

Cícero Moura
Data Hackers

Arquiteto de Dados, pós-graduado em Big Data e Machine Learning. Palestrante em Big Data. Também sou AWS Community Builder e AWS Community Leader.