Streaming de dados (do PostgreSQL) utilizando Kafka|Debezium (v2)

Hey! Aqui estou novamente para falar sobre Kafka & Debezium, desta vez numa versão mais curta, mais direta ao ponto, somente instalação e configuração de Kafka (incluindo Zookeeper, Schema Registry, Rest API e Kafka Connect), e também a configuração de um PostgreSQL para utilização de Debezium com decoderbufs, e com isso, consumir os dados de um banco de dados PostgreSQL em tempo real num dashboard, data lake, databricks, enfim…, diversas possibilidades de consumo.

No story anterior, que pode ser visualizado aqui, foi utilizado o plugin wal2json para a conexão do Debezium, por dois motivos, no iFood lá em 2018, instancias PostgreSQL estavam em RDS(serviço gerenciado de banco de dados da AWS), e estava rolando uma PoC em RDS, que não permite um acesso digamos mais "baixo nível", para ser configurado o decoderbufs e todas as dependências deste plugin. Desta vez, é diferente \o , streaming de dados (a partir do PostgreSQL) já virou realidade, e está rolando em produção, um dos motivos é que os bancos de dados PostgreSQL no iFood estão em EC2 (aquele abraço Glauco).

Utilizei para a demonstração e os prints, o Google Cloud e suas VMs.

Para facilitar a utilização, as maquinas foram criadas com IP externo e configuração de acesso com chave ssh (compute engine > metadados > chaves ssh).


PostgreSQL 10

Para a configuração a seguir, esta sendo considerado uma nova instancia, mas caso já tenha uma em uso que prefira utilizar, desconsidere as instalações relacionadas a PostgreSQL, mas não deixe de instalar os pacotes referente ao PostGIS, devel e contrib.

yum update -y &&
yum clean all &&
yum install https://download.postgresql.org/pub/repos/yum/10/redhat/rhel-7-x86_64/pgdg-centos10-10-2.noarch.rpm -y &&
yum install postgresql10-server postgresql10-devel postgresql10-contrib postgis25_10 postgis25_10-devel -y &&
yum install wget autoconf automake curl file gcc gcc-c++ git libtool make pkgconfig libxml2-devel gdal-devel geos-devel unzip protobuf-c-devel.x86_64 -y &&
/usr/pgsql-10/bin/postgresql-10-setup initdb &&
systemctl enable postgresql-10 &&
systemctl start postgresql-10 &&
export PG_CONFIG=/usr/pgsql-10/bin/pg_config &&
cd / && wget http://download.osgeo.org/proj/proj-5.2.0.tar.gz &&
tar -xzvf proj-5.2.0.tar.gz &&
rm -rf /proj-5.2.0.tar.gz &&
cd /proj-5.2.0/ && ./configure && make && make install && cd / &&
wget https://github.com/protocolbuffers/protobuf/releases/download/v2.6.1/protobuf-2.6.1.tar.gz &&
tar -xzvf protobuf-2.6.1.tar.gz &&
rm -rf /protobuf-2.6.1.tar.gz &&
cd protobuf-2.6.1/ &&
./autogen.sh && ./configure && make && make install &&
export PG_CONFIG=/usr/pgsql-10/bin/pg_config &&
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig &&
cd / && wget https://codeload.github.com/protobuf-c/protobuf-c/zip/v1.2.1 &&
unzip v1.2.1 && cd protobuf-c-1.2.1/ &&
./autogen.sh && ./configure && make && make install &&
cd / && git clone https://github.com/debezium/postgres-decoderbufs &&
cd /postgres-decoderbufs/ && make && make install &&
echo “shared_preload_libraries = ‘decoderbufs’” > /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “wal_level = logical” >> /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “max_wal_senders = 8” >> /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “wal_keep_segments = 4” >> /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “wal_sender_timeout = 60s” >> /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “max_replication_slots = 4” >> /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “listen_addresses = ‘*’” >> /var/lib/pgsql/10/data/postgresql.auto.conf &&
echo “host all all 10.0.0.0/8 trust” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo “local all all peer” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo “host all all 127.0.0.1/32 ident” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo “host all all ::1/128 ident” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo “local replication all peer” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo “host replication all 127.0.0.1/32 ident” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo “host replication all ::1/128 ident” >> /var/lib/pgsql/10/data/pg_hba.conf &&
echo ‘/usr/local/lib/’ >> /etc/ld.so.conf.d/local.conf &&
ldconfig &&
echo ‘LANG=en_US.utf-8’ >> /etc/environment &&
echo ‘LC_ALL=en_US.utf-8’ >> /etc/environment &&
systemctl restart postgresql-10.service

