Análise de Tweets com Nifi, Kafka, Postgres e Metabase no Google Cloud (Parte 2/3)

Savio Teles
Data Hackers
Published in
8 min readJan 29, 2021

Esta é a segunda parte do artigo que tem como objetivo criar um pipeline de ingestão de dados do Twitter utilizando Apache Nifi e Apache Kafka. O pipeline obtém os tweets da API do Twitter e carrega no PostgreSQL para serem visualizados com o Metabase, utilizando o serviço de nuvem do Google.

Na etapa anterior (link aqui), instalamos e configuramos todas as ferramentas em uma máquina no Google Cloud.

Objetivo desta segunda etapa

Nesta segunda etapa iremos construir nosso fluxo de dados com o Apache Nifi e Apache Kafka afim de ler os dados do Twitter e escrevê-los no PostgreSQL, em um formato que depois seja possível construir relatórios no BI.

Criação do tópico no Kafka

Iremos utilizar o projeto Kafdrop para criar o tópico no Kafka. Você consegue acessar o Kafdrop no endereço da máquina em que ele foi instalado, na porta 19000. No Google Cloud, por exemplo, se você configurou um IP estático para a sua máquina, basta acessar o Kafdrop neste IP e na porta 19000. Para criar um novo tópico no Kafka, você deve primeiro clicar no botão New (imagem abaixo à esquerda) e, na próxima página, informar as configurações do tópico. Neste tutorial vamos ler tweets a respeito da COVID-19 e, por isso, criamos um tópico com nome ‘covid’.

Criação da tabela no PostgreSQL

Os dados lidos do Twitter e processados pelo Nifi e Kafka serão armazenados em uma tabela do PostgreSQL. Para criar a tabela do nosso tutorial, basta entrar os comandos abaixo na linha de comando da sua máquina:

$ docker exec -it postgres psql -U metabase
$ create table covid (tweet_date bigint, tweet text, reply_count integer, retweet_count integer, favorite_count integer, user_name text, user_screen_name text, user_location text, user_followers_count integer, user_friends_count integer, user_listed_count integer, user_favourites_count integer, user_statuses_count integer);

Escrita dos tweets com Nifi e Kafka

Chegou a hora de construir o pipeline de dados do Twitter! No primeiro momento, vamos receber os tweets e escrever no tópico do Kafka, aqui denominado ‘covid’. Acesse o Nifi no endereço IP da máquina que ele está rodando e na porta 8080.

Dentro do Nifi, você deve adicionar o Processor ‘GetTwitter’ ao pipeline. Caso você tenha dúvidas de como adicionar um Processor no Nifi, basta consultar a documentação aqui ou aqui. A imagem abaixo mostra a tela de configuração do processador ‘GetTwitter’. Nela alteramos a propriedade ‘Twitter Endpoint’ para ‘Filter Endpoint’ para ser possível passar filtros à API do Twitter. Estes filtros são configurados em “Terms to Filter On” com a lista de palavras chaves a respeito da Covid-19 (covid,covid19,coronavírus,corona,pandemia), além da configuração da propriedade “Languages” para “pt” com o objetivo de retornar apenas tweets na Língua Portuguesa.

O próximo Processor a ser adicionado é o ‘PublishKafka_2_6’ que tem como objetivo escrever os tweets no tópico ‘covid’ no Kafka. A imagem abaixo mostra a configuração deste processador, com a propriedade ‘Kafka Brokers’ sendo preenchida com endereço dos Brokers do Kafka (aqui ‘kafka:29092’). Outras duas configurações que vamos alterar é “Use Transactions” para false e “Topic Name” para covid (tópico que iremos escrever no Kafka).

Agora conecte os dois componentes no Nifi, onde a saída de sucesso do processador GetTwitter irá para o processador PublishKafka_2_6. As saídas de sucesso e falhas do componente PublishKafka_2_6 ficarão nele mesmo.

