ETL Batch pipeline with Cloud Storage, Dataflow and BigQuery orchestrated by Airflow/Composer

Mazlum Tosun
Google Cloud - Community
10 min readJul 22, 2023

1. Explanation of the use case presented in this article

I written an article previously with the same use case presented here but with a Cloud Run Service instead of a Dataflow job :

Sometimes, if the data processing part concerns a large volume, a tool like Dataflow can be more adapted, because it’s more powerful to treat a big amount of data.

Dataflow is the runner for Apache Beam model, proposed in Google Cloud. Beam unfies batch and streaming with the same code base.

Dataflow is serverless, fully managed in Google Cloud and presents some advantages :

  • It contains many supports for observability
  • Proposes an horizontal autoscaling and create new Compute Engine VMs on the fly if needed
  • A powerful engine called Dataflow Prime can be used and proposes a vertical autoscaling in a VM (memory…) and other functionalities

The ETL Batch Pipeline in this case is represented by the following services :

  • Extract -> Cloud Storage
  • Transform -> Dataflow
  • Load -> BigQuery

Everything is orchestrated by Apache Airflow and Cloud Composer.

The steps of the orchestration part are :

  • The orchestrator run a Dataflow Job written in Python via a Flex Template
  • The job reads a Newline Delimited Json file from Cloud Storage in a hot folder (a folder is an object in Cloud Storage)
  • The job applies the needed transformations and business rules
  • The job stores the result to a BigQuery table
  • The orchestrator backups the processed file and moves it to a cold folder

Here you can see the diagram of this use case :

I also created a video on this topic in my GCP Youtube channel, please subscribe to the channel to support my work for the Google Cloud community :

English version

French version

An example of raw data in a Json format :

{
"teamName": "PSG",
"teamScore": 30,
"scorers": [
{
"scorerFirstName": "Kylian",
"scorerLastName": "Mbappe",
"goals": 15,
"goalAssists": 6,
"games": 13
},
{
"scorerFirstName": "Da Silva",
"scorerLastName": "Neymar",
"goals": 11,
"goalAssists": 7,
"games": 12
},
{
"scorerFirstName": "Angel",
"scorerLastName": "Di Maria",
"goals": 7,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Lionel",
"scorerLastName": "Messi",
"goals": 12,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Marco",
"scorerLastName": "Verrati",
"goals": 3,
"goalAssists": 10,
"games": 13
}
]
}

The corresponding computed domain data :

{
"teamName": "PSG",
"teamScore": 30,
"teamTotalGoals": 48,
"teamSlogan": "Paris est magique",
"topScorerStats": {
"firstName": "Kylian",
"lastName": "Mbappe",
"goals": 15,
"games": 13
},
"bestPasserStats": {
"firstName": "Marco",
"lastName": "Verrati",
"goalAssists": 10,
"games": 13
}
}

The goal is to calculate :

  • The total goals per team
  • The top scorer node
  • The best passer node
  • Set the slogan per team

2. Structure of the project

2.1 Environment variables

Set the following environment variables :

# Common
export PROJECT_ID={{project_id}}
export LOCATION={{region}}

# Dataflow (deployment Flex Template)
export REPO_NAME=internal-images
export IMAGE_NAME="dataflow/team-league-elt-dataflow-python"
export IMAGE_TAG=latest
export METADATA_FILE="config/dataflow_template_metadata.json"
export METADATA_TEMPLATE_FILE_PATH="gs://mazlum_dev/dataflow/templates/team_league/python/team-league-elt-dataflow-python.json"
export SDK_LANGUAGE=PYTHON

# Composer (deployment DAG)
export DAG_FOLDER=team_league_etl_dataflow_dag
export COMPOSER_ENVIRONMENT=dev-composer-env
export CONFIG_FOLDER_NAME=config
export ENV=dev

2.2 Python local environment

The Python local environment uses PipEnv as a package manager and to automate the creation of virtual env.

You can check this video from my GCP Youtube channel that shows :

  • How having a Python comfortable local environment with PyEnv, PipEnv, DirEnv and Intellij IDEA and navigate in all the files, classes and methods
  • How to automate the creation of the virtual env for our Python project

2.3 The Dataflow Job part

All the elements concerning the Dataflow job are put in the team_league_dataflow_job folder (root of the Python module).

I created a video to explain in detail the code logic for the job :

English version

French version

