Securing Kafka with TLS/SSL authentication
บทความนี้เราจะมาเล่าว่าเราสามารถทำให้ Kafka secure มากขึ้นได้อย่างไรบ้าง ทำไมต้องคิดเรื่อง security, มี security protocol แบบไหนบ้าง และปรับ Kafka cluster เราให้เป็นแบบ TLS จนไปถึงการทดสอบว่าสามารถ authen และ authorized ผ่านจริงๆ
Table of contents
- Authentication vs Authorization
- Why securing kafka?
- Securing in Kafka
- TLS ต่างจาก SSL ยังไง
- Establish secure access from external clients
- Kafka cluster
- Kafka user
- สรุปและ Next action
Authentication vs Authorization
Authentication บอกว่าเราเป็นใคร (identity)
Authorization บอกว่า เราถูกอนุญาติให้ทำอะไรได้บ้าง
Why security in Kafka?
สมมติเหตุการณ์ว่ามี producer, consumer และ broker
- producer ส่ง message มายัง topic A
- record นั้นจะส่งไปยัง broker broker ก็จะเขียน record นั้นลง local log file
- หลังจากนั้น consumer ที่ได้ consume จาก topic A ก็ดึงข้อมูลจาก topic A ไป
จุด security ที่น่าสนใจคือ
- เวลาที่ producer client เชื่อมต่อมายัง broker เราจะมั่นใจได้ยังไงว่า message มาจาก producer ตัวนั้นจริงๆ (Client authenticity)
- รวมถึงว่า producer ต้องมั่นใจได้ว่าส่งไปหา broker ตัวนั้นจริงๆ (Server authencity)
- เราจะมั่นใจได้ยังไงว่าภายใน kafka จะไม่มีคนมาดักฟังข้อมูล เนื้อข้อมูลที่ถูกส่งไปข้าม network จึงต้องมีการป้องกันด้วย message digest (Data Integrity)
- producer จะมี authorized ที่เขียนถึง topic A จริงๆ ในขณะเดียวกัน consumer มี authorized ในการ read ข้อมูลจาก topic A จริงๆหรือไม่ (Access control)
Kafka brokers อาจมี listener หลาย endpoints ทั้ง internal (เข้าถึงได้แค่ authorized network) และ external (เข้าถึงจาก public internet) ซึ่งทั้ง 2 ตัวก็จะมี security requirements ที่แตกต่างกันออกไป
ทุกคนจึงเห็นได้ว่า ทั้งหมดคือสิ่งที่ต้องคำนึงถึงเรื่อง security ใน Kafka platform ในส่วนถัดไปเราจะเล่าว่า เราจะสร้าง guarantee เหล่านี้ได้อย่างไร
Securing in Kafka
Kafka support 4 security protocols โดยใช้ 2 standard technologies นั่นคือ
- Transport Layer Security (TLS)
- Simple Authentication and Security Layer (SASL)
💡 TLS ต่างจาก SSL ยังไง
ทุกครั้งที่เราพูดถึง TLS เราจะมักนึกถึง Secure Sockets Layer (SSL) เพราะแต่เดิม SSL เป็นบรรพบุรุษของ TLS
นั่นเพราะ SSL ได้หยุดพัฒนาโดย Netscape ตั้งแต่ปี 1996 และต่อมาในปี 1999 ทีมของ IETF ได้เข้ามาพัฒนาต่อ และด้วยเหตุผลเรื่องการเปลี่ยนมือนี้เลยเป็นเหตุผลว่า SSL จึงเปลี่ยนชื่อเป็น TLS
ทั้ง 2 มีไว้เพื่อ support การ encryption ทั้งฝั่ง client และ server เพียงแต่ TLS ถือเป็น up-to-date encryption protocol ถึงแม้อย่างนั้นก็ตาม คนก็นิยมเรียกสลับไปสลับมาว่ามันเป็นสิ่งเดียวกันอยู่ดี (TLS/SSL encryption) เช่น SSL certificate สำหรับ website ตามเทคนิคต้องเรียก TLS certificate (แต่นั่นก็ไม่ใช่เรื่องผิดนะ)
กลับมาที่ Kafka กัน
Security protocols ที่เรากล่าวไว้ด้านบน เกิดจากการรวม transport layer (PLAINTEXT หรือ SSL) ควบกับ authentication layer (SSL หรือ SASL) เลยออกเป็น 4 ตัวดังนี้:
- PLAINTEXT
- SSL
- SASL_PLAINTEXT
- SASL_SSL
ซึ่งบทความนี้จะเน้นไปที่ TLS/SSL จึงขอยก PLAINTEXT และ SSL มาเล่า
- PLAINTEXT
คือชั้น transport ที่ไม่มี authentication และไม่มีการ encrypt ข้อมูลเลย ดังนั้นจะเหมาะกับเคสที่ Kafka ของเราอยู่ใน private network อยู่แล้ว - SSL
คือชั้น transport แบบ SSL ที่มี authentication แบบ SSL นั่นแปลว่ามีการทำ authentication และ encryption ระหว่าง client และ server จึงเหมาะกับเคส insecure network หรือต้องการความ secure มากกว่า PLAINTEXT
ซึ่งโดย default แล้ว Kafka จะใช้ PLAINTEXT ซึ่งหมายความว่า เวลาส่งข้อมูล ข้อมูลเป็นยังไงก็เป็นอย่างนั้น ไม่ได้ถูก encrypt ไว้ หากเราอยากไปท่า encrypt ก็ต้องปรับมาเป็น TLS/SSL เอง
Fact ที่น่าสนใจ
ในด้าน performance พอเราใช้ SSL ในการ encrypt และ decrypt แล้วจะทำให้เสีย overhead เรื่อง CPU usage มากขึ้น ซึ่งอาจสูงถึง 20–30% เลยทีเดียว (refer จากหนังสือ Kafka: The Definitive Guide)
Establish secure access from external clients
ต่อจากนี้เราจะลงมือทำให้ external clients สามารถต่อเข้า Kafka cluster ของเราจากนอก cloud เราได้ด้วยวิธี TLS/SSL
ซึ่งก่อนหน้านี้เราเปิดเฉพาะ PLAINTEXT เอาไว้ และไม่ได้เปิดให้ภายนอกเข้ามา
Prerequisites
- ในการลงมือ configure Kafka ต่อจากนี้จะเป็น Kafka ที่ติดตั้งผ่าน Strimzi ที่เราเคยติดตั้งไปแล้วในบทความก่อน
Setting up Kafka with Kafka connect from SQL server to Cloud storage (GCS) (อ่านจนถึงก่อน Kafka connect ก็เพียงพอ) - Load balancer ที่เซ็ตแยกไว้แล้ว
Now Hands on !
หากเราต้องการแก้ไขเรื่อง securing access เราต้อง configure เพิ่มในส่วน resource ดังนี้
Kafka
เช่น listeners และ authorizationKafkaUser
เพื่อบอกว่าใคร access Kafka brokers ได้บ้าง
โดยการสร้าง KafkaUser จะเป็นการ enable authentication, authorization และ quotas control
1. แก้ Kafka cluster
เราสร้างไฟล์ kafka-persistent-single-tls.yaml
จากไฟล์เดิมkafka-persistent-single.yaml
ด้วยการแก้ไขตามด้านล่าง
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-cluster
namespace: kafka-mils
spec:
kafka:
version: 3.2.3
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
### new changes below
authentication: # <---- add authentication for internal
type: tls
- name: external
port: 9094
type: loadbalancer
tls: true
authentication: # <---- add authentication for external
type: tls
authorization: #<------ Authorization for whole cluster (all listeners)
type: simple
superUsers:
- CN=my-internal-user
...
จาก config ที่เปลี่ยนแปลงด้านบน จะเห็นได้ว่า Authentication สามารถเซ็ตได้แยกตาม listener แต่ Authorizartion จะครอบทั้ง Kafka Cluster เลย
ความแตกต่างระหว่าง Internal listener กับ External listeners คือ
- Internal จะ connect clients กับ Kafka brokers ภายใน K8s cluster นี้
- External จะ connect ผ่านจากนอก K8s cluster ซึ่งมีหลาย connection mechanism มากๆ ซึ่งเราเลือกเป็นท่า loadbalancer
และเราได้เพิ่มให้ Kafka user ที่ชื่อ my-internal-user
เป็น super users สำหรับ Kafka cluster นี้ (CN ย่อมาจาก common name จาก client certificate)
คราวนี้ apply ด้วยคำสั่ง
kubectl apply -f kafka-persistent-single-tls.yaml
รันคำสั่ง get pod เพื่อเช็คดู status ของ kafka เรื่อยๆ
kubectl get pod -n kafka-mils
และเราจะรอจนทุก pod ขึ้นครบและ ready ครบตามด้านล่าง
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-67c47f7454-qsxwn 3/3 Running 0 33s
my-cluster-kafka-0 1/1 Running 0 84s
my-cluster-zookeeper-0 1/1 Running 0 3m9s
strimzi-cluster-operator-56d64c8584-vbpd5 1/1 Running 0 6m21s
สิ่งที่พิเศษเพิ่มเติมคือ หากเรา apply คำสั่งแล้ว Cluster Operator จะสร้าง listeners และสร้าง cluster และ client certificate authority (CA) certificates เพื่อ enable authentication
CA certificates หรือ certificate authority certificates จะถูกสร้างโดย Cluster Operator เพื่อยืนยันการเข้าถึงของแต่ละ components ใน Kafka และฝั่ง client เอง (cluster CA กับ client CA)
เราสามารถดูได้จาก secrets
kubectl get secrets -n kafka-mils
result:
NAME TYPE DATA AGE
my-cluster-clients-ca Opaque 1 12m
my-cluster-clients-ca-cert Opaque 3 12m
my-cluster-cluster-ca Opaque 1 12m
my-cluster-cluster-ca-cert Opaque 3 12m
my-cluster-cluster-operator-certs Opaque 4 12m
my-cluster-entity-topic-operator-certs Opaque 4 9m51s
my-cluster-entity-user-operator-certs Opaque 4 9m50s
my-cluster-kafka-brokers Opaque 4 10m
my-cluster-zookeeper-nodes Opaque 4 12m
จะเห็นได้ว่าจะมี secret ที่ถูกสร้างโดย Cluster Operator
<cluster_name>-cluster-ca
จะมี private key ของ cluster CA เพื่อที่ว่า Strimzi และ Kafka components ใช้ private key นี้สำหรับ sign server certificates<cluster_name>-cluster-ca-cert
จะเก็บ public key ของ cluster CA เพื่อ identify ตัว kafka broker ตอนเชื่อมต่อแบบ TLS connection<cluster_name>-clients-ca
จะมี private keyของ clients CA เพื่อ sign เวลามี user เกิดใหม่ ในการเชื่อม TLS ไปหา brokers<cluster_name>-clients-ca-cert
จะมี public key ของ client CA เพื่อ verify identity ของ client ในการ access broker
สามารถอ่านเพิ่มเติมเรื่อง cluster CA กับ client CA ได้ที่ official doc
…
ต่อมาเมื่อเราสร้าง external listeners ด้วย load balancer แล้ว จะมี my-cluster-kafka-external-bootstrap
ขึ้นมาใน services ด้วย เพื่อที่ client จะ connect ผ่าน external ip นี้
⚡ kubectl get services -n kafka-mils
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-0 LoadBalancer 10.109.xx.xxx 34.142.xx.xx 9094:32116/TCP 50m
my-cluster-kafka-bootstrap ClusterIP 10.109.x.xx <none> 9091/TCP,9092/TCP,9093/TCP 61m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 61m
my-cluster-kafka-external-bootstrap LoadBalancer 10.109.x.xxxx 35.240.xxx.xx 9094:31000/TCP 50m
my-cluster-zookeeper-client ClusterIP 10.109.xx.xxx <none> 2181/TCP 62m
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 62m
2. เพิ่ม Kafka user
เราสร้างไฟล์ใหม่ชื่อ my-internal-user.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-internal-user
namespace: kafka-mils
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: "*"
patternType: literal
operation: All
- resource:
type: group
name: "*"
patternType: literal
operation: All
- resource:
type: cluster
operation: All
kubectl apply -f my-internal-user.yaml
พอเรา apply เสร็จแล้วก็เช็คว่าใน resource KafkaUser
ก็จะมี my-internal-user ขึ้นมา
KeyStore and TrustStore
Step ถัดมา เราจะดึง TrustStore กับ KeyStore ออกมาจาก Kubernetes Secrets เพื่อที่ว่าเราจะนำไปเทสว่า user นี้สามารถเข้าถึง topic ใน cluster ได้หรือไม่
คำเตือน: อย่า copy เพลิน อย่าลืมเปลี่ยนชื่อ namespace
# get TrustStore from cluster
kubectl get secret my-cluster-cluster-ca-cert -n kafka-mils -o jsonpath='{.data.ca\\.p12}' | base64 -d > secret/my-cluster-ca.p12
# get TrustStore password
kubectl get secret my-cluster-cluster-ca-cert -n kafka-mils -o jsonpath='{.data.ca\\.password}' | base64 -d > secret/truststore_pass
# get KeyStore from Kafka user for authentication
kubectl get secret my-internal-user -n kafka-mils -o jsonpath='{.data.user\\.p12}' | base64 -d > secret/my-internal-user.p12
# get KeyStore password
kubectl get secret my-internal-user -n kafka-mils -o jsonpath='{.data.user\\.password}' | base64 -d > secret/keystore_pass
คำสั่งด้านบนจะดึงค่าจาก Kubernetes Secret มาใส่ในโฟลเดอร์ secret
ต่อไปเราจะสร้างไฟล์ config.properties
security.protocol=SSL
ssl.truststore.location=/tmp/my-cluster-ca.p12
ssl.truststore.password=<pass from truststore_pass file>
ssl.keystore.location=/tmp/my-internal-user.p12
ssl.keystore.password=<pass from keystore_pass file>
เรา copy file ขึ้นไปใน pod แล้วลองเทสสร้าง message ดู
โดยเราจะใช้ 3 files: my-cluster-ca.p12
, my-internal-user.p12
และ config.properties
$ kubectl cp secret/my-cluster-ca.p12 my-cluster-kafka-0:/tmp/my-cluster-ca.p12 -n kafka-mils
$ kubectl cp secret/my-internal-user.p12 my-cluster-kafka-0:/tmp/my-internal-user.p12 -n kafka-mils
$ kubectl cp secret/config.properties my-cluster-kafka-0:/tmp/config.properties -n kafka-mils
แล้วเรา shell เข้าไปยัง pod แล้วทดลองตัวเองเป็น producer ดู โดยทดสอบเข้าผ่าน external IP
$ sh bin/kafka-console-producer.sh --topic internal-user-topic-test --bootstrap-server 35.240.xxx.xx:9094 --producer.config=/tmp/config.properties
result:
คราวนี้เทสว่าเราสามารถทดลองตัวเองเป็น consumer ได้ไหม
sh bin/kafka-console-consumer.sh --bootstrap-server 35.240.xxx.xx:9094 --topic internal-user-topic-test --from-beginning --consumer.config=/tmp/config.properties
ทดลองแบบ internal แบบ TLS บ้าง (9093)
คราวนี้เราลองสร้างอีก user ชื่อ restricted-user
แบบให้เข้าถึงแค่ topic เดียวที่ชื่อว่า my-topic-na-ja
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: restricted-user
namespace: kafka-mils
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
- resource:
type: topic
name: my-topic-na-ja
patternType: literal
operation: All
- resource:
type: group
name: de-group
patternType: literal
operations:
- Read
- resource:
type: cluster
name: "*"
name: my-cluster
patternType: literal
operation: All
ทำคล้ายๆ step ด้านบน แล้วทดลองเป็น producer ดู
จะเห็นว่า restricted-user
สามารถใช้ได้แค่ topic ที่กำหนดเท่านั้น หากไปใช้ topic อื่นจะขึ้น error ว่า “Topic authorization failed”
นั่นแสดงว่าเราสามารถจำกัดสิทธิต่างๆผ่านตัว acls ได้ ซึ่งนอกจากนี้ เราสามารถกำหนด operations ให้ย่อยลงได้ เช่นระบุเป็น Create, Read, Write เฉพาะ resource นั้นๆ
สรุป
ในบทความนี้เราพูดถึงการทำให้ Kafka secure มากด้วย security protocols ต่างๆโดยเราใช้ผ่าน TLS/SSL encryption แบบ step by step ในการ apply กับตัว Kafka cluster และ Kafka user
โดยเราได้ config Kafka user และทดลองตัวเองเป็น external client ในการเข้าถึง topic ใน cluster
Next action
เราได้ config เพียงแค่ Kafka cluster เท่านั้น หากจำเป็นต้องใช้ service อื่นๆภายใน network เช่น Kafka connect, Kakfa UI, Schema registry ก็จำเป็นต้องมีการแนบ key นี้ไปด้วย ไม่งั้นจะติดเรื่อง authorization
บทความนี้ก็น่าสนใจมาก ในการเซ็ต kafka connect และ connecto
มีอีกวิธีในการที่ไม่ต้องแนบ key ตลอด คือการเปิด ANONYMOUS เป็น superuser เพื่อให้ service ภายในสามารถคุยหากันเองได้ แต่จะไม่ปลอดภัยเท่า ทั้งนี้ต้องขึ้นอยู่กับความ secure ของ Kafka และ Kubernetes เราเองด้วย
code ทั้งหมดสามารถดูได้ที่
Ref:
- Kafka: The Definitive Guide, 2nd Edition
- https://www.cloudflare.com/learning/ssl/what-is-ssl/
- https://strimzi.io/docs/operators/latest/configuring.html#assembly-securing-kafka-str
- https://strimzi.io/docs/operators/latest/configuring.html#configuring-external-clients-to-trust-cluster-ca-str