İlişkisel Veri Tabanından Kafka’ya Veri Aktarımı

İbrahim Baran
Bankalararası Kart Merkezi
4 min readMay 2, 2019

Kafka; büyük veriler ile çalışabilen, dağıtık sistemlere uygun, anlık toplanan veriyi hızla ve hatasız bir biçimde aktarabilen bir mesajlaşma sistemidir. Özellikle büyük boyutta loglama yapılan sistemlerde kullanım kolaylığı sağlıyor.

Bu yazımda ilişkisel veri tabanından (Oracle) Kafka’ya veriyi nasıl aktarabileceğimizden bahsedeceğim.

Apache Kafka’nın bakım ve geliştirmeleri Confluent şirketi tarafından yapılmaktadır. Kafka Connect API’yı kullanarak aşağıdaki şemadaki gibi ilişkisel veri tabanından Kafka Topic’lere veriyi nasıl kaydedeceğimizi örnek üzerinden anlatmaya çalışacağım.

1. Kafka ve Oracle için Gerekli Ortamların Hazırlanması

Docker ‘ın ilgili imajları indirilip docker-compose komutuyla aynı anda birden fazla imajın birlikte çalışması sağlanabilir. İhtiyacımız olan docker imajlarını aşağıdaki gibi listeleyebiliriz.

confluentinc/cp-kafka-connect:5.0.0

confluentinc/cp-ksql-cli:latest

confluentinc/cp-ksql-server:latest

confluentinc/cp-schema-registry:5.1.0

confluentinc/cp-enterprise-kafka:5.1.0

confluentinc/cp-zookeeper:5.1.0

oracle/database:12.2.0.1-ee

Docker pull komutuyla bu imajlar ilgili makineye indirilir. Bu imajların birlikte çalıştırılabilmesi için yml uzantılı dosya yaratılarak ilgili konfigurasyonlar bu dosyaya eklenir.

Burada oracle 12.2 versiyonu için ojdbc8–12.2.0.1.jar versiyonu cp-kafka-connect:5.0.0 imajı içinde bulunan aşağıdaki dizinlere kopyalanır. Bu kopyalama komutu yml dosyasının içine eklenir.

cp /db-leach/ojdbc8–12.2.0.1.jar /usr/share/java/kafka-connect-jdbc/

cp /db-leach/ojdbc8–12.2.0.1.jar /usr/share/java/

cp /db-leach/ojdbc8–12.2.0.1.jar /etc/kafka-connect/jars/

docker-compose -f docker-compose_last.yml up komutu çalıştırılarak 7 adet imajın çalışması sağlanır. Ekran görüntüsü aşağıdaki gibidir.

docker ps –a komutu ile çalışan imajların statüleri listelenir.

2. Oracle Veri Tabanında Kullanıcı ve Tablo Yaratmak

İlgili Oracle veri tabanına sysdba ile bağlanılıp connect_user adında kullanıcı yaratılıp şifresi verilir. Sonrasında connect_user kullanıcısı ile bağlanılıp ACCOUNTS tablosu yaratılır. Bu tabloda trigger oluşturularak UPDATE_TS kolonuna sysdate değeri verilir. ACCOUNTS tablosuna insert ya da update geldiğinde UPDATE_TS kolonu sysdate olarak güncellenecektir. Kafka Connect, tabloda oluşturduğumuz UPDATE_TS kolonu üzerinden çalışacaktır.

database:12.2.0.1-ee imajının komut satırına girilir.

sqlplus connect_user/asgard@//localhost:1521/ORCLPDB1 komutuyla oluşturduğumuz kullanıcıyı ve tabloyu kontrol edebiliriz. Aşağıda ekran görüntüsüde ACCOUNTS tablosunda 23 kayıt görünmektedir.

3. Kafka Connect ile İlişkisel Veri Tabanından Kafka’ya Veri Yazma

docker exec -it eb4d52a40186 /bin/bash komutu ile cp-kafka-connect:5.0.0 imajının komut satırına girilir.

curl -s http://localhost:8083/connectors komutu ile mevcut bağlantılar listelenir. Mevcut durumda yeni bir bağlantı yaratmadığımız için herhangi bir kayıt dönmediğini aşağıdaki gibi görebiliyoruz.

Aşağıdaki komut Kafka Connect üzerinde yeni bir bağlantı yaratır.

curl -X POST http://localhost:8083/connectors -H “Content-Type: application/json” -d ‘{

“name”: “jdbc_source_oracle_03”,

“config”: {

“connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”,

“connection.url”: “jdbc:oracle:thin:@oracle:1521/ORCLPDB1”,

“connection.user”: “connect_user”,

“connection.password”: “asgard”,

“topic.prefix”: “oracle-03-”,

“mode”:”timestamp”,

“table.whitelist” : “ACCOUNTS”,

“timestamp.column.name”: “UPDATE_TS”,

“validate.non.null”: false

}

}’

Çalıştırdıktan sonraki ekran görüntüsü aşağıdaki gibidir.

Artık Oracle veri tabanında yarattığımız ACCOUNTS tablosundaki UPDATE_TS kolonunu üzerinden Kafka Connect sayesinde insert update komutları geldikçe Kafka Topic’e yazılacak.

4. Oracle Veri Tabanında Test Amaçlı Kayıt Oluşturulup Kafka Tarafında Kontrol Etme

Oracle veri tabanına bağlanıp aşağıdaki gibi test amaçlı kaydı ekleyebiliriz.

insert into accounts (id, first_name, last_name, username, company, created_date) values (24, ‘Test4’, ‘Test4’, ‘Test4’, ‘Test4’, ‘2019–04–22’);

commit;

Kafka Topic ‘den bu kaydı görebilmek için aşağıdaki komut çalıştırılır. Ksql komutuları ile ilgili Topic’i sorgulayıp listeleyebiliyoruz.

docker-compose exec ksql-cli ksql http://ksql-server:8088

Sonrasında LIST TOPICS; komutu ile Kafka Topic’ler listelenir. Kafka Connect kısmında oracle-03- verdiğimiz için bu Topic üzerinden listeleyebiliriz.

PRINT ‘oracle-03-ACCOUNTS’ FROM BEGINNING; Komutu ile oracle-03 ile başlayan Topic içerisindeki verileri listeleyebiliriz.

Ekranın en altında görüldüğü gibi Test4 olarak oracle tablosuna insert edilen kayıt Kafka üzerinde görüntülenebilir.

Böylelikle Kafka Connect ile yarattığımız bağlantı aktif olduğu sürece ACCOUNTS tablosuna gelen insert ,update değerlerini Kafka Topic ‘de listeleyebiliyoruz.

Kaynaklar :

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector
https://github.com/confluentinc/demo-scene/tree/master/connect-jdbc
https://github.com/oracle/docker-images/blob/master/OracleDatabase/SingleInstance/

--

--