Após o script executado, o PostgreSQL estará com o serviço iniciado e aceitando conexões. Mais a frente, será necessário um nome de "tópico" do Kafka para configuração de Debezium e GCS Connector, para isso, foi criado a tabela abaixo para ter um tópico de nome postgres_server.postgres_schema.postgres_table.

Para esta configuração, foi utilizada a seguinte tabela:

create schema postgres_schema;
create table postgres_schema.postgres_table (
id int,
texto varchar,
data timestamptz
);
alter table postgres_schema.postgres_table add primary key (id);

Zookeeper

Esse é o serviço que monitora os nós do cluster do Kafka (broker) e coordena a distribuição dos processos entre nós.

Config minima do nó: 1 vCPUj, 1 GB de memória, 10GB disco com CentOS 7.

Após a criação da vm, é hora de configurar o repositório do Confluent, instalar o confluent-community-2.12, java e nc. Para facilitar na sua configuração, basta copiar o "script" abaixo e colar no terminal da sua vm com o root:

rpm — import https://packages.confluent.io/rpm/5.2/archive.key &&
echo ‘[Confluent.dist]’ > /etc/yum.repos.d/confluent.repo &&
echo ‘name=Confluent repository (dist)’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘baseurl=https://packages.confluent.io/rpm/5.2/7' >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgcheck=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgkey=https://packages.confluent.io/rpm/5.2/archive.key' >> /etc/yum.repos.d/confluent.repo &&
echo ‘enabled=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘[Confluent]’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘name=Confluent repository’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘baseurl=https://packages.confluent.io/rpm/5.2' >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgcheck=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgkey=https://packages.confluent.io/rpm/5.2/archive.key' >> /etc/yum.repos.d/confluent.repo &&
echo ‘enabled=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘vm.swappiness=0’ | sudo tee — append /etc/sysctl.conf &&
yum clean all &&
yum update -y &&
yum install -y confluent-community-2.12 nc java-1.8.0-openjdk jq &&
echo ‘LANG=en_US.utf-8’ >> /etc/environment &&
echo ‘LC_ALL=en_US.utf-8’ >> /etc/environment &&
hostname | sed ‘s@^[⁰-9]*\([0–9]\+\).*@\1@’ > /data/zookeeper/myid &&
chown cp-kafka:confluent /var/log/confluent &&
chmod u+wx,g+wx,o= /var/log/confluent &&
mkdir -p -m 777 /data/zookeeper &&
cp /etc/kafka/zookeeper.properties /etc/kafka/zookeeper.properties.bkp &&
echo ‘dataDir=/data/zookeeper’ > /etc/kafka/zookeeper.properties &&
echo ‘clientPort=2181’ >> /etc/kafka/zookeeper.properties &&
echo ‘maxClientCnxns=15’ >> /etc/kafka/zookeeper.properties &&
echo ‘tickTime=2000’ >> /etc/kafka/zookeeper.properties &&
echo ‘initLimit=5’ >> /etc/kafka/zookeeper.properties &&
echo ‘syncLimit=2’ >> /etc/kafka/zookeeper.properties &&
echo ‘autopurge.snapRetainCount=3’ >> /etc/kafka/zookeeper.properties &&
echo ‘autopurge.purgeInterval=48’ >> /etc/kafka/zookeeper.properties &&
echo ‘server.1=0.0.0.0:2888:3888’ >> /etc/kafka/zookeeper.properties &&
sed -i 's/-Xmx512M -Xms512M/-Xmx512M -Xms512M/g' /usr/bin/zookeeper-server-start

Após a finalização da execução acima, é só iniciar o serviço (systemctl start confluent-zookeeper), e verificar nos logs se está tudo certo, a mensagem de "ok" será dessa forma:

Para verificar o log do zookeeper: tail -f /var/log/kafka/server.log

"INFO binding to port 0.0.0.0/0.0.0.0:2181…"

Kafka 2.2 (Confluent Community 5.2.1)

Também conhecido como broker, é este o serviço que faz o envio distribuído de mensagens em tempo real.

Config minima do nó: 1 vCPUj, 4GB de memória, 10GB disco com CentOS 7.

Com a vm criada, é copiar/colar o "script" abaixo:

