Integração de dados em tempo real do Postgres para o S3 com Debezium

Cícero Moura
Data Hackers
Published in
8 min readMar 2, 2021

Neste artigo iremos conversar sobre como fazer a integração de dados de um banco relacional para o Amazon S3 em tempo real, capturando cada alteração do banco e armazenando os dados em formato otimizado para processamento de Big Data.

O processo de ingestão de dados por si só é um grande desafio. E quando precisamos manter os dados na camada de análise atualizados com uma certa frequência e próximo do tempo real (Near Real Time) o desafio fica ainda maior.

Quando os requisitos de negócios exigem que os dados sejam sincronizados a partir da fonte relacional para o mundo do Big Data perto do tempo real, existe um conceito que nos auxilia nessa atividade, o famoso CDC.

O CDC ou Change Data Capture (captura de alteração dos dados) permite a sincronização de dados através da verificação de alterações como insert, update e delete na fonte de origem, permitindo assim sincronizar os dados com outro qualquer destino em tempo real (ou quase isso).

Uma ferramenta de CDC bem famosa e consequentemente muito poderosa é o Debezium, que será abordada em detalhes logo a seguir.

O repositório com todo o código que será demonstrado nesse artigo pode ser encontrado neste link do GitHub.

O Debezium

O Debezium é uma ferramenta proprietária da Red Hat que possui a versão community com código aberto, onde trabalhamos especificamente com stream de dados aplicando conceitos de CDC.

O Debezium tem por base o já conhecido Kafka e Kafka Connect, tornando assim uma ferramenta confiável, escalável e com alta performance.

Basicamente o Debezium é um Kafka Connect com conectores embutidos específicos para fazer captura de dados em bancos relacionais e não relacionais também.

Os conectores do Debezium irão trabalhar com os logs de alteração do banco de dados, realizando a captura das alterações como inserção, atualização, exclusão e todas as alterações de schema das tabelas.

Com os conceitos do Debezium em mente, vamos agora para a parte de infraestrutura.

Debezium, Postgres e mais

Os problemas que queremos resolver são os seguintes:

  • sincronizar tabelas que registram a venda de produtos de uma fábrica em tempo real;
  • sincronizando cada alteração nos dados para a camada de armazenamento do Data Lake que está no S3
  • salvar os dados em formato parquet e disponibilizá-los para processamento posterior.

Para resolver os problemas iremos seguir os seguintes passos:

  1. Configuração do ambiente;
  2. Configuração do WAL do Postgres;
  3. Configuração das bibliotecas do Kafka Connect;
  4. Configuração dos conectores;
  5. Sincronização dos dados.

A imagem abaixo mostra em detalhes como foi planejada e montada a arquitetura para este exemplo.

Configuração do Ambiente

O nosso ambiente para este artigo será montado baseado no Docker e Docker Compose, assim iremos ter basicamente quatro containers: Zookeeper, Kafka, Kafka Connect e o PostgreSQL (levando em consideração que ele já pode existir em um ambiente real).

As imagens do Docker que iremos utilizar para o ambiente do Kafka são as oficiais do Debezium.

Assim o Docker Compose para a stack do Kafka ficará da seguinte forma:

E o Docker Compose para o Postgres, caso ele não exista, será como o exemplo abaixo.

Configuração do WAL para o PostgreSQL

O WAL (Write Ahead Log) é um sistema de log que basicamente todos os Bancos de Dados Relacionais utilizam, onde todas as alterações são primeiro gravadas no arquivo de log (WAL) para depois serem persistidas em disco na pasta de armazenamento dos dados.

Para que o Debezium funcione com o Postgres é necessário realizar a alteração do nível do WAL para Logical e essa configuração deve ser feita no arquivo de configuração principal (postgresql.conf).

No caso do nosso container, onde montamos um volume para os arquivos de configurações, o caminho do arquivo é o seguinte: /home/docker/postgres/db/postgresql.conf.

A imagem abaixo mostra como ficará a configuração após a sua realização.

Configuração das bibliotecas do Kafka Connect

O Kafka Connect por padrão salva os arquivos no formato Avro em seu destino, porém neste projeto queremos ter os arquivos disponibilizados no formato parquet, assim será necessário configurar algumas bibliotecas adicionais. E também precisamos adicionar algumas para utilizar o S3.

Então para essa etapa teremos dois passos:

  1. O primeiro é realizar a instalação do connector para o S3.

É necessário realizar o download do arquivo que contém as bibliotecas para o S3, descompactar e adicionar na pasta raiz do Kafka Connect (destaque aqui, pois é o container do Kafka Connect com nome connect-debezium), como estamos lidando com container Docker, o mesmo deve ser adicionado descompactado no seguinte caminho: /kafka/connect/

O connector pode ser encontrado neste LINK.

A pasta com os connector do S3 adicionado ficará conforme a imagem abaixo:

Destaque para o o connect que instalamos, o confluentinc-kafka-connect-s3–5.5.3.

