Apache Airflow Is Fun For Data Engineer!

Salim Masagus
Tunaiku Tech
Published in
4 min readJan 3, 2020

Apache Airflow implementation in Tunaiku

Pada tulisan pertama saya di medium ini, saya akan bercerita tentang pengalaman saya menggunakan Airflow sebagai Data Engineer di Tunaiku. Sekilas tentang Airflow, Data Engineer Tunaiku menggunakan Airflow terkhusus dikarenakan kebutuhan schedulling task, dan monitoring workflow yang telah dirancang untuk kebutuhan data di Tunaiku, dengan menggunakan Airflow semua proses automasi dan monitoring akan terhandle!!! . untuk lebih jelasnya anda dapat melihat dokumentasinya disini , dan untuk memahami etential tentang Data Engineer, anda dapat membaca blog creator Airflow maxime beauchemin disini

.

To a modern Data Engineer, traditional ETL tools are largely obsolete because logic cannot be expressed using code. As a result, the abstractions needed cannot be expressed intuitively in those tools. Now knowing that the Data Engineer’s role consist largely of defining ETL, and knowing that a completely new set of tools and methodology is needed, one can argue that this forces the discipline to rebuild itself from the ground up. New stack, new tools, a new set of constraints, and in many cases, a new generation of individuals. — Maxime Beauchemin

.

Pada case kali ini saya menggunakan case membuat automate report yang datanya berasal dari BigQuery. Sekilas tentang BigQuery, BigQuery merupakan sebuah analytic data-warehouse yang termasuk ke dalam produk Google Cloud Platform (GCP), untuk lebih lengkapnya anda dapat melihat dokumentasinya disini

Konfigurasi Airflow

berikut konfigurasi saya ketika membuat Airflow :

args = {
'owner': 'muhammad.nursalim',
'depends_on_past': False,
'start_date': datetime.strptime('2019-1-1','%Y-%m-%d'),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'conccurency': 1,
'max_active_runs': 1
}

Get Data From BigQuery

Pada proses pengambilan data dari beberapa table sesuai dengan kebutuhan report, berikut contoh script pengambilan data di BigQuery :

from google.cloud import bigquery
import json,ndjson
with open("dummy_path/dummy_file.json", "r") as read_file:
CONFIG = json.loads(read_file)['dummy_config']
client = bigquery.Client(project=CONFIG['project'])query = '''
select a,b,c,d from '{project}.{dataset}.{table}' where date(_PARTITIONTIME) == '2019-01-01'
'''.format(project=CONFIG['project'],dataset=CONFIG['dataset'],table=CONFIG['table'])
df = client.query(query).to_dataframe()datas = df.to_json(orient='records')
json_data = json.loads(datas)
with open('dummy_path/dummy_{date_nodash}.json'.format(date_nodash='20190101'), 'w') as f:
ndjso.dump(json_data,f)

pada script diatas, saya menuliskan query yang mana menuliskan semua kolom yang ingin saya ambil, dibanding menggunakan ‘*’, kenapa saya memanggil semua kolom ? , anda dapat melihat disini untuk penjelasannya. pada kondisi where date(_PARTITIONTIME) ini merupakan sebuah partition pada table di BigQuery agar menghemat cost dan mempercepat proses query.

setelah penulisan query, hasilnya akan saya simpan kedalam dataFrame, setelah itu akan saya loads ke ndjson (NewDelimitedJson) untuk menyesuaikan spesifikasi yang diperlukan BigQuery dalam Load Data to BigQuery using File.

Load Data to BigQuery

Berikut contoh script mengirimkan data ke BigQuery :

bigquery_client = bigquery.Client('dummy_project')
destination_dataset_ref = bigquery_client.dataset('dummy_dataset')
destination_table_ref = destination_dataset_ref.table('table'+'$'+''20190101)
job_config = bigquery.LoadJobConfig()
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATEjob_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON#using partition by ingestion Timejob_config.time_partitioning = bigquery.TimePartitioning(field=partition_field)with open(path,'rb') as f:
job = bigquery_client.load_table_from_file(f,destination_table_ref,
job_config=job_config)
job.result()

Pada proses ini, saya load data ke sebuah table yang sudah di buat partition menggunakan partition_field, yang mana partition nya menggunakan sebuah kolom, dan pada destinasion_dataset_ref, saya mendefine table diikuti oleh lambang ‘$’ dan tanggal tanpa ‘-’, itu merupakan cara define table menggunakan partition.

.

Dikarenakan tim Data Engineer Tunaiku telah sepakat menggunakan script dari pada operator Airflow untuk memproses data, maka dari itu, saya menggunakan bash operator Airflow untuk menjalankan script saya, bash operator sama seperti bash pada terminal. berikut contohnya :

dummy_execute = "python dummy_path/main.py {{ ds }}"
execute_script = BashOperator(
task_id='dummy_execute_bq',
bash_command=dummy_execute,
dag=dag)

Terdapat ds, itu merupakan macros variable untuk mendapat kan tanggal dengan format YYYY-MM-DD , untuk lebih lengkapnya dapat melihat disini

Result

Dokumentasi Pribadi

Hasilnya, ketika seorang Data Engineer melihat dags nya memperlihatkan warna hijau hijau, harusnya sudah dapat tersenyum karena proses yang dibuatnya telah berhasil! , dan akan automatis sesual dengan setting schedule nya.

Not Important!

No man is so wise that he can afford to wholly ignore the advice of others.” — James Lendall Basford (1845–1915), Sparks from the Philosopher’s Stone, 1882

Dengan masuknya kita ke not important section! berakhir sudah sharing knowledge saya kali ini, saya harap ini adalah awal dari lanjutan tulisan saya di medium, dan ini jauh dari kata sempurna, akan tetapi ingatlah Mark Zuckerberg tidak membuat Facebook dalam percobaan pertama hehehe :D

Saya harap artikel yang saya tulis ini dapat membantu semua orang, terkhusus bidang Data Engineer. Jika Anda memiliki pertanyaan atau diskusi lebih lanjut, Anda dapat menuliskan komentar, atau menghubungi saya melalui email. Terima kasih!

--

--

Salim Masagus
Tunaiku Tech

A man with fast learner, enthusiastic, high achiever, collaborative, breaker the limit, and growth mindset.