Setting up Kafka with Kafka connect from SQL server to Cloud storage (GCS)

Burasakorn Sabyeying
Mils’ Blog
Published in
10 min readJan 8, 2023

Tutorial นี้เราจะพา setup Kafka cluster โดยใช้ Strimzi
รวมถึง setup Kafka connect เพื่อต่อไปยัง external source ต่างๆเพื่อดึงจาก SQL server เป็น source data ต้นทางโดยเก็บ event จาก Change Data Capture (CDC) แล้วนำไปเก็บที่ sink ปลายทางที่ Google Cloud Storage

บทความนี้จะเหมาะกับ

  • ผู้ต้องการศึกษา Kafka, Kafka connect
    และพอเข้าใจ concept เบื้องต้นของ Kafka บ้างแล้ว (เช่น topics, producer, consumer)
    เราแนะนำให้อ่านบทความนี้ก่อน https://mesodiar.com/intro-to-apache-kafka-universe-kafka-คืออะไร-45f652e8233f
  • ต้องการลอง practice ด้วยการ setup Kafka ด้วยตัวเอง

Table of contents:

- Pre-requisites
- ทำไมใช้ Strimzi ? Strimzi คืออะไร
- Difference between Strimzi and Confluent for Kubernetes

เริ่ม setup กัน
- Setup Kafka Cluster ?
- Kafka Connect คืออะไร ?
- สิ่งที่ต้องเตรียมสำหรับ project นี้ ก่อนสร้าง Kafka connect
- Cloud SQL for SQL server
- Setup Kafka Connect
- source connect
- sink connect
- Setup Kafka Connector
- source connector
- sink connector
- Install Kafka UI
- ทดสอบ CDC

Pre-requisites:

  • Docker
  • Kubernetes (เราจะใช้ GKE)
  • kubectl command
  • Kubernetes Desktop Client สักตัว เช่น Aptakube, Lens (ไม่ฟรี), OpenLens เพื่อการ monitor resource ต่างๆบน Kafka ง่ายๆ
  • GCP service อื่นๆอย่าง Cloud SQL และ Cloud storage

ทำไม Strimzi ? Strimzi คืออะไร?

ในการสร้าง Kafka Cluster นั้น เราสามารถสร้างได้หลายวิธีเนื่องจากมี provider มากมายที่ให้บริการในการสร้าง cluster โดยเราขอแบ่งเป็น 2–3 เจ้าสั้นๆ

  1. Strimzi or Strimzi operator
  2. Confluent for Kubernetes (CFK) or Confluent operator (จะเรียกสั้นๆว่า confluent)
  3. Koperator

เหตุผลที่เขาเรียกว่า operator เพราะว่าทั้งหมดนี้คือ Kubernetes operator ที่ install controller เพื่อ create, configure, และ secure ตัว Kafka cluster (เหมือน Kubernetes resource ต่างๆเช่น Pod, Deployment, ConfigMap)

ตัวที่นิยมในตลาดเป็น Strimzi และ Confluent เราจึงได้นำ 2 ตัวนี้มา compare กัน

Different between Strimzi and Confluent

จากในตาราง Strimzi นั้นเป็น Open source ที่มีคน back ให้เป็น community จริงๆ ในขณะที่ confluent จะเป็นทีมที่สร้าง Kafka มา

และในเรื่องของราคา Strimzi นั้นฟรีทั้งหมด ส่วนของ Confluent นั้นฟีเจอร์จะไม่เปิดใช้ทั้งหมดให้ฟรีและเป็น Enterprise grade

เพื่อความสะดวก เราจะใช้ Strimzi ในการ setup Kafka cluster กัน

คำเตือน ! ห้าม copy คำสั่งและโค้ดเพลิน

Setup Kafka Cluster

เราจะสร้าง namespace เพื่อ deploy Strimzi operator ในนี้

kubectl create namespace kafka-mils

หมายเหตุ: namespace ทุกคนจะชื่อว่า kafka ก็ได้นะ แต่เราอยากจะตั้งว่า kafka-mils

ถัดไป เราจะ download ไฟล์ install จากลิ้งนี้ แล้วเราเอาไฟล์นี้มาเซฟไว้ ตั้งชื่อว่า strimzi-install-file.yaml

curl <https://strimzi.io/install/latest?namespace=kafka-mils>
--output strimzi-install-file.yaml

ถ้าทุกคนใช้ namespace kafka ก็ใช้ https://strimzi.io/install/latest?namespace=kafka น้า

