ติดตั้งและเชื่อมต่อ Kafka ด้วย Go แบบง่าย ๆ พร้อมเพิ่ม Security ขั้นพื้นฐานลงไปด้วย!

Poonkasem Kasemsri Na Ayutthaya
Sirisoft
Published in
7 min readApr 19, 2022

สวัสดีครับ วันนี้เราจะมาว่ากันด้วยเรื่องของ Kafka หรือ Apache Kafka ซึ่งเป็น Message queue ที่โดดเด่นด้านความเร็วในการทำงาน และ performance ที่สูงมาก ๆ ทำให้ Kafka ได้รับความนิยมเป็นอย่างสูงในปัจจุบัน โดยในบทความนี้จะครอบคลุมถึงทฤษฎี architecture พื้นฐาน, การติดตั้ง config, การเชื่อมต่อด้วย command line, การใช้ Go เพื่อเชื่อมต่อ รวมถึงการใส่ Security เพื่อเพิ่มความปลอดภัยนั่นเอง

Kafka คืออะไร? ทำงานยังไง? แล้วเราเอาไปใช้อะไรได้บ้าง?

Kafka คือ Message queue ประเภท Publish/Subscribe (Pub/Sub) ที่ถูกสร้างขึ้นโดย Apache สำหรับใช้ในการรับ — ส่งข้อมูล โดยมีจุดเด่นคือความสามารถรองรับงานหนัก ๆ ได้โดยยังคงความเร็วสูงไว้ได้อยู่

แล้ว Message queue คืออะไร? Message queue ทำหน้าที่เป็นตัวกลางระหว่าง ผู้ส่ง (producer) และผู้รับ (consumer) ในการส่งต่อข้อมูล โดย Kafka มีความต่างจาก message queue ทั่วไปเพราะ message ที่ถูกเก็บไว้นั้น จะไม่ถูกลบทิ้งออกไปแม้ว่าจะมี consumer มา consume ไปใช้งานแล้ว และตัว Producer กับ Consumer ที่มาต่อกับ Kafka นั้น ถูกแยกออกจากกันโดยสิ้นเชิง เหมือนกับว่าทั้งสองไม่รู้จักกัน

การทำงานแบบที่ Producer และ Consumer ไม่รู้จักกันนั้น เรียกว่าการทำงานแบบ Publish/Subscribe ซึ่งถึงแม้ว่าทั้งคู่ไม่ได้รู้จักกันโดยตรง แต่สามารถทำงานร่วมกันได้โดย (ณ ที่นี้) ใช้ Kafka เป็นตัวกลางนั่นเอง โดยฝั่ง producer จะผลิต (produce) ข้อมูลเข้ามาใน Kafka โดยเก็บไว้ใน topic หรือหมวดหมู่เรื่องนั้น ๆ โดยที่ไม่จำเป็นต้องรู้ว่า consumer มีใครบ้าง ในขณะเดียวกัน Consumer ก็จะดึงมาใช้ (consume) จาก topic ที่ต้องการใน Kafka โดยไม่จำเป็นต้องรู้ว่าใครเป็นคน produce เข้ามาเช่นกัน

Kafka มักจะถูกนำมาใช้ในงานที่ต้องการความเร็วสูง (แบบ real-time) ที่สามารถใช้ข้อมูลที่เก็บไว้ได้หลาย ๆ ครั้ง โดยที่การ down ของฝั่งใดฝั่งหนึ่งของการรับ — ส่งข้อมูลไม่ทำให้ทั้งระบบพัง เช่น นำมาใช้เก็บและส่งต่อ request ของระบบธนาคาร เป็นต้น

แล้วภายใน Kafka มันเป็นยังไงล่ะ?

