มารู้จักการ Replication Topic ใน Confluent Kafka กันเถอะ

Rungwaraporn Khu
Sirisoft
Published in
5 min readJun 2, 2023

Kafka คืออะไร ?

Kafka คือ Message queue ที่ใช้ในการกระจายข้อความ (message, event, logs etc.) เพื่อเป็นศูนย์กลางในการรับส่ง, สำเนา และสำรองข้อมูล โดยมีจุดเด่นคือความสามารถรองรับงานหนักได้ โดยยังคงความเร็วสูงไว้อยู่ โดยปัจจุบันเป็น open source ที่ถูกพัฒนาโดย Apache ในชื่อ Apache Kafka

Message queue คือ เป็นตัวกลางระหว่าง ผู้ส่ง (producer) และผู้รับ (consumer) ในการส่งต่อข้อมูล โดย Kafka มีความต่างจาก message queue ทั่วไปเพราะ message ที่ถูกเก็บไว้นั้น จะไม่ถูกลบทิ้งออกไปแม้ว่าจะมี consumer มา consume ไปใช้งานแล้ว นอกจากนั้น Kafka ยังเป็น message queue ประเภท publish/subscribe ที่แยก producer และ consumer อย่างชัดเจน ทำให้ทั้งสองฝั่งสามารถทำงานตามปกติได้โดยไม่ต้องคำนึงถึงอีกฝั่ง

Confluent Kafka คืออะไร ?

Confluent Kafka คือแพลตฟอร์มในการ Streaming Data Real-time และ Distribute Message Queue ที่ได้รับความนิยมอย่างมากในปัจจุบัน

Confluent Kafka ถูกพัฒนามากจาก Apache Kafka ที่เป็นระบบรองรับการส่งข้อมูลจากหลายๆแหล่งข้อมูลต้นทาง (Produce) จำนวนมหาศาล ทั้งแบบ Batch file และ Real-time Data พร้อมๆกัน และส่งไปยังปลายทาง (Consume) ทั้งฐานข้อมูลหรือแอปพลิเคชันต่างๆได้

Replication Topic อย่างไรได้บ้าง

การ replicate topic คือ การคัดลอกข้อความ, การตั้งค่า Topic, จำนวน Partition จากต้นทางไปยังปลายทาง สามารถทำได้ 3 แบบ ดังนี้

  • MirrorMaker2 (MM2)
  • Confluent Replicator
  • Cluster Linking.

MirrorMaker 2.0 (MM2)

MirrorMaker 2.0 เป็นการนำ Kafka Connect Framework มาใช้งานเพื่อคัดลอกข้อความ, การตั้งค่า Topic, จำนวน Partition แต่จะไม่มีการเก็บค่า offset ระหว่างต้นทางและปลายทางนั่นหมายความว่าหากต้องการ failover จากศูนย์ข้อมูลนึงไปยังศูนย์ข้อมูลอื่นจะต้องทำการ translate offset เองภายในแอปพลิเคชัน

โดยชื่อของ Topic จะถูกขึ้นต้นด้วยชื่อของ Cluster ต้นทางตามด้วยชื่อ topic เช่น B.heartbeats และ A.heartbeats เป็นต้น

การทำงานของ MirrorMaker 2

มาทดลองสร้าง MirrorMaker 2 กันค่ะ โดยที่เราจะมาลองกันจะเป็นการ replicate ข้อมูลทุก topic จาก A ไป B และจาก B ไป A

Step 1 : แก้ไขไฟล์ตั้งค่าของ MirrorMaker ที่ connect-mirror-maker.properties

# specify any number of cluster aliases
clusters = A, B

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = A:9092
B.bootstrap.servers = B:9092

# enable and configure individual replication flows
A->B.enabled = true

# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = .*

B->A.enabled = true
B->A.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=1

############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

Step 2: เริ่มการใช้งาน MirrorMaker2

$  ./bin/connect-mirror-maker etc/kafka/connect-mirror-maker.properties

Confluent Replicator

Replicator เป็นการนำ MirrorMaker 2.0 (MM2) มาปรับปรุงให้สามารถทำ auto offset translation สำหรับ java consumer และทำ metadata replication เมื่อมีการทำ mirror ที่ข้อมูลจริงแล้วจะสามารถ mirror metadata ได้อีกด้วย เช่น การตั้งค่า topic หรือจำนวน partition

โดยชื่อ topic ที่ได้จะเป็นชื่อ topic.replica เช่น test-topic.replica เป็นต้น โดยที่ต้นทางและปลายทางจะสามารถ produce, consume ข้อความได้และสามารถตั้งค่าเพิ่มเติมฝั่งปลายทางได้ผ่านทาง connector

ภาพการทำงานของ Confluent Replicator

มาทดลองสร้าง Confluent Replicator กันค่ะ โดยที่เราจะมาลองกันจะเป็นการ replicate ข้อมูลใน demo-whitelist จาก cluster A ไป cluster B

Step 1: สร้าง topic เพื่อ replicate (สร้างเป็น whitelist)

$ ./bin/kafka-topics --bootstrap-server A:9092 --create --topic demo-whitelist --replication-factor 1 --partitions 3

