Airflow sync dags from GCS on k8s

Pongthep Vijite
DAMAGeek
Published in
5 min readMay 2, 2022

หมายเหตุ ผู้อ่านสามารถดู table of contents ของ Data Engineering from Noob to Newbie ได้ที่ http://bit.ly/2P7isEw

refer: https://en.wikipedia.org/wiki/Apache_Airflow

Motivation

ปัจจุบัน(01/05/2022) การใช้งาน data composer ของ GCP ที่โดย default แล้ว GCP จะทำการสร้าง airflow cluster บน GKE ให้เราโดยประกอบไปด้วย 3 nodes และตั้งค่า airflow’s executor เป็น CeleryExecutor นั้นหมายความว่า worker จะถูกกำหนดแบบตายตัวตั้งแต่ cluster ถูกสร้างและเรื่องการจัดการ dags file นั้น data composer มีรูปแบบการจัดเก็บ dags file ไว้ที่ GCS หากเราอยากจะ update dags file ก็ทำการ upload เข้า GCS ได้เลย ตัว data composer จะทำการ sync dags ต่างๆให้เราเอง ซึ่งรูปแบบการใช้งานดังกล่าวก็สะดวกมากสำหรับการใช้งานแบบเริ่มต้น

แต่ถ้าหากใครมี k8s cluster อยู่แล้วและต้องการใช้ executor เป็นแบบ KubernetesExecutor เพื่อเพิ่มความยืดหยุ่นในการ scale out รวมถึงการเลือก node ที่เป็น Preemptible สำหรับ airflow’s worker node เพื่อลด cost เราก็คงเหลือทางเลือกเป็นติดตั้ง airflow บน k8s ที่เรามีอยู่แล้วเพื่อสามารถกำหนด airflow’s executor และ node selector เองได้ (แต่ในอนาคตหาก data composer มีให้บริการแบบ serverless ก็อาจจะเพิ่มความน่าสนใจมากขึ้น) ซึ่งการติดตั้ง airflow บน k8s ก็ทำได้อย่างง่ายดายผ่าน helm install แต่ในส่วนการสร้าง dags นั้น airflow มี options การ config ให้ด้วยกัน 2 แบบนั้นคือ git sync และสร้าง volume บน k8s ซึ่งตรงนี้เราก็จะไม่สามารถวาง dag file ได้เหมือน data composer แต่หากทีมไหนมีการ impl ตัว airflow ด้วยใช้ git อยู่แล้วการเลือก git sync ดูเป็นทางเลือกที่สะดวกดี แค่กำหนด branch จากนั้นเวลามีใครในทีม commit airflow’s dag เข้า git repo ใน branch นั้น ตัว git sync ก็จะทำการ sync code นั้นมาให้เองเลย

แต่ถ้าหาก data engineer ต้อง provide airflow เพื่อให้ทีมต่างๆในองค์กรสามารถมาร่วมใช้งานแบบ share resource อย่าง Pinterests แล้วละก็ การทำ git sync อาจจะไม่สะดวกนักเพราะเท่ากับว่าทุกทีมต้องแชร์ git repo ร่วมกัน ทางเลือกก็อาจจะทำการ setup airflow ให้สามารถ sync dag file จาก lake แบบที่ Pinterests ทำ ซึ่งจากที่เราได้กล่าวมาแล้วนั้นไม่สามารถทำได้โดย default ของ helm chart จาก airflow จึงเป็นที่มาของบทความนี้ที่เราจะมาขยายความสามารถของ airflow เพื่อให้สามารถทำงานตอบโจทย์เราได้

แม้ว่าในบทความนี้เราจะกล่าวถึง airflow ดึง dag file จาก lake ที่เป็น GCS ของ GCP แต่ด้วยรูปแบบการ impl นั้นคาดว่าสามารถปรับใช้กับ lake ที่เป็น S3 ของ AWS ได้เช่นกัน โดยเปลี่ยนจาก gcpfuse เป็น s3fs-fuse ซึ่งเป็น lib ที่เราจะใช้ mount file กับ lake

Prepare

