Capturing Real-Time Database Changes with Debezium and Kafka: A Comprehensive Guide

Furkan Özmen
Appcent
Published in
4 min readMar 26, 2024

--

Debezium ve Kafka ile Veritabanı Değişikliklerini Gerçek Zamanlı Yakalama: Change Data Capture (CDC)

Günümüzün veri odaklı dünyasında, veritabanlarında meydana gelen değişiklikleri anlık olarak yakalamak ve bu değişikliklere tepki vermek birçok uygulama için kritik bir öneme sahiptir. Veri tabanlarını güncel tutmak, real time analizler yapmak, uygulamalar arasında veri senkronizasyonu sağlamak ve mikro servisler mimarisinde consistency’i korumak gibi birçok farklı senaryoda, veritabanı değişikliklerini anlık olarak takip etmek büyük avantaj sağlar.

Change Data Capture (CDC) olarak adlandırılan bu işlem, Debezium ve Kafka gibi araçlar kullanılarak etkin bir şekilde gerçekleştirilebilir. Bu yazıda, Debezium ve Kafka ile CDC uygulamasının nasıl yapılacağını adım adım inceleyeceğiz ve gerçek hayattan örneklerle bu teknolojilerin nasıl kullanıldığını göstereceğiz.

1. Debezium Nedir?

Debezium, açık kaynaklı bir CDC platformudur. Veritabanınızdaki tablolara bağlanarak, ekleme, güncelleme ve silme işlemlerini realtime olarak izler ve bu değişiklikleri Kafka topic’lerine aktarır. Debezium, popüler birçok veritabanı yönetim sistemi (DBMS) ile uyumludur, örneğin:

  • MySQL
  • PostgreSQL
  • Oracle
  • SQL Server
  • MongoDB

Debezium’un sunduğu bazı önemli özellikler şunlardır:

  • Düşük gecikme süresi: Debezium, veritabanı değişikliklerini minimum gecikme ile Kafka’ya aktarır.
  • Kolay kurulum ve kullanım: Debezium’un kurulumu ve kullanımı oldukça basittir.
  • Ölçeklenebilirlik: Debezium, yüksek hacimli veri akışlarını işleyebilecek şekilde ölçeklenebilir.
  • Veri bütünlüğü: Debezium, veritabanı değişikliklerinin sırasını ve bütünlüğünü korur.

2. Kafka Nedir?

Apache Kafka, verilerin bir sistemden hızlı bir şekilde toplanıp diğer sistemlere hatasız bir şekilde transferini sağlamak için geliştirilen dağıtık bir veri akış mekanizmasıdır.

3. Kafka Connect Nedir?

Kafka Connect, Kafka ile external sistemler arasında veri aktarımı yapmak için kullanılan bir araçtır. Debezium, Kafka Connect ile entegre çalışarak veritabanı değişikliklerini Kafka topic’lerine aktarır.

Kafka Connect, veri kaynaklarından (source) Kafka’ya veri aktarımı yapan “source connector”lar ve Kafka’dan veri hedeflerine (sink) veri aktarımı yapan “sink connector”lar olmak üzere iki tür connector’ı destekler. Debezium, bir source connector olarak çalışır.

4. Kafka Connect Source Connect Yapılandırması Nasıl Olmalı ?

Gelin beraber örnek olarak Mysql source connector yapılandırmasını beraber yazalım ve adım adım her maddenin ne görevi üstlendiğini beraber açıklamaya çalışalım.

{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.99.100",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpw",
"database.server.id": "184054",
"topic.prefix": "fullfillment",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.fullfillment",
"include.schema.changes": "true"
}
}

name: Connector’a verilen isim. Bu örnekte “inventory-connector” olarak belirttik.

connector.class: Kullanılacak connector sınıfının adı. Bu örnekte “io.debezium.connector.mysql.MySqlConnector” olarak belirttik, yani MySQL connector’ı kullanacağız.

database.hostname: Veritabanı sunucusunun IP adresi veya hostname’i. Bu örnekte “192.168.99.100” olarak belirttik.

database.port: Veritabanı sunucusunun port numarası. Bu örnekte “3306” olarak belirttik.