Pronto! Conseguimos, até este momento, ler os dados do Twitter e publicar no tópico do Kafka.

Consumo dos dados do Kafka e escrita no Postgres

A partir de agora você pode criar quantos pipelines quiser para ler os dados do tópico do Kafka. Com esta arquitetura desacoplada fica fácil adicionar novos serviços e análises em tempo real sobre os dados do Covid. Neste tutorial vamos criar um pipeline que lê os tweets do tópico e escreve no PostgreSQL.

Consumo de Mensagens do Kafka

Primeiro você deve adicionar o Processor ‘ConsumeKafkaRecord_2_6’ ao pipeline de dados para ler os dados do tópico do Kafka. Neste processador alteramos as seguintes propriedades (conforme imagem abaixo): 1) ‘Kafka Brokers’, 2) ‘Topic Name(s)’, 3) ‘Record Reader’, 4) ‘Record Writer’, 5) ‘Group ID’ e 6) ‘Offset Reset’. As propriedades ‘Record Reader’ e ‘Record Writer’ indicam, na imagem abaixo, que este consumidor espera mensagens do tópico ‘covid’ no formato JSON, já que a API do Twitter manda os dados no formato JSON. A propriedade ‘Offset Reset’ foi alterada para ‘earliest’ para obter todos os tweets já publicados no tópico do Kafka (o padrão é ‘latest’ que pega somente as últimas mensagens publicados no tópico).

Split do array de JSON

Os dados de tweets são lidos do Kafka em lote, ou seja, como um array de objetos JSON. Por isso, adicionamos o Processor ‘SplitJson’ (conforme imagem abaixo) ao pipeline para “quebrar” o array em vários objetos JSON. Você deve alterar a propriedade “JsonPath Expression” para o valor ‘$.*’, conforme figura abaixo. Para finalizar, você precisa conectar o Processor anterior (‘ConsumeKafkaRecord_2_6’) ao Processor ‘SplitJson’.

Transformação do JSON

Cada tweet contém muitas informações, mas serão selecionados alguns atributos e adequar as colunas para que possam ser inseridas no banco de dados. Para este fim, será adicionado o Processor ‘JoltTransformJSON’ ao pipeline.

Conforme imagem acima, o Processor é configurado clicando no botão ‘Advanced’ na aba ‘Properties’. No campo “Jolt Specification” inserimos o conteúdo do JSON abaixo. Iremos pegar informações sobre o tweet (como data de publicação em milissegundos e o texto), além de várias informações sobre o usuário que publicou o tweet. Clique no botão ‘Save’ para salvar esta especificação e conecte o Processor ‘SplitJson’ a este Processor ‘JoltTransformJSON’.

Transformação do JSON para SQL

A informação do tweet está organizada como gostaríamos, mas ainda falta converter o JSON para um formato que seja possível inserir no PostgreSQL. Por isso, será adicionado o Processor ‘ConvertJSONToSQL’ ao pipeline para converter o JSON para SQL. Neste Processor, iremos alterar basicamente as três propriedades da imagem abaixo. A propriedade ‘Statement Type’ foi alterada para ‘INSERT’ para indicar que é uma operação de inserção e a propriedade ‘Table Name’ foi alterada para ‘covid’ para indicar em qual tabela do PostgreSQL esta inserção irá ocorrer.

A propriedade ‘JDBC Connection Pool’ deve ser alterada para configurar a conexão com o banco de dados PostgreSQL. Você deve clicar no valor da propriedade para abrir uma tela para configurar um ‘DBCPConnectionPool’, semelhante a imagem abaixo, mas sem a opção ‘Postgres’, já que você ainda não configurou nenhuma conexão. Quando abrir a tela abaixo, clique em ‘Create new service…’.

Irá abrir uma tela semelhante a imagem abaixo. Altere o valor do campo ‘Controller Service Name’ para ‘Postgres’ para depois facilitar a identificação deste serviço. Depois clique em ‘Create’.

