Securing Kafka with TLS/SSL authentication

Burasakorn Sabyeying
Mils’ Blog
Published in
8 min readFeb 12, 2023

บทความนี้เราจะมาเล่าว่าเราสามารถทำให้ Kafka secure มากขึ้นได้อย่างไรบ้าง ทำไมต้องคิดเรื่อง security, มี security protocol แบบไหนบ้าง และปรับ Kafka cluster เราให้เป็นแบบ TLS จนไปถึงการทดสอบว่าสามารถ authen และ authorized ผ่านจริงๆ

Table of contents
- Authentication vs Authorization
- Why securing kafka?
- Securing in Kafka
- TLS ต่างจาก SSL ยังไง
- Establish secure access from external clients
- Kafka cluster
- Kafka user
- สรุปและ Next action

Authentication vs Authorization

cr. www.ssl2buy.com

Authentication บอกว่าเราเป็นใคร (identity)

Authorization บอกว่า เราถูกอนุญาติให้ทำอะไรได้บ้าง

Why security in Kafka?

สมมติเหตุการณ์ว่ามี producer, consumer และ broker

  • producer ส่ง message มายัง topic A
  • record นั้นจะส่งไปยัง broker broker ก็จะเขียน record นั้นลง local log file
  • หลังจากนั้น consumer ที่ได้ consume จาก topic A ก็ดึงข้อมูลจาก topic A ไป

จุด security ที่น่าสนใจคือ

  • เวลาที่ producer client เชื่อมต่อมายัง broker เราจะมั่นใจได้ยังไงว่า message มาจาก producer ตัวนั้นจริงๆ (Client authenticity)
  • รวมถึงว่า producer ต้องมั่นใจได้ว่าส่งไปหา broker ตัวนั้นจริงๆ (Server authencity)
  • เราจะมั่นใจได้ยังไงว่าภายใน kafka จะไม่มีคนมาดักฟังข้อมูล เนื้อข้อมูลที่ถูกส่งไปข้าม network จึงต้องมีการป้องกันด้วย message digest (Data Integrity)
  • producer จะมี authorized ที่เขียนถึง topic A จริงๆ ในขณะเดียวกัน consumer มี authorized ในการ read ข้อมูลจาก topic A จริงๆหรือไม่ (Access control)

Kafka brokers อาจมี listener หลาย endpoints ทั้ง internal (เข้าถึงได้แค่ authorized network) และ external (เข้าถึงจาก public internet) ซึ่งทั้ง 2 ตัวก็จะมี security requirements ที่แตกต่างกันออกไป

ทุกคนจึงเห็นได้ว่า ทั้งหมดคือสิ่งที่ต้องคำนึงถึงเรื่อง security ใน Kafka platform ในส่วนถัดไปเราจะเล่าว่า เราจะสร้าง guarantee เหล่านี้ได้อย่างไร

Securing in Kafka

Kafka support 4 security protocols โดยใช้ 2 standard technologies นั่นคือ

  1. Transport Layer Security (TLS)
  2. Simple Authentication and Security Layer (SASL)

💡 TLS ต่างจาก SSL ยังไง

ทุกครั้งที่เราพูดถึง TLS เราจะมักนึกถึง Secure Sockets Layer (SSL) เพราะแต่เดิม SSL เป็นบรรพบุรุษของ TLS

นั่นเพราะ SSL ได้หยุดพัฒนาโดย Netscape ตั้งแต่ปี 1996 และต่อมาในปี 1999 ทีมของ IETF ได้เข้ามาพัฒนาต่อ และด้วยเหตุผลเรื่องการเปลี่ยนมือนี้เลยเป็นเหตุผลว่า SSL จึงเปลี่ยนชื่อเป็น TLS

ทั้ง 2 มีไว้เพื่อ support การ encryption ทั้งฝั่ง client และ server เพียงแต่ TLS ถือเป็น up-to-date encryption protocol ถึงแม้อย่างนั้นก็ตาม คนก็นิยมเรียกสลับไปสลับมาว่ามันเป็นสิ่งเดียวกันอยู่ดี (TLS/SSL encryption) เช่น SSL certificate สำหรับ website ตามเทคนิคต้องเรียก TLS certificate (แต่นั่นก็ไม่ใช่เรื่องผิดนะ)

กลับมาที่ Kafka กัน

Security protocols ที่เรากล่าวไว้ด้านบน เกิดจากการรวม transport layer (PLAINTEXT หรือ SSL) ควบกับ authentication layer (SSL หรือ SASL) เลยออกเป็น 4 ตัวดังนี้:

  1. PLAINTEXT
  2. SSL
  3. SASL_PLAINTEXT
  4. SASL_SSL

