Setting up Kafka with Kafka connect from SQL server to Cloud storage (GCS)
Tutorial นี้เราจะพา setup Kafka cluster โดยใช้ Strimzi
รวมถึง setup Kafka connect เพื่อต่อไปยัง external source ต่างๆเพื่อดึงจาก SQL server เป็น source data ต้นทางโดยเก็บ event จาก Change Data Capture (CDC) แล้วนำไปเก็บที่ sink ปลายทางที่ Google Cloud Storage
บทความนี้จะเหมาะกับ
- ผู้ต้องการศึกษา Kafka, Kafka connect
และพอเข้าใจ concept เบื้องต้นของ Kafka บ้างแล้ว (เช่น topics, producer, consumer)
เราแนะนำให้อ่านบทความนี้ก่อน https://mesodiar.com/intro-to-apache-kafka-universe-kafka-คืออะไร-45f652e8233f - ต้องการลอง practice ด้วยการ setup Kafka ด้วยตัวเอง
Table of contents:
- Pre-requisites
- ทำไมใช้ Strimzi ? Strimzi คืออะไร
- Difference between Strimzi and Confluent for Kubernetes
เริ่ม setup กัน
- Setup Kafka Cluster ?
- Kafka Connect คืออะไร ?
- สิ่งที่ต้องเตรียมสำหรับ project นี้ ก่อนสร้าง Kafka connect
- Cloud SQL for SQL server
- Setup Kafka Connect
- source connect
- sink connect
- Setup Kafka Connector
- source connector
- sink connector
- Install Kafka UI
- ทดสอบ CDC
Pre-requisites:
- Docker
- Kubernetes (เราจะใช้ GKE)
kubectl
command- Kubernetes Desktop Client สักตัว เช่น Aptakube, Lens (ไม่ฟรี), OpenLens เพื่อการ monitor resource ต่างๆบน Kafka ง่ายๆ
- GCP service อื่นๆอย่าง Cloud SQL และ Cloud storage
ทำไม Strimzi ? Strimzi คืออะไร?
ในการสร้าง Kafka Cluster นั้น เราสามารถสร้างได้หลายวิธีเนื่องจากมี provider มากมายที่ให้บริการในการสร้าง cluster โดยเราขอแบ่งเป็น 2–3 เจ้าสั้นๆ
- Strimzi or Strimzi operator
- Confluent for Kubernetes (CFK) or Confluent operator (จะเรียกสั้นๆว่า confluent)
- Koperator
เหตุผลที่เขาเรียกว่า operator เพราะว่าทั้งหมดนี้คือ Kubernetes operator
ที่ install controller เพื่อ create, configure, และ secure ตัว Kafka cluster (เหมือน Kubernetes resource ต่างๆเช่น Pod, Deployment, ConfigMap)
ตัวที่นิยมในตลาดเป็น Strimzi และ Confluent เราจึงได้นำ 2 ตัวนี้มา compare กัน
Different between Strimzi and Confluent
จากในตาราง Strimzi นั้นเป็น Open source ที่มีคน back ให้เป็น community จริงๆ ในขณะที่ confluent จะเป็นทีมที่สร้าง Kafka มา
และในเรื่องของราคา Strimzi นั้นฟรีทั้งหมด ส่วนของ Confluent นั้นฟีเจอร์จะไม่เปิดใช้ทั้งหมดให้ฟรีและเป็น Enterprise grade
เพื่อความสะดวก เราจะใช้ Strimzi ในการ setup Kafka cluster กัน
คำเตือน ! ห้าม copy คำสั่งและโค้ดเพลิน
Setup Kafka Cluster
เราจะสร้าง namespace เพื่อ deploy Strimzi operator ในนี้
kubectl create namespace kafka-mils
หมายเหตุ: namespace ทุกคนจะชื่อว่า kafka ก็ได้นะ แต่เราอยากจะตั้งว่า kafka-mils
ถัดไป เราจะ download ไฟล์ install จากลิ้งนี้ แล้วเราเอาไฟล์นี้มาเซฟไว้ ตั้งชื่อว่า strimzi-install-file.yaml
curl <https://strimzi.io/install/latest?namespace=kafka-mils>
--output strimzi-install-file.yaml
ถ้าทุกคนใช้ namespace kafka ก็ใช้ https://strimzi.io/install/latest?namespace=kafka
น้า
สิ่งที่ไฟล์ strimzi-install-file.yaml
ทำคือการ install พวก ClusterRoles, ClusterRoleBindings ต่างๆและพวก CRDS อย่าง Kafka, KafkaTopic ทั้งหลาย เพื่อเอามา manage Kafka cluster
หากทุกคนสังเกตจะเห็นว่า ในไฟล์จะมี namespace ที่เรากำหนดเรียบร้อยแล้ว
เพื่อจะสร้าง resource ต่างๆเราก็ต้อง apply ก่อน
kubectl apply -f strimzi-install-file.yaml
ทีนี่เรากลับมาดูที่ namespace ของเรา เราจะเจอว่ามี pod ตัว strimzi operator เกิดขึ้นแล้ว เย้
หรือถ้าใครสาย cli จะใช้ kubectl get pod มาดูได้
kubectl get pod -n kafka-mils --watch
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-f696c85f7-k6hgf 1/1 Running 0 3m40s
คราวนี้เราลง strimzi operator เสร็จแล้ว ถัดไปเราจะลง cluster
curl <https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml> --output kafka-persistent-single.yaml
ทุกคนจะเห็นว่าหน้าตาของไฟล์เป็นแบบนี้ แล้วเราก็เพิ่ม namespace เข้าไป เพื่อที่ว่าตอน apply เราจะได้ไม่ลั่นไปผิด namespace 55555
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka-mils ##<-- add namespace here
spec:
kafka:
version: 3.3.1
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.3"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
รันด้วยคำสั่ง
kubectl apply -f kafka-persistent-single.yaml
หากเราดูจาก Aptakube จะเห็นว่ามี cluster ขึ้นแล้วชื่อ my-cluster
แล้วมาดูที่ pod จะเห็นว่ามี 3 pods เกิดขึ้นมาใหม่ เราจะรอแปบนึงจนกว่าทุกอย่าง Running เขียวหมด
pod ที่ขึ้นมาจะเป็น cluster entity, Kafka cluster และ ZooKeeper ของเรา
เราสามารถลองเทสว่า kafka cluster เราใช้ได้ไหมด้วยการลอง produce message เข้าไปดู
kubectl -n kafka-mils run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
ทีนี้ลอง consume message ดู
kubectl -n kafka-mils run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
Result:
เราจะเห็นว่า consumer จะพ่น message ต่างๆที่เราใส่ไว้ใน producer
ทีนี้เราก็มั่นใจว่า Kafka cluster เราเรียบร้อยแล้ว
Kafka Connect คืออะไร
Kafka Connect คือหนึ่งในจักรวาลของ Kafka ในการทำ streaming data ระหว่าง Kafka brokers กับ external data source อื่นๆในโลกภายนอก เช่น ไปต่อกับ database
ใน Kafka Connect จะมี connector อยู่ เช่นถ้าเราต่อกับ mysql ก็จะเป็น mysql connector ที่จะต่อกับ mysql database เท่านั้น
โดย Connector มี 2 ประเภท:
- source connector — push data ขาเข้า Kafka
- sink connector — pull data ออกจาก Kafka
รูปนี้เป็นรูปจาก debezium ซึ่งใน tutorial นี้เราก็จะใช้ connector จาก Debezium ในการเซ็ต source connector สำหรับ SQL server เช่นกัน
สิ่งที่ต้องเตรียมสำหรับ project นี้ ก่อนสร้าง Kafka connect
- SQL server เป็นตัว database ต้นทาง โดยเราจะใช้ Cloud SQL แบบ SQL server
- Cloud storage bucket
Cloud SQL for SQL server
ในการสร้าง Cloud SQL ให้ไปที่ https://console.cloud.google.com/ และเข้าไป service ที่ชื่อว่า Cloud SQL
จากนั้นก็กดสร้าง Create instance โดยเลือกเป็น SQL server
เราจะเซ็ตให้เครื่องอยู่ Singapore, DB Version เป็น SQL Server 2019 Standard
และขนาดเครื่องเล็กที่สุดเพื่อใช้สำหรับ tutorial นี้เท่านั้น
สำคัญที่สุดคือเราจะสร้างเป็น Public IP และใส่ authorized network 2 ตัว
- เครื่อง IP ของเรา เพื่อที่ว่าเครื่องเราจะเข้า database นี้ได้เพื่อไปสร้าง table โดยเอา IP จากการเสิร์ช google ว่า “what’s my ip” ได้
- IP ของ Kubernetes เพื่อที่ว่า Kafka บน Kubernetes เราต่อไปยัง database ได้
เราเลือกแบบไม่ใช่ private IP เพราะไม่อยากให้มันยุ่งยากเกินไป
คราวนี้เมื่อสร้างเสร็จแล้ว อาจจะต้องรอสักครู่ใหญ่ๆ
เมื่อสร้างเสร็จแล้วจะมีเครื่องหมายติ๊กถูกข้างหน้า instance
คราวนี้เราจะต้อง access เข้าไปใน database เพื่อสร้าง table กัน
เราจะใช้ DBeaver นะ ก็กรอก connection พวก hostname, username, password ตามรูปได้เลย
จากนั้นก็สร้าง database และสร้าง table ชื่อ employee (เอา mock data จาก https://www.mockaroo.com/)
เมื่อเราสร้าง table เสร็จก็เท่ากับว่าเราเตรียมความพร้อมสำหรับ database ต้นทางแล้ว(ชื่อว่า yaki) แต่อีกจุดที่สำคัญคือ เนื่องจาก Kafka เราจะ capture event ผ่าน CDC
Change data capture (CDC) คืออะไร?
Change data capture คือ process ที่บอกว่าตอนนี้มีใครทำอะไรกับ database บ้าง เช่น มี update, delete transaction ใน database นะ โดยสิ่งนี้จะ deliver event แบบ real-time ให้ อย่างที่เราตั้งใจว่าจะให้วิ่งไปหา Kafka
สำหรับ SQL server สามารถเปิด feature CDC ได้เลย โดยรัน command ผ่าน DBeaver ด้านล่าง
## Turn CDC on
EXEC msdb.dbo.gcloudsql_cdc_enable_db 'yaki'
### Track changes in a table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'employee',
@role_name = N'CDC'
## Check if CDC is enabled on a table
EXECUTE sys.sp_cdc_help_change_data_capture
@source_schema = N'dbo',
@source_name = N'employee'
To see change in table
SELECT * FROM cdc.dbo_employee_CT
เท่านี้เราก็เซ็ตต้นทางสำหรับ source data เสร็จแล้ว !
ส่วน Cloud storage ปลายทาง ก็สร้าง bucket ได้เลยตามปกติจ้า อันนี้ไม่ยาก
Setup Kafka Connect
วิธีการสร้าง Kafka Connect เราต้อง build container image ที่มี connector plugins ที่จำเป็นสำหรับ connection
Source Connect
source.Dockerfile
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins
ADD <https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.7.Final/debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz> /opt/kafka/plugins
RUN cd /opt/kafka/plugins && tar -xzf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz && rm -rf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz**
USER 1001
จะเห็นว่า เราจะใช้ base image จาก strimzi แล้วสิ่งที่ทำเพิ่มคือลง plugin ตัว sqlserver จาก debezium เพิ่ม
แล้วเราก็ build เป็น image ไว้
docker buildx build --platform=linux/amd64 -t mesodiar/kafka-connect-mssql2019:v1.0.0 -f source.Dockerfile --load .
ทีนี้เราจะสร้างไฟล์ Kafka connect กัน
ตั้งชื่อว่า source-kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-mssql-2019-source-cluster
namespace: kafka-mils
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: mesodiar/kafka-connect-mssql2019:v1.0.0
version: 3.2.3
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: source-my-connect-cluster
offset.storage.topic: source-my-connect-cluster-offsets
config.storage.topic: source-my-connect-cluster-configs
status.storage.topic: source-my-connect-cluster-status
key.converter.schemas.enable: false
value.converter.schemas.enable: false
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
จะเห็นว่า bootstrap server คือ my-cluster-kafka-bootstrap:9092 โดยคุยผ่าน plaintext ธรรมดา (9092) ซึ่งเราสามารถดูชื่อ service ของ bootstrap ได้ที่แถบ services
ต่อไป รันเพื่อสร้าง source connect
kubectl apply -f source-kafka-connect.yaml
Sink connect
sink.Dockerfile
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins
ADD <https://github.com/aiven/gcs-connector-for-apache-kafka/releases/download/v0.9.0/aiven-kafka-connect-gcs-0.9.0.tar> /opt/kafka/plugins
RUN cd /opt/kafka/plugins && tar -xf aiven-kafka-connect-gcs-0.9.0.tar && rm -rf aiven-kafka-connect-gcs-0.9.0.tar
USER 1001
จะเห็นว่า เราจะใช้ base image จาก strimzi แล้วสิ่งที่ทำเพิ่มคือลง plugin ตัว gcs จาก Aiven
ถัดมา เราก็ build image
docker buildx build --platform=linux/amd64 -t mesodiar/kafka-connect-gcs:v1.0.0 -f source.Dockerfile --load .
Sink connect file
## sink-kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-gcs-sink-cluster
namespace: kafka-mils
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: mesodiar/kafka-connect-gcs:v1.0.0
version: 3.2.3
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: sink-my-connect-cluster
offset.storage.topic: sink-my-connect-cluster-offsets
config.storage.topic: sink-my-connect-cluster-configs
status.storage.topic: sink-my-connect-cluster-status
key.converter.schemas.enable: false
value.converter.schemas.enable: false
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"
ssl.enabled.protocols: "TLSv1.2"
ssl.protocol: "TLSv1.2"
ssl.endpoint.identification.algorithm: HTTPS
kubectl apply -f sink-kafka-connect.yaml
ผลลัพท์ที่ได้ก็จะมี 2 pods ที่เป็นทั้ง source และ sink connect
Create Kafka Connectors
Source connector
source_sql_server_connector.yaml
# To use the KafkaConnector resource you have to first enable the connector operator using
# the strimzi.io/use-connector-resources annotation on the KafkaConnect custom resource.
# From Apache Kafka 3.1.1 and 3.2.0 you also have to add the FileStreamSourceConnector
# connector to the container image. You can do that using the kafka-connect-build.yaml example.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: source-sql-server-connector
namespace: kafka
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: kafka-connect-mssql-2019-source-cluster
spec:
class: io.debezium.connector.sqlserver.SqlServerConnector
tasksMax: 1
config:
topic: cloudSQL.dbo.employee
connector.class: io.debezium.connector.sqlserver.SqlServerConnector
database.hostname: <instance_ip>
database.port: 1433
database.dbname: yaki
database.user: sqlserver
database.password: '<password>'
database.server.name: cloudSQL
database.encrypt: true
database.ssl: true
database.trustServerCertificate: true
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: booboo.history.employee-topic
key.converter.schemas.enable: false
value.converter.schemas.enable: false
table.whitelist: dbo.employee
offset.flush.interval.ms: 15000
errors.deadletterqueue.topic.name: cdc_employee-topic
poll.interval.ms: 1000
errors.tolerance: all
snapshot.mode: initial
จะเห็นได้ว่า config เราจะระบุว่าจะต่อไปยัง database ด้วย connection อะไร โดยต้องระบุทั้ง hostname, port, username, password, และบอกว่า table อะไรที่เราสนใจอยู่ (table.whitelist) และที่สำคัญคือ topic name cloudSQL.dbo.employee
ต่อไปเราก็สร้าง source connector ด้วยคำสั่ง
kubectl apply -f source_sql_server_connector.yaml
Sink connector
sink-gcs-connector.yaml
# To use the KafkaConnector resource you have to first enable the connector operator using
# the strimzi.io/use-connector-resources annotation on the KafkaConnect custom resource.
# From Apache Kafka 3.1.1 and 3.2.0 you also have to add the FileStreamSourceConnector
# connector to the container image. You can do that using the kafka-connect-build.yaml example.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: sink-gcs-connector
namespace: kafka-mils
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: kafka-connect-gcs-sink-cluster
spec:
class: io.aiven.kafka.connect.gcs.GcsSinkConnector
tasksMax: 2
config:
file: /opt/kafka/LICENSE
topics: cloudSQL.dbo.employee
tasks.max: 1
format.output.type: "jsonl"
key.converter.schemas.enable: false
value.converter.schemas.enable: false
gcs.bucket.name: "<bucket_name>"
file.name.prefix: "landing/<folder_name>/"
file.compression.type: "gzip"
format.output.fields: "key,value,offset,timestamp"
file.name.template: "{{timestamp:unit=yyyy}}/{{timestamp:unit=MM}}/{{timestamp:unit=dd}}/{{topic}}-{{partition}}-{{start_offset}}.gz"
gcs.credentials.json: |
{
"type": "service_account",
"project_id": "XXXXX",
"private_key_id": "xxxxx",
"private_key": "XXXX\\n",
"client_email": "XXX.iam.gserviceaccount.com",
"client_id": "XXXX",
"auth_uri": "<https://accounts.google.com/o/oauth2/auth>",
"token_uri": "<https://oauth2.googleapis.com/token>",
"auth_provider_x509_cert_url": "XXXs",
"client_x509_cert_url": "XXXXm"
}
จะเห็นว่า config เราระบุว่าปลายทางต้องใช้ config อะไรบ้าง เช่น bucket name, path ใน bucket, file output, และ service account secret key เพื่อยืนยันว่าเราสามารถเข้าถึง bucket ได้
สิ่งที่สำคัญใน config นี้คือ topic name ต้องตรงกับที่เราใช้ คือcloudSQL.dbo.employee
นั่นเอง เพื่อให้ connector มาดูจาก topic นี้
แล้วเราก็รันสร้าง sink connector ด้วยคำสั่ง
kubectl apply -f sink_gcs_connector.yaml
เมื่อเราสร้าง Kafka connectors ทั้ง 2 ตัวแล้ว ในหน้า aptakube ก็จะแสดงตามรูป
Done !
หาก connectors เราสามารถใช้ได้แล้ว เมื่อเราไปดู GCS จะพบว่ามีข้อมูล initial ไหลเข้ามาเรียบร้อย
Install Kafka ui
เราจะ install Kafka UI เพื่อให้เราได้ debug ง่ายขึ้น โดยเราใช้ของ provectus
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ui
namespace: kafka-mils
labels:
app: kafka-ui
spec:
replicas: 1
selector:
matchLabels:
app: kafka-ui
template:
metadata:
labels:
app: kafka-ui
spec:
containers:
- name: ui
image: provectuslabs/kafka-ui:latest
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 8080
protocol: TCP
resources:
limits:
cpu: 500m
memory: 256Mi
requests:
cpu: 100m
memory: 128Mi
env:
- name: KAFKA_CLUSTERS_0_NAME
value: local
- name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
value: "my-cluster-kafka-bootstrap:9092"
- name: KAFKA_CLUSTERS_0_SCHEMAREGISTRY
value: "http://schema-registry-cp-schema-registry:8081"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME
value: "kafka_connect_source"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS
value: "<http://kafka-connect-mssql-2019-source-cluster-connect-api:8083>"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_1_NAME
value: "kafka_connect_sink"
- name: KAFKA_CLUSTERS_0_KAFKACONNECT_1_ADDRESS
value: "<http://kafka-connect-gcs-sink-cluster-connect-api:8083>"
---
apiVersion: v1
kind: Service
metadata:
name: kafka-ui
namespace: kafka-mils
labels:
app: kafka-ui
spec:
ports:
- port: 80
protocol: TCP
targetPort: 8080
selector:
app: kafka-ui
kubectl apply -f provectus_kafka_ui.yaml
แล้ว port forward จาก pod ตัว kafka-ui ได้เลย
จะเห็นว่าในแถบของ Kafka connect จะเห็น 2 connectors ที่เราลงไปแล้ว
วิธีสังเกตว่า connectors เราใช้ได้แล้วหรือไม่ ให้ดูตรง topics กับ status ว่าผูกกับ topic และ status เป็น ready แล้วหรือยัง
ทดสอบ CDC
เราจะเข้าไปใน DBeaver เพื่อเพิ่ม record 1 row
INSERT INTO yaki.dbo.employee
(id, first_name, last_name, email, gender, ip_address, new_address)
VALUES(125, 'burasakorn', 'sabyeying', 'burasakorn.sab@cjexpress.co.th', 'female', '99.99.99.99', 'thailand');
พอเรากลับมาที่ Cloud storage จะเห็นว่ามีไฟล์เพิ่มเข้ามาแล้ว เย้ !
จะเห็นว่าไฟล์ใหม่ที่เข้ามาชื่อ cloudSQL.dbo.employee-0-134.gz
ซึ่ง 134 คือเลข offset
และหากเราเปิดใน Kafka UI เข้าไปดูใน topic จะเห็นเลข next offset ด้วย คือ 135
ซึ่งตรงกับ db ของเราที่มีจำนวน 135 rows ด้วย
ส่งท้าย
เท่านี้เราก็สามารถทดลองสร้าง Kafka โดยใช้ Kafka connect ได้แล้ว เย้
ใน tutorial นี้ เรา skip การทำ schema registry ไป เพื่อไม่ให้บทความเนื้อหาเยอะเกิน
เราหวังว่าบทความนี้จะเป็นประโยชน์กับใครหลายๆคนที่กำลังฝึกเล่นเจ้า Kafka อยู่ ถ้าชอบบทความ, เห็นว่าบทความนี้เป็นประโยชน์ หรือติชม ก็สามารถทักไปบอกที่เพจได้นะคะ https://www.facebook.com/mesodiar/
ขอบคุณที่อ่านถึงตรงนี้ค่ะ !