rpm — import https://packages.confluent.io/rpm/5.2/archive.key &&
echo ‘[Confluent.dist]’ > /etc/yum.repos.d/confluent.repo &&
echo ‘name=Confluent repository (dist)’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘baseurl=https://packages.confluent.io/rpm/5.2/7' >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgcheck=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgkey=https://packages.confluent.io/rpm/5.2/archive.key' >> /etc/yum.repos.d/confluent.repo &&
echo ‘enabled=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘[Confluent]’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘name=Confluent repository’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘baseurl=https://packages.confluent.io/rpm/5.2' >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgcheck=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgkey=https://packages.confluent.io/rpm/5.2/archive.key' >> /etc/yum.repos.d/confluent.repo &&
echo ‘enabled=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘vm.swappiness=0’ | sudo tee — append /etc/sysctl.conf &&
yum clean all &&
yum update -y &&
yum install -y confluent-community-2.12 nc java-1.8.0-openjdk jq &&
echo ‘LANG=en_US.utf-8’ >> /etc/environment &&
echo ‘LC_ALL=en_US.utf-8’ >> /etc/environment &&
chown cp-kafka:confluent /var/log/confluent &&
chmod u+wx,g+wx,o= /var/log/confluent &&
mkdir -p -m 777 /data/kafka &&
cp /etc/kafka/server.properties /etc/kafka/server.properties.bkp &&
echo ‘zookeeper.connect=zookeeper:2181’ > /etc/kafka/server.properties &&
echo ‘broker.id.generation.enable=true’ >> /etc/kafka/server.properties &&
echo ‘group.initial.rebalance.delay.ms=10000’ >> /etc/kafka/server.properties &&
echo ‘log.dirs=/data/kafka’ >> /etc/kafka/server.properties &&
echo ‘log.retention.hours=672’ >> /etc/kafka/server.properties &&
echo ‘log.segment.bytes=1073741824’ >> /etc/kafka/server.properties &&
echo ‘log.retention.check.interval.ms=300000’ >> /etc/kafka/server.properties &&
echo ‘confluent.support.metrics.enable=false’ >> /etc/kafka/server.properties &&
echo ‘delete.topic.enable=true’ >> /etc/kafka/server.properties &&
echo ‘num.partitions=1’ >> /etc/kafka/server.properties &&
echo ‘default.replication.factor=1’ >> /etc/kafka/server.properties &&
echo ‘min.insync.replicas=1’ >> /etc/kafka/server.properties &&
echo ‘offsets.topic.replication.factor=1’ >> /etc/kafka/server.properties &&
echo ‘zookeeper.connection.timeout.ms=10000’ >> /etc/kafka/server.properties &&
echo ‘auto.create.topics.enable=true’ >> /etc/kafka/server.properties &&
echo ‘transaction.state.log.replication.factor=1’ >> /etc/kafka/server.properties &&
echo ‘transaction.state.log.min.isr=1’ >> /etc/kafka/server.properties &&
echo ‘group.initial.rebalance.delay.ms=11000’ >> /etc/kafka/server.properties &&
sed -i 's/-Xmx1G -Xms4G/-Xmx2G -Xms3G/g' /usr/bin/kafka-server-start

Após o script, systemctl start confluent-kafka sobe o serviço e a mágica acontece:

"INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)"

Schema Registry

Responsável por armazenar/versionar metadados de schemas Avro que serão enviados pelo Kafka Connect/Debezium aos tópicos do Kafka.

Config minima do nó: Foi utilizada a mesma vm do broker.

Executando o script abaixo, estará configurado:

cp /etc/schema-registry/schema-registry.properties /etc/schema-registry/schema-registry.properties.bkp &&
echo ‘listeners=http://0.0.0.0:8081 ‘ > /etc/schema-registry/schema-registry.properties &&
echo ‘kafkastore.topic=schemas’ >> /etc/schema-registry/schema-registry.properties &&
echo ‘debug=false’ >> /etc/schema-registry/schema-registry.properties &&
echo ‘kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092’ >> /etc/schema-registry/schema-registry.properties

Com o systemctl start confluent-schema-registry você inicia o serviço e tail -f /var/log/confluent/schema-registry/schema-registry.log permite visualizar o log em tempo real, como no print abaixo, que contém a mensagem "INFO Server started, listening for requests…", que significa que deu tudo certo.

"INFO Server started, listening for requests…"

Kafka Rest API

Outro exemplo de que não valeria a pena (neste contexto do iFood e para este contexto em especifico do Medium) criar uma vm a parte, portando foi utilizado o mesmo nó que contém o Kafka broker e Schema Registry.