There are 2 folders :

  • domain : contains all the business rules, typed objects (dataclasses) and transformations
  • application : for the simplicity all the Beam pipeline and composition of transformations are put in the team_league_app.py file
import json
import logging
from typing import Dict

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions

from team_league_dataflow_job.application.team_league_options import TeamLeagueOptions
from team_league_dataflow_job.application.team_stats_mapper import deserialize, to_team_stats_bq
from team_league_dataflow_job.domain.team_stats import TeamStats


def to_dict(team_stats_raw_as_str: str) -> Dict:
return json.loads(team_stats_raw_as_str)


def main() -> None:
logging.getLogger().setLevel(logging.INFO)

team_league_options = PipelineOptions().view_as(TeamLeagueOptions)
pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read Json file' >> ReadFromText(team_league_options.input_json_file)
| 'Map str message to Dict' >> beam.Map(to_dict)
| 'Deserialize to domain dataclass' >> beam.Map(deserialize)
| 'Validate raw fields' >> beam.Map(lambda t_raw: t_raw.validate_fields())
| 'Compute team stats' >> beam.Map(TeamStats.compute_team_stats)
| 'Add slogan to team stats' >> beam.Map(lambda t_stats: t_stats.add_slogan_to_stats())
| 'Map to team stats bq dicts' >>
beam.Map(to_team_stats_bq)
| 'Write team stats to BQ' >> beam.io.WriteToBigQuery(
project=team_league_options.project_id,
dataset=team_league_options.team_league_dataset,
table=team_league_options.team_stats_table,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))


if __name__ == "__main__":
main()

The pipeline steps are :

  • Read the team stats raw data as string from Newline Delimited Json file in Cloud Storage (input connector)
  • Deserialize each String line to a TeamStatRaw typed object
  • Validate the input fields of the TeamStatRaw object
  • Transform the TeamStatRaw to TeamStat domain data, while applying the needed business rules
  • Assign the slogan to the current team in the TeamStat object
  • Serialize the TeamStat object to a Dict, because we need having a PCollection of Dict to store the result to BigQuery
  • Write the result to the output BigQuery table (output connector)

2.4 The DAG part

The DAG configuration is put in the config folder with a Json file called variables.json :

{
"team_league_etl_dataflow_dag": {
"dag_folder": "team_league_etl_dataflow_dag",
"team_stats_source_bucket": "mazlum_dev",
"team_stats_source_object": "hot/etl/dataflow/input_teams_stats_raw.json",
"team_stats_dest_bucket": "mazlum_dev",
"team_stats_dest_object": "cold/etl/dataflow/",
"team_stats_job_config": {
"launchParameter": {
"containerSpecGcsPath": "gs://mazlum_dev/dataflow/templates/team_league/python/team-league-elt-dataflow-python.json",
"jobName": "team-league-dataflow-etl-python",
"parameters": {
"project_id": "gb-poc-373711",
"input_json_file": "gs://mazlum_dev/hot/etl/dataflow/input_teams_stats_raw.json",
"team_league_dataset": "mazlum_test",
"team_stats_table": "team_stat"
},
"environment": {
"serviceAccountEmail": "sa-dataflow-dev@gb-poc-373711.iam.gserviceaccount.com",
"tempLocation": "gs://mazlum_dev/dataflow/temp",
"stagingLocation": "gs://mazlum_dev/dataflow/staging"
}
}
},
"team_stats_job_location": "europe-west1"
}
}

The configuration contains :

  • The source bucket and object to read the input file (hot folder)
  • The output bucket and object to move the processed file (cold folder)
  • The Dataflow job config : team_stats_job_config
  • The Dataflow job location : team_stats_job_location

The Dataflow job config passed in the DataflowStartFlexTemplate operator, follows the standard convention used by the Flex Template REST API.

It’s worth noting that the specifics job parameters are put in the parameters node.

The configuration that are common between all the Dataflow jobs like serviceAccountEmail or stagingLocation are put in the environment node

The CI CD part will deploy this Json file as Airflow variables.

You can check this article to have a detailed explanation on Airflow config :

The team_league_etl_dataflow_dag contains the DAG file and all the logic around.

The config and Json variables are loaded in the settings.py file :

import os
from dataclasses import dataclass
from datetime import timedelta

from airflow.models import Variable
from airflow.utils.dates import days_ago