Architecture คร่าว ๆ ของ Kafka ประกอบไปด้วย Kafka cluster, Zookeeper, Producer และ Consumer โดยมีrole และหน้าที่ที่แตกต่างกันไป

  • Kafka cluster คือแกนหลักของ Kafka ที่ประกอบขึ้นจาก Broker (ที่ทำหน้าที่เหมือนกับ Server ของ Kafka) เพื่อเก็บข้อมูลที่ได้รับและส่งต่อ โดยภายในจะมี topic ที่ทำงานเหมือนกับ slot หรือ channel ในการเก็บข้อมูล และ Partition ภายใน topic ที่เป็นเสมือนเลนย่อย ๆ ภายใน topic ที่แบ่งข้อมูลข้างในไว้ (โดยปกติจะใส่ข้อมูลเข้าไปข้างใน partition เป็น round robin และไม่มีผลกับ business flow) โดย broker สามารถมีได้มากกว่า 1 และสามารถทำ Replica (หรือการ copy ข้อมูลระหว่าง broker ด้วยกัน เช่นข้อมูลของ topic หรือ partition) เพื่อความปลอดภัยในกรณีที่มี broker ตัวใดตัวหนึ่งตายไป ตัวที่เหลือก็ยังสามารถทำงานได้ตามปกติ นอกจากนี้ ระหว่าง replica ยังมีการเลือก leader (แบบสุ่ม) เพื่อให้ broker ทุกตัวมีการแบ่ง load การทำงานระหว่าง replica อีกด้วย
  • Zookeeper เปรียบเสมือนเลขาของ Kafka Cluster ที่คอยจัดการการเปลี่ยนแปลงต่าง ๆ เช่น การเชื่อมต่อ broker ตัวใหม่, การสร้าง topic หรือ การ update ค่าระหว่าง Replica เป็นต้น
  • Producer คือผู้ผลิต ที่คอยส่งข้อมูลไปเก็บไว้ใน Kafka เพื่อให้ Consumer นำไปใช้ต่อไป โดย Producer จำเป็นต้องรู้ชื่อ topic ที่ถูกต้อง (รวมถึง Security หากมีการตั้งไว้) เพื่อเชื่อมต่อไปยัง Kafka Broker และ produce ข้อมูลเข้าไป โดย producer ที่ produce ไปยัง topic หนึ่ง ๆ นั้นสามารถมีได้หลายตัว และไม่จำเป็นต้องรู้จักกันเองระหว่าง producer ด้วยกัน (ค่า default ที่ Kafka จะ auto ลบข้อมูลใน topic ที่ producer ส่งข้อมูลเข้ามาคือ 7 วัน)
  • Consumer คือผู้รับที่รับข้อมูลจาก topic ใน Kafka ที่ producer ส่งเข้ามา โดย Consumer สามารถทำงานร่วมกันได้โดยใช้ Consumer group ที่เหมือนกับการรวมกลุ่มและช่วยกันทำงาน โดยจำนวน consumer ใน group สามารถมีได้มากที่สุด ตามจำนวน partition ของ topic นั้น ๆ ที่ตั้งไว้ (Max consumer ใน group ที่มาต่อ = Max partition ที่ตั้งไว้ใน topic นั้น ๆ) หากมี consumerใน group เกิน partition ตัวที่เกินมานั้นก็จะไม่ได้ทำงาน แต่ว่า ถ้าหากมี consumer ใน group อื่น ๆ มาเชื่อมกับ topic นั้น consumer ที่ไม่ได้อยู่ใน group เดียวกัน ก็จะทำเหมือนไม่รู้จักกันและจะ consume ข้อมูลใน topic โดยไม่สนใจว่า group อื่นจะ consume ไปถึงไหนแล้ว ซึ่งสามารถทำได้เนื่องจาก Kafka ไม่ลบ message ใน topic แม้ว่าจะถูก consume ออกไปแล้วนั่นเอง (โดยปกติแล้ว consumer จะรอรับ message ที่เข้ามาหลังจาก consumer เริ่มทำงานแล้วเท่านั้น และไม่ย้อนกลับไป consume ข้อมูลที่ถูก produce มาที่ topic ก่อนที่ consumer นี้จะเริ่มทำงาน แต่ก็สามารถใช้ command เพื่อให้ consumer consume ตั้งแต่แรกได้)

ทำไมถึงต้องใช้ Kafka?

