ETL Batch pipeline with Cloud Storage, Cloud Run and BigQuery orchestrated by Airflow/Composer

Mazlum Tosun
Google Cloud - Community
8 min readJul 11, 2023

1. Explanation of the use case presented in this article

This article shows a complete use case with an ETL Batch Pipeline on Google Cloud :

  • Extract -> Cloud Storage
  • Transform -> Cloud Run
  • Load -> BigQuery

Everything is orchestrated by Apache Airflow and Cloud Composer.

The steps of the orchestration part are :

  • The orchestrator run a Cloud Run Service
  • The service reads a New Line Delimited Json file from Cloud Storage in a hot folder (a folder is an object in Cloud Storage)
  • The service applies the needed transformations and business rules
  • The service stores the result to a BigQuery table
  • The orchestrator backups the processed file and moves it to a cold folder

Here you can see the diagram of this use case :

I also created a video on this topic in my GCP Youtube channel, please subscribe to the channel to support my work for the Google Cloud community :

English version

French version

An example of raw data in a Json format :

{
"teamName": "PSG",
"teamScore": 30,
"scorers": [
{
"scorerFirstName": "Kylian",
"scorerLastName": "Mbappe",
"goals": 15,
"goalAssists": 6,
"games": 13
},
{
"scorerFirstName": "Da Silva",
"scorerLastName": "Neymar",
"goals": 11,
"goalAssists": 7,
"games": 12
},
{
"scorerFirstName": "Angel",
"scorerLastName": "Di Maria",
"goals": 7,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Lionel",
"scorerLastName": "Messi",
"goals": 12,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Marco",
"scorerLastName": "Verrati",
"goals": 3,
"goalAssists": 10,
"games": 13
}
]
}

The corresponding computed domain data :

{
"teamName": "PSG",
"teamScore": 30,
"teamTotalGoals": 48,
"teamSlogan": "Paris est magique",
"topScorerStats": {
"firstName": "Kylian",
"lastName": "Mbappe",
"goals": 15,
"games": 13
},
"bestPasserStats": {
"firstName": "Marco",
"lastName": "Verrati",
"goalAssists": 10,
"games": 13
}
}

The goal is to calculate :

  • The total goals per team
  • The top scorer node
  • The best passer node
  • Set the slogan per team

2. Structure of the project

2.1 Environment variables

Set the following environment variables :

#!/bin/bash

# Common
export PROJECT_ID={{your_gcp_project_id}}
export LOCATION={{your_region}}

# Cloud Run
export SERVICE_NAME=teams-stats-service
export DOCKER_FILE_PATH=team_stats_domain_service/Dockerfile
export REPO_NAME=internal-images
export IMAGE_TAG="latest"
export OUTPUT_DATASET="mazlum_test"
export OUTPUT_TABLE="team_stat"
export INPUT_BUCKET="mazlum_dev"
export INPUT_OBJECT="hot/etl/cloud_run/input_teams_stats_raw.json"

# Composer
export DAG_FOLDER=team_league_etl_cloud_run_dag
export COMPOSER_ENVIRONMENT=dev-composer-env
export CONFIG_FOLDER_NAME=config
export ENV=dev

2.2 Python local environment

The Python local environment uses PipEnv as a package manager and to automate the creation of virtual env.

You can check this video from my GCP Youtube channel that shows :

  • How having a Python comfortable local environment with PyEnv, PipEnv, DirEnv and Intellij IDEA and navigate in all the files, classes and methods
  • How to automate the creation of the virtual env for our Python project

2.3 The Cloud Run Service part

The code logic of the Cloud Run Service is the same as used in this article and to have a deep explanation of the code, don’t hesitate to check it :

All the elements concerning the service are put in the team_stats_domain_service folder

FastApi and Uvicorn serve our service and the Dockerfile code contains :

FROM python:3.10-slim

ENV PYTHONUNBUFFERED True

COPY team_stats_domain_service/requirements.txt ./

RUN pip install -r requirements.txt

ENV APP_HOME /app
WORKDIR $APP_HOME
COPY team_stats_domain_service $APP_HOME/team_stats_domain_service

CMD ["uvicorn", "team_stats_domain_service.main:app", "--host", "0.0.0.0", "--port", "8080"]

2.4 The DAG part

The DAG configuration is put in the config folder with a Json file called variables.json :

{
"team_league_etl_cloud_run_dag": {
"dag_folder": "team_league_etl_cloud_run_dag",
"team_stats_source_bucket": "mazlum_dev",
"team_stats_source_object": "hot/etl/cloud_run/input_teams_stats_raw.json",
"team_stats_dest_bucket": "mazlum_dev",
"team_stats_dest_object": "cold/etl/cloud_run/",
"team_stats_service_url": "https://teams-stats-service-xxxxxxxxx.a.run.app",
"team_stats_service_post_endpoint": "/teams/statistics",
"team_stats_service_slogans_request_body": {
"team_slogans": {
"PSG": "Paris est magique",
"Real": "Hala Madrid"
}
}
}
}

