Encrypt Streaming Data with Faust Streaming

Watcharee Skr
CJ Express Tech (TILDI)

--

สำหรับทุกธุรกิจแล้วการที่เราสามารถนำข้อมูลลูกค้ามาวิเคราะห์พฤติกรรมแล้วทำแคมเปญหรือออกโปรโมชั่นสักอย่างออกมาเพื่อชวนให้ลูกค้ามาซื้อของมากขึ้น เป็นเรื่องที่ขาดไม่ได้เลย แต่เราก็ไม่สามารถนำข้อมูลที่เต็มไปด้วยข้อมูลระบุตัวตนได้หรือ PII มาเก็บและนำไปใช้โดยไม่ป้องกันอะไรได้เลย ดังนั้นในบทความนี้เราจะมาพูดถึงการ Encrypt Streaming data ด้วย Faust Streaming กันค่ะ

**Table of Contents**

- Data Encryption Pipeline
- Streaming Processing
- Faust Streaming
- มาทดลองใช้ Faust Streaming กันเถอะ
- Encryption Process
- สรุป Envelope Encryption แบบเร็ว ๆ
- Encryption with Streaming Processing
- Key Rotation
- Conclusion
- Reference

Data Encryption Pipeline

สำหรับการนำข้อมูลเข้า Data Platform ของข้อมูลชัดนี้เราจะใช้เป็น Change Data Capture (CDC) ผ่าน Kafka เนื่องจากข้อมูลที่ได้จะเป็นข้อมูลที่ปัจจุบันที่สุดและข้อมูลครบถ้วนไม่มีปัญหาเรื่องของ timezone หรือ datetime ที่เป็นปัญหาคลาสสิคของการ query ข้อมูล รวมไปถึงยังทำให้ database ไม่จำเป็นต้องรับโหลดมากจากการ query อีกด้วย

ทีนี้ถ้าเป็น CDC แบบไม่มีการทำ streaming transforming ข้อมูลที่มี PII ก็จะถูกส่งผ่าน Source Connector ไปยัง Topic A แล้วจึง sink ไปลง Google Cloud Storage (GCS) ด้วย Sink Connector แต่ว่าเราไม่ต้องการเก็บข้อมูลที่มี PII ลง storage bucket เลยเพราะมีความเสี่ยงที่ใครก็ได้ที่มีสิทธิเข้าถึง project นี้จะเข้ามาเห็นและแก้ไขข้อมูลได้ ดังนั้นเราจึงจำเป็นต้อง encrypt ข้อมูลก่อนจะ sink ลง GCS เลย คือเมื่อข้อมูล record หนึ่งไหลผ่านเข้ามาที่ topic A จะเกิดการ encrypt เกิดขึ้นแล้วส่งออกไปยังอีก topic หนึ่งซึ่งก็คือ topic B จากนั้นจึง sink ข้อมูลที่ encrypt แล้วที่ topic B ลง GCS ทำให้ไม่มีข้อมูล PII เลยแม้แต่ที่ storage

Streaming Processing

Streaming Processing คือ การที่เราประมวลผลหรือแปลงข้อมูลทันทีที่มีข้อมูลไหลเข้ามาอย่างต่อเนื่อง แน่นอนว่า latency น้อยกว่าแบบ batch มากเพราะไม่ต้องรอให้ถึงช่วงเวลาที่ตั้งค่าไว้ถึงจะทำงาน ในปัจจุบันมีหลายเจ้ามากที่สามารถช่วย stream processing ได้ เช่น Kafka Streams, Apache Flink เป็นต้น แต่ขอยกตัวอย่างเทียบสามตัวหลักที่เราเห็นกันบ่อย ๆ ได้แก่ Kafka Streams, Apache Flink และ Faust Streaming สำหรับ spark streaming ขอไม่นำมาเทียบเนื่องจาก spark straming เป็น micro-batch ไม่ใช่ streaming processing (1)

table comparing streaming processing framework

