Microsoft Azure Pipeline vs Apache Airflow

Özgür Akdeniz
Bentego Teknoloji
Published in
5 min read6 days ago

Microsoft Azure Tanıtımı ve Portal Simgeleri

Microsoft Azure, Microsoft’un geliştirdiği bulut bilişim hizmetleri sunan bir platformdur. Kullanıcılar, veri depolama, sanal makineler, yapay zeka, veritabanları ve daha birçok hizmeti kullanarak ölçeklenebilir ve güvenilir çözümler geliştirebilirler. Bu yazımda Apache Airflow’da geliştirdiğimiz airflow yapılarını Azure servisleri üzerinde nasıl kurgulayabiliriz konusuna değineceğim.

Azure Portal Simgeleri ve İşlevleri:

Önce Azure Portal giriş sayfasını birlikte inceleyelim. Azure Portala giriş yaptığımızda aşağıdaki görsel bizi karşılayacaktır. Bu ekranda bizi karşılayan simgeleri aşağıda kısaca özetlenmiş halini bulabilirsiniz.

1. Dashboard: Özelleştirilebilir bir ana sayfa, hızlı erişim sağlamak için çeşitli kaynaklar ve grafiklerle donatılmıştır.

Edit seçeneğine tıklayarak dashboardumuzun görünümünü kendi isteğimize göre düzenleyebiliriz.

2. All Resources: Tüm Azure kaynaklarına erişim sağlar.

3. Resource Groups: Kaynakları mantıksal gruplar halinde organize eder.

4. Virtual Machines: Sanal makineleri yönetir ve izler.

5. Storage Accounts: Depolama hesaplarına erişim sağlar.

6. Azure SQL Databases: SQL veritabanlarını yönetir.

7. Azure Functions: Sunucusuz işlevler oluşturur ve yönetir.

8. Monitor: Performans izleme ve analiz araçları sunar.

9. Marketplace: Üçüncü taraf uygulamalar ve hizmetler satın alınabilir.

10. Cost Management + Billing: Maliyet yönetimi ve faturalandırma bilgileri.

Azure Pipeline ve Apache Airflow Karşılaştırması

Azure Pipeline Microsoft Azure’un Azure Data Factory (ADF) servisi üzerinden programlanır, aşağıda kısaca bu sistem ile Apache Airflow karşılaştırmasını bulabilirsiniz

1. İş Akışı Tanımı:

- ADF: Data Factory, iş akışlarını tanımlamak için görsel bir arayüz sunar ve JSON tabanlı ARM şablonlarını kullanır.

- Airflow: Python tabanlı DAG (Directed Acyclic Graph) tanımları kullanılır.

2. Zamanlama ve Tetikleme:

- ADF: Zamanlanmış tetikleyiciler veya etkinlik bazlı tetikleyiciler kullanır.

- Airflow: Cron tabanlı zamanlayıcılar ve sensörler kullanır.

3. Hata Yönetimi ve Bildirimler:

- ADF: Hata yönetimi için yerleşik politikalar ve bildirim mekanizmaları sunar.

- Airflow: Özel hata yönetimi ve bildirimler Python operatörleri ile kolayca yapılandırılabilir.

4. Kullanım Kolaylığı:

- ADF: Kod yazmadan görsel olarak iş akışları oluşturulabilir.

- Airflow: Daha fazla kodlama bilgisi gerektirir, esneklik sağlar ancak öğrenme eğrisi daha yüksektir.

5. Entegrasyonlar:

- ADF: Azure ekosistemiyle derin entegrasyon, özellikle Azure SQL Database, Blob Storage, Cosmos DB gibi hizmetlerle.

- Airflow: Geniş yelpazede operatörler ve hook’lar ile birçok üçüncü parti hizmete entegrasyon sağlar.

Örnek Çalışma:

CSV Dosyasından Oracle Database’e Veri Aktarımı

Azure Data Factory Kullanarak:

1. Azure Data Factory Portalına Giriş ve Yeni Data Factory Oluşturma

1.1 Azure Portal’a Giriş:

1.2. Data Factory Oluşturma:

  • Sol menüde bulunan “Create a resource” butonuna tıklayın.
  • Arama çubuğuna “Data Factory” yazın ve arama sonuçlarından “Data Factory” seçeneğine tıklayın.
  • Açılan pencerede “Create” butonuna tıklayın.
  • Resource Group, Data Factory Name, Region gibi gerekli bilgileri girin ve “Review + Create” butonuna tıklayın.
  • Tüm bilgileri kontrol ettikten sonra “Create” butonuna tıklayarak Data Factory’yi oluşturun.

2. Data Factory İçerisinde Pipeline Oluşturma

2.1. Data Factory’yi Açma:

  • Azure Portal ana sayfasına geri dönün ve sol menüde “All Resources” sekmesinden oluşturduğunuz Data Factory’yi bulun ve açın.
  • Açılan sayfada “Author & Monitor” butonuna tıklayarak Data Factory çalışma alanına girin.

2.2. Pipeline Oluşturma:

  • Sol menüde “Author” (kalem simgesi) ikonuna tıklayın.
  • “+” simgesine tıklayarak “Pipeline” seçeneğini seçin.

3. Pipeline Parametresi Oluşturma

3.1. Pipeline Parametreleri:

  • Pipeline’ınızı açın ve üst menüdeki “Parameters” sekmesine tıklayın.
  • Yeni bir parametre oluşturun. Örneğin, parametrenin adı FileDate olsun ve değeri @formatDateTime(utcnow(), ‘yyyyMMdd’) olarak ayarlayın.

4. Get Metadata Aktivitesini Ekleme