สิ่งหนึ่งที่ทำให้ Kafka โดดเด่นจาก Message queue อื่น ๆ คือความเร็วของการทำงานนั่นเอง performance ของ Kafka นั้นสูงมาก ๆ สามารถรับ — ส่งข้อมูลเยอะ ๆ ได้สบาย นอกจากนี้ Kafka ยังทำงานแบบ Pub/Sub ซึ่งทำให้แม้ว่า producer หรือ consumer จะตายไปก็ไม่ส่งผลกับตัวอื่น ๆ เนื่องจาก producer และ consumer ไม่รู้จักกัน ส่วนข้อมูลก็ยังคงเก็บใน Kafka เหมือนเดิม และข้อมูลสามารถถูก consume ซ้ำ ๆ จาก consumer หลาย ๆ group ได้เนื่องจาก Kafka จะลบข้อมูลตามเวลา ไม่ใช่หลังถูก consume นั่นเอง

ติดตั้ง Apache Kafka

โหลด Kafka ที่ link นี้ https://kafka.apache.org/downloads โดยในตัวอย่างนี้จะใช้เป็น ver. 2.5.1 โดย load ตัว Binary downloads : Scala 2.12 - kafka_2.12-2.5.1.tgz โดยหน้าตาจะเป็นตามรูปด้านล่าง และจะใช้ Mac ในการแสดงตัวอย่าง แต่สำหรับ OS อื่น ๆ command ก็จะไม่ต่างกันมากนัก

Kafka จำเป็นต้องใช้ java ในการใช้งาน ให้ตรวจว่าว่ามี java ในเครื่องหรือไม่โดยเปิด terminal และใช้ command

java -version

โดยจะมี version ของ java แสดงขึ้นมา

หากไม่มี java ก็สามารถ install ได้โดยใช้ brew โดยใช้ command ต่อไปนี้เพื่อ install brew (อันแรก) และ java (อัน 2 และ 3)

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"brew tap adoptopenjdk/openjdkbrew install --cask adoptopenjdk8

หลังจากนั้น ให้เปิด terminal ของ folder นี้ (kafka_2.12–2.5.1) ขึ้นมา และใส่ command นี้ลงไป

bin/zookeeper-server-start.sh config/zookeeper.properties

แล้วกด enter เพื่อเป็นการเปิด zookeeper ตามค่า default ที่ Kafka ตั้งไว้ ก็จะมี log ของ Zookeeper ออกมาโดย command นี้จะไม่หยุดจนกว่า zookeeper จะพังหรือเรากดหยุดเอง (หน้าตาของ log อาจไม่เหมือน 100%)

หลังจากนี้ก็เปิด terminal อันใหม่ที่ folder เดิม แล้วใส่ command นี้ลงไป

bin/kafka-server-start.sh config/server.properties

หลังกด enter ก็จะมี log ออกมาเช่นกัน

โดยเราก็ปล่อยมันค้างไว้เหมือนเดิม (แต่ถ้าอยากปิดก็สามารถใช้ command + c หรือ control + c บน keyboard ในการหยุดได้) ซึ่งจะเป็น log Kafka Cluster ซึ่งเชื่อมต่อกับ zookeeper ที่เราเปิดออกมาตอนแรกอัตโนมัติแล้ว เราสามารถเช็คว่า Zookeeper กับ Kafka ทำงานหรือไม่ด้วย nc (netcat) ซึ่งหากไม่มีก็สามารถใช้ brew ในการ install ได้ โดยใช้ command ต่อไปนี้

brew install netcat

หลังจาก install แล้วก็ใช้คำสั่ง

nc -zv localhost 2181

เพื่อเช็คว่า port 2181 ที่เป็น port default ของ zookeeper ทำไงหรือไม่ และใช้ command

nc -zv localhost 9092

เพื่อเช็คว่า port 9092 ที่เป็น port default ของ Kafka ทำงานได้ปกติ

สร้าง topic และทดสอบการใช้งานโดยใช้ command line

เปิด terminal อันใหม่อีกอันที่ folder เดิม แล้วใช้ command นี้

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --topic test

โดยสามารถเปลี่ยนจำนวน partitions และชื่อ topic ตามที่ต้องการได้ แล้วกด enter ก็จะแสดงผลว่า create สำเร็จ

หลังจากนั้นก็ใช้ command นี้เพื่อทดสอบ produce ข้อมูลเข้าไปใน topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