เริ่มต้นเราอาจจะต้องทำการ build airflow’s docker image ที่จะใช้งานขึ้นมาจากเองโดยการต่อยอดจาก official ของ airflow ซึ่งสาเหตุเพราะก่อนหน้านี้ผมได้ทำการลองสร้าง gcs sync ให้ทำงานเป็นแบบ sidecar เหมือนกับ git sync ที่ airflow มีมาให้ แต่เหมือนกับว่า container ของ airflow’s scheduler จะไม่สามารถมองเห็น file ที่ gcsfuse mount ไว้แม้ว่าจะการ mount path จะตรงกันก็ตาม จึงทำให้วิธีต่อมาในการทดลองคือการ pack gcsfuse เข้าไปกับตัว airflow’s docker image และเรียกใช้งานจาก scheduler container แทน ซึ่งเริ่มแรกเราต้องทำการสร้าง script เพื่อใช้ติดตั้ง gcsfuse กันก่อน

#!/bin/bashexport GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
apt-get update
apt-get install gcsfuse -y

จากนั้นก็ทำการ build docker image ของ airflow ที่มี gcsfuse อยู่ด้านใน

FROM apache/airflow:2.2.3-python3.8USER root
RUN apt install ca-certificates
COPY requirements.txt requirements.txt
RUN /usr/local/bin/python3 -m pip install -r requirements.txt
ADD gcsfuse_setup.sh gcsfuse_setup.sh
RUN bash gcsfuse_setup.sh

ถึงตรงนี้ docker image เราก็พร้อมใช้งานแล้ว ในขั้นตอนการ prepare ต่อมาจะเป็นการ pull airflow helm chart เพราะเราจะต้องแก้ไข chart ของ official โดยใช้คำสั่ง

helm pull apache-airflow/airflow

หลังจาก pull แล้วเราจะได้ file ที่ชื่อว่า airflow-1.5.0.tgz โดยเลข 1.5.0 เป็น version ของ chart ณ วันที่ผมทำการทดสอบ ขั้นตอนต่อมาเราก็จะใช้คำสั่งแตก file ใน tgz

tar zxvf  airflow-1.5.0.tgz

เท่านี้เราก็พร้อมจะทำการติดตั้งกันแล้ว ซึ่งรูปแบบการ setup พื้นฐานของ airflow บน k8s ใน GCP ผมจะขอข้ามไปและจาก focus เฉพาะการแก้ไข helm chart เท่านั้น โดยผู้อ่านที่สนใจรายละเอียดการติดตั้งสามารถดูขั้นตอนได้จากน้อง Mils เลยครับ https://mesodiar.com/setup-airflow-on-kubernetes-with-remote-logging-to-gcs-bf7869642b4c

Config

การ config gcsSync ของ values.yaml

จากรูปข้างต้นจะเห็นว่าส่วนการ config dags ของ values.yaml ใน airflow นั้นมีตัวเลือก 2 แบบอย่างที่เรากล่าวไปแล้ว ในส่วนที่เราจะเพิ่มขึ้นมาเป็น option ที่ 3 นั้นก็คือ gcsSync โดยมี 2 ตัวแปรที่เราจะต้อง set นั้นก็คือ enabled ที่ใช้กำหนดว่า option ไหนในการจัดเก็บ dag ที่เราจะเลือกใช้ในการติดตั้งนี้ และ bucket ที่เราจะใช้เพื่อกำหนด bucket บน GCS ที่เราจะทำการ sync

values.schema.json

โดยในจุดแรกที่เราต้องเพิ่มคือ values.schema.json ซึ่ง airflow ใช้เป็นเสมือน config verify โดยจุดที่เราจะเพิ่มอยู่ในจุดของ properties ของ dag config ดังรูปด้านล่าง

gcsSync config verify

scheduler-deployment.yaml

ขึ้นตอนต่อไปเราจะ config ให้ airflow’s scheduler เรา sync dag file จาก GCS ได้โดยเราจะแก้ไข file scheduler-deployment.yaml (airflow/template/scheduler/scheduler-deployment.yaml)

config lifecycle and privileged

สิ่งที่เราทำการกำหนดไปตามรูปข้างบนนี้คือหากมีการ set ว่าจะใช้ gcs sync เราจะเพิ่ม action หลังจากที่ scheduler container ทำการ start ไปแล้วโดยเราจะเรียกคำสั่ง mount folder กับ bucket ซึ่งชื่อของ bucket เราจะดึงมาจาก config ที่ได้แสดงไปก่อนหน้า อีกส่วนนึง securityContext ซึ่งโดย default แล้ว airflow’s scheduler ไม่ได้ config ในส่วนนี้เราเลยสามารถเอา if condition มาครอบไว้ได้ โดยที่เราจะ set ให้ scheduler container ทำงานแบบ privileged mode