Step 2: แก้ไขไฟล์ตั้งค่า consumer.properties สำหรับกำหนด source cluster

# Origin cluster connection configuration
bootstrap.servers=A:9092

Step 3: แก้ไขไฟล์ตั้งค่า producer.properties สำหรับกำหนด destination cluster

# Destination cluster connection configuration
bootstrap.servers=B:9092

Step 4: แก้ไขไฟล์ตั้งค่า replication.properties

# Replication configuration
topic.rename.format=${topic}.replica
replication.factor=1
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
confluent.topic.replication.factor=1

Step 5: run command เพื่อเริ่มต้นใช้งาน confluent replicator

$ ./bin/replicator --cluster.id replicator --consumer.config /etc/kafka/consumer.properties --producer.config /etc/kafka/producer.properties --replication.config /etc/kafka/replication.properties --whitelist 'demo-whitelist'

Cluster Linking

Cluster Linking เป็นการนำ MirrorMaker 2.0 (MM2) และ Confluent Replicator มาปรับปรุงให้ไม่มีการแบ่ง process ระหว่างต้นทางกับปลายทาง มีประสิทธิภาพมากขึ้น และ Preserve Offset (จะ mirror ข้อมูลโดยมี partition และ offset เดียวกัน โดยไม่ต้อง translate offset)

Cluster linking จะทำการเชื่อมต่อ cluster และสร้าง mirror topic จากต้นทางไปยังปลายทางได้โดยไม่ต้องผ่าน connector, มีความปลอดภัย, รองรับ network latency และสามารถสร้างที่ Confluent Server, Confluent Cloud ได้ อีกทั้งยังรองรับ multi-datacenter, multi-region และ hybrid cloud

โดยชื่อ topic ที่ได้จะเป็นชื่อเดียวกัน โดยฝั่งต้นทางจะสามารถ produce และ consume ข้อความได้ แต่ฝั่งปลายทางจะสามารถ consume ข้อความได้อย่างเดียว หากต้องการ produce ข้อความ ต้องยกเลิก cluster link ด้วยการ promote topic

ภาพการทำงานของ Cluster Linking

มาทดลองสร้าง Cluster Linking กันค่ะ โดยที่เราจะมาลองกันจะเป็นการ link topic demo ใน cluster A กับ demo-link ใน cluster B

Step 1: สร้าง topic demo ที่ source cluster

$ ./bin/kafka-topics --bootstrap-server A:9092 --create --topic demo --replication-factor 1 --partitions 3

Step 2: สร้าง cluster link ที่ destination cluster

$ ./bin/kafka-cluster-links --bootstrap-server B:9092 --create --link demo-link --config bootstrap.servers=A:9092

Step 3: Initialize mirror topic

$ ./bin/kafka-mirrors --create --mirror-topic demo --link demo-link --bootstrap-server B:9092

Step 4 : ตรวจสอบ replica ที่ destination cluster

$ ./bin/kafka-replica-status --topics demo --include-linked --bootstrap-server B:9092

Summary

  • MirrorMaker2 เป็นการนำ Kafka Connect Framework มาใช้งานเพื่อคัดลอก topic จากต้นทางไปยังปลายทาง จะไม่มีการเก็บค่า offset ทำให้ต้องมีการทำ translate offset เองที่แอปพลิเคชัน
  • Replicator นำ MirrorMaker2 มาปรับปรุงทำให้จะสามารถทำ metadata replication, auto offset translation สำหรับ java consumer
  • Cluster linking นำ MirrorMaker2 และ Replicator มาปรับปรุงเพื่อ mirror ข้อมูลโดยมี partition และ offset เดียวกัน โดยไม่ต้อง translate offset, มีความปลอดภัยสูง, รองรับ network latency

ท้ายที่สุดนี้หวังว่า Blog นี้จะเป็นประโยชน์สำหรับผู้ที่กำลังอ่านอยู่หรือผู้ที่สนใจเกี่ยวกับการ replicate topic ถ้ามีเนื้อหาอะไรที่ตกหล่นหรือผิดพลาดประการใดต้องขออภัยด้วยค่ะ

ยังไม่หมด สำหรับใครที่สนใจอยากจะเอา kafka ไปใช้ภายในองค์กร แต่ยังไม่มีความเชี่ยวชาญ ก็สามารถติดต่อมาที่ Sirisoft ได้ค่ะ ทางเรายินดีให้บริการ และตอนนี้ทางบริษัทยังเปิดรับสมาชิกเพิ่มอยู่นะคะเพื่อนๆคนในสนใจก็ติดต่อมาได้ที่ https://lin.ee/ms8vit4 เลยค่ะ

👉🏻ติดตามข่าวสารอื่นๆของบริษัทได้ที่ช่องทางด้านล่างนี้เลยค่ะ

Instagram: Sirisoft_official

Tiktok: https://www.tiktok.com/@sirisoft

Youtube: https://www.youtube.com/@sirisoftofficial1698

Facebook: https://www.facebook.com/sirisoft/

Website: https://www.sirisoft.co.th/

--

--