_variables = Variable.get("team_league_etl_dataflow_dag", deserialize_json=True)
_current_dag_folder = _variables["dag_folder"]


@dataclass
class Settings:
dag_default_args = {
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
"start_date": days_ago(1)
}
project_id = os.getenv("GCP_PROJECT")

team_stats_job_config = _variables["team_stats_job_config"]
team_stats_job_location = _variables["team_stats_job_location"]

team_stats_source_bucket = _variables["team_stats_source_bucket"]
team_stats_source_object = _variables["team_stats_source_object"]
team_stats_dest_bucket = _variables["team_stats_dest_bucket"]
team_stats_dest_object = _variables["team_stats_dest_object"]

variables = _variables

The DAG file :

import airflow
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator

from team_league_etl_dataflow_dag.settings import Settings

settings = Settings()

with airflow.DAG(
"team_league_etl_dataflow_dag",
default_args=settings.dag_default_args,
schedule_interval=None) as dag:
start_dataflow_flex_template = DataflowStartFlexTemplateOperator(
task_id="start_dataflow_flex_template",
project_id=settings.project_id,
body=settings.team_stats_job_config,
location=settings.team_stats_job_location
)

move_file_to_cold = GCSToGCSOperator(
task_id="move_file_to_cold",
source_bucket=settings.team_stats_source_bucket,
source_object=settings.team_stats_source_object,
destination_bucket=settings.team_stats_dest_bucket,
destination_object=settings.team_stats_dest_object,
move_object=False
)

start_dataflow_flex_template >> move_file_to_cold

The with airflow.DAG allows to instantiate a DAG with an ID and a schedule interval. There is no cron in this case and the DAG execution is manual.

The DataflowStartFlexTemplateOperator allows to start the Flex Template and the Dataflow job.

Flex Template is based on a Docker image and a Json spec file. The Docker image offers flexibility, a standardization and the same way to deploy and start a Dataflow job, whatever the language and sdk.

To have more details on Dataflow Flex Template, you can check this article :

We pass some elements to this operator, from the Airflow configuration :

  • The current GCP project ID
  • The Flex Template parameters
  • The Flex Template location

The DAG will wait for the job termination and run an operator to move the processed file from a hot folder to a cold. It’s an usual pattern, when a file is processed, we move and backup it in another place :

move_file_to_cold = GCSToGCSOperator(
task_id="move_file_to_cold",
source_bucket=settings.team_stats_source_bucket,
source_object=settings.team_stats_source_object,
destination_bucket=settings.team_stats_dest_bucket,
destination_object=settings.team_stats_dest_object,
move_object=True
)

The sequencing of operators is done by this syntaxe :

start_dataflow_flex_template >> move_file_to_cold

We could also have used a PythonOperator instead of the Dataflow job, to contain the business logic with a Python program, but we prefer to delegate this responsibility outside and let Airflow only have the responsibility of orchestration.

In this case, the data processing and the business logic are used in the Dataflow job.

3. The CI CD part

The CI CD part is done with Cloud Build and based on YAML files at the root of the project :

  • deploy-dataflow-flex-template.yaml -> deploy the Dataflow Flex Template
  • deploy-airflow-dag.yaml -> deploy the Airflow DAG in Cloud Composer

The deploy-dataflow-flex-template.yaml file :

steps:
- name: google/cloud-sdk:420.0.0-slim
entrypoint: 'bash'
args:
- '-c'
- |
./scripts/build_image_with_dockerfile.sh \
&& ./scripts/create_flex_template_spec_file_gcs.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'REPO_NAME=$_REPO_NAME'
- 'IMAGE_NAME=$_IMAGE_NAME'
- 'IMAGE_TAG=$_IMAGE_TAG'
- 'METADATA_TEMPLATE_FILE_PATH=$_METADATA_TEMPLATE_FILE_PATH'
- 'SDK_LANGUAGE=$_SDK_LANGUAGE'
- 'METADATA_FILE=$_METADATA_FILE'

The command line to execute the Cloud Build job from our local machine (check the README.md file) :

gcloud builds submit \
--project=$PROJECT_ID \
--region=$LOCATION \
--config deploy-dataflow-flex-template.yaml \
--substitutions _REPO_NAME="$REPO_NAME",_IMAGE_NAME="$IMAGE_NAME",_IMAGE_TAG="$IMAGE_TAG",_METADATA_TEMPLATE_FILE_PATH="$METADATA_TEMPLATE_FILE_PATH",_SDK_LANGUAGE="$SDK_LANGUAGE",_METADATA_FILE="$METADATA_FILE" \
--verbosity="debug" .