ต่อมาในการเข้าถึง GCS เราจำเป็นต้องมีการใช้งาน service account’s json เพื่อใช้สร้าง env var ที่ชื่อ GOOGLE_APPLICATION_CREDENTIALS ดังนั้น scheduler container เราก็ต้องทำการ mount volume เพิ่มเติมเพื่อนำ k8s secret มาสร้างเป็น json file ดังกล่าว

config volumeMounts

ส่วนขั้นตอนการสร้าง env var นั้นอยู่ใน file values.yaml ที่เราจะใช้ทำการ install อยู่แล้ว

การสร้าง env var ใน values.yaml

มีการสร้าง volumeMount แล้วก็ต้องมีการ config ส่วนของ volume กัน ซึ่งจากรูปด้านล่างเราจะเรียกใช้ k8s secret ที่ชื่อ google-sa-key ในส่วนของ dag volume เนื่องจากเราไม่ได้จำเป็นต้องสร้าง volume แบบ emptyDir อย่างของ gitSync จึงไม่ต้องแก้ไข config ในส่วนนั้น

config volume

มาถึงตรงนี้หากเราทำการติดตั้งโดยใช้คำสั่งด้านล่างซึ่ง airflow_charts คือ folder เก็บ chart ที่เราทำการได้แก้ไขไป ตัว airflow cluster โดยเฉพาะ scheduler เราก็จะสามารถ sync dag files จาก GCS ได้แล้ว

helm int airflow-lake -n airflow -f values.yaml airflow_charts

แต่หลังจากที่วาง file ใน GCS แล้วตัว scheduler ใช้เวลาประมาณ 3–5 นาทีกว่าจะแสดง dag บนหน้า UI เราเลยจะเร่งเวลาในส่วนนี้ โดยการกำหนดให้ interval เหลือ 30 วิ โดย config ในส่วนนี้เราสามารถ config ได้ใน file values.yaml

กำหนด interval ในการที่ scheduler จะ scan dag

pod-template-file.kubernetes-helm-yaml

แม้ว่า airflow cluster เราจะ sync dag file จาก GCS ได้และแสดงผลหน้า UI เรียบร้อยแล้ว แต่ด้วยความที่เรากำหนด executor เป็นแบบ KubernetesExecutor ทำให้ cluster เราไม่มี worker ไว้ให้ airflow ประมวลผล task ใน dag แต่ทุก task ใน dag จะถูกสร้างออกมาเป็น pod แยกใน k8s ทุกครั้งที่ run แทน ซึ่งหากเราทำการ run dag ณ ตอนนี้จะมี error แสดงว่า airflow หา dag file ไม่พบ เนื่องจาก pod ที่สร้างนั้นไม่ได้มีการ sync dag file จาก GCS สิ่งที่เราจะทำคือแก้ไข file ที่ชื่อว่า pod-template-file.kubernetes-helm-yaml (files/pod-template-file.kubernetes-helm-yaml) ซึ่ง file นี้ airflow จะใช้เป็น template ที่สร้าง task pod ของทุก task ภายใน dags

init container step

จากรูปข้างต้นในจังหวะของขั้นตอนการ init container ของ task pod หากเป็นการใช้ gcsSync option เราจะทำการ mount bucket เข้ากับ gcs folder หลังจากนั้นจะทำการ copy files ทุกหมดไปไว้ใน dags folder เพราะว่าหลังจากที่ init step จบลงการ mount ดังกล่าวและ file ทั้งหมดก็จะหายไป มาถึงขั้นตอนนี้ airflow cluster บน k8s ของเราก็สามารถ sync dags จาก GCS และทำงานได้อย่างที่ต้องการแล้ว

สุดท้ายนี้ถ้าท่านใดมีข้อสงสัยหรือคำชี้แนะใดๆสามารถฝากข้อความได้ที่ https://www.facebook.com/coeffest/ นะครับ ขอบคุณมากครับที่ติดตาม

สำหรับคนที่สนใจ Data Engineer สามารถเข้ามาแชร์ข้อมูลกันที่ได้ที่ https://www.facebook.com/groups/dataengineerth

Refer:

--

--