The configuration contains :

  • The source bucket and object to read the input file (hot folder)
  • The output bucket and object to move the processed file (cold folder)
  • The Cloud Run Service URL
  • The Cloud Run Service Endpoint
  • The Cloud Run Service Post Request Body

The CI CD part will deploy this Json file as Airflow variables.

You can check this article to have a detailed explanation on Airflow config :

The team_league_etl_cloud_run_dag contains the DAG file and all the logic around.

The config and Json variables are loaded in the settings.py file :

import os
from dataclasses import dataclass
from datetime import timedelta

from airflow.models import Variable
from airflow.utils.dates import days_ago

_variables = Variable.get("team_league_etl_cloud_run_dag", deserialize_json=True)
_current_dag_folder = _variables["dag_folder"]


@dataclass
class Settings:
dag_default_args = {
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
"start_date": days_ago(1)
}
project_id = os.getenv("GCP_PROJECT")

team_stats_service_url = _variables["team_stats_service_url"]
team_stats_service_post_endpoint = _variables["team_stats_service_post_endpoint"]
team_stats_service_slogans_request_body = _variables["team_stats_service_slogans_request_body"]

team_stats_source_bucket = _variables["team_stats_source_bucket"]
team_stats_source_object = _variables["team_stats_source_object"]
team_stats_dest_bucket = _variables["team_stats_dest_bucket"]
team_stats_dest_object = _variables["team_stats_dest_object"]

variables = _variables

The DAG file :

import json

import airflow
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.http.operators.http import SimpleHttpOperator

from team_league_etl_cloud_run_dag.settings import Settings

settings = Settings()

with airflow.DAG(
"team_league_etl_cloud_run_dag",
default_args=settings.dag_default_args,
schedule_interval=None) as dag:
print_token = BashOperator(
task_id="print_token",
bash_command=f'gcloud auth print-identity-token "--audiences={settings.team_stats_service_url}"'
)

token = "{{ task_instance.xcom_pull(task_ids='print_token') }}"

run_cloud_run_service = SimpleHttpOperator(
task_id="run_cloud_run_service",
method="POST",
http_conn_id="team_stats_service",
endpoint=settings.team_stats_service_post_endpoint,
data=json.dumps(settings.team_stats_service_slogans_request_body),
headers={"Authorization": "Bearer " + token},
)

move_file_to_cold = GCSToGCSOperator(
task_id="move_file_to_cold",
source_bucket=settings.team_stats_source_bucket,
source_object=settings.team_stats_source_object,
destination_bucket=settings.team_stats_dest_bucket,
destination_object=settings.team_stats_dest_object,
move_object=False
)

print_token >> run_cloud_run_service >> move_file_to_cold

The with airflow.DAG allows to instantiate a DAG with an ID and a schedule interval. There is no cron in this case and the DAG execution is manual.

There is no a native operator in the official Google Cloud operators to launch a Cloud Run Service, but this service corresponds to a HTTP call and we can use a SimpleHttpOperator.

Our Cloud Run Service is secure and needs authentication to be launched. One possible solution is to print the identity token with a BashOperator for the Default Service Account used by Cloud Composer and pass it in the header of the HTTP request :

print_token = BashOperator(
task_id="print_token",
bash_command=f'gcloud auth print-identity-token "--audiences={settings.team_stats_service_url}"'
)

token = "{{ task_instance.xcom_pull(task_ids='print_token') }}"

run_cloud_run_service = SimpleHttpOperator(
task_id="run_cloud_run_service",
method="POST",
http_conn_id="team_stats_service",
endpoint=settings.team_stats_service_post_endpoint,
data=json.dumps(settings.team_stats_service_slogans_request_body),
headers={"Authorization": "Bearer " + token},
)

The Cloud Run Service corresponds to a HTTP Post request and we pass a request body from our configuration (variables) :

"team_stats_service_slogans_request_body": {
"team_slogans": {
"PSG": "Paris est magique",
"Real": "Hala Madrid"
}
}

We retrieve this node as a Dict in the settings.py file :

team_stats_service_slogans_request_body = _variables["team_stats_service_slogans_request_body"]

We serialize as String while passing it in the operator :

data=json.dumps(settings.team_stats_service_slogans_request_body),

The DAG will wait for the service termination and run an operator to move the processed file from a hot folder to a cold. It’s an usual pattern, when a file is processed, we move and backup it in another place :

move_file_to_cold = GCSToGCSOperator(
task_id="move_file_to_cold",
source_bucket=settings.team_stats_source_bucket,
source_object=settings.team_stats_source_object,
destination_bucket=settings.team_stats_dest_bucket,
destination_object=settings.team_stats_dest_object,
move_object=True
)

The sequencing of operators is done by this syntaxe :

print_token >> run_cloud_run_service >> move_file_to_cold

We could also have used a PythonOperator instead of the Cloud Run Service, to contain the business logic with a Python program, but we prefer to delegate this responsibility outside and let Airflow only have the responsibility of orchestration.

In this case, the data processing and the business logic are used in the Cloud Run Service.

3. The CI CD part

The CI CD part is done with Cloud Build and based on YAML files at the root of the project :

  • deploy-cloud-run-service.yaml -> deploy the Cloud Run Service
  • deploy-airflow-dag.yaml -> deploy the Airflow DAG in Cloud Composer

