ETL Batch pipeline with Cloud Storage, Dataflow and BigQuery orchestrated by Airflow/Composer
1. Explanation of the use case presented in this article
I written an article previously with the same use case presented here but with a Cloud Run Service instead of a Dataflow job :
Sometimes, if the data processing part concerns a large volume, a tool like Dataflow can be more adapted, because it’s more powerful to treat a big amount of data.
Dataflow is the runner for Apache Beam model, proposed in Google Cloud. Beam unfies batch and streaming with the same code base.
Dataflow is serverless, fully managed in Google Cloud and presents some advantages :
- It contains many supports for observability
- Proposes an horizontal autoscaling and create new Compute Engine VMs on the fly if needed
- A powerful engine called Dataflow Prime can be used and proposes a vertical autoscaling in a VM (memory…) and other functionalities
The ETL Batch Pipeline in this case is represented by the following services :
- Extract -> Cloud Storage
- Transform -> Dataflow
- Load -> BigQuery
Everything is orchestrated by Apache Airflow and Cloud Composer.
The steps of the orchestration part are :
- The orchestrator run a Dataflow Job written in Python via a Flex Template
- The job reads a Newline Delimited Json file from Cloud Storage in a hot folder (a folder is an object in Cloud Storage)
- The job applies the needed transformations and business rules
- The job stores the result to a BigQuery table
- The orchestrator backups the processed file and moves it to a cold folder
Here you can see the diagram of this use case :
I also created a video on this topic in my GCP Youtube channel, please subscribe to the channel to support my work for the Google Cloud community :
English version
French version
An example of raw data in a Json
format :
{
"teamName": "PSG",
"teamScore": 30,
"scorers": [
{
"scorerFirstName": "Kylian",
"scorerLastName": "Mbappe",
"goals": 15,
"goalAssists": 6,
"games": 13
},
{
"scorerFirstName": "Da Silva",
"scorerLastName": "Neymar",
"goals": 11,
"goalAssists": 7,
"games": 12
},
{
"scorerFirstName": "Angel",
"scorerLastName": "Di Maria",
"goals": 7,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Lionel",
"scorerLastName": "Messi",
"goals": 12,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Marco",
"scorerLastName": "Verrati",
"goals": 3,
"goalAssists": 10,
"games": 13
}
]
}
The corresponding computed domain data :
{
"teamName": "PSG",
"teamScore": 30,
"teamTotalGoals": 48,
"teamSlogan": "Paris est magique",
"topScorerStats": {
"firstName": "Kylian",
"lastName": "Mbappe",
"goals": 15,
"games": 13
},
"bestPasserStats": {
"firstName": "Marco",
"lastName": "Verrati",
"goalAssists": 10,
"games": 13
}
}
The goal is to calculate :
- The total goals per team
- The top scorer node
- The best passer node
- Set the slogan per team
2. Structure of the project
2.1 Environment variables
Set the following environment variables :
# Common
export PROJECT_ID={{project_id}}
export LOCATION={{region}}
# Dataflow (deployment Flex Template)
export REPO_NAME=internal-images
export IMAGE_NAME="dataflow/team-league-elt-dataflow-python"
export IMAGE_TAG=latest
export METADATA_FILE="config/dataflow_template_metadata.json"
export METADATA_TEMPLATE_FILE_PATH="gs://mazlum_dev/dataflow/templates/team_league/python/team-league-elt-dataflow-python.json"
export SDK_LANGUAGE=PYTHON
# Composer (deployment DAG)
export DAG_FOLDER=team_league_etl_dataflow_dag
export COMPOSER_ENVIRONMENT=dev-composer-env
export CONFIG_FOLDER_NAME=config
export ENV=dev
2.2 Python local environment
The Python
local environment uses PipEnv as a package manager and to automate the creation of virtual env.
You can check this video from my GCP Youtube channel that shows :
- How having a
Python
comfortable local environment with PyEnv, PipEnv, DirEnv and Intellij IDEA and navigate in all the files, classes and methods - How to automate the creation of the virtual env for our Python project
2.3 The Dataflow Job part
All the elements concerning the Dataflow job are put in the team_league_dataflow_job
folder (root of the Python module).
I created a video to explain in detail the code logic for the job :
English version
French version
There are 2 folders :
- domain : contains all the business rules, typed objects (dataclasses) and transformations
- application : for the simplicity all the Beam pipeline and composition of transformations are put in the
team_league_app.py
file
import json
import logging
from typing import Dict
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from team_league_dataflow_job.application.team_league_options import TeamLeagueOptions
from team_league_dataflow_job.application.team_stats_mapper import deserialize, to_team_stats_bq
from team_league_dataflow_job.domain.team_stats import TeamStats
def to_dict(team_stats_raw_as_str: str) -> Dict:
return json.loads(team_stats_raw_as_str)
def main() -> None:
logging.getLogger().setLevel(logging.INFO)
team_league_options = PipelineOptions().view_as(TeamLeagueOptions)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read Json file' >> ReadFromText(team_league_options.input_json_file)
| 'Map str message to Dict' >> beam.Map(to_dict)
| 'Deserialize to domain dataclass' >> beam.Map(deserialize)
| 'Validate raw fields' >> beam.Map(lambda t_raw: t_raw.validate_fields())
| 'Compute team stats' >> beam.Map(TeamStats.compute_team_stats)
| 'Add slogan to team stats' >> beam.Map(lambda t_stats: t_stats.add_slogan_to_stats())
| 'Map to team stats bq dicts' >>
beam.Map(to_team_stats_bq)
| 'Write team stats to BQ' >> beam.io.WriteToBigQuery(
project=team_league_options.project_id,
dataset=team_league_options.team_league_dataset,
table=team_league_options.team_stats_table,
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
if __name__ == "__main__":
main()
The pipeline steps are :
- Read the team stats raw data as string from Newline Delimited Json file in Cloud Storage (input connector)
- Deserialize each
String
line to aTeamStatRaw
typed object - Validate the input fields of the
TeamStatRaw
object - Transform the
TeamStatRaw
toTeamStat
domain data, while applying the needed business rules - Assign the slogan to the current team in the
TeamStat
object - Serialize the
TeamStat
object to aDict
, because we need having aPCollection
ofDict
to store the result to BigQuery - Write the result to the output BigQuery table (output connector)
2.4 The DAG part
The DAG configuration is put in the config
folder with a Json file called variables.json
:
{
"team_league_etl_dataflow_dag": {
"dag_folder": "team_league_etl_dataflow_dag",
"team_stats_source_bucket": "mazlum_dev",
"team_stats_source_object": "hot/etl/dataflow/input_teams_stats_raw.json",
"team_stats_dest_bucket": "mazlum_dev",
"team_stats_dest_object": "cold/etl/dataflow/",
"team_stats_job_config": {
"launchParameter": {
"containerSpecGcsPath": "gs://mazlum_dev/dataflow/templates/team_league/python/team-league-elt-dataflow-python.json",
"jobName": "team-league-dataflow-etl-python",
"parameters": {
"project_id": "gb-poc-373711",
"input_json_file": "gs://mazlum_dev/hot/etl/dataflow/input_teams_stats_raw.json",
"team_league_dataset": "mazlum_test",
"team_stats_table": "team_stat"
},
"environment": {
"serviceAccountEmail": "sa-dataflow-dev@gb-poc-373711.iam.gserviceaccount.com",
"tempLocation": "gs://mazlum_dev/dataflow/temp",
"stagingLocation": "gs://mazlum_dev/dataflow/staging"
}
}
},
"team_stats_job_location": "europe-west1"
}
}
The configuration contains :
- The source bucket and object to read the input file (hot folder)
- The output bucket and object to move the processed file (cold folder)
- The Dataflow job config :
team_stats_job_config
- The Dataflow job location :
team_stats_job_location
The Dataflow job config passed in the DataflowStartFlexTemplate
operator, follows the standard convention used by the Flex Template REST API.
It’s worth noting that the specifics job parameters are put in the parameters
node.
The configuration that are common between all the Dataflow jobs like serviceAccountEmail
or stagingLocation
are put in the environment
node
The CI CD part will deploy this Json file as Airflow variables.
You can check this article to have a detailed explanation on Airflow config :
The team_league_etl_dataflow_dag
contains the DAG file and all the logic around.
The config and Json variables are loaded in the settings.py
file :
import os
from dataclasses import dataclass
from datetime import timedelta
from airflow.models import Variable
from airflow.utils.dates import days_ago
_variables = Variable.get("team_league_etl_dataflow_dag", deserialize_json=True)
_current_dag_folder = _variables["dag_folder"]
@dataclass
class Settings:
dag_default_args = {
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
"start_date": days_ago(1)
}
project_id = os.getenv("GCP_PROJECT")
team_stats_job_config = _variables["team_stats_job_config"]
team_stats_job_location = _variables["team_stats_job_location"]
team_stats_source_bucket = _variables["team_stats_source_bucket"]
team_stats_source_object = _variables["team_stats_source_object"]
team_stats_dest_bucket = _variables["team_stats_dest_bucket"]
team_stats_dest_object = _variables["team_stats_dest_object"]
variables = _variables
The DAG file :
import airflow
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from team_league_etl_dataflow_dag.settings import Settings
settings = Settings()
with airflow.DAG(
"team_league_etl_dataflow_dag",
default_args=settings.dag_default_args,
schedule_interval=None) as dag:
start_dataflow_flex_template = DataflowStartFlexTemplateOperator(
task_id="start_dataflow_flex_template",
project_id=settings.project_id,
body=settings.team_stats_job_config,
location=settings.team_stats_job_location
)
move_file_to_cold = GCSToGCSOperator(
task_id="move_file_to_cold",
source_bucket=settings.team_stats_source_bucket,
source_object=settings.team_stats_source_object,
destination_bucket=settings.team_stats_dest_bucket,
destination_object=settings.team_stats_dest_object,
move_object=False
)
start_dataflow_flex_template >> move_file_to_cold
The with airflow.DAG
allows to instantiate a DAG with an ID and a schedule interval. There is no cron in this case and the DAG execution is manual.
The DataflowStartFlexTemplateOperator
allows to start the Flex Template and the Dataflow job.
Flex Template is based on a Docker image and a Json spec file. The Docker image offers flexibility, a standardization and the same way to deploy and start a Dataflow job, whatever the language and sdk.
To have more details on Dataflow Flex Template, you can check this article :
We pass some elements to this operator, from the Airflow configuration :
- The current GCP project ID
- The Flex Template parameters
- The Flex Template location
The DAG will wait for the job termination and run an operator to move the processed file from a hot folder to a cold. It’s an usual pattern, when a file is processed, we move and backup it in another place :
move_file_to_cold = GCSToGCSOperator(
task_id="move_file_to_cold",
source_bucket=settings.team_stats_source_bucket,
source_object=settings.team_stats_source_object,
destination_bucket=settings.team_stats_dest_bucket,
destination_object=settings.team_stats_dest_object,
move_object=True
)
The sequencing of operators is done by this syntaxe :
start_dataflow_flex_template >> move_file_to_cold
We could also have used a PythonOperator
instead of the Dataflow job, to contain the business logic with a Python program, but we prefer to delegate this responsibility outside and let Airflow only have the responsibility of orchestration.
In this case, the data processing and the business logic are used in the Dataflow job.
3. The CI CD part
The CI CD part is done with Cloud Build and based on YAML files at the root of the project :
- deploy-dataflow-flex-template.yaml -> deploy the Dataflow Flex Template
- deploy-airflow-dag.yaml -> deploy the Airflow DAG in Cloud Composer
The deploy-dataflow-flex-template.yaml
file :
steps:
- name: google/cloud-sdk:420.0.0-slim
entrypoint: 'bash'
args:
- '-c'
- |
./scripts/build_image_with_dockerfile.sh \
&& ./scripts/create_flex_template_spec_file_gcs.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'REPO_NAME=$_REPO_NAME'
- 'IMAGE_NAME=$_IMAGE_NAME'
- 'IMAGE_TAG=$_IMAGE_TAG'
- 'METADATA_TEMPLATE_FILE_PATH=$_METADATA_TEMPLATE_FILE_PATH'
- 'SDK_LANGUAGE=$_SDK_LANGUAGE'
- 'METADATA_FILE=$_METADATA_FILE'
The command line to execute the Cloud Build job from our local machine (check the README.md
file) :
gcloud builds submit \
--project=$PROJECT_ID \
--region=$LOCATION \
--config deploy-dataflow-flex-template.yaml \
--substitutions _REPO_NAME="$REPO_NAME",_IMAGE_NAME="$IMAGE_NAME",_IMAGE_TAG="$IMAGE_TAG",_METADATA_TEMPLATE_FILE_PATH="$METADATA_TEMPLATE_FILE_PATH",_SDK_LANGUAGE="$SDK_LANGUAGE",_METADATA_FILE="$METADATA_FILE" \
--verbosity="debug" .
The Dockerfile
:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
COPY . ${WORKDIR}/
# The setup.py file need to be copied at the same level of Dataflow root folder team_league_dataflow_job.
COPY team_league_dataflow_job/setup.py ${WORKDIR}/setup.py
RUN ls -R ${WORKDIR}
ENV PYTHONPATH ${WORKDIR}
ENV REQUIREMENTS_FILE="${WORKDIR}/team_league_dataflow_job/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/team_league_dataflow_job/application/team_league_app.py"
RUN apt-get update \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $REQUIREMENTS_FILE
# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
Some explanations about the Docker image :
An important detail : to launch a Beam
and Dataflow
job as a Python
module without issue, the runner needs having a setup.py
file at the same level as the Python
root folder :
team_league_dataflow_job
setup.py
For a better readability, we initially set the setup.py
file inside the root folder :
team_league_dataflow_job
-----setup.py
That’s why we used a command in the Dockerfile
to copy the setup.py
file from team_league_dataflow_job/setup.py
to the root directory in the container :
# The setup.py file need to be copied at the same level of Dataflow root folder team_league_dataflow_job.
COPY team_league_dataflow_job/setup.py ${WORKDIR}/setup.py
For the rest of the logic, all the needed Python packages are installed in the container :
RUN apt-get update \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $REQUIREMENTS_FILE
Two required environment variable are set for the Python Flex Template, like the path of the setup.py
file and the entrypoint of the Beam application team_league_app.py
:
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/team_league_dataflow_job/application/team_league_app.py"
To have a more detailed explanation of the deployment of Flex Template, you can check this article :
The deploy-airflow-dag.yaml
file :
steps:
- name: google/cloud-sdk:429.0.0
entrypoint: 'bash'
args:
- '-c'
- |
./scripts/deploy_dag_setup.sh \
&& ./scripts/deploy_dag_config.sh \
&& ./scripts/deploy_dag_folder.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'DAG_FOLDER=$_DAG_FOLDER'
- 'COMPOSER_ENVIRONMENT=$_COMPOSER_ENVIRONMENT'
- 'CONFIG_FOLDER_NAME=$_CONFIG_FOLDER_NAME'
- 'ENV=$_ENV'
This YAML
file executes different Bach
scripts :
- Deploy a
setup.py
file in Composer - Deploy the
DAG
configvariables.json
in Composer - Deploy the
DAG
folder in Composer
The command line to execute the Cloud Build job from our local machine (check the README.md
file) :
gcloud builds submit \
--project=$PROJECT_ID \
--region=$LOCATION \
--config deploy-airflow-dag.yaml \
--substitutions _DAG_FOLDER="$DAG_FOLDER",_COMPOSER_ENVIRONMENT="$COMPOSER_ENVIRONMENT",_CONFIG_FOLDER_NAME="$CONFIG_FOLDER_NAME",_ENV="$ENV" \
--verbosity="debug" .
To have a deep explanation of the CI CD and deployment part of the DAG
, you can check this article :
4. Execution of the Airflow DAG from Cloud Composer
Run the DAG from the Airflow Webserver :
We can then access to the DAG graph :
Click on the start_dataflow_flex_template
task and then on the rendered
button :
The rendered
menu contains the info for the operator that start the Dataflow Flex Template :
Conclusion
Previously, we presented this pattern ETL with a Cloud Run Service for the transform part.
For high volume and big amounts of data, Dataflow is powerful and more adapted for the data processing part. Dataflow is based on the Beam open source model that proposes very stable SDKs and a complete documentation.
Flex Template brings a standardization to deploy and start Dataflow jobs and Airflow proposes a native operator to start it.
There is a learning curve for Beam but behind this offers a really complete tool to work with Big Data use cases.
Dataflow keep, at minimum, one Compute Engine VM (not scale to 0), but the cost remains cheap for batch pipelines.
The developers have to be careful and check the metrics used by the job from the Dataflow DAG and UI.
There is also a cost calculator for jobs.
All the code shared on this article is accessible from my Github
repository :
If you like my articles, videos and want to see my posts, follow me on :