สิ่งที่ไฟล์ strimzi-install-file.yaml ทำคือการ install พวก ClusterRoles, ClusterRoleBindings ต่างๆและพวก CRDS อย่าง Kafka, KafkaTopic ทั้งหลาย เพื่อเอามา manage Kafka cluster

หากทุกคนสังเกตจะเห็นว่า ในไฟล์จะมี namespace ที่เรากำหนดเรียบร้อยแล้ว

เพื่อจะสร้าง resource ต่างๆเราก็ต้อง apply ก่อน

kubectl apply -f strimzi-install-file.yaml

ทีนี่เรากลับมาดูที่ namespace ของเรา เราจะเจอว่ามี pod ตัว strimzi operator เกิดขึ้นแล้ว เย้

เราเปิดจาก Aptakube

หรือถ้าใครสาย cli จะใช้ kubectl get pod มาดูได้

kubectl get pod -n kafka-mils --watch
NAME                                       READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-f696c85f7-k6hgf 1/1 Running 0 3m40s

คราวนี้เราลง strimzi operator เสร็จแล้ว ถัดไปเราจะลง cluster

curl <https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml> --output kafka-persistent-single.yaml

ทุกคนจะเห็นว่าหน้าตาของไฟล์เป็นแบบนี้ แล้วเราก็เพิ่ม namespace เข้าไป เพื่อที่ว่าตอน apply เราจะได้ไม่ลั่นไปผิด namespace 55555

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka-mils ##<-- add namespace here
spec:
kafka:
version: 3.3.1
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.3"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}

รันด้วยคำสั่ง

kubectl apply -f kafka-persistent-single.yaml

หากเราดูจาก Aptakube จะเห็นว่ามี cluster ขึ้นแล้วชื่อ my-cluster

แล้วมาดูที่ pod จะเห็นว่ามี 3 pods เกิดขึ้นมาใหม่ เราจะรอแปบนึงจนกว่าทุกอย่าง Running เขียวหมด

pod ที่ขึ้นมาจะเป็น cluster entity, Kafka cluster และ ZooKeeper ของเรา

เราสามารถลองเทสว่า kafka cluster เราใช้ได้ไหมด้วยการลอง produce message เข้าไปดู

kubectl -n kafka-mils run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic

ทีนี้ลอง consume message ดู

kubectl -n kafka-mils run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

Result:

consumer

เราจะเห็นว่า consumer จะพ่น message ต่างๆที่เราใส่ไว้ใน producer
ทีนี้เราก็มั่นใจว่า Kafka cluster เราเรียบร้อยแล้ว

Kafka Connect คืออะไร

Kafka connect ด้านบนขวา

Kafka Connect คือหนึ่งในจักรวาลของ Kafka ในการทำ streaming data ระหว่าง Kafka brokers กับ external data source อื่นๆในโลกภายนอก เช่น ไปต่อกับ database

ใน Kafka Connect จะมี connector อยู่ เช่นถ้าเราต่อกับ mysql ก็จะเป็น mysql connector ที่จะต่อกับ mysql database เท่านั้น

โดย Connector มี 2 ประเภท:

  • source connector — push data ขาเข้า Kafka
  • sink connector — pull data ออกจาก Kafka
cr. Debezium

รูปนี้เป็นรูปจาก debezium ซึ่งใน tutorial นี้เราก็จะใช้ connector จาก Debezium ในการเซ็ต source connector สำหรับ SQL server เช่นกัน

สิ่งที่ต้องเตรียมสำหรับ project นี้ ก่อนสร้าง Kafka connect

  1. SQL server เป็นตัว database ต้นทาง โดยเราจะใช้ Cloud SQL แบบ SQL server
  2. Cloud storage bucket

Cloud SQL for SQL server

ในการสร้าง Cloud SQL ให้ไปที่ https://console.cloud.google.com/ และเข้าไป service ที่ชื่อว่า Cloud SQL

จากนั้นก็กดสร้าง Create instance โดยเลือกเป็น SQL server

เราจะเซ็ตให้เครื่องอยู่ Singapore, DB Version เป็น SQL Server 2019 Standard

และขนาดเครื่องเล็กที่สุดเพื่อใช้สำหรับ tutorial นี้เท่านั้น

สำคัญที่สุดคือเราจะสร้างเป็น Public IP และใส่ authorized network 2 ตัว

  1. เครื่อง IP ของเรา เพื่อที่ว่าเครื่องเราจะเข้า database นี้ได้เพื่อไปสร้าง table โดยเอา IP จากการเสิร์ช google ว่า “what’s my ip” ได้
  2. IP ของ Kubernetes เพื่อที่ว่า Kafka บน Kubernetes เราต่อไปยัง database ได้