ซึ่งบทความนี้จะเน้นไปที่ TLS/SSL จึงขอยก PLAINTEXT และ SSL มาเล่า

  1. PLAINTEXT
    คือชั้น transport ที่ไม่มี authentication และไม่มีการ encrypt ข้อมูลเลย ดังนั้นจะเหมาะกับเคสที่ Kafka ของเราอยู่ใน private network อยู่แล้ว
  2. SSL
    คือชั้น transport แบบ SSL ที่มี authentication แบบ SSL นั่นแปลว่ามีการทำ authentication และ encryption ระหว่าง client และ server จึงเหมาะกับเคส insecure network หรือต้องการความ secure มากกว่า PLAINTEXT

ซึ่งโดย default แล้ว Kafka จะใช้ PLAINTEXT ซึ่งหมายความว่า เวลาส่งข้อมูล ข้อมูลเป็นยังไงก็เป็นอย่างนั้น ไม่ได้ถูก encrypt ไว้ หากเราอยากไปท่า encrypt ก็ต้องปรับมาเป็น TLS/SSL เอง

Fact ที่น่าสนใจ

ในด้าน performance พอเราใช้ SSL ในการ encrypt และ decrypt แล้วจะทำให้เสีย overhead เรื่อง CPU usage มากขึ้น ซึ่งอาจสูงถึง 20–30% เลยทีเดียว (refer จากหนังสือ Kafka: The Definitive Guide)

Establish secure access from external clients

ต่อจากนี้เราจะลงมือทำให้ external clients สามารถต่อเข้า Kafka cluster ของเราจากนอก cloud เราได้ด้วยวิธี TLS/SSL

ซึ่งก่อนหน้านี้เราเปิดเฉพาะ PLAINTEXT เอาไว้ และไม่ได้เปิดให้ภายนอกเข้ามา

Prerequisites

  • ในการลงมือ configure Kafka ต่อจากนี้จะเป็น Kafka ที่ติดตั้งผ่าน Strimzi ที่เราเคยติดตั้งไปแล้วในบทความก่อน
    Setting up Kafka with Kafka connect from SQL server to Cloud storage (GCS) (อ่านจนถึงก่อน Kafka connect ก็เพียงพอ)
  • Load balancer ที่เซ็ตแยกไว้แล้ว

Now Hands on !

หากเราต้องการแก้ไขเรื่อง securing access เราต้อง configure เพิ่มในส่วน resource ดังนี้

  • Kafka เช่น listeners และ authorization
  • KafkaUser เพื่อบอกว่าใคร access Kafka brokers ได้บ้าง
    โดยการสร้าง KafkaUser จะเป็นการ enable authentication, authorization และ quotas control

1. แก้ Kafka cluster

เราสร้างไฟล์ kafka-persistent-single-tls.yamlจากไฟล์เดิมkafka-persistent-single.yaml ด้วยการแก้ไขตามด้านล่าง

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: kafka-mils
spec:
kafka:
version: 3.2.3
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
### new changes below
authentication: # <---- add authentication for internal
type: tls
- name: external
port: 9094
type: loadbalancer
tls: true
authentication: # <---- add authentication for external
type: tls
authorization: #<------ Authorization for whole cluster (all listeners)
type: simple
superUsers:
- CN=my-internal-user
...

จาก config ที่เปลี่ยนแปลงด้านบน จะเห็นได้ว่า Authentication สามารถเซ็ตได้แยกตาม listener แต่ Authorizartion จะครอบทั้ง Kafka Cluster เลย

Diagram from Strimzi

ความแตกต่างระหว่าง Internal listener กับ External listeners คือ

  • Internal จะ connect clients กับ Kafka brokers ภายใน K8s cluster นี้
  • External จะ connect ผ่านจากนอก K8s cluster ซึ่งมีหลาย connection mechanism มากๆ ซึ่งเราเลือกเป็นท่า loadbalancer

และเราได้เพิ่มให้ Kafka user ที่ชื่อ my-internal-user เป็น super users สำหรับ Kafka cluster นี้ (CN ย่อมาจาก common name จาก client certificate)

คราวนี้ apply ด้วยคำสั่ง

kubectl apply -f kafka-persistent-single-tls.yaml

รันคำสั่ง get pod เพื่อเช็คดู status ของ kafka เรื่อยๆ

kubectl get pod -n kafka-mils

และเราจะรอจนทุก pod ขึ้นครบและ ready ครบตามด้านล่าง

NAME                                          READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-67c47f7454-qsxwn 3/3 Running 0 33s
my-cluster-kafka-0 1/1 Running 0 84s
my-cluster-zookeeper-0 1/1 Running 0 3m9s
strimzi-cluster-operator-56d64c8584-vbpd5 1/1 Running 0 6m21s

