Encrypt Streaming Data with Faust Streaming
สำหรับทุกธุรกิจแล้วการที่เราสามารถนำข้อมูลลูกค้ามาวิเคราะห์พฤติกรรมแล้วทำแคมเปญหรือออกโปรโมชั่นสักอย่างออกมาเพื่อชวนให้ลูกค้ามาซื้อของมากขึ้น เป็นเรื่องที่ขาดไม่ได้เลย แต่เราก็ไม่สามารถนำข้อมูลที่เต็มไปด้วยข้อมูลระบุตัวตนได้หรือ PII มาเก็บและนำไปใช้โดยไม่ป้องกันอะไรได้เลย ดังนั้นในบทความนี้เราจะมาพูดถึงการ Encrypt Streaming data ด้วย Faust Streaming กันค่ะ
**Table of Contents**
- Data Encryption Pipeline
- Streaming Processing
- Faust Streaming
- มาทดลองใช้ Faust Streaming กันเถอะ
- Encryption Process
- สรุป Envelope Encryption แบบเร็ว ๆ
- Encryption with Streaming Processing
- Key Rotation
- Conclusion
- Reference
Data Encryption Pipeline
สำหรับการนำข้อมูลเข้า Data Platform ของข้อมูลชัดนี้เราจะใช้เป็น Change Data Capture (CDC) ผ่าน Kafka เนื่องจากข้อมูลที่ได้จะเป็นข้อมูลที่ปัจจุบันที่สุดและข้อมูลครบถ้วนไม่มีปัญหาเรื่องของ timezone หรือ datetime ที่เป็นปัญหาคลาสสิคของการ query ข้อมูล รวมไปถึงยังทำให้ database ไม่จำเป็นต้องรับโหลดมากจากการ query อีกด้วย
ทีนี้ถ้าเป็น CDC แบบไม่มีการทำ streaming transforming ข้อมูลที่มี PII ก็จะถูกส่งผ่าน Source Connector ไปยัง Topic A แล้วจึง sink ไปลง Google Cloud Storage (GCS) ด้วย Sink Connector แต่ว่าเราไม่ต้องการเก็บข้อมูลที่มี PII ลง storage bucket เลยเพราะมีความเสี่ยงที่ใครก็ได้ที่มีสิทธิเข้าถึง project นี้จะเข้ามาเห็นและแก้ไขข้อมูลได้ ดังนั้นเราจึงจำเป็นต้อง encrypt ข้อมูลก่อนจะ sink ลง GCS เลย คือเมื่อข้อมูล record หนึ่งไหลผ่านเข้ามาที่ topic A จะเกิดการ encrypt เกิดขึ้นแล้วส่งออกไปยังอีก topic หนึ่งซึ่งก็คือ topic B จากนั้นจึง sink ข้อมูลที่ encrypt แล้วที่ topic B ลง GCS ทำให้ไม่มีข้อมูล PII เลยแม้แต่ที่ storage
Streaming Processing
Streaming Processing คือ การที่เราประมวลผลหรือแปลงข้อมูลทันทีที่มีข้อมูลไหลเข้ามาอย่างต่อเนื่อง แน่นอนว่า latency น้อยกว่าแบบ batch มากเพราะไม่ต้องรอให้ถึงช่วงเวลาที่ตั้งค่าไว้ถึงจะทำงาน ในปัจจุบันมีหลายเจ้ามากที่สามารถช่วย stream processing ได้ เช่น Kafka Streams, Apache Flink เป็นต้น แต่ขอยกตัวอย่างเทียบสามตัวหลักที่เราเห็นกันบ่อย ๆ ได้แก่ Kafka Streams, Apache Flink และ Faust Streaming สำหรับ spark streaming ขอไม่นำมาเทียบเนื่องจาก spark straming เป็น micro-batch ไม่ใช่ streaming processing (1)
เราเลือกใช้ Faust streaming เป็น library สำหรับ streaming processing ในครั้งนี้เพราะเหมือนกับ Kafka Stream เกือบทั้งหมดแต่เป็นฉบับ python ซึ่งเข้าใจง่ายกว่า นอกจากนี้ Faust Streaming ยังมี option ที่น่าสนใจอีกอย่างคือสามารถตั้ง cron job ในตัวเองได้ซึ่งเราจะสิ่งนี้แหละในการทำ key rotation กัน
Faust Streaming
อย่างที่ได้เล่าไปว่าเราเลือกใช้ Faust Streaming เป็น library ในการทำ streaming processing ตอนนี้เรามาทำความรู้จักกับ Faust ให้มากขึ้นกันดีกว่า
Faust Streaming หรือ Faust เป็น stream processing library โดยใช้แนวคิดเดียวกันกับ Kafka Streams แต่เป็น python แทน Java เริ่มต้นโดย Robinhood แต่ภายหลังทาง Robinhood ก็เลิก support project ไป ตอนนี้กลายเป็น community ที่ช่วยกัน contribute ให้กับ project แทน และในปัจจุบันก็มี feature ใหม่ ๆ และมีการแก้ปัญหาเดิมที่ถูกทิ้งไว้เนิ่นนานด้วย community นี้เอง
จุดสำคัญของ Faust เลยคือ ใช้งานง่าย รวดเร็ว และ ยืดหยุ่นมาก เพราะต้องการแค่ Kafka cluster และ python เท่านั้น ที่เหลือเราจะใช้ library อื่นอะไรก้ได้ที่มีใน python เช่น Numpy, TensorFlow
มาทดลองใช้ Faust Streaming กันเถอะ
สร้าง Docker Image ตาม Dockerfile นี้
ใน requirements.txt
faust-streaming==0.10.4
tink==1.6.1
protobuf==3.20.*
google-cloud-kms==2.15.0
crcmod==1.7
pendulum==2.1.2
จากนั้นเริ่มสร้าง Kafka cluster ง่าย ๆ ที่มีแค่ zookerper กับ broker บน local กัน run docker compose up -d
original docker-compose file from confluent
จากนั้นมาเริ่มเขียน faust app กัน โครงสร้างของ faust ง่ายมาก
# import faust library
import faust
# specific app name which need to be unique and specific kafka broker
app = faust.App("<APP_NAME>", broker="<KAFKA_BROKER>")
# specific which the app will consumed message from and specific value type
# see more serialization in : <https://faust.readthedocs.io/en/1.6/userguide/models.html#supported-codecs>
topic_in = app.topic("<TOPIC_IN>", value_type="<MESSAGE_VALUE_TYPE>", value_serializer="<MESSAGE_VALUE_SERIALIZER>")
# specific which topic the app will produce processed message to and specific value type
sink = app.topic("<TOPIC_OUT>", value_type="<MESSAGE_VALUE_TYPE>")
# the part where message is processed
@app.agent(topic_in)
async def process_demo(stream):
async for message in stream:
"<process function>"
await sink.send(value="<VALUE_TO_SEND>")
if __name__ == "__main__":
app.main()
จะเห็นว่าโครงสร้างสำหรับการเริ่มทำงานของ faust มีที่ต้องใส่ 5 ส่วน
1. import library
แน่นอนว่าที่ขาดไม่ได้เลยคือ faust นั่นเอง แต่ถ้าเราจำเป็นต้องทำงานร่วมกับ library อื่นหรือ script ทีเราเขียนแยกก็ import ในส่วนนี้ได้เลย เหมือนการเขียน python ทั่ว ๆ ไป
2. Specific app name
ใส่ชื่อ app และ Kafka broker url ในกรณีนี้เรา run app จากใน container หนึ่งและต้องการต่อกับ Kafka ที่อยู่ในอีก container หนึ่งเลยใส่เป็น "broker:9092"
สำหรับคำอธิบายเพิ่มเติมเกี่ยวกับการเชื่อมต่อ Kafka สามารถอ่านได้จากสามลิ้งนี้
- Connect to Kafka Running in Docker
- My Client Won’t Connect to My Apache Kafka Cluster in Docker/AWS/My Brother’s Laptop
- Kafka Listeners — Explained
3. Specific topic from which app will consume message
ใส่ชื่อ Kafka topic ที่เราต้องการจะให้ faust app อ่านข้อมูลมา ในที่นี้ขอเรียกว่า topic-in
และใส่ว่าข้อความที่ผ่าน topic มานั้นเป็นประเภทอะไร เช่น str, int ส่วนสุดท้ายที่ต้องใส่คือ message ของเรา serialize ด้วยรูปแบบอะไร เช่น raw หมายถึงไม่มีการ serialize, json หมายถึงรูปแบบเป็น json แล้วตัว faust จะแปลมาเป็น dictionary ให้อัตโนมัติ
4. Specific topic where app will produce or send message to (optional)
ใส่ชื่อ Kafka topic ที่เราต้องการให้ faust app ส่งข้อมูลที่ process แล้วไป ขอตั้งชื่อว่า topic-out
พร้อมระบุว่าข้อความที่จะส่งไปเป็นประเภทอะไร เช่น str, int
5. Specific process function
เขียนว่าเราต้องการให้ process ข้อมูลอย่างไรตามที่เราต้องการ ครั้งนี้เราจะใช้ส่วนนี้ในการ encrypt ข้อมูลนั่นเอง
เพียงเท่านี้เราก็ได้ faust streaming application แล้ว ตัวอย่างข้างล่างนี้คือลองส่งข้อมูลคะแนนของนักเรียนเป็น json ที่มีชื่อกับคะแนนไปแล้วให้แสดงผลว่านักเรียนชื่อนี้สอบได้คะแนนดีเยี่ยม สอบผ่าน หรือ สอบตก
import json
import os
import faust
app = faust.App("demo", broker=os.environ["KAKFA_BOOTSTRAP"])
topic_in = app.topic("topic-in", value_type=str, value_serializer=json)
# Consumer
@app.agent(topic_in)
async def greet(stream):
async for message in stream:
name = message["name"]
if message["score"] >= 80:
print(f"Well Done {name}!\\nYou scored more than 80 on the test.")
elif message["score"] >= 50:
print(f"Congratulations {name}!\\nYou pass the test.")
else:
print(f"To {name}:\\nSorry you failed the test.")
if __name__ == "__main__":
app.main()
จากนั้นถ้าต้องการ run faust app ให้ exec เข้าไปใน container
docker exec -it faust-app /bin/bash
จากนั้นรันคำสั่งนี้ใน container โดยที่เปลี่ยน “<FAUST_FILE_NAME>” เป็นชื่อไฟล์ที่เราเขียน faust app เช่น faust -A main worker -l info
faust -A "<FAUST_FILE_NAME>" worker -l info
จะเห็นว่ามีขึ้นชื่อ topic พร้อมบอก partition ที่รับข้อมูล และคำว่า ready หมายความว่าพร้อมทำงานแล้ว เราสามารถส่ง message เข้าไปได้เลย
เราจะมาทดลองส่งข้อมูลกัน โดยเราจะส่งผ่าน Kafka Producer
run คำสั่งนี้ใน terminal (นอก container)
docker exec --interactive --tty broker \\
kafka-console-producer --bootstrap-server broker:9092 \\
--topic topic-in # <- your TOPIC_IN name
จากนั้นก้ลองส่งข้อมูลเข้าไปใน topic-in เพื่อให้ app ของเราประมวลผลได้เลย
จะเห็นได้ว่าพอเราส่งข้อมูลเข้าไป ก็จะมีการส่งข้อมูลที่ประมวลผลออกมาเรียบร้อยแล้วให้เราแทบจะทันทีเลย
Tidbits
❓ ถ้าสังเกตจะเห็นว่าฟังก์ชันหลักของเรามีการรับ parameter หนึ่งที่ชื่อ stream คืออะไร?
💡 หมายถึง stream data หรือสายของข้อมูลที่ไหลเข้ามาอย่างต่อเนื่อง หรือก็คือ message ของเราไหลเข้ามาใน topic อย่างต่อเนื่อง แล้วมีการวนลูปให้ประมวลผลแต่ละ message ที่ไหลเข้ามานั่นเอง แต่จะใช้คำอื่นแทน stream กับ message ก็ได้เหมือนกัน ไม่มีข้อกำหนดตายตัวว่าควรจะใช้คำว่าอะไร
Encryption Process
หลังจากรู้จักกับ Faust ไปแล้วเรามาทำความรู้จักกับวิธีการที่เราจะใช้ในการ encrypt ข้อมูลกัน
การ encrypt ทุกคนน่าจะรู้จักกันดีอยู่แล้ว คือการเข้ารหัสข้อมูลไว้เพื่อป้องกันไม่ให้ข้อมูลถูกขโมยหรือถูกปลอมแปลง โดยการแปลงข้อมูลเป็นรหัสซึ่งสามารถถอดรหัสออกมาได้ผ่านกุญแจที่เข้ารหัสไว้เท่านั้น
แน่นอนว่า Google ก็มีบริการเรื่องของ Key ด้วยนั่นก็คือ Google Key Management Service (KMS) ที่ช่วยให้สามารถผู้ใช้สามารถสร้างและจัดการ Key ของตัวเองได้เลย อีกทั้งยังมีประเภทการ encrypt ที่หลากหลายและแน่นอนว่าสิ่งสำคัญอย่างหนึ่งของการ encrypt เลยคือต้องมี Key Rotation ซึ่ง KMS สามารถตั้งค่าได้เลยว่าอยากให้ Rotate ทุก ๆ กี่วัน
เมื่อถึงรอบวันที่กำหนด Key ใหม่จะถูกสร้างขึ้นมาโดยอัตโนมัติและจะถูกเซ็ตให้เป็น Primary พอจะ encrypt ข้อมูลใหม่ KMS จะใช้ Key ที่เป็น Primary ส่วน Key เก่าจะถูกใช้เพื่อ Decrypt ข้อมูลเก่าเท่านั้น
ดังนั้นถ้าเราทำลาย Key version เก่าจะทำให้ Decrypt ข้อมูลก่อนหน้าไม่ได้เลย ลองนึกดูว่าข้อมูลขนาดใหญ่ เช่น 10 GB หรือ 1 TB หรือมากกว่านั้น เราต้องมา Decrypt ข้อมูลทั้งหมด แล้ว Re-encrypt ข้อมูลด้วย Key ใหม่ในทุกครั้งที่ Rotate Key เป็นเรื่องสยองมากดังนั้นถ้าเราจะ Encrypt ข้อมูลด้วย KMS โดยตรง Key Rotation จะเป็นเรื่องยาก และ KMS ก้ไม่ได้ถูกออกแบบมาให้ใช้ Encrypt ข้อมูลขนาดใหญ่โดยตรง
ดังนั้นวิธีการที่เราจะใช้ก็คือ **Envelope Encryption**
นั่นเอง ถ้าใครอยากรู้แบบละเอียดสามารถตามไปอ่านที่บทความนี้ได้
สรุป Envelope Encryption แบบเร็ว ๆ
Envelope Encryption อธิบายแบบสั้น ๆ คือ
- สร้าง DEK แบบ Plaintext ขึ้นมา
- ใช้ Plaintext DEK ในการ encrypt ข้อมูลที่มี sensitive data
- Encrypt Plaintext DEK ด้วย KEK อีกทีได้เป็น Encrypted DEK
ด้วยวิธีการนี้ทำให้เมื่อ Rotate Key ไม่จำเป็นต้อง Re-encrypt ข้อมูลใหม่ทั้งหมด แค่ต้อง Re-encrypt DEK ด้วย Key ใหม่เท่านั้น
โดยเราจะใช้ Key จาก KMS เป็น KEK เพราะสามารถเก็บและจัดการ Key ได้ง่ายและปลอดภัย ส่วน DEK จะสร้างขึ้นและจัดการผ่าน cryptography library ซึ่งเราจะใช้ Tink นั่นเอง (สามารถอ่านแบบละเอียดและดูตัวอย่างการ Encrypt ได้ที่บทความ medium ก่อนหน้า)
Encryption with Streaming Processing
เรามารวม function การ encrypt เข้าไปใน faust กันเถอะ
เราเขียน function การสร้าง DEK ด้วย tink และใช้ DEK ในการ encrypt และ decrypt ข้อมูล (function การ encrypt/decrypt ทั้งหมดได้ดัดแปลงมาจาก code ในบทความนี้)
จากนั้นเราก็สร้าง main.py ของ faust application
จากนั้นเมื่อลองส่งข้อมูลเข้าไปใน topic-in และลองดูข้อมูลสุดท้ายที่ถูกส่งออกไปยัง topic-out จะเห็นว่าข้อมูลที่เป็นชื่อถูก encrypt เรียบร้อย
Tidbits
❕ จะสังเกตเห็นว่ามี @app.task() เพิ่มขึ้นมา คือหมายถึงให้ app ทำ function นี้เมื่อเริ่มหรือหลังขึ้น Worker Ready ครั้งเดียวเท่านั้น
Key Rotation
อย่างที่ได้กล่าวไปข้างต้นแล้วว่าเราต้องมี Key Rotation และวิธีการที่เราจะ Rotate Key คือตั้งค่าให้ KEK rotate ตามรอบวันที่เราต้องการโดยเราจะเซ็ตค่านี้ใน KMS ให้ทำอัตโนมัติ จากนั้นเราจะเขียน function ขึ้นมาโดยใช้ cron job ของ faust streaming เพื่อให้ faust app ทำ function ที่เราเขียนตามรอบวันที่กำหนด
❓ ก็ตั้งค่าให้ KEK ใน KMS rotate ก็จบแล้วนี่ ต้องทำอะไรเพิ่มอีกด้วยเหรอ
💡 คำตอบคืออย่าลืมว่าที่ถูก rotate ไปคือ KEK แต่ DEK ยังเป็นก้อนเดิม ใช้ KEK เวอร์ชั่นเก่า encrypt ไว้อยู่ ดังนั้นถ้าไม่ re-encrypt ก็เหมือนเราแค่สร้าง KEK เวอร์ชั่นใหม่ขึ้นมาโดยที่ไม่ได้ใช้งานอะไรเลย
ดังนั้นเราเขียน script สำหรับ Key Rotation กัน
แล้วก็เพิ่มส่วน @app.crontab(“* * * * *”) ลงไปใน main.py ทำให้ได้ main.py ทั้งหมดตามนี้
จะได้ผลตามนี้
Conclusion
ทั้งหมดนี้ก็คือการใช้ Faust Streaming รวมกับ KMS และ Tink Encryption เพื่อใช้ในการ Encrypt Sensitive Data สำหรับ Streaming Data ค่ะ ซึ่งการ encrypt ข้อมูลเป็นเพียงจุดเริ่มต้นของการป้องกัน sensitive data เท่านั้นยังมีอีกหลายเรื่องที่ต้องใช้เพื่อจะป้องกันให้แน่นหนามากขึ้นค่ะ เช่น การ authentication, authorization, การเปิด TLS ใน Kafka Cluster นอกจากนี้ในการ deploy ลงบน production environment ยังมีสิ่งที่ต้องเซ็ตเพิ่มอีกเช่น source connector หรือ sink connector สามารถไปตามอ่านได้ค่ะ
Reference
(1) https://azureops.org/articles/kafka-streaming-vs-spark-streaming/
- https://developer.confluent.io/quickstart/kafka-docker/
- https://cloud.google.com/learn/what-is-encryption
- https://faust.readthedocs.io/en/latest/userguide/tasks.html
- https://cloud.google.com/kms/docs/envelope-encryption
- https://life.wongnai.com/envelope-encryption-f93837e5309f
- https://cloud.google.com/kms/docs/encrypt-decrypt#kms-encrypt-symmetric-python
- https://github.com/faust-streaming/faust