Config minima do nó: Foi utilizada a mesma vm do broker.

O Rest API é utilizada para gerenciado do cluster, consumo e produção de dados, via API (pareceu meio óbvio o API, mas tá dito mesmo assim).

cp /etc/kafka-rest/kafka-rest.properties /etc/kafka-rest/kafka-rest.properties.bkp &&
echo ‘schema.registry.url=http://localhost:8081' > /etc/kafka-rest/kafka-rest.properties &&
echo ‘bootstrap.servers=PLAINTEXT://localhost:9092’ >> /etc/kafka-rest/kafka-rest.properties &&
echo ‘id=kafka-rest-test-server1’ >> /etc/kafka-rest/kafka-rest.properties &&
echo ‘access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS’ >> /etc/kafka-rest/kafka-rest.properties &&
echo ‘access.control.allow.origin=*’ >> /etc/kafka-rest/kafka-rest.properties

Iniciando o serviço confluent-kafka-rest, e usando o tail -f, é possivel visualizar a seguinte tela:

"INFO Server started, listening for requests…"

Kafka Connect + Debezium + GCS (Google Cloud Storage)

Framework para realização de streaming de dados para dentro ou para fora do Kafka (broker). Os conectores podem ser de 2 tipos, para enviar dados ao Kafka (source connector) e ser armazenado no tópico, ou para extrair mensagens dos tópicos (sink connector) e enviar para algum lugar. No site hub do confluent tem vários conectores.

Sobre o Debezium, (Matheus Oliveira que foi até o fim da internet e na volta encontrou este cara que resolveria nosso problema no iFood. 👏 👏 pra vc) é do tipo "source connector", que envia dados ao Kafka, desenvolvido para Red Hat, e pode ser plugado em MySQL, PostgreSQL, Oracle, e outros mais.

S3 Connector, é o plugin do tipo "sink" que vai levar os dados dos tópicos do Kafka para o S3. Ele já vem nas libs do Kafka Connect quando instalado a plataforma da Confluent, um dos principais motivos por usar este plugin.

Agora que já estamos na mesma página, para a instalação, vamos naquele mesmo padrão, copiar/colar e aprender depois o que você acabou de fazer. 😬

Configuração do Kafka Connect:

