Apache Airflow — ExternalTaskSensor ile DAG Bağımlılığı Oluşturmak

Onur Taşhan
Trendyol Tech
Published in
2 min readJul 18, 2019
Apache Airflow

Selamlar,

Daha önceki yazımda Apache Airflow’u Docker üzerinde çalıştırma ve basit bir task oluşturma yazısı yazmıştım. Daha önce Airflow kullanmadıysanız ve kurmak istiyorsanız; önceki yazıma buradan ulaşabilirsiniz.

Bu yazımda ise Airflow DAG’larınızı birbirine bağımlı hale getirmek istiyorsanız ne yapmanız gerektiğinden bahsedeceğim. :)

Kodlarınızı schedule etmek için Airflow tercih ettiyseniz birden fazla DAG yazmak durumunda kalmış olabilirsiniz. Bazı durumlarda ise bu DAG’ların birbirini beklemesi gerekmiş olabilir. Örneğin;

Verileriniz, saatte bir Hadoop’a gönderiyorsunuz ve bunu bir DAG olarak tanımladınız adını da TO_HADOOP olarak belirlediniz.

Ancak gelen bir talep üzerine; gelen bu verilerden sonra Hadoop üzerinde bazı işlemler yapmanız gerekiyor ve bu işlemleri TO_HADOOP DAG’ında yapmak istemiyorsunuz. Yeni bir DAG tanımladınız adını da CHECK_HADOOP olarak belirlediniz.

Python kodlarımızı da aşağıdaki şekilde olduğunu düşünelim;

Airflow DAG Bağımlılığı için ExternalTaskSensor adlı bir fonksiyona sahip. Bu fonksiyon sayesinde bir DAG’ımıza task olarak, x adlı DAG’ın y adlı task’ının tamamlanmasını bir task olarak belirleyebiliriz.

O zaman örnek durumuz için düşünürsek. TO_HADOOP adlı DAG’ımızın TRANSFER_DONE id’li final taskını CHECK_HADOOP DAG’ında ExternalTaskSensor fonksiyonunu kullanarak bir task olarak oluşturabiliriz. Bu oluşturduğumuz yeni task’ı da başlangıç task’ı olarak CHECK_HADOOP DAG’ına verirsek. İki DAG’ı birbirine bağımlı hale getirmiş oluruz.

Hadi bunu kod üzerinde gösterelim;

check_hadoop.py kodumuza w_t adında bir değişken oluşturduk ve bu değişken; ExternalTaskSensor fonksiyonu ile bir task haline geldi. Bu task parametre olarak task_id, external_dag_id, external_task_id, dag parametrelerini aldı.

  • task_id: w_t adlı değişkenin task adı.
  • external_dag_id: beklemek istediğiniz dag’ın adı.
  • external_dag_id: beklemek istediğiniz dag’ın içindeki beklemek istediğiniz task adı.
  • dag: bulunduğu python kodundaki dag’ın adı.

w_t >> t_c >> t_d

Olacak şekilde w_t adlı task’ı başlangıç task’ı olarak belirledik.

İşlemimiz bu kadar. :)

Artık iki DAG arasında bağımlılık yapısını kurmuş olduk. Her saat başı TO_HADOOP adlı DAG çalışıp tamamlandığında CHECK_HADOOP adlı DAG w_t adlı task’ı complete’e çekip process tasklarını başlatacak.

Buradaki dikkat edilmesi gereken nokta, schedule_interval değerlerinin mantıklı bir yapı içinde kurulması. Ve belirtilen talebe göre ayarlanması. Bizim örneğimizde saatte bir çalıştığı için to_hadoop işimiz, check_hadoop işimizide saatte bir çalışacak şekilde ayarladık.

Sorularınız olursa sormaktan çekinmeyiniz. İyi kodlamalar :)

--

--