Apache Airflow — ExternalTaskSensor ile DAG Bağımlılığı Oluşturmak
Selamlar,
Daha önceki yazımda Apache Airflow’u Docker üzerinde çalıştırma ve basit bir task oluşturma yazısı yazmıştım. Daha önce Airflow kullanmadıysanız ve kurmak istiyorsanız; önceki yazıma buradan ulaşabilirsiniz.
Bu yazımda ise Airflow DAG’larınızı birbirine bağımlı hale getirmek istiyorsanız ne yapmanız gerektiğinden bahsedeceğim. :)
Kodlarınızı schedule etmek için Airflow tercih ettiyseniz birden fazla DAG yazmak durumunda kalmış olabilirsiniz. Bazı durumlarda ise bu DAG’ların birbirini beklemesi gerekmiş olabilir. Örneğin;
Verileriniz, saatte bir Hadoop’a gönderiyorsunuz ve bunu bir DAG olarak tanımladınız adını da TO_HADOOP olarak belirlediniz.
Ancak gelen bir talep üzerine; gelen bu verilerden sonra Hadoop üzerinde bazı işlemler yapmanız gerekiyor ve bu işlemleri TO_HADOOP DAG’ında yapmak istemiyorsunuz. Yeni bir DAG tanımladınız adını da CHECK_HADOOP olarak belirlediniz.
Python kodlarımızı da aşağıdaki şekilde olduğunu düşünelim;
Airflow DAG Bağımlılığı için ExternalTaskSensor adlı bir fonksiyona sahip. Bu fonksiyon sayesinde bir DAG’ımıza task olarak, x adlı DAG’ın y adlı task’ının tamamlanmasını bir task olarak belirleyebiliriz.
O zaman örnek durumuz için düşünürsek. TO_HADOOP adlı DAG’ımızın TRANSFER_DONE id’li final taskını CHECK_HADOOP DAG’ında ExternalTaskSensor fonksiyonunu kullanarak bir task olarak oluşturabiliriz. Bu oluşturduğumuz yeni task’ı da başlangıç task’ı olarak CHECK_HADOOP DAG’ına verirsek. İki DAG’ı birbirine bağımlı hale getirmiş oluruz.
Hadi bunu kod üzerinde gösterelim;
check_hadoop.py kodumuza w_t adında bir değişken oluşturduk ve bu değişken; ExternalTaskSensor fonksiyonu ile bir task haline geldi. Bu task parametre olarak task_id, external_dag_id, external_task_id, dag parametrelerini aldı.
- task_id: w_t adlı değişkenin task adı.
- external_dag_id: beklemek istediğiniz dag’ın adı.
- external_dag_id: beklemek istediğiniz dag’ın içindeki beklemek istediğiniz task adı.
- dag: bulunduğu python kodundaki dag’ın adı.
w_t >> t_c >> t_d
Olacak şekilde w_t adlı task’ı başlangıç task’ı olarak belirledik.
İşlemimiz bu kadar. :)
Artık iki DAG arasında bağımlılık yapısını kurmuş olduk. Her saat başı TO_HADOOP adlı DAG çalışıp tamamlandığında CHECK_HADOOP adlı DAG w_t adlı task’ı complete’e çekip process tasklarını başlatacak.
Buradaki dikkat edilmesi gereken nokta, schedule_interval değerlerinin mantıklı bir yapı içinde kurulması. Ve belirtilen talebe göre ayarlanması. Bizim örneğimizde saatte bir çalıştığı için to_hadoop işimiz, check_hadoop işimizide saatte bir çalışacak şekilde ayarladık.
Sorularınız olursa sormaktan çekinmeyiniz. İyi kodlamalar :)