The Dockerfile :

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY . ${WORKDIR}/

# The setup.py file need to be copied at the same level of Dataflow root folder team_league_dataflow_job.
COPY team_league_dataflow_job/setup.py ${WORKDIR}/setup.py

RUN ls -R ${WORKDIR}

ENV PYTHONPATH ${WORKDIR}

ENV REQUIREMENTS_FILE="${WORKDIR}/team_league_dataflow_job/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/team_league_dataflow_job/application/team_league_app.py"

RUN apt-get update \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $REQUIREMENTS_FILE

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True

ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]

Some explanations about the Docker image :

An important detail : to launch a Beam and Dataflow job as a Python module without issue, the runner needs having a setup.py file at the same level as the Python root folder :

team_league_dataflow_job
setup.py

For a better readability, we initially set the setup.py file inside the root folder :

team_league_dataflow_job
-----setup.py

That’s why we used a command in the Dockerfile to copy the setup.py file from team_league_dataflow_job/setup.py to the root directory in the container :

# The setup.py file need to be copied at the same level of Dataflow root folder team_league_dataflow_job.
COPY team_league_dataflow_job/setup.py ${WORKDIR}/setup.py

For the rest of the logic, all the needed Python packages are installed in the container :

RUN apt-get update \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $REQUIREMENTS_FILE

Two required environment variable are set for the Python Flex Template, like the path of the setup.py file and the entrypoint of the Beam application team_league_app.py :

ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/team_league_dataflow_job/application/team_league_app.py"

To have a more detailed explanation of the deployment of Flex Template, you can check this article :

The deploy-airflow-dag.yaml file :

steps:
- name: google/cloud-sdk:429.0.0
entrypoint: 'bash'
args:
- '-c'
- |
./scripts/deploy_dag_setup.sh \
&& ./scripts/deploy_dag_config.sh \
&& ./scripts/deploy_dag_folder.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'DAG_FOLDER=$_DAG_FOLDER'
- 'COMPOSER_ENVIRONMENT=$_COMPOSER_ENVIRONMENT'
- 'CONFIG_FOLDER_NAME=$_CONFIG_FOLDER_NAME'
- 'ENV=$_ENV'

This YAML file executes different Bach scripts :

  • Deploy a setup.py file in Composer
  • Deploy the DAG config variables.json in Composer
  • Deploy the DAG folder in Composer

The command line to execute the Cloud Build job from our local machine (check the README.md file) :

gcloud builds submit \
--project=$PROJECT_ID \
--region=$LOCATION \
--config deploy-airflow-dag.yaml \
--substitutions _DAG_FOLDER="$DAG_FOLDER",_COMPOSER_ENVIRONMENT="$COMPOSER_ENVIRONMENT",_CONFIG_FOLDER_NAME="$CONFIG_FOLDER_NAME",_ENV="$ENV" \
--verbosity="debug" .

To have a deep explanation of the CI CD and deployment part of the DAG, you can check this article :

4. Execution of the Airflow DAG from Cloud Composer

Run the DAG from the Airflow Webserver :

We can then access to the DAG graph :

Click on the start_dataflow_flex_template task and then on the rendered button :

The rendered menu contains the info for the operator that start the Dataflow Flex Template :

Conclusion

Previously, we presented this pattern ETL with a Cloud Run Service for the transform part.

For high volume and big amounts of data, Dataflow is powerful and more adapted for the data processing part. Dataflow is based on the Beam open source model that proposes very stable SDKs and a complete documentation.

Flex Template brings a standardization to deploy and start Dataflow jobs and Airflow proposes a native operator to start it.

There is a learning curve for Beam but behind this offers a really complete tool to work with Big Data use cases.

Dataflow keep, at minimum, one Compute Engine VM (not scale to 0), but the cost remains cheap for batch pipelines.

The developers have to be careful and check the metrics used by the job from the Dataflow DAG and UI.

There is also a cost calculator for jobs.

All the code shared on this article is accessible from my Github repository :

If you like my articles, videos and want to see my posts, follow me on :

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Head of Data & Cloud GroupBees | Data | Serverless | IAC | Devops | FP