Remove duplicates in Bigquery batch pipeline with Airflow and Dataflow
The purpose of this article is showing an application with a batch pipeline orchestrated with Cloud Composer 2/Airflow
and Apache Beam/Cloud Dataflow
job, that ingests data in Bigquery
without duplicates.
One of common problem in datawarehouse
is the handling of duplicates and many times we want to keep the last state of an object.
The versions used for this example are :
- Cloud Composer :
composer-2.0.27-airflow-2.2.5
- Apache Airflow :
2.2.5
- Apache Beam providers apache beam :
4.0.0
- Apache Beam :
2.41.0
1. Local environement
In this example the Pipenv
tool is used, here you can see the PipFile
used for this project :
We create the subshell
, virtual env
and install all the packages inside with the following commands :
pipenv shell
pipenv install --dev
I am going to write a dedicated article to show how using pipenv
with IntelliJ
and having the most comfortable environment to work on a local machine.
2. Goal of the application used for these examples
This article shows an example of application which goal is the handling of a Footbal team league. This application uses the following steps :
- Read Json file containing team league stats raw data from
Cloud Storage
- Transform and apply business rules to compute expected team league stats domain data
- Write result to a
Bigquery
table
For this application we want to deals with duplicates and update rows when a new file brings changes.
The raw data is :
The domain data and result of business transformations :
The unicity of a Team stat is represented by
teamName
field
3. Creation of Bigquery tables
Two tables are created for this example :
team_stat_staging
team_stat
The Bigquery
schema of these 2 tables is the same :
We use the following shell
script to create the tables, example for team_stat
table :
A partition is used on
ingestionDate
field atDAY
4. Airflow DAG and Dataflow JOB
4.1 DAG not keeping all the events in the staging table
In this first example, we are showing the case without keeping all the events in the staging table
Some explanations :
- Creation of 2 tables :
team_stat_staging
andteam_stat
team_stat_staging
is a temporary table containing the result of theDataflow
jobteam_stat
is the final table containing the last state of team stats, without duplicatesAirflow
is used asDAG
orchestrator in this example in aCloud Composer 2
cluster- The
DAG
at the beginingtruncate
the staging table, because in this example we doesn’t want to keep all the events - The
DAG
launches aDataflow
job, which has the responsibility to apply transformations and business rules - The
Dataflow
job writes the resultteam stats domain data
to the staging table inBigquery
- The
DAG
run amerge
query between staging and final table. If a line doesn’t exists it will be inserted, otherwise updated (upsert)
The merge
query at the end of the DAG
helps to avoid duplicates and keep the latest state of team stats data.
4.2 DAG diagram in Airflow
4.3 Structure and code of the DAG
We have a root folder of the project called team_league
, under this folder there are 2 main folders :
dag
containing theAirflow
DAG
job
containing theBeam
Dataflow
job
Here you can see the code of the DAG
:
Truncate query task
The first task uses BigQueryInsertJobOperator
executing the truncate
query to the staging table
The goal is to empty the team_stat_staging
table by this truncate
query :
The params project_id
dataset
team_stat_staging_table
are passed by the DAG
with Jinja2
template.
The truncate
will empty the whole table because in this case there is no need to keep all the events, but only the last states of the data.
Launch Beam/Dataflow job task
The second task uses BeamRunPythonPipelineOperator
executing the Beam
job with Dataflow
runner. Pipeline options are passed as Dict
to the operator. The result is ingested to the staging table.
Merge query task
The last task uses also a BigQueryInsertJobOperator
executing the final merge
query between the staging and final table.
A matching is done between the staging and final table :
- if the row from the staging table exists in the final table, it’s updated :
WHEN MATCHED THEN
clause - otherwise the row is inserted :
WHEN NOT MATCHED THEN
clause
⚠️ the matching criteria ON T.teamName = S.teamName
must match only one row in the 2 tables. The final table can’ t contains duplicates because we prevent this kind of situation, but if the input files and staging table contain duplicates or multiple lines with the same teamName
, the query will fail with the following error :
google.api_core.exceptions.BadRequest: 400 UPDATE/MERGE must match at most one source row for each target row
We will adapt our merge
query to remove duplicates from the staging table :
Via ROW_NUMBER() OVER (PARTITION BY teamName ORDER BY ingestionDate DESC) AS rn
and subquery, we removed duplicates from the staging table.
So no risk to fail the merge
query in this case.
You can check the official documentation to have more information on merge
query :
4.4 Same DAG but with keeping all the events in the staging table
In this second example, we are showing the case with keeping all the events in the staging table
The truncate
task is deleted because we don’t want cleaning the staging table.
We also have to adapt the merge
query.
In this case the staging table can contains a high volume of data over time, we want to avoid a merge
query that processes all the table and having a high cost. The solution is the usage of the partition on our table represented by ingestionDate
column :
The following clause WHERE date(ingestionDate) = CURRENT_DATE()
uses the DAY
partition.
The rest of the DAG
is unchanged.
It’s worth noting that this technique allows reducing the cost for the staging table, but unfortunately there no solution to use partition of the final table.
Bigquery
needs to process the entire final table to find if ateamName
matches between the 2 tables.
Clustering
onteamName
can improve search performance as well as cost because the data is collocated.
We are not going to dwell on this concept here because it is not the subject of this article.
4.5 Structure and code of the Beam/Dataflow job
The code of Beam
follows the same architecture I presented in BeamSummit
conference at Austin.
You can access the slides and the video of the talk :
The architecture is based on Hexagonal Architecture
Domain Driven Design
and dependency injection
with Python
:
domain
folder contains all the business rules with purePython
codedomain_ptransform
contains connectors interfaces for IOs and a businessPTransform
that composes all the business transformations in theBeam
worldinfrastructure
contains all theadapters
and technical codeinjection
centralizes and configures all thedependency injection
part
The launch of a Dataflow
python
job have to respect the following conditions :
- The runner will instantiate the
Dataflow
job. In this case, the runner isCloud Composer
and he must contains all the neededPython
packages to instantiate the job. - In the execution phase, the
Dataflow
workers use a givensetup.py
file or a customDocker
image. Thesetup.py
orDocker
image must contains the samePython
packages as the runner. - You can use the
setup.py
file present in theComposer
bucket :gs://{composer_bucket}/dags/setup.py
In this example the Dataflow
job uses a custom Docker
image in the pipeline options. I am going to write a dedicated article on this topic.
The advantage of using a custom image versus setup.py
file is the possibilty to download internal Python
packages from GCP Artifact Registry
Dataflow
options forsetup.py
:
--setup_file: {composer_dag_folder}/setup.py
The DAG
folder path can be retrieved from Airflow
Python
code with an environment variable :
DAGS_FOLDER = os.getenv("DAGS_FOLDER")
Dataflow
options for customDocker image
--experiments: "use_runner_v2" \
--sdk_container_image: eu.gcr.io/{my-gcp-project}/beam_custom:2.41.0
You can also check the offical documentation for this part :
https://cloud.google.com/dataflow/docs/guides/using-custom-containers?hl=en
5. Deployment of DAG and job in Google Cloud
The Airflow
DAG
and Dataflow
job can be deployed in the Cloud Composer
bucket, with the following command :
team_league
is thePython
root folder in the project containing theDAG
andjob
environment
is theComposer
instance namelocation
is the location of theComposer
clusterproject
is theGCP
project
All the code examples and project are accessibles in my personal Github
repository :
I hope you learned something and enjoyed reading this article.
If you like my articles and want to see my posts, follow me on :