โดยหลังจาก enter command ก็จะมีช่องให้ใส่ข้อมูลที่ต้องการ produce ลงไป ณ ที่นี้ก็จะลองใส่ test1 ลงไป

หลังจากลอง produce เรียบร้อยก็กด control + c หรือ command + c เพื่อปิดหน้า produce และใช้ command ต่อไปนี้เพื่อ consume ค่าออกมาได้

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

โดย --from-beginning ใช้สำหรับการบอกให้ consumer เริ่ม consume ตั้งแต่แรกของ topic และกด command + c หรือ ctrl + c เพื่อหยุด

จะเห็นว่า message ที่ produce เข้ามานั้นถูก consume ออกมาแล้ว ซึ่งหากกด command consume อีกครั้ง test1 ก็จะยังคงถูก consume ออกมาเช่นเดิมเนื่องจาก --from-beginning จะทำให้ consumer เริ่ม consume ตั้งแต่แรกเสมอนั่นเอง

ทดลองเชื่อมต่อโดยใช้ Go

สร้าง folder เปล่าขึ้นมา ชื่อ producer แล้วเปิด terminal ที่ folder นั้นพร้อมใช้ command นี้เพื่อ init go

go mod init kafka

โดยชื่อ Kafka สามารถเปลี่ยนเป็นชื่อ module ตามที่ต้องการได้

หลังจากนั้นก็สร้างไฟล์ที่ชื่อ main.go พร้อม source code ตามนี้ได้เลย

TestMessage (ข้อความที่กำลังจะ produce) สามารถถูกเปลี่ยนเป็นข้อความอื่น ๆ ได้ โดยชื่อ topic test ก็สามารถถูกเปลี่ยนเป็น topic อื่น ๆ ที่ถูกสร้างไว้ได้เช่นกัน โดยหลังจากนั้น ใช้ command ต่อไปนี้ใน terminal ของ folder นี้ เพื่อโหลด Sarama (library สำหรับเชื่อมต่อ Kafka โดยใช้ Go) และ library อื่น ๆ ที่เกี่ยวข้อง

go mod tidygo mod vendor

หลังจากนั้น ก็ใช้ command นี้เพื่อ run

go run main.go

โดยจะมีข้อความแสดงว่า produce สำเร็จ

หลังจากนั้นก็ copy folder ของ producer แล้วเปลี่ยนชื่อเป็น consumer พร้อมเปลี่ยน source code ของ main.go เป็น code ตามด้านล่างนี้แทน

โดย sarama.OffsetOldest เปรียบได้กับ --from-beginning หากต้องการให้ consume เฉพาะ message ใหม่เท่านั้น ก็ให้เปลี่ยนเป็น sarama.OffsetNewest โดยใช้ run command เดียวกับของ producer

โดย consumer จะ print message ที่ consume ออกมาโดยจะ run ไปเรื่อย ๆ จนกว่าจะกด ctrl + c หรือ command + c เพื่อหยุด

เพิ่ม Security ด้วย SSL

ให้เรา down kafka และ zookeeper (ควร down kafka ก่อน zookeeper เสมอ) ด้านใน folder ของ kafka_2.12–2.5.1 สร้าง folder ใหม่ชื่อ secrets ขึ้นมา สร้างไฟล์ certs.sh ด้วย code ตามนี้ โดยเปลี่ยน <password> ให้เป็น password ที่ตั้งขึ้นมาเอง

และสร้างไฟล์ alt.ext ด้วยเนื้อในตามนี้

โดยเปลี่ยน <IP> เป็น IP address ของเครื่องตัวเอง โดยสามารถเช็คได้โดยใช้ command

ifconfig | grep "inet " | grep -v 127.0.0.1 | cut -d\  -f2

หลังจากนั้นก็เปิด terminal ของ folder secrets แล้วใช้ command นี้เพื่อ generate ไฟล์ security ออกมา

./certs.sh

เสร็จแล้วไปที่ folder kafka_2.12–2.5.1 แล้วเข้าไปที่ folder config แล้วแก้ไขไฟล์ server.properties ตามนี้

