ETL Batch Data Pipeline Menggunakan Apache Airflow, Docker, Google Cloud Storage dan BigQuery

Muhammad Saipul Rohman
Google Cloud Indonesia
9 min readAug 24, 2023

Di era Data is New Oil sekarang pada kebutuhan untuk mengolah data sehingga bisa menjadi insight sangat diperlukan di semua perusahaan untuk mengambil keputusan yang tepat agar bisa bersaing dengan para kompetitor. Untuk pengolahan data di dunia data engineering sendiri terdiri dari 3 jenis yaitu EL, ELT dan ETL. Yang sudah lama digunakan untuk pengolahan data yaitu ETL. Untuk menjalankan pengolahan data atau bisa disebut data pipeline itu sendiri terdiri dari 3 jenis yaitu Batch, Streaming/Real Time dan Hybrid.

Di artikel ini saya akan membahas bagaimana membuat ETL Batch Data Pipeline menggunakan Apache Airflow di docker, Google Cloud Storage dan BigQuery.

Untuk servis Google Cloud yang digunakan pada artikel ini yaitu :

  • Google BigQuery untuk penyimpanan data setelah di lakukan proses ETL
  • Google Cloud Storage untuk menyimpan file parquet dari local ke Google Cloud
  • IAM Service Accounts untuk memberikan akses agar apache airflow bisa mengakses BigQuery dan cloud storage
  • VPC Firewalls untuk membuka port 8080 web server apache airflow

Data Architecture

Untuk Data Architecture dari artikel ini yaitu :

Data Architecture Batch Data Pipeline Apache Airflow, GCS dan BigQuery

Prerequisite:

  • Sudah membuat IAM service account dengan role at least BigQuery Admin, BigQuery Job User dan Storage Admin dan download service account key nya ke local komputer
  • Sudah setup firewall di VPC dengan membuka port 8080 untuk web server airflow dan port 5432 untuk port postgresql database
  • Sudah membuat dataset di BigQuery untuk menyimpan data hasil ETL ke BigQuery
  • Sudah membuat bucket di Cloud Storage untuk menyimpan raw data dan data hasil transformasi
  • Sudah terinstall docker dan docker compose di local komputer

Setup Apache Airflow di Docker

Untuk Apache Airflow di artikel ini menggunakan docker compose default dari apache airflow dengan tambahan Dockerfile yang berisi instalasi Google Cloud SDK dan config yang lain. Untuk isi Dockerfile nya yaitu:

FROM apache/airflow:2.6.3

ENV AIRFLOW_HOME=/opt/airflow

USER root
RUN apt-get update -qq && apt-get install vim -qqq

SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"]

ARG CLOUD_SDK_VERSION=322.0.0
ENV GCLOUD_HOME=/home/google-cloud-sdk

ENV PATH="${GCLOUD_HOME}/bin/:${PATH}"

RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz" \
&& TMP_DIR="$(mktemp -d)" \
&& curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/google-cloud-sdk.tar.gz" \
&& mkdir -p "${GCLOUD_HOME}" \
&& tar xzf "${TMP_DIR}/google-cloud-sdk.tar.gz" -C "${GCLOUD_HOME}" --strip-components=1 \
&& "${GCLOUD_HOME}/install.sh" \
--bash-completion=false \
--path-update=false \
--usage-reporting=false \
--quiet \
&& rm -rf "${TMP_DIR}" \
&& gcloud --version

WORKDIR $AIRFLOW_HOME

COPY scripts scripts
COPY dataset dataset

RUN chmod +x scripts

USER $AIRFLOW_UID

Untuk isi docker compose nya yaitu :

---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
build:
context: .
dockerfile: ./Dockerfile
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
GOOGLE_APPLICATION_CREDENTIALS: /.google/credentials/serious-music-345014-7adc51c1a770.json
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: 'google-cloud-platform://?extra__google_cloud_platform__key_path=/.google/credentials/serious-music-345014-7adc51c1a770.json'

# TODO: Please change GCP_PROJECT_ID & GCP_GCS_BUCKET, as per your config
GCP_PROJECT_ID: 'serious-music-345014'
GCP_GCS_BUCKET: 'demo-de-class'

volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./credentials/serious-music-345014-7adc51c1a770.json:/.google/credentials/serious-music-345014-7adc51c1a770.json
- ./dataset/raw:/opt/airflow/dataset/raw
- ./dataset/transform:/opt/airflow/dataset/transform

user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always

redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always

airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0"
volumes:
- .:/sources

airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow

flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

volumes:
postgres-db-volume:

Untuk isi docker compose ada yang perlu yaitu di komentar #TO DO di sesuaikan dengan GCP project ID dan GCP GCS Bucket masing — masing. Dan juga untuk folder credentials yang berisi service account key json file disesuaikan folder nya. Jangan lupa untuk membuat folder dataset/raw dan dataset/transform serta folder scripts dan di dalam folder scripts buat file entrypoint.sh di satu folder dengan Dockerfile dan docker compose.