Obs: as credenciais da AWS, o aws_access_key_id e aws_secret_access_key devem estar dentro do container do Kafka Connect no caminho padrão de configuração da AWS: ~/.aws/credentials. Ou ainda podem ser exportados como variáveis de ambiente. Nos dois casos as credenciais serão reconhecidas automaticamente.

2. O segundo passo é adicionar as bibliotecas para gravar os dados em parquet no S3.

Para essa etapa precisamos adicionar três novas lib’s no formato JAR, que são as seguintes:

As lib’s devem ser adicionadas na pasta padrão de bibliotecas do Kafka Connect no seguinte caminho: /kafka/libs

Após as configurações realizadas acima, precisamos reiniciar o container, que pode ser feito com o seguinte comando:

docker restart connect-debezium

Essa parte está concluída!

Configuração dos conectores

Depois que todo o ambiente estiver configurado, agora as coisas ficam mais simples, pois precisamos apenas adicionar as configurações no Kafka Connect, e isso pode ser feito através da API Rest do mesmo.

O primeiro connector que iremos configurar é do tipo source, que irá permitir a extração dos dados do Postgres e enviar para um tópico no Kafka. O tópico será criado automaticamente com o nome da tabela e o nome do banco de dados.

A configuração do connector do Postgres ficará da seguinte forma, conforme este exemplo:

O segundo connector é do tipo sink, que permite enviar os dados de um tópico do Kafka para algum destino, que no nosso caso será o S3 e em formato parquet.

A configuração do connector ficará da seguinte forma:

As requisições para criar um connector deve ser feita como POST no path /connector, ao executado em ambiente local fica assim: http://localhost:8083/connectors.

A imagem logo abaixo mostra como ficará a requisição para criar o connector com o Postgres.

E logo a seguir a imagem mostra como ficou a configuração do connector para o S3.

Obs: como estamos trabalhando com duas tabelas, produtos e item_vendas, adicionamos os dois tópicos criados no Kafka Connect juntamente na mesma configuração, como pode ser observado logo acima.

Outros endpoints importantes:

  • GET /connectors -> listar todos os connectors criados;
  • DELETE /connectors/<nome_do_connect> -> deleta um connector pelo nome.
  • PUT /connectors/<nome_do_connect> ->atualizar um connector pelo nome.

Ao consultar o endpoint GET /connectors, temos os dois connectors criados, conforme a imagem abaixo:

Sincronização dos dados

Após realizar todas as configurações anteriores, temos os conectores funcionando e os dados sendo sincronizados.

É importante destacar que o Debezium por padrão faz o carregamento de todos os dados (full load) caso não seja passado um parâmetro para que isso não ocorra e depois disso ele começa a monitorar todas as transações do banco e sincronizar com o destino dos dados configurados no Kafka Connect.

Para o nosso exemplo podemos perceber que automaticamente foram criadas partições para os dados em pastas organizadas no S3 e os dados armazenados no formato parquet e já comprimidos com o snappy.

A imagem abaixo mostra as pastas criadas com o nome dos tópicos.

A outra imagem a seguir mostra um exemplo de dado que foi armazenado dentro do bucket no S3.

Alguns pontos a mais

  • Full load: como mencionado, por padrão o Debezium faz a carga inicial do banco de dados para o destino, porém isso pode ser configurado com a tag snapshot.mode=never adicionada a configuração do conector do tipo source para Postgres.
  • Mini batch: podemos configurar algo parecido como mini batch para carregar os dados no S3, no nosso exemplo adicionamos a tag flush.size=3, onde indica que os dados serão gravados no S3 a cada três entradas de registro, isso pode ser aumentado ou diminuído.
  • Particionamento: Em caso de gravação dos dados no S3 não adicionamos um padrão para o particionamento dos dados no Bucket, assim é criado um particionamento padrão, até porque neste exemplo não temos campo de data, para adicionar um particionamento, as tags podem ser utilizadas: partitioner.class e partitioner.field.name;
  • Schema Registry: uma boa opção de solução com o Debezium é a utilização do Schema Registry para padronizar e configurar cada tipo de dado que entrará no Kafka, isso pode evitar problemas posteriores no processamento dos dados.
  • Slot Replication: o Debezium se utiliza de slot replication para controlar a captura dos dados de um banco de dados, no caso do Postgres podemos visualizar o slot_replications com a seguinte consulta: SELECT * FROM pg_replication_slots;

Conclusão

Assim concluímos esse assunto muito interessante que é a utilização do CDC com o Debezium.

Muitas soluções podem ser construídas com o Debezium, em diferentes arquiteturas e propósitos, mas sempre devemos pensar na sua real aplicação e complexidade de implementação e também se naquele momento é a solução ideal para resolver o problema de negócio, caso contrário, outras ferramentas e arquiteturas podem ser escolhidas.

O repositório com todo o código demonstrado nesse artigo pode ser encontrado neste repositório do GitHub (que ainda contém um super extra, que são os códigos com o Terraform para criação do ambiente na AWS).

--

--

Cícero Moura
Data Hackers

Arquiteto de Dados, pós-graduado em Big Data e Machine Learning. Palestrante em Big Data. Também sou AWS Community Builder e AWS Community Leader.