4.1. Get Metadata Aktivitesini Ekleme:

  • Sol menüdeki “Activities” panelinden “Move & Transform” sekmesine tıklayın.
  • “Get Metadata” aktivitesini sürükleyerek çalışma alanına bırakın.
  • “Dataset” sekmesinde, Blob Storage’daki klasörü seçin. (Azure Blob Storage CSV dosyaları gibi veri dosyalarını depolamak için idealdir.)
  • “Field list” kısmında “Child Items” öğesini seçin.

4.2. Kaynak ve Hedef Ayarları:

  • Source (Kaynak) Ayarları:
  • Get Metadata aktivitesine tıkladığınızda sağ tarafta açılan panelde “Source” sekmesine geçin.
  • “New” butonuna tıklayarak yeni bir kaynak veri seti oluşturun.
  • “Azure Blob Storage” seçeneğini seçin ve “Continue” butonuna tıklayın.
  • Dosya formatı olarak “DelimitedText (CSV)” seçeneğini seçin ve “Continue” butonuna tıklayın.
  • Dosyanın adını ve yerini belirtin (örn: @concat(pipeline().parameters.FileDate, ‘.csv’)).

5. If Condition Aktivitesini Ekleme

5.1. If Condition Aktivitesini Ekleme:

  • Sol menüdeki “Activities” panelinden “Iteration & Condition” sekmesine tıklayın.
  • “If Condition” aktivitesini sürükleyerek çalışma alanına bırakın.
  • “Expression” kısmında şu ifadeyi yazın:
    @contains(activity(‘Get Metadata’).output.childItems, concat(pipeline().parameters.FileDate, ‘.csv’))

5.2. True ve False Kısımlarını Yapılandırma:

  • True durumunda “Copy Data” aktivitesini ekleyin.
  • False durumunda “Web” aktivitesi ile email gönderimini yapılandırın.

6. Copy Data Aktivitesini Ekleme

6.1. Copy Data Aktivitesini Ekleme:

  • Sol menüdeki “Activities” panelinden “Move & Transform” sekmesine tıklayın.
  • “Copy Data” aktivitesini sürükleyerek If Condition aktivitesinin True kısmına bırakın.

6.2. Kaynak ve Hedef Ayarları:

  • Source (Kaynak) Ayarları:
  • Copy Data aktivitesine tıkladığınızda sağ tarafta açılan panelde “Source” sekmesine geçin.
  • Daha önce oluşturduğunuz Blob Storage veri setini seçin.
  • Sink (Hedef) Ayarları:
  • “Sink” sekmesine geçin.
  • “New” butonuna tıklayarak yeni bir hedef veri seti oluşturun.
  • “Oracle” seçeneğini seçin ve “Continue” butonuna tıklayın.
  • Hedef tablonun adını ve bağlantı bilgilerini girin.

7. Web Aktivitesini Ekleme (Email Bildirimi)

7.1. Web Aktivitesini Ekleme:

  • Sol menüdeki “Activities” panelinden “General” sekmesine tıklayın.
  • “Web” aktivitesini sürükleyerek If Condition aktivitesinin False kısmına bırakın.

7.2. Web Aktivitesi Ayarları:

  • Web aktivitesine tıkladığınızda sağ tarafta açılan panelde URL ve diğer ayarları yapılandırın.
  • Email hizmeti kullanarak email göndermek için bir HTTP POST isteği yapılandırın. Örneğin, Logic Apps veya başka bir email hizmeti kullanabilirsiniz.

8. Pipeline’ı Kaydetme ve Çalıştırma

8.1. Pipeline’ı Kaydetme:

  • Sol üst köşedeki “Save” butonuna tıklayarak pipeline’ınızı kaydedin.

8.2. Pipeline’ı Çalıştırma:

  • Sağ üst köşedeki “Trigger Now” butonuna tıklayarak pipeline’ınızı manuel olarak çalıştırın.

Apache Airflow Kullanarak:

1. DAG Tanımı:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.providers.oracle.hooks.oracle import OracleHook
from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalToAzureDataLakeStorageOperator
from datetime import datetime
import pandas as pd
import os
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 7, 17),
'email_on_failure': False,
'email_on_retry': False,
}
def check_file_exists():
file_path = '/path/to/20240717.csv'
if not os.path.isfile(file_path):
raise FileNotFoundError(f"{file_path} not found")
def load_csv_to_oracle():
file_path = '/path/to/20240717.csv'
df = pd.read_csv(file_path)
oracle_hook = OracleHook(oracle_conn_id='oracle_default')
oracle_hook.bulk_insert_rows('oracle_table', df.values.tolist())
dag = DAG(
'csv_to_oracle',
default_args=default_args,
description='Load data from CSV to Oracle and send email if file is missing',
schedule_interval='@daily',
)
check_file = PythonOperator(
task_id='check_file_exists',
python_callable=check_file_exists,
dag=dag,
)
load_to_oracle = PythonOperator(
task_id='load_csv_to_oracle',
python_callable=load_csv_to_oracle,
dag=dag,
)
send_email = EmailOperator(
task_id='send_email',
to='recipient@example.com',
subject='File Missing',
html_content='The file 20240717.csv is missing.',
dag=dag,
)
check_file >> load_to_oracle
check_file >> send_email

2. Airflow pathine ilgili py dosyasının yüklenmesi ve airflow üzerinden çalıştırılması

Bu blog yazısında, Azure Data Factory ve Apache Airflow ile veri işleme ve aktarma iş akışlarının nasıl oluşturulacağını karşılaştırmalı olarak ele alarak, örnek iş akışı ile her iki platformda da benzer görevleri nasıl gerçekleştirebileceğiniz gösterildi.

Umarım faydalı bir yazı olmuştur. Herhangi bir sorunuz, öneriniz veya sorunuz varsa benimle iletişime geçebilirsiniz. Teşekkürler

--

--