rpm — import https://packages.confluent.io/rpm/5.2/archive.key &&
echo ‘[Confluent.dist]’ > /etc/yum.repos.d/confluent.repo &&
echo ‘name=Confluent repository (dist)’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘baseurl=https://packages.confluent.io/rpm/5.2/7' >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgcheck=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgkey=https://packages.confluent.io/rpm/5.2/archive.key' >> /etc/yum.repos.d/confluent.repo &&
echo ‘enabled=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘[Confluent]’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘name=Confluent repository’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘baseurl=https://packages.confluent.io/rpm/5.2' >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgcheck=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘gpgkey=https://packages.confluent.io/rpm/5.2/archive.key' >> /etc/yum.repos.d/confluent.repo &&
echo ‘enabled=1’ >> /etc/yum.repos.d/confluent.repo &&
echo ‘vm.swappiness=0’ | sudo tee — append /etc/sysctl.conf &&
yum clean all &&
yum update -y &&
yum install -y unzip confluent-community-2.12 nc java-1.8.0-openjdk jq wget &&
echo ‘LANG=en_US.utf-8’ >> /etc/environment &&
echo ‘LC_ALL=en_US.utf-8’ >> /etc/environment &&
chown cp-kafka:confluent /var/log/confluent &&
chmod u+wx,g+wx,o= /var/log/confluent &&
chmod g+w /var/log/kafka &&
cp /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed.properties.bkp &&
echo ‘bootstrap.servers=kafka:9092’ > /etc/kafka/connect-distributed.properties &&
echo ‘group.id=connect-cluster’ >> /etc/kafka/connect-distributed.properties &&
echo ‘key.converter=io.confluent.connect.avro.AvroConverter’ >> /etc/kafka/connect-distributed.properties &&
echo ‘key.converter.schema.registry.url=http://kafka:8081' >> /etc/kafka/connect-distributed.properties &&
echo ‘value.converter.schema.registry.url=http://kafka:8081' >> /etc/kafka/connect-distributed.properties &&
echo ‘value.converter=io.confluent.connect.avro.AvroConverter’ >> /etc/kafka/connect-distributed.properties &&
echo ‘config.storage.topic=connect-configs’ >> /etc/kafka/connect-distributed.properties &&
echo ‘offset.storage.topic=connect-offsets’ >> /etc/kafka/connect-distributed.properties &&
echo ‘status.storage.topic=connect-statuses’ >> /etc/kafka/connect-distributed.properties &&
echo ‘config.storage.replication.factor=1’ >> /etc/kafka/connect-distributed.properties &&
echo ‘offset.storage.replication.factor=1’ >> /etc/kafka/connect-distributed.properties &&
echo ‘status.storage.replication.factor=1’ >> /etc/kafka/connect-distributed.properties &&
echo ‘internal.key.converter=org.apache.kafka.connect.json.JsonConverter’ >> /etc/kafka/connect-distributed.properties &&
echo ‘internal.value.converter=org.apache.kafka.connect.json.JsonConverter’ >> /etc/kafka/connect-distributed.properties &&
echo ‘internal.key.converter.schemas.enable=false’ >> /etc/kafka/connect-distributed.properties &&
echo ‘internal.value.converter.schemas.enable=false’ >> /etc/kafka/connect-distributed.properties &&
echo ‘plugin.path=/usr/share/java’ >> /etc/kafka/connect-distributed.properties &&
echo ‘offset.flush.interval.ms=20000’ >> /etc/kafka/connect-distributed.properties &&
echo ‘buffer.memory=16777216’ >> /etc/kafka/connect-distributed.properties &&
echo ‘offset.flush.timeout.ms=100000’ >> /etc/kafka/connect-distributed.properties &&
sed -i ‘s/-Xms256M -Xmx2G/-Xms256M -Xmx3G/g’ /usr/bin/connect-distributed &&
cp /etc/kafka/connect-log4j.properties /etc/kafka/connect-log4j.properties.bkp &&
echo ‘log4j.appender.FILE=org.apache.log4j.DailyRollingFileAppender’ >> /etc/kafka/connect-log4j.properties &&
echo ‘log4j.appender.FILE.DatePattern=’.’yyyy-MM-dd’ >> /etc/kafka/connect-log4j.properties &&
echo ‘log4j.appender.FILE.File=/var/log/kafka/connect.log’ >> /etc/kafka/connect-log4j.properties &&
echo ‘log4j.appender.FILE.layout=org.apache.log4j.PatternLayout’ >> /etc/kafka/connect-log4j.properties &&
echo ‘log4j.appender.FILE.layout.ConversionPattern=[%d] %p %m (%c)%n’ >> /etc/kafka/connect-log4j.properties &&
sed -i ‘s/log4j.rootLogger=INFO, stdout/log4j.rootLogger=INFO, stdout, FILE/g’ /etc/kafka/connect-log4j.properties

Instalação do plugin do Debezium:

mkdir -p -m 777 /usr/share/java/kafka-connect-postgresql /etc/kafka-connect-postgresql &&
cd /usr/share/java/kafka-connect-postgresql &&
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/0.9.4.Final/debezium-connector-postgres-0.9.4.Final-plugin.tar.gz &&
tar -xzvf debezium-connector-postgres-0.9.4.Final-plugin.tar.gz

Configuração do Debezium (arquivo properties):

echo ‘{‘ > /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “name”: “postgres”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “config”: {‘ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “database.hostname”: “postgres”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “database.port”: “5432”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “database.user”: “postgres”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “database.password”: “postgres”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “database.dbname” : “postgres”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “database.server.name”: “postgres_server”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “schema.whitelist”: “postgres_schema”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “plugin.name”: “decoderbufs”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “snapshot.mode”: “initial”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “snapshot.lock.timeout.ms”: “10000”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “slot.name”: “postgres_slot”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “decimal.handling.mode”: “double”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “time.precision.mode”: “adaptive”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “rows.fetch.size”: “13312”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “max.queue.size”: “26312”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “max.batch.size”: “13312”,’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ “poll.interval.ms”: “1000”’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘ }’ >> /etc/kafka-connect-postgresql/postgres.json &&
echo ‘}’ >> /etc/kafka-connect-postgresql/postgres.json

Instalação do plugin do GCS (Google Cloud Storage):

mkdir -p -m 777 /usr/share/java/kafka-connect-gcs /etc/kafka-connect-gcs &&
cd /usr/share/java/kafka-connect-gcs &&
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-gcs/versions/5.0.1/confluentinc-kafka-connect-gcs-5.0.1.zip &&
unzip confluentinc-kafka-connect-gcs-5.0.1.zip