เราเลือกใช้ Faust streaming เป็น library สำหรับ streaming processing ในครั้งนี้เพราะเหมือนกับ Kafka Stream เกือบทั้งหมดแต่เป็นฉบับ python ซึ่งเข้าใจง่ายกว่า นอกจากนี้ Faust Streaming ยังมี option ที่น่าสนใจอีกอย่างคือสามารถตั้ง cron job ในตัวเองได้ซึ่งเราจะสิ่งนี้แหละในการทำ key rotation กัน

Faust Streaming

อย่างที่ได้เล่าไปว่าเราเลือกใช้ Faust Streaming เป็น library ในการทำ streaming processing ตอนนี้เรามาทำความรู้จักกับ Faust ให้มากขึ้นกันดีกว่า

Faust Streaming หรือ Faust เป็น stream processing library โดยใช้แนวคิดเดียวกันกับ Kafka Streams แต่เป็น python แทน Java เริ่มต้นโดย Robinhood แต่ภายหลังทาง Robinhood ก็เลิก support project ไป ตอนนี้กลายเป็น community ที่ช่วยกัน contribute ให้กับ project แทน และในปัจจุบันก็มี feature ใหม่ ๆ และมีการแก้ปัญหาเดิมที่ถูกทิ้งไว้เนิ่นนานด้วย community นี้เอง

จุดสำคัญของ Faust เลยคือ ใช้งานง่าย รวดเร็ว และ ยืดหยุ่นมาก เพราะต้องการแค่ Kafka cluster และ python เท่านั้น ที่เหลือเราจะใช้ library อื่นอะไรก้ได้ที่มีใน python เช่น Numpy, TensorFlow

มาทดลองใช้ Faust Streaming กันเถอะ

สร้าง Docker Image ตาม Dockerfile นี้

ใน requirements.txt

faust-streaming==0.10.4
tink==1.6.1
protobuf==3.20.*
google-cloud-kms==2.15.0
crcmod==1.7
pendulum==2.1.2

จากนั้นเริ่มสร้าง Kafka cluster ง่าย ๆ ที่มีแค่ zookerper กับ broker บน local กัน run docker compose up -d

original docker-compose file from confluent

จากนั้นมาเริ่มเขียน faust app กัน โครงสร้างของ faust ง่ายมาก

# import faust library
import faust

# specific app name which need to be unique and specific kafka broker
app = faust.App("<APP_NAME>", broker="<KAFKA_BROKER>")

# specific which the app will consumed message from and specific value type
# see more serialization in : <https://faust.readthedocs.io/en/1.6/userguide/models.html#supported-codecs>
topic_in = app.topic("<TOPIC_IN>", value_type="<MESSAGE_VALUE_TYPE>", value_serializer="<MESSAGE_VALUE_SERIALIZER>")

# specific which topic the app will produce processed message to and specific value type
sink = app.topic("<TOPIC_OUT>", value_type="<MESSAGE_VALUE_TYPE>")

# the part where message is processed
@app.agent(topic_in)
async def process_demo(stream):
async for message in stream:

"<process function>"

await sink.send(value="<VALUE_TO_SEND>")


if __name__ == "__main__":
app.main()

จะเห็นว่าโครงสร้างสำหรับการเริ่มทำงานของ faust มีที่ต้องใส่ 5 ส่วน

1. import library

แน่นอนว่าที่ขาดไม่ได้เลยคือ faust นั่นเอง แต่ถ้าเราจำเป็นต้องทำงานร่วมกับ library อื่นหรือ script ทีเราเขียนแยกก็ import ในส่วนนี้ได้เลย เหมือนการเขียน python ทั่ว ๆ ไป

2. Specific app name

ใส่ชื่อ app และ Kafka broker url ในกรณีนี้เรา run app จากใน container หนึ่งและต้องการต่อกับ Kafka ที่อยู่ในอีก container หนึ่งเลยใส่เป็น "broker:9092" สำหรับคำอธิบายเพิ่มเติมเกี่ยวกับการเชื่อมต่อ Kafka สามารถอ่านได้จากสามลิ้งนี้

3. Specific topic from which app will consume message