The deploy-cloud-run-service.yaml file :

steps:
- name: 'gcr.io/cloud-builders/docker'
script: |
docker build -f $DOCKER_FILE_PATH -t $SERVICE_NAME .
docker tag $SERVICE_NAME $LOCATION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/$SERVICE_NAME:$IMAGE_TAG
docker push $LOCATION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/$SERVICE_NAME:$IMAGE_TAG
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'REPO_NAME=$_REPO_NAME'
- 'SERVICE_NAME=$_SERVICE_NAME'
- 'DOCKER_FILE_PATH=$_DOCKER_FILE_PATH'
- 'IMAGE_TAG=$_IMAGE_TAG'
- name: google/cloud-sdk:429.0.0
args: [ './scripts/deploy_cloud_run_service.sh' ]
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'REPO_NAME=$_REPO_NAME'
- 'SERVICE_NAME=$_SERVICE_NAME'
- 'IMAGE_TAG=$_IMAGE_TAG'
- 'OUTPUT_DATASET=$_OUTPUT_DATASET'
- 'OUTPUT_TABLE=$_OUTPUT_TABLE'
- 'INPUT_BUCKET=$_INPUT_BUCKET'
- 'INPUT_OBJECT=$_INPUT_OBJECT'

The command line to execute the Cloud Build job from our local machine (check the README.md file) :

gcloud builds submit \
--project=$PROJECT_ID \
--region=$LOCATION \
--config deploy-cloud-run-service.yaml \
--substitutions _REPO_NAME="$REPO_NAME",_SERVICE_NAME="$SERVICE_NAME",_DOCKER_FILE_PATH="$DOCKER_FILE_PATH",_IMAGE_TAG="$IMAGE_TAG",_OUTPUT_DATASET="$OUTPUT_DATASET",_OUTPUT_TABLE="$OUTPUT_TABLE",_INPUT_BUCKET="$INPUT_BUCKET",_INPUT_OBJECT="$INPUT_OBJECT" \
--verbosity="debug" .

The logic is explained in this article :

The deploy-airflow-dag.yaml file :

steps:
- name: google/cloud-sdk:429.0.0
entrypoint: 'bash'
args:
- '-c'
- |
./scripts/deploy_dag_setup.sh \
&& ./scripts/deploy_dag_config.sh \
&& ./scripts/deploy_dag_folder.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'DAG_FOLDER=$_DAG_FOLDER'
- 'COMPOSER_ENVIRONMENT=$_COMPOSER_ENVIRONMENT'
- 'CONFIG_FOLDER_NAME=$_CONFIG_FOLDER_NAME'
- 'ENV=$_ENV'

This YAML file executes different Bach scripts :

  • Deploy a setup.py file in Composer
  • Deploy the DAG config variables.json in Composer
  • Deploy the DAG folder in Composer

The command line to execute the Cloud Build job from our local machine (check the README.md file) :

gcloud builds submit \
--project=$PROJECT_ID \
--region=$LOCATION \
--config deploy-airflow-dag.yaml \
--substitutions _DAG_FOLDER="$DAG_FOLDER",_COMPOSER_ENVIRONMENT="$COMPOSER_ENVIRONMENT",_CONFIG_FOLDER_NAME="$CONFIG_FOLDER_NAME",_ENV="$ENV" \
--verbosity="debug" .

To have a deep explanation of the CI CD and deployment part of the DAG, you can check this article :

4. Execution of the Airflow DAG from Cloud Composer

Run the DAG from the Airflow Webserver :

We can then access to the DAG graph :

Click on the run_cloud_run_service task and then on the rendered button :

The rendered menu contains the info for the operator that called the service :

Conclusion

This article showed a complete use case with an ETL Batch Pipeline orchestrated by Apache Airflow and Cloud Composer.

This pattern is often used for batch pipeline and Data projects, most of the time we need having a pipeline orchestrator to manage the executions and the sequencing of tasks.

Then the Data Teams need to choose the best tool depending on the use case for the Transform part.

The best approach is to delegate this responsibility outside of the orchestrator for the data processing pipeline and let the orchestrator only manage the sequencing of tasks.

This use case uses a Python Cloud Run Service, but we could have used other tools as well, depending on the data volume and the processing time.

Cloud Run presents some advantages :

  • Flexibility with Docker containers
  • Serverless
  • Can scale down to 0
  • You pay only as you consume

Cloud Run can be less adapted for pipelines with very high volume, long running jobs and distributed data processing. Other solutions like Cloud Dataflow may be more suitable in this case.

In next articles, we will use the same use case but different tools :

  • The Transform part with a Cloud Function, if developers want to be only focused on the code and the language, and don’t want to deal with Docker containers.
  • The Transform part with Cloud Dataflow, if there is a high data volume and the need to have more power for data processing (distributed processing across VMs).

All the code shared on this article is accessible from my Github repository :

If you like my articles, videos and want to see my posts, follow me on :

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Head of Data & Cloud GroupBees | Data | Serverless | IAC | Devops | FP