Configuração do GCS (arquivo properties):

echo ‘{‘ > /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “name”: “gcs_sink_postgres”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “config”: {‘ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “connector.class”:”io.confluent.connect.gcs.GcsSinkConnector”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “tasks.max”:”1",’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “topics”:”postgres_server.postgres_schema.postgres_table”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “gcs.bucket.name”:”storage-kafka-debezium”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “gcs.part.size”:”5242880",’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “flush.size”:”1",’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “gcs.credentials.path=/etc/kafka-connect-gcs/key.json,’ >> /etc/kafka-connect-gcs/gcs-sink-postgres.json &&
echo ‘ “storage.class”:”io.confluent.connect.gcs.storage.GcsStorage”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “format.class”:”io.confluent.connect.gcs.format.avro.AvroFormat”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “partitioner.class”:”io.confluent.connect.storage.partitioner.DefaultPartitioner”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “schema.compatibility”:”NONE”,’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “confluent.topic.bootstrap.servers”:”kafka:9092",’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ “confluent.topic.replication.factor”:”1"’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘ }’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json &&
echo ‘}’ >> /etc/kafka-connect-gcs/gcs_sink_postgres.json

Após a configuração, vamos iniciar o Kafka Connect, em seguida, adicionar a config em formato json do Debezium, para se conectar ao banco de dados, e em seguida, adicionar a configuração do GCS para se conectar ao tópico que o Debezium gerou, e então enviar os dados ao storage do Google.

Iniciar com systemctl start confluent-kafka-connect e visualizar os logs com tail -f /var/log/kafka/connect.log

Para inserir as configurações do Debezium e GCS no Kafka Connect, usaremos curl se conectando no Kafka Connect na porta 8083, começando com Debezium:

curl -X POST -H "Content-Type: application/json" --data @/etc/kafka-connect-postgresql/postgres.json localhost:8083/connectors  | jq -r

O retorno do curl (que fica mais "bonitinho" com o "| jq -r") informa todas as configs que foram inseridas, e utilizando curl novamente podemos verificar o status do plugin, se está "running" e se há tasks sendo executadas:

E por último, com o Debezium de pé, e executando, se você executar a partir de algum servidor do cluster de kafka o comando abaixo, poderá verificar que foi criado um tópico com os dados da tabela do PostgreSQL que foi configurado lá em cima no inicio deste story (detalhe, se lá naquele passo você não inseriu nenhuma linha, nenhum tópico será criado).

kafka-topics — zookeeper zookeeper:2181/kafka — list

Finalizando agora a inserção do json do plugin GCS Connector:

curl -X POST -H “Content-Type: application/json” — data @/etc/kafka-connect-gcs/gcs_sink_postgres.json localhost:8083/connectors | jq -r

Verificando o status do plugin com o comando abaixo, que retorna o seguinte:

curl -X POST -H “Content-Type: application/json” — data @/etc/kafka-connect-postgresql/postgres.json localhost:8083/connectors | jq -r

GCS

Após toda essa configuração, vamos lá no Google Cloud Storage e olha quem legal que tem lá:

São os arquivos que foram gerados e enviados pelo GCS Connector. Acessando um destes arquivos em formato Avro, temos o seguinte:

O insert realizado no PostgreSQL para o exemplo acima foi o seguinte:

postgres=# insert into postgres_schema.postgres_table values (39, ‘hey ho lets go’,now());
INSERT 0 1

That's it, have fun. Este story faz parte da palestra do dia 4 de maio de 2019 no DBA Brasil 4, que será mostrado o caso de uso do iFood com Kafka e Debezium. Para você que já testou, deu tudo certo, e pretende montar um cluster (isso quer dizer, um ambiente de produção de verdade), vou disponibilizar no meu github.com/singaretti o passo a passo em arquivos de Standalone Mode e Distributed Mode (incluindo também Kafka Broker, Zookeeper, Rest API e Schema Registry em cluster).

Outro ponto sobre o cluster, caso esteja pensando em produção, pense também em automatização (Subir Kafka broker na mão é sofrido), e pense também em controle de acesso, nos exemplos mostrados aqui é tudo bruto, com root, mas na vida real pode ser um pouco arriscado, enfim, não é legal.

E para já deixar um gancho para o próximo story, o Lucas Viecelli e eu vamos conseguir plugar aplicações no Kafka diretamente, sem intermediários

🇧🇷