ใส่ชื่อ Kafka topic ที่เราต้องการจะให้ faust app อ่านข้อมูลมา ในที่นี้ขอเรียกว่า topic-in และใส่ว่าข้อความที่ผ่าน topic มานั้นเป็นประเภทอะไร เช่น str, int ส่วนสุดท้ายที่ต้องใส่คือ message ของเรา serialize ด้วยรูปแบบอะไร เช่น raw หมายถึงไม่มีการ serialize, json หมายถึงรูปแบบเป็น json แล้วตัว faust จะแปลมาเป็น dictionary ให้อัตโนมัติ

4. Specific topic where app will produce or send message to (optional)

ใส่ชื่อ Kafka topic ที่เราต้องการให้ faust app ส่งข้อมูลที่ process แล้วไป ขอตั้งชื่อว่า topic-out พร้อมระบุว่าข้อความที่จะส่งไปเป็นประเภทอะไร เช่น str, int

5. Specific process function

เขียนว่าเราต้องการให้ process ข้อมูลอย่างไรตามที่เราต้องการ ครั้งนี้เราจะใช้ส่วนนี้ในการ encrypt ข้อมูลนั่นเอง

เพียงเท่านี้เราก็ได้ faust streaming application แล้ว ตัวอย่างข้างล่างนี้คือลองส่งข้อมูลคะแนนของนักเรียนเป็น json ที่มีชื่อกับคะแนนไปแล้วให้แสดงผลว่านักเรียนชื่อนี้สอบได้คะแนนดีเยี่ยม สอบผ่าน หรือ สอบตก

import json
import os
import faust

app = faust.App("demo", broker=os.environ["KAKFA_BOOTSTRAP"])
topic_in = app.topic("topic-in", value_type=str, value_serializer=json)

# Consumer
@app.agent(topic_in)
async def greet(stream):
async for message in stream:
name = message["name"]
if message["score"] >= 80:
print(f"Well Done {name}!\\nYou scored more than 80 on the test.")
elif message["score"] >= 50:
print(f"Congratulations {name}!\\nYou pass the test.")
else:
print(f"To {name}:\\nSorry you failed the test.")

if __name__ == "__main__":
app.main()

จากนั้นถ้าต้องการ run faust app ให้ exec เข้าไปใน container

docker exec -it faust-app /bin/bash

จากนั้นรันคำสั่งนี้ใน container โดยที่เปลี่ยน “<FAUST_FILE_NAME>” เป็นชื่อไฟล์ที่เราเขียน faust app เช่น faust -A main worker -l info

faust -A "<FAUST_FILE_NAME>" worker -l info

จะเห็นว่ามีขึ้นชื่อ topic พร้อมบอก partition ที่รับข้อมูล และคำว่า ready หมายความว่าพร้อมทำงานแล้ว เราสามารถส่ง message เข้าไปได้เลย

เราจะมาทดลองส่งข้อมูลกัน โดยเราจะส่งผ่าน Kafka Producer

run คำสั่งนี้ใน terminal (นอก container)

docker exec --interactive --tty broker \\
kafka-console-producer --bootstrap-server broker:9092 \\
--topic topic-in # <- your TOPIC_IN name

จากนั้นก้ลองส่งข้อมูลเข้าไปใน topic-in เพื่อให้ app ของเราประมวลผลได้เลย

จะเห็นได้ว่าพอเราส่งข้อมูลเข้าไป ก็จะมีการส่งข้อมูลที่ประมวลผลออกมาเรียบร้อยแล้วให้เราแทบจะทันทีเลย

Tidbits

ถ้าสังเกตจะเห็นว่าฟังก์ชันหลักของเรามีการรับ parameter หนึ่งที่ชื่อ stream คืออะไร?

💡 หมายถึง stream data หรือสายของข้อมูลที่ไหลเข้ามาอย่างต่อเนื่อง หรือก็คือ message ของเราไหลเข้ามาใน topic อย่างต่อเนื่อง แล้วมีการวนลูปให้ประมวลผลแต่ละ message ที่ไหลเข้ามานั่นเอง แต่จะใช้คำอื่นแทน stream กับ message ก็ได้เหมือนกัน ไม่มีข้อกำหนดตายตัวว่าควรจะใช้คำว่าอะไร