โดยเปลี่ยน path ของ ssl.keystore.location, ssl.truststore.location เป็น path ไฟล์ของ kafka.broker.keystore.jks และ kafka.broker.truststore.jks ที่ถูก generate ใน folder secrets และ <password> เป็น password ที่ใส่ไว้ใน certs.sh และเปลี่ยน log.dirs เป็น folder ใหม่ ที่สร้างขึ้นมารับ log เมื่อเรียบร้อยแล้วก็เปลี่ยน IP ของ listeners และ zookeeper.connect เป็น IP ปัจจุบัน พร้อมกับ ใช้ terminal ในการ เข้าไป map host สำหรับเตรียมการเชื่อมต่อ

sudo vi /etc/hosts

แล้วเพิ่ม IP เครื่องของเราไปด้วยชื่อ host kafka

<IP>    kafka

หลังจากนั้นก็สามารถ start Kafka และ zookeeper ได้ตามปกติเลย

ทดสอบการใช้ security

การเชื่อมต่อ SSL จำเป็นต้องใช้ไฟล์ client.properties (ตั้งชื่อไฟล์ได้) เพื่อเชื่อมต่อ โดยสร้างไฟล์นี้ขึ้นมา และใช่ข้อมูลตามนี้ โดยใช้ <password> ตามที่ใส่ใน certs.sh

และลองสร้าง topic เพื่อ test security โดยใช้ command ที่เพิ่ม path ของ client.properties จะสามารถสร้าง topic ได้ แต่หากลบ --command-config และ path ออกไป จะไม่สามารถต่อได้

bin/kafka-topics.sh --create --bootstrap-server 192.168.0.85:9092 --partitions 3 --topic testSSL --command-config <path to client.properties>

สามารถ test security ต่อไปได้อีกโดยลอง produce และ consume ด้วย command ดังนี้

bin/kafka-console-producer.sh --broker-list 192.168.0.85:9092 --topic testSSL --producer.config <path to client.properties>bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.85:9092 --topic testSSL --from-beginning --consumer.config <path to client.properties>

โดย command อื่น ๆ ต้องใช้ --command-config แต่มีเฉพาะ producer และ consumer ที่ใช้ไม่เหมือน command อื่น

ต่อ Security โดยใช้ Go

เปิด terminal ของ folder secret และ ใส่ 5 command ต่อไปนี้ โดย password ทั้งหมดคือ <password> ที่เคยใส่ไว้ใน certs.sh

keytool -importkeystore -srckeystore kafka.client.truststore.jks -destkeystore trust.p12 -deststoretype PKCS12keytool -importkeystore -srckeystore kafka.client.keystore.jks -destkeystore key.p12 -deststoretype PKCS12openssl pkcs12 -in trust.p12 -nokeys -out trust.cer.pemopenssl pkcs12 -in key.p12 -nokeys -out key.cer.pemopenssl pkcs12 -in key.p12 -nodes -nocerts -out key.key.pem

หลังจากนั้น เปลี่ยน main.go ใน folder producer เป็นตามนี้ โดยใส่ path ของ pem 3 files ที่ generate ออกมาจาก command ก่อนหน้านี้ และเปลี่ยน <IP> เป็น IP ของเครื่อง

และเปลี่ยนของ consumer เป็นตามนี้ โดยเปลี่ยน path ของ PEM และ <IP> เช่นกัน

และลองทดสอบ run ได้ตามปกติเลย

และนี่คือทั้งหมดของการสร้าง, ทดสอบ และ config Kafka เพื่อเชื่อมต่อกับ Go ทั้งแบบไม่มี security และระบบ security แบบ SSL ครับ หวังว่าบทความนี้จะมีประโยชน์กับผู้ต้องการศึกษา Kafka และการเชื่อมต่อไปที่ Go นะครับ

สามารถติดตามบทความและข่าวสารอื่นๆได้ที่ https://www.facebook.com/sirisoft และเพื่อนๆคนไหนสนใจอยากมา Dev ด้วยกันสามารถสมัครและทักสอบถาม HR กันมาได้นะครับ https://lin.ee/ms8vit4 ขอบคุณครับ

--

--