Untuk isi entrypoint.sh yaitu :

#!/usr/bin/env bash
export GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS}
export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT=${AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT}

airflow db upgrade

airflow users create -r Admin -u admin -p admin -e admin@example.com -f admin -l airflow
# "$_AIRFLOW_WWW_USER_USERNAME" -p "$_AIRFLOW_WWW_USER_PASSWORD"

airflow webserver

Untuk struktur folder-nya yaitu :

Struktur folder Apache Airflow

Untuk file gambar data architecture, explore_data.ipynb bisa di abaikan saja karena tidak mempengaruhi kerja Apache Airflow. Untuk folder dags, logs dan plugins akan terbentuk sendiri setelah menjalankan file docker-compose.yml.

Data Pipeline di Apache Airflow

Untuk flow data pipeline di Apache Airflow pada artikel ini ada 2 yaitu flow data pipeline menggunakan task group dan flow data pipeline menggunakan task group yang sudah didetailkan

Flow data pipeline menggunakan task group
Flow data pipeline menggunakan task group yang sudah didetailkan

Source code dags airflow yang ada di artikel ini menggunakan feature di airflow yaitu TaskGroup. Apa itu TaskGroup? Secara simple TaskGroup adalah membuat group untuk mengelompokan beberapa task untuk mempermudah dalam pembuatan workflow di data pipeline. Untuk detailnya bisa refer ke artikel ini

Untuk Data Pipeline di Apache Airflow ada 3 bagian yaitu:

  • Raw_data : TaskGroup yang berisi 4 task yaitu:

— Download_dataset_task: task yang bertujuan untuk mendownload data berupa file parquet dari website NYC Taxi TLC di sini

— Upload_zone_map : Untuk data zone map sudah di download secara manual sehingga task ini bertujuan untuk upload data berupa csv file ini ke GCS bucket di folder raw

— Upload_yellow_taxi_trip : task ini bertujuan untuk upload data hasil dari task download_dataset_task ke GCS bucket di folder raw

— Load_zone_map_to_bigquery : task ini bertujuan untuk load data zone map ke BigQuery

source code dags TaskGroup raw_data

· Transform_data : TaskGroup yang berisi 2 task yaitu :

— Transform_raw_data : task ini bertujuan untuk memfilter data dimana trip_distance tidak boleh memiliki nilai 0

— Upload_transform_data_to_gcs : task ini bertujuan untuk upload data hasil transformasi ke GCS bucket di folder transforms

source code fungsi transform_data
source code dags TaskGroup transform_data

· Load_transform_data_to_bigquery

Task ini bertujuan untuk load hasil transform data ke BigQuery

source code task load_transform_data_to_bigquery

Untuk full source code nya bisa di lihat di https://github.com/saipulrx/ETL_Airflow_BigQuery/blob/main/airflow_bigquery/dags/etl_gcs_to_bq.py

Setelah selesai membuat python dags script jangan lupa di simpan di folder dags agar bisa tampil di web server apache airflow

Run Apache Airflow di docker

Untuk menjalankan apache airflow di docker dengan langkah — langkah sebagai berikut :

  • Build docker image terlebih dahulu(hanya pertama kali) menggunakan perintah
sudo docker compose build
  • Inisialisasi airflow scheduler, database dan config yang lain
docker compose up airflow-init
  • Menjalankan semua services yang ada di docker container
docker compose up
  • Cek status docker
docker ps
Status docker container
  • Buka browser favorit kalian lalu ketik localhost:8080 untuk membuka web server apache airflow. Jika di minta login maka ketik username airflow dan password airflow
Web Server Apache Airflow

Hasil ETL batch data Pipeline

Di GCS Bucket sudah otomatis dibuat 2 folder raw dan transform setelah menjalankan data pipeline di airflow

GCS Bucket

Di GCS bucket folder raw sudah otomatis ada file hasil dari task download dari web data source di airflow

GCS bucket folder raw

Di GCS bucket folder transform sudah ada file parquet hasil dari task transform data di airflow

GCS bucket folder transform

Di BigQuery sudah ada 2 table yaitu green_trip_data dan taxi_zone_lookup

BigQuery table green_trip_data
BigQuery table taxi_zone_lookup

What’s Next ?

  • Bisa di tambahkan dashboard untuk menampilkan data green_trip_data
  • Jika ingin lebih mudah di-maintan bisa menggunakan Google Cloud Managed Service Apache Airflow yaitu Google Cloud Composer
  • Bisa di tambahkan CI/CD agar sebelum di push ke git branch master, bisa di review terlebih dahulu oleh Data Engineer member yang lain

Referensi :

--

--

Muhammad Saipul Rohman
Google Cloud Indonesia

Data Engineering, Data Science and Cloud Computing Enthusiast.