เราเลือกแบบไม่ใช่ private IP เพราะไม่อยากให้มันยุ่งยากเกินไป

คราวนี้เมื่อสร้างเสร็จแล้ว อาจจะต้องรอสักครู่ใหญ่ๆ

เมื่อสร้างเสร็จแล้วจะมีเครื่องหมายติ๊กถูกข้างหน้า instance

คราวนี้เราจะต้อง access เข้าไปใน database เพื่อสร้าง table กัน

เราจะใช้ DBeaver นะ ก็กรอก connection พวก hostname, username, password ตามรูปได้เลย

จากนั้นก็สร้าง database และสร้าง table ชื่อ employee (เอา mock data จาก https://www.mockaroo.com/)

เมื่อเราสร้าง table เสร็จก็เท่ากับว่าเราเตรียมความพร้อมสำหรับ database ต้นทางแล้ว(ชื่อว่า yaki) แต่อีกจุดที่สำคัญคือ เนื่องจาก Kafka เราจะ capture event ผ่าน CDC

Change data capture (CDC) คืออะไร?

Change data capture คือ process ที่บอกว่าตอนนี้มีใครทำอะไรกับ database บ้าง เช่น มี update, delete transaction ใน database นะ โดยสิ่งนี้จะ deliver event แบบ real-time ให้ อย่างที่เราตั้งใจว่าจะให้วิ่งไปหา Kafka

สำหรับ SQL server สามารถเปิด feature CDC ได้เลย โดยรัน command ผ่าน DBeaver ด้านล่าง

## Turn CDC on
EXEC msdb.dbo.gcloudsql_cdc_enable_db 'yaki'

### Track changes in a table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'employee',
@role_name = N'CDC'

## Check if CDC is enabled on a table
EXECUTE sys.sp_cdc_help_change_data_capture
@source_schema = N'dbo',
@source_name = N'employee'

To see change in table

SELECT * FROM cdc.dbo_employee_CT

เท่านี้เราก็เซ็ตต้นทางสำหรับ source data เสร็จแล้ว !

ส่วน Cloud storage ปลายทาง ก็สร้าง bucket ได้เลยตามปกติจ้า อันนี้ไม่ยาก

Setup Kafka Connect

วิธีการสร้าง Kafka Connect เราต้อง build container image ที่มี connector plugins ที่จำเป็นสำหรับ connection

Source Connect

source.Dockerfile

FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root

RUN mkdir -p /opt/kafka/plugins
ADD <https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.7.Final/debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz> /opt/kafka/plugins
RUN cd /opt/kafka/plugins && tar -xzf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz && rm -rf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz**
USER 1001

จะเห็นว่า เราจะใช้ base image จาก strimzi แล้วสิ่งที่ทำเพิ่มคือลง plugin ตัว sqlserver จาก debezium เพิ่ม

แล้วเราก็ build เป็น image ไว้

docker buildx build --platform=linux/amd64  -t mesodiar/kafka-connect-mssql2019:v1.0.0 -f source.Dockerfile --load .

ทีนี้เราจะสร้างไฟล์ Kafka connect กัน

ตั้งชื่อว่า source-kafka-connect.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-mssql-2019-source-cluster
namespace: kafka-mils
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: mesodiar/kafka-connect-mssql2019:v1.0.0
version: 3.2.3
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: source-my-connect-cluster
offset.storage.topic: source-my-connect-cluster-offsets
config.storage.topic: source-my-connect-cluster-configs
status.storage.topic: source-my-connect-cluster-status
key.converter.schemas.enable: false
value.converter.schemas.enable: false
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1

จะเห็นว่า bootstrap server คือ my-cluster-kafka-bootstrap:9092 โดยคุยผ่าน plaintext ธรรมดา (9092) ซึ่งเราสามารถดูชื่อ service ของ bootstrap ได้ที่แถบ services

ต่อไป รันเพื่อสร้าง source connect

kubectl apply -f source-kafka-connect.yaml

Sink connect

sink.Dockerfile

FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root

RUN mkdir -p /opt/kafka/plugins
ADD <https://github.com/aiven/gcs-connector-for-apache-kafka/releases/download/v0.9.0/aiven-kafka-connect-gcs-0.9.0.tar> /opt/kafka/plugins
RUN cd /opt/kafka/plugins && tar -xf aiven-kafka-connect-gcs-0.9.0.tar && rm -rf aiven-kafka-connect-gcs-0.9.0.tar
USER 1001

จะเห็นว่า เราจะใช้ base image จาก strimzi แล้วสิ่งที่ทำเพิ่มคือลง plugin ตัว gcs จาก Aiven

ถัดมา เราก็ build image

docker buildx build --platform=linux/amd64  -t mesodiar/kafka-connect-gcs:v1.0.0 -f source.Dockerfile --load .

Sink connect file

## sink-kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-gcs-sink-cluster
namespace: kafka-mils
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: mesodiar/kafka-connect-gcs:v1.0.0
version: 3.2.3
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: sink-my-connect-cluster
offset.storage.topic: sink-my-connect-cluster-offsets
config.storage.topic: sink-my-connect-cluster-configs
status.storage.topic: sink-my-connect-cluster-status
key.converter.schemas.enable: false
value.converter.schemas.enable: false
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
ssl.enabled.protocols: "TLSv1.2"
ssl.protocol: "TLSv1.2"
ssl.endpoint.identification.algorithm: HTTPS
kubectl apply -f sink-kafka-connect.yaml

ผลลัพท์ที่ได้ก็จะมี 2 pods ที่เป็นทั้ง source และ sink connect

2 pods ด้านบยคือ sink และ source pod ตามลำดับ

Create Kafka Connectors

Source connector

source_sql_server_connector.yaml

# To use the KafkaConnector resource you have to first enable the connector operator using
# the strimzi.io/use-connector-resources annotation on the KafkaConnect custom resource.
# From Apache Kafka 3.1.1 and 3.2.0 you also have to add the FileStreamSourceConnector
# connector to the container image. You can do that using the kafka-connect-build.yaml example.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: source-sql-server-connector
namespace: kafka
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: kafka-connect-mssql-2019-source-cluster
spec:
class: io.debezium.connector.sqlserver.SqlServerConnector
tasksMax: 1
config:
topic: cloudSQL.dbo.employee
connector.class: io.debezium.connector.sqlserver.SqlServerConnector
database.hostname: <instance_ip>
database.port: 1433
database.dbname: yaki
database.user: sqlserver
database.password: '<password>'
database.server.name: cloudSQL
database.encrypt: true
database.ssl: true
database.trustServerCertificate: true
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: booboo.history.employee-topic
key.converter.schemas.enable: false
value.converter.schemas.enable: false
table.whitelist: dbo.employee
offset.flush.interval.ms: 15000
errors.deadletterqueue.topic.name: cdc_employee-topic
poll.interval.ms: 1000
errors.tolerance: all
snapshot.mode: initial

จะเห็นได้ว่า config เราจะระบุว่าจะต่อไปยัง database ด้วย connection อะไร โดยต้องระบุทั้ง hostname, port, username, password, และบอกว่า table อะไรที่เราสนใจอยู่ (table.whitelist) และที่สำคัญคือ topic name cloudSQL.dbo.employee

ต่อไปเราก็สร้าง source connector ด้วยคำสั่ง

kubectl apply -f source_sql_server_connector.yaml

Sink connector

sink-gcs-connector.yaml

# To use the KafkaConnector resource you have to first enable the connector operator using
# the strimzi.io/use-connector-resources annotation on the KafkaConnect custom resource.
# From Apache Kafka 3.1.1 and 3.2.0 you also have to add the FileStreamSourceConnector
# connector to the container image. You can do that using the kafka-connect-build.yaml example.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: sink-gcs-connector
namespace: kafka-mils
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: kafka-connect-gcs-sink-cluster
spec:
class: io.aiven.kafka.connect.gcs.GcsSinkConnector
tasksMax: 2
config:
file: /opt/kafka/LICENSE
topics: cloudSQL.dbo.employee
tasks.max: 1
format.output.type: "jsonl"
key.converter.schemas.enable: false
value.converter.schemas.enable: false
gcs.bucket.name: "<bucket_name>"
file.name.prefix: "landing/<folder_name>/"
file.compression.type: "gzip"
format.output.fields: "key,value,offset,timestamp"
file.name.template: "{{timestamp:unit=yyyy}}/{{timestamp:unit=MM}}/{{timestamp:unit=dd}}/{{topic}}-{{partition}}-{{start_offset}}.gz"
gcs.credentials.json: |
{
"type": "service_account",
"project_id": "XXXXX",
"private_key_id": "xxxxx",
"private_key": "XXXX\\n",
"client_email": "XXX.iam.gserviceaccount.com",
"client_id": "XXXX",
"auth_uri": "<https://accounts.google.com/o/oauth2/auth>",
"token_uri": "<https://oauth2.googleapis.com/token>",
"auth_provider_x509_cert_url": "XXXs",
"client_x509_cert_url": "XXXXm"
}

จะเห็นว่า config เราระบุว่าปลายทางต้องใช้ config อะไรบ้าง เช่น bucket name, path ใน bucket, file output, และ service account secret key เพื่อยืนยันว่าเราสามารถเข้าถึง bucket ได้

สิ่งที่สำคัญใน config นี้คือ topic name ต้องตรงกับที่เราใช้ คือcloudSQL.dbo.employee นั่นเอง เพื่อให้ connector มาดูจาก topic นี้

แล้วเราก็รันสร้าง sink connector ด้วยคำสั่ง

kubectl apply -f sink_gcs_connector.yaml

เมื่อเราสร้าง Kafka connectors ทั้ง 2 ตัวแล้ว ในหน้า aptakube ก็จะแสดงตามรูป

Done !

หาก connectors เราสามารถใช้ได้แล้ว เมื่อเราไปดู GCS จะพบว่ามีข้อมูล initial ไหลเข้ามาเรียบร้อย

Install Kafka ui

เราจะ install Kafka UI เพื่อให้เราได้ debug ง่ายขึ้น โดยเราใช้ของ provectus

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ui
namespace: kafka-mils
labels:
app: kafka-ui
spec:
replicas: 1
selector:
matchLabels:
app: kafka-ui
template:
metadata:
labels:
app: kafka-ui
spec:
containers:
- name: ui
image: provectuslabs/kafka-ui:latest
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8080
protocol: TCP
resources:
limits:
cpu: 500m
memory: 256Mi
requests:
cpu: 100m
memory: 128Mi
env:
- name: KAFKA_CLUSTERS_0_NAME
value: local
- name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
value: "my-cluster-kafka-bootstrap:9092"
- name: KAFKA_CLUSTERS_0_SCHEMAREGISTRY
value: "http://schema-registry-cp-schema-registry:8081"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME
value: "kafka_connect_source"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS
value: "<http://kafka-connect-mssql-2019-source-cluster-connect-api:8083>"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_1_NAME
value: "kafka_connect_sink"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_1_ADDRESS
value: "<http://kafka-connect-gcs-sink-cluster-connect-api:8083>"

---
apiVersion: v1
kind: Service
metadata:
name: kafka-ui
namespace: kafka-mils
labels:
app: kafka-ui
spec:
ports:
- port: 80
protocol: TCP
targetPort: 8080
selector:
app: kafka-ui
kubectl apply -f provectus_kafka_ui.yaml

แล้ว port forward จาก pod ตัว kafka-ui ได้เลย

จะเห็นว่าในแถบของ Kafka connect จะเห็น 2 connectors ที่เราลงไปแล้ว

วิธีสังเกตว่า connectors เราใช้ได้แล้วหรือไม่ ให้ดูตรง topics กับ status ว่าผูกกับ topic และ status เป็น ready แล้วหรือยัง

ทดสอบ CDC

เราจะเข้าไปใน DBeaver เพื่อเพิ่ม record 1 row

INSERT INTO yaki.dbo.employee
(id, first_name, last_name, email, gender, ip_address, new_address)
VALUES(125, 'burasakorn', 'sabyeying', 'burasakorn.sab@cjexpress.co.th', 'female', '99.99.99.99', 'thailand');

พอเรากลับมาที่ Cloud storage จะเห็นว่ามีไฟล์เพิ่มเข้ามาแล้ว เย้ !

จะเห็นว่าไฟล์ใหม่ที่เข้ามาชื่อ cloudSQL.dbo.employee-0-134.gz ซึ่ง 134 คือเลข offset

และหากเราเปิดใน Kafka UI เข้าไปดูใน topic จะเห็นเลข next offset ด้วย คือ 135

ซึ่งตรงกับ db ของเราที่มีจำนวน 135 rows ด้วย

ส่งท้าย

เท่านี้เราก็สามารถทดลองสร้าง Kafka โดยใช้ Kafka connect ได้แล้ว เย้

ใน tutorial นี้ เรา skip การทำ schema registry ไป เพื่อไม่ให้บทความเนื้อหาเยอะเกิน

เราหวังว่าบทความนี้จะเป็นประโยชน์กับใครหลายๆคนที่กำลังฝึกเล่นเจ้า Kafka อยู่ ถ้าชอบบทความ, เห็นว่าบทความนี้เป็นประโยชน์ หรือติชม ก็สามารถทักไปบอกที่เพจได้นะคะ https://www.facebook.com/mesodiar/

ขอบคุณที่อ่านถึงตรงนี้ค่ะ !

--

--

Burasakorn Sabyeying
Mils’ Blog

Data engineer at CJ Express. GDE in Cloud. Women Techmakers Ambassador. Co-lead GDG Cloud Bangkok. Other channel > Mesodiar.com