database.user: Veritabanına bağlanmak için kullanılacak kullanıcı adı. Bu örnekte “root” olarak belirttik.

database.password: Veritabanına bağlanmak için kullanılacak parola. Bu örnekte “rootpw” olarak belirttik.

database.server.id: Connector’a atanacak benzersiz bir ID. Bu ID, Debezium tarafından veritabanı değişikliklerini Kafka’ya aktarırken kullanılır.

topic.prefix: Kafka topic’lerinin önüne eklenecek bir önek. Bu örnekte “fullfillment” olarak belirttik.

database.include.list: İzlenecek veritabanlarının listesi. Bu örnekte sadece “inventory” veritabanı belirttik.

schema.history.internal.kafka.bootstrap.servers: Kafka sunucusunun adresi. Bu örnekte “kafka:9092” olarak belirttik.

schema.history.internal.kafka.topic: Veritabanı şemasındaki değişikliklerin kaydedileceği Kafka topic’inin adı. Bu örnekte “schemahistory.fullfillment” olarak belirttik.

include.schema.changes: Veritabanı şemasındaki değişikliklerin Kafka’ya aktarılıp aktarılmayacağını belirtir. Bu örnekte “true” olarak belirtilmiş, yani şema değişiklikleri de Kafka’ya aktarılacak.

Şimdi sıra geldi bunları debeziuma göndermeye. Bunun için bir docker compose yazıp bu instance’ları ayağa kaldırmak gerekiyor. Aşağıda vereceğim compose dosyasında ihtiyacınız olacak tüm şeyler var. Bazılarının image’ini kendiniz bulup eklemeniz gerekiyor. Çünkü çoğu yerde custom çözümler kullandım.

version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "59092:59092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:59092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: <your_topic_name>

zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZK_SERVER_HEAP: "-Xmx256M -Xms256M"
ports:
- 52181:2181

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "9091:8080"
restart: always
depends_on:
- kafka
- zookeeper
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181


debezium-ui:
image: debezium/debezium-ui:latest
restart: always
container_name: debezium-ui
hostname: debezium-ui
ports:
- '8080:8080'
environment:
KAFKA_CONNECT_URIS: <your_kafka_connect_url>

kafka-connect:
image: my-kafka-connect:latest
build:
context: .
dockerfile: Dockerfile
ports:
- "8083:8083"
depends_on:
- kafka
- zookeeper

Şimdi debezium ayağa kalktığına göre connector dosyasını execute edebiliriz. Aşağıdaki gibi bir sh dosyası yazıp çalıştırmanız mümkün.

curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -d \
'{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.99.100",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpw",
"database.server.id": "184054",
"topic.prefix": "fullfillment",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.fullfillment",
"include.schema.changes": "true"
}
}'

Eğer validasyonlarda bir hata yaptıysanız bu dosyayı çalıştırdıktan sonra hata dönecektir. Eğer hata dönmediyse başarıyla bağlanmış demektir. Bundan sonra tek yapmanız gereken tablonuza kayıt eklemek ve eklenen kayıdın Kafka topiclerine düşüp düşmediğini gözlemlemek.

Şimdi sıra geldi Kafka topiclerini dinleyen consumer oluşturmaya. Onu da tek config ile yapabiliyoruz desem ne dersiniz :)

furkan özmen

Evet bu mümkün, sink connectorler ile bu işi yapabiliyoruz. Şimdi bir elasticsearch sink connector configi oluşturup execute edeceğiz.

curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -d \
'{
"name": "elastic-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "metro_search.metro.dbo.product",
"schema.ignore": "true",
"connection.url": "http://elastic:9200",
"type.name": "_doc",
"name": "elastic-sink",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"behavior.on.null.values": "delete"
}
}'

Artık bunu çalıştırdığımızda kafka-ui üzerinden de takip edip görmüş olacaksınız ki, verdiğiniz topiğe bir consumer assign olmuş. Artık gelen event otomatik olarak elasticsearch indexlerine aktarılacak.

Sonuna kadar okuduğunuz için teşekkür ederiz. Gitmeden önce:

  • Lütfen yazıyı alkışlamayı ve beni takip etmeyi unutmayın! 👏
  • Beni takip etmek isterseniz X | LinkedIn | Github

--

--