Encryption Process

หลังจากรู้จักกับ Faust ไปแล้วเรามาทำความรู้จักกับวิธีการที่เราจะใช้ในการ encrypt ข้อมูลกัน

การ encrypt ทุกคนน่าจะรู้จักกันดีอยู่แล้ว คือการเข้ารหัสข้อมูลไว้เพื่อป้องกันไม่ให้ข้อมูลถูกขโมยหรือถูกปลอมแปลง โดยการแปลงข้อมูลเป็นรหัสซึ่งสามารถถอดรหัสออกมาได้ผ่านกุญแจที่เข้ารหัสไว้เท่านั้น

แน่นอนว่า Google ก็มีบริการเรื่องของ Key ด้วยนั่นก็คือ Google Key Management Service (KMS) ที่ช่วยให้สามารถผู้ใช้สามารถสร้างและจัดการ Key ของตัวเองได้เลย อีกทั้งยังมีประเภทการ encrypt ที่หลากหลายและแน่นอนว่าสิ่งสำคัญอย่างหนึ่งของการ encrypt เลยคือต้องมี Key Rotation ซึ่ง KMS สามารถตั้งค่าได้เลยว่าอยากให้ Rotate ทุก ๆ กี่วัน

KMS key edit rotation period UI

เมื่อถึงรอบวันที่กำหนด Key ใหม่จะถูกสร้างขึ้นมาโดยอัตโนมัติและจะถูกเซ็ตให้เป็น Primary พอจะ encrypt ข้อมูลใหม่ KMS จะใช้ Key ที่เป็น Primary ส่วน Key เก่าจะถูกใช้เพื่อ Decrypt ข้อมูลเก่าเท่านั้น

Example of KMS key

ดังนั้นถ้าเราทำลาย Key version เก่าจะทำให้ Decrypt ข้อมูลก่อนหน้าไม่ได้เลย ลองนึกดูว่าข้อมูลขนาดใหญ่ เช่น 10 GB หรือ 1 TB หรือมากกว่านั้น เราต้องมา Decrypt ข้อมูลทั้งหมด แล้ว Re-encrypt ข้อมูลด้วย Key ใหม่ในทุกครั้งที่ Rotate Key เป็นเรื่องสยองมากดังนั้นถ้าเราจะ Encrypt ข้อมูลด้วย KMS โดยตรง Key Rotation จะเป็นเรื่องยาก และ KMS ก้ไม่ได้ถูกออกแบบมาให้ใช้ Encrypt ข้อมูลขนาดใหญ่โดยตรง

ดังนั้นวิธีการที่เราจะใช้ก็คือ **Envelope Encryption** นั่นเอง ถ้าใครอยากรู้แบบละเอียดสามารถตามไปอ่านที่บทความนี้ได้

สรุป Envelope Encryption แบบเร็ว ๆ

Envelope Encryption อธิบายแบบสั้น ๆ คือ

  1. สร้าง DEK แบบ Plaintext ขึ้นมา
  2. ใช้ Plaintext DEK ในการ encrypt ข้อมูลที่มี sensitive data
  3. Encrypt Plaintext DEK ด้วย KEK อีกทีได้เป็น Encrypted DEK
Envelope Encryption | Google Cloud

ด้วยวิธีการนี้ทำให้เมื่อ Rotate Key ไม่จำเป็นต้อง Re-encrypt ข้อมูลใหม่ทั้งหมด แค่ต้อง Re-encrypt DEK ด้วย Key ใหม่เท่านั้น

โดยเราจะใช้ Key จาก KMS เป็น KEK เพราะสามารถเก็บและจัดการ Key ได้ง่ายและปลอดภัย ส่วน DEK จะสร้างขึ้นและจัดการผ่าน cryptography library ซึ่งเราจะใช้ Tink นั่นเอง (สามารถอ่านแบบละเอียดและดูตัวอย่างการ Encrypt ได้ที่บทความ medium ก่อนหน้า)