สิ่งที่พิเศษเพิ่มเติมคือ หากเรา apply คำสั่งแล้ว Cluster Operator จะสร้าง listeners และสร้าง cluster และ client certificate authority (CA) certificates เพื่อ enable authentication

CA certificates หรือ certificate authority certificates จะถูกสร้างโดย Cluster Operator เพื่อยืนยันการเข้าถึงของแต่ละ components ใน Kafka และฝั่ง client เอง (cluster CA กับ client CA)

เราสามารถดูได้จาก secrets

kubectl get secrets -n kafka-mils

result:

NAME                                     TYPE     DATA   AGE
my-cluster-clients-ca Opaque 1 12m
my-cluster-clients-ca-cert Opaque 3 12m
my-cluster-cluster-ca Opaque 1 12m
my-cluster-cluster-ca-cert Opaque 3 12m
my-cluster-cluster-operator-certs Opaque 4 12m
my-cluster-entity-topic-operator-certs Opaque 4 9m51s
my-cluster-entity-user-operator-certs Opaque 4 9m50s
my-cluster-kafka-brokers Opaque 4 10m
my-cluster-zookeeper-nodes Opaque 4 12m

จะเห็นได้ว่าจะมี secret ที่ถูกสร้างโดย Cluster Operator

  • <cluster_name>-cluster-ca จะมี private key ของ cluster CA เพื่อที่ว่า Strimzi และ Kafka components ใช้ private key นี้สำหรับ sign server certificates
  • <cluster_name>-cluster-ca-cert จะเก็บ public key ของ cluster CA เพื่อ identify ตัว kafka broker ตอนเชื่อมต่อแบบ TLS connection
  • <cluster_name>-clients-ca จะมี private keyของ clients CA เพื่อ sign เวลามี user เกิดใหม่ ในการเชื่อม TLS ไปหา brokers
  • <cluster_name>-clients-ca-cert จะมี public key ของ client CA เพื่อ verify identity ของ client ในการ access broker

สามารถอ่านเพิ่มเติมเรื่อง cluster CA กับ client CA ได้ที่ official doc

ต่อมาเมื่อเราสร้าง external listeners ด้วย load balancer แล้ว จะมี my-cluster-kafka-external-bootstrap ขึ้นมาใน services ด้วย เพื่อที่ client จะ connect ผ่าน external ip นี้

⚡  kubectl get services -n kafka-mils
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-0 LoadBalancer 10.109.xx.xxx 34.142.xx.xx 9094:32116/TCP 50m
my-cluster-kafka-bootstrap ClusterIP 10.109.x.xx <none> 9091/TCP,9092/TCP,9093/TCP 61m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 61m
my-cluster-kafka-external-bootstrap LoadBalancer 10.109.x.xxxx 35.240.xxx.xx 9094:31000/TCP 50m
my-cluster-zookeeper-client ClusterIP 10.109.xx.xxx <none> 2181/TCP 62m
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 62m

2. เพิ่ม Kafka user

เราสร้างไฟล์ใหม่ชื่อ my-internal-user.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-internal-user
namespace: kafka-mils
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: "*"
patternType: literal
operation: All
- resource:
type: group
name: "*"
patternType: literal
operation: All
- resource:
type: cluster
operation: All
kubectl apply -f my-internal-user.yaml

พอเรา apply เสร็จแล้วก็เช็คว่าใน resource KafkaUser ก็จะมี my-internal-user ขึ้นมา

เราเปิดจาก OpenLens

KeyStore and TrustStore

Step ถัดมา เราจะดึง TrustStore กับ KeyStore ออกมาจาก Kubernetes Secrets เพื่อที่ว่าเราจะนำไปเทสว่า user นี้สามารถเข้าถึง topic ใน cluster ได้หรือไม่

คำเตือน: อย่า copy เพลิน อย่าลืมเปลี่ยนชื่อ namespace

# get TrustStore from cluster
kubectl get secret my-cluster-cluster-ca-cert -n kafka-mils -o jsonpath='{.data.ca\\.p12}' | base64 -d > secret/my-cluster-ca.p12

# get TrustStore password
kubectl get secret my-cluster-cluster-ca-cert -n kafka-mils -o jsonpath='{.data.ca\\.password}' | base64 -d > secret/truststore_pass

# get KeyStore from Kafka user for authentication
kubectl get secret my-internal-user -n kafka-mils -o jsonpath='{.data.user\\.p12}' | base64 -d > secret/my-internal-user.p12

# get KeyStore password
kubectl get secret my-internal-user -n kafka-mils -o jsonpath='{.data.user\\.password}' | base64 -d > secret/keystore_pass

คำสั่งด้านบนจะดึงค่าจาก Kubernetes Secret มาใส่ในโฟลเดอร์ secret