Você será direcionado de volta a tela de configuração do Processor. Agora clique no ícone da seta ao lado do valor da propriedade ‘JDBC Connection Pool’.

Você será direcionado a uma tela semelhante à imagem abaixo. Agora você deve clicar no ícone ‘Engine’ destacado na imagem abaixo para configurar a conexão com o PostgreSQL.

Será aberta uma tela semelhante ao da imagem abaixo. Nela tem um exemplo de como configurar a conexão com o PostgreSQL no host ‘postgres’ e porta 5432. Você também deve informar o usuário ‘metabase’ em ‘Database User’ e a senha ‘postgres’ em ‘Password’.

Na propriedade ‘Database Connection URL’ coloque o valor ‘jdbc:postgresql://postgres:5432/metabase’ para conseguir informar a URL de acesso ao PostgreSQL. A propriedade ‘Database Driver Class Name’ deve ser alterada para ‘org.postgresql.Driver’ e a propriedade ‘Database Driver Location’ para ‘/home/nifi/postgresql-42.2.18.jar’.

Na imagem acima você percebe que o driver do PostgreSQL está localizado na pasta ‘/home/nifi’. Ele não vem, por padrão, dentro do container do Nifi. Para baixar o driver do Postgres, dentro da pasta ‘/home/nifi’, digite os comandos abaixo na máquina que o Nifi está rodando:

$ docker exec -it nifi bash
$ cd /home/nifi
$ wget https://jdbc.postgresql.org/download/postgresql-42.2.18.jar

Conecte então o Processor anterior (‘JoltTransformJSON’) a este processor ‘ConvertJSONToSQL’ para que os dados em JSON sejam convertidos para o formato SQL.

Inserir os dados no Postgres

A última etapa é inserir os dados, agora no formato SQL, dentro do PostgreSQL. Por isso, você deve adicionar o Processor ‘PutSQL’ ao pipeline, utilizando o ‘JDBC Connection Pool’ criada no Processor anterior ‘ConvertJSONToSQL’. Depois, conecte o Processor anterior (‘ConvertJSONToSQL’) a este processor para que os dados em SQL seja inseridos no banco de dados.

Habilitar todos os serviços

A última etapa é habilitar todos os serviços criados. Você deve primeiro clicar no ícone destacado em vermelho na imagem abaixo, dentro do painel ‘Operate’ do Nifi, que fica do lado esquerdo.

Irá abrir a tela apresentada na imagem abaixo, onde você deve clicar em cada ícone semelhante a um ‘raio’ do lado direito para habilitar os serviços ‘JsonRecordSetWriter’, ‘JsonTreeReader’ e ‘Postgres’. O serviço ‘JsonTreeReader’ é utilizado para consumir os tweets do Kafka no formato JSON. O serviço ‘JsonRecordSetWriter’ é utilizado para enviar a mensagem, no formato JSON, para a próxima etapa do Pipeline. Já o serviço ‘Postgres’ foi o nome que foi dado ao serviço que faz a conexão com o PostgreSQL. No final, o status de todos os serviços deve estar com valor ‘Enabled’.

Pipeline Completo

A imagem abaixo apresenta o pipeline completo desde a coleta dos tweets até a escrita final no PostgreSQL. Agora você pode mandar executar o pipeline!

Próximos Passos

Agora que o pipeline de dados está escrevendo os tweets dentro do PostgreSQL, iremos realizar algumas análises destes dados no Metabase, integrado ao PostgreSQL. Isto será feito na terceira e última etapa.

O pipeline completo do Nifi está disponível no meu github no seguinte link. Você pode importar este pipeline no Apache Nifi.

Você pode acessar a próxima etapa no seguinte link.

Referências

--

--

Savio Teles
Data Hackers

Doutor em Ciência da Computação. Pesquisador e desenvolvedor na área de Big Data & Machine Learning há mais de 12 anos.