Encryption with Streaming Processing

เรามารวม function การ encrypt เข้าไปใน faust กันเถอะ

เราเขียน function การสร้าง DEK ด้วย tink และใช้ DEK ในการ encrypt และ decrypt ข้อมูล (function การ encrypt/decrypt ทั้งหมดได้ดัดแปลงมาจาก code ในบทความนี้)

จากนั้นเราก็สร้าง main.py ของ faust application

จากนั้นเมื่อลองส่งข้อมูลเข้าไปใน topic-in และลองดูข้อมูลสุดท้ายที่ถูกส่งออกไปยัง topic-out จะเห็นว่าข้อมูลที่เป็นชื่อถูก encrypt เรียบร้อย

Tidbits

❕ จะสังเกตเห็นว่ามี @app.task() เพิ่มขึ้นมา คือหมายถึงให้ app ทำ function นี้เมื่อเริ่มหรือหลังขึ้น Worker Ready ครั้งเดียวเท่านั้น

Key Rotation

อย่างที่ได้กล่าวไปข้างต้นแล้วว่าเราต้องมี Key Rotation และวิธีการที่เราจะ Rotate Key คือตั้งค่าให้ KEK rotate ตามรอบวันที่เราต้องการโดยเราจะเซ็ตค่านี้ใน KMS ให้ทำอัตโนมัติ จากนั้นเราจะเขียน function ขึ้นมาโดยใช้ cron job ของ faust streaming เพื่อให้ faust app ทำ function ที่เราเขียนตามรอบวันที่กำหนด

❓ ก็ตั้งค่าให้ KEK ใน KMS rotate ก็จบแล้วนี่ ต้องทำอะไรเพิ่มอีกด้วยเหรอ

💡 คำตอบคืออย่าลืมว่าที่ถูก rotate ไปคือ KEK แต่ DEK ยังเป็นก้อนเดิม ใช้ KEK เวอร์ชั่นเก่า encrypt ไว้อยู่ ดังนั้นถ้าไม่ re-encrypt ก็เหมือนเราแค่สร้าง KEK เวอร์ชั่นใหม่ขึ้นมาโดยที่ไม่ได้ใช้งานอะไรเลย

ดังนั้นเราเขียน script สำหรับ Key Rotation กัน

แล้วก็เพิ่มส่วน @app.crontab(“* * * * *”) ลงไปใน main.py ทำให้ได้ main.py ทั้งหมดตามนี้

จะได้ผลตามนี้

Conclusion

ทั้งหมดนี้ก็คือการใช้ Faust Streaming รวมกับ KMS และ Tink Encryption เพื่อใช้ในการ Encrypt Sensitive Data สำหรับ Streaming Data ค่ะ ซึ่งการ encrypt ข้อมูลเป็นเพียงจุดเริ่มต้นของการป้องกัน sensitive data เท่านั้นยังมีอีกหลายเรื่องที่ต้องใช้เพื่อจะป้องกันให้แน่นหนามากขึ้นค่ะ เช่น การ authentication, authorization, การเปิด TLS ใน Kafka Cluster นอกจากนี้ในการ deploy ลงบน production environment ยังมีสิ่งที่ต้องเซ็ตเพิ่มอีกเช่น source connector หรือ sink connector สามารถไปตามอ่านได้ค่ะ

Reference

(1) https://azureops.org/articles/kafka-streaming-vs-spark-streaming/

  • https://developer.confluent.io/quickstart/kafka-docker/
  • https://cloud.google.com/learn/what-is-encryption
  • https://faust.readthedocs.io/en/latest/userguide/tasks.html
  • https://cloud.google.com/kms/docs/envelope-encryption
  • https://life.wongnai.com/envelope-encryption-f93837e5309f
  • https://cloud.google.com/kms/docs/encrypt-decrypt#kms-encrypt-symmetric-python
  • https://github.com/faust-streaming/faust

--

--