ต่อไปเราจะสร้างไฟล์ config.properties

security.protocol=SSL
ssl.truststore.location=/tmp/my-cluster-ca.p12
ssl.truststore.password=<pass from truststore_pass file>
ssl.keystore.location=/tmp/my-internal-user.p12
ssl.keystore.password=<pass from keystore_pass file>

เรา copy file ขึ้นไปใน pod แล้วลองเทสสร้าง message ดู

โดยเราจะใช้ 3 files: my-cluster-ca.p12 , my-internal-user.p12 และ config.properties

$ kubectl cp secret/my-cluster-ca.p12 my-cluster-kafka-0:/tmp/my-cluster-ca.p12 -n kafka-mils
$ kubectl cp secret/my-internal-user.p12 my-cluster-kafka-0:/tmp/my-internal-user.p12 -n kafka-mils
$ kubectl cp secret/config.properties my-cluster-kafka-0:/tmp/config.properties -n kafka-mils

แล้วเรา shell เข้าไปยัง pod แล้วทดลองตัวเองเป็น producer ดู โดยทดสอบเข้าผ่าน external IP

$ sh bin/kafka-console-producer.sh --topic internal-user-topic-test --bootstrap-server 35.240.xxx.xx:9094 --producer.config=/tmp/config.properties

result:

ส่ง message ผ่าน producer

คราวนี้เทสว่าเราสามารถทดลองตัวเองเป็น consumer ได้ไหม

sh bin/kafka-console-consumer.sh --bootstrap-server 35.240.xxx.xx:9094 --topic internal-user-topic-test --from-beginning --consumer.config=/tmp/config.properties
ทดสอบเป็น consumer ก็ได้ message ครบถ้วน

ทดลองแบบ internal แบบ TLS บ้าง (9093)

คราวนี้เราลองสร้างอีก user ชื่อ restricted-user แบบให้เข้าถึงแค่ topic เดียวที่ชื่อว่า my-topic-na-ja

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: restricted-user
namespace: kafka-mils
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: my-topic-na-ja
patternType: literal
operation: All
- resource:
type: group
name: de-group
patternType: literal
operations:
- Read
- resource:
type: cluster
name: "*"
name: my-cluster
patternType: literal
operation: All

ทำคล้ายๆ step ด้านบน แล้วทดลองเป็น producer ดู

จะเห็นว่า restricted-user สามารถใช้ได้แค่ topic ที่กำหนดเท่านั้น หากไปใช้ topic อื่นจะขึ้น error ว่า “Topic authorization failed”

นั่นแสดงว่าเราสามารถจำกัดสิทธิต่างๆผ่านตัว acls ได้ ซึ่งนอกจากนี้ เราสามารถกำหนด operations ให้ย่อยลงได้ เช่นระบุเป็น Create, Read, Write เฉพาะ resource นั้นๆ

สรุป

ในบทความนี้เราพูดถึงการทำให้ Kafka secure มากด้วย security protocols ต่างๆโดยเราใช้ผ่าน TLS/SSL encryption แบบ step by step ในการ apply กับตัว Kafka cluster และ Kafka user

โดยเราได้ config Kafka user และทดลองตัวเองเป็น external client ในการเข้าถึง topic ใน cluster

Next action

เราได้ config เพียงแค่ Kafka cluster เท่านั้น หากจำเป็นต้องใช้ service อื่นๆภายใน network เช่น Kafka connect, Kakfa UI, Schema registry ก็จำเป็นต้องมีการแนบ key นี้ไปด้วย ไม่งั้นจะติดเรื่อง authorization

บทความนี้ก็น่าสนใจมาก ในการเซ็ต kafka connect และ connecto

มีอีกวิธีในการที่ไม่ต้องแนบ key ตลอด คือการเปิด ANONYMOUS เป็น superuser เพื่อให้ service ภายในสามารถคุยหากันเองได้ แต่จะไม่ปลอดภัยเท่า ทั้งนี้ต้องขึ้นอยู่กับความ secure ของ Kafka และ Kubernetes เราเองด้วย

code ทั้งหมดสามารถดูได้ที่

Ref:

  • Kafka: The Definitive Guide, 2nd Edition
  • https://www.cloudflare.com/learning/ssl/what-is-ssl/
  • https://strimzi.io/docs/operators/latest/configuring.html#assembly-securing-kafka-str
  • https://strimzi.io/docs/operators/latest/configuring.html#configuring-external-clients-to-trust-cluster-ca-str

--

--

Burasakorn Sabyeying
Mils’ Blog

Data engineer at CJ Express. Women Techmakers Ambassador. GDG Cloud Bangkok team. Moved to Mesodiar.com