Remove duplicates in Bigquery batch pipeline with Airflow and Dataflow

Mazlum Tosun
Google Cloud - Community
7 min readSep 26, 2022

The purpose of this article is showing an application with a batch pipeline orchestrated with Cloud Composer 2/Airflow and Apache Beam/Cloud Dataflow job, that ingests data in Bigquery without duplicates.

One of common problem in datawarehouse is the handling of duplicates and many times we want to keep the last state of an object.

The versions used for this example are :

  • Cloud Composer : composer-2.0.27-airflow-2.2.5
  • Apache Airflow : 2.2.5
  • Apache Beam providers apache beam : 4.0.0
  • Apache Beam : 2.41.0

1. Local environement

In this example the Pipenv tool is used, here you can see the PipFile used for this project :

We create the subshell, virtual env and install all the packages inside with the following commands :

pipenv shell
pipenv install --dev

I am going to write a dedicated article to show how using pipenv with IntelliJ and having the most comfortable environment to work on a local machine.

2. Goal of the application used for these examples

This article shows an example of application which goal is the handling of a Footbal team league. This application uses the following steps :

  • Read Json file containing team league stats raw data from Cloud Storage
  • Transform and apply business rules to compute expected team league stats domain data
  • Write result to a Bigquery table

For this application we want to deals with duplicates and update rows when a new file brings changes.

The raw data is :

The domain data and result of business transformations :

The unicity of a Team stat is represented by teamName field

3. Creation of Bigquery tables

Two tables are created for this example :

  • team_stat_staging
  • team_stat

The Bigquery schema of these 2 tables is the same :

We use the following shell script to create the tables, example for team_stat table :

A partition is used on ingestionDate field at DAY

4. Airflow DAG and Dataflow JOB

4.1 DAG not keeping all the events in the staging table

In this first example, we are showing the case without keeping all the events in the staging table

Some explanations :

  • Creation of 2 tables : team_stat_staging and team_stat
  • team_stat_staging is a temporary table containing the result of the Dataflow job
  • team_stat is the final table containing the last state of team stats, without duplicates
  • Airflow is used as DAG orchestrator in this example in a Cloud Composer 2 cluster
  • The DAG at the begining truncate the staging table, because in this example we doesn’t want to keep all the events
  • The DAG launches a Dataflow job, which has the responsibility to apply transformations and business rules
  • The Dataflow job writes the result team stats domain data to the staging table in Bigquery
  • The DAG run a merge query between staging and final table. If a line doesn’t exists it will be inserted, otherwise updated (upsert)

The merge query at the end of the DAG helps to avoid duplicates and keep the latest state of team stats data.

4.2 DAG diagram in Airflow

4.3 Structure and code of the DAG

We have a root folder of the project called team_league, under this folder there are 2 main folders :

  • dag containing the Airflow DAG
  • job containing the Beam Dataflow job

Here you can see the code of the DAG :

Truncate query task

The first task uses BigQueryInsertJobOperator executing the truncate query to the staging table

The goal is to empty the team_stat_staging table by this truncate query :

The params project_id dataset team_stat_staging_table are passed by the DAG with Jinja2 template.

The truncate will empty the whole table because in this case there is no need to keep all the events, but only the last states of the data.

Launch Beam/Dataflow job task

The second task uses BeamRunPythonPipelineOperator executing the Beam job with Dataflow runner. Pipeline options are passed as Dict to the operator. The result is ingested to the staging table.

Merge query task

The last task uses also a BigQueryInsertJobOperator executing the final merge query between the staging and final table.

A matching is done between the staging and final table :

  • if the row from the staging table exists in the final table, it’s updated : WHEN MATCHED THEN clause
  • otherwise the row is inserted : WHEN NOT MATCHED THEN clause

⚠️ the matching criteria ON T.teamName = S.teamName must match only one row in the 2 tables. The final table can’ t contains duplicates because we prevent this kind of situation, but if the input files and staging table contain duplicates or multiple lines with the same teamName, the query will fail with the following error :

google.api_core.exceptions.BadRequest: 400 UPDATE/MERGE must match at most one source row for each target row

We will adapt our merge query to remove duplicates from the staging table :

Via ROW_NUMBER() OVER (PARTITION BY teamName ORDER BY ingestionDate DESC) AS rn and subquery, we removed duplicates from the staging table.

So no risk to fail the merge query in this case.

You can check the official documentation to have more information on merge query :

4.4 Same DAG but with keeping all the events in the staging table

In this second example, we are showing the case with keeping all the events in the staging table

The truncate task is deleted because we don’t want cleaning the staging table.

We also have to adapt the merge query.

In this case the staging table can contains a high volume of data over time, we want to avoid a merge query that processes all the table and having a high cost. The solution is the usage of the partition on our table represented by ingestionDate column :

The following clause WHERE date(ingestionDate) = CURRENT_DATE() uses the DAY partition.

The rest of the DAG is unchanged.

It’s worth noting that this technique allows reducing the cost for the staging table, but unfortunately there no solution to use partition of the final table.
Bigquery needs to process the entire final table to find if a teamName matches between the 2 tables.

Clustering on teamName can improve search performance as well as cost because the data is collocated.
We are not going to dwell on this concept here because it is not the subject of this article.

4.5 Structure and code of the Beam/Dataflow job

The code of Beam follows the same architecture I presented in BeamSummit conference at Austin.

You can access the slides and the video of the talk :

The architecture is based on Hexagonal Architecture Domain Driven Design and dependency injection with Python :

  • domain folder contains all the business rules with pure Python code
  • domain_ptransform contains connectors interfaces for IOs and a business PTransform that composes all the business transformations in the Beam world
  • infrastructure contains all the adapters and technical code
  • injection centralizes and configures all thedependency injection part

The launch of a Dataflow python job have to respect the following conditions :

  • The runner will instantiate the Dataflow job. In this case, the runner is Cloud Composer and he must contains all the needed Python packages to instantiate the job.
  • In the execution phase, the Dataflow workers use a given setup.py file or a custom Docker image. The setup.py or Docker image must contains the same Python packages as the runner.
  • You can use the setup.py file present in the Composer bucket : gs://{composer_bucket}/dags/setup.py

In this example the Dataflow job uses a custom Docker image in the pipeline options. I am going to write a dedicated article on this topic.

The advantage of using a custom image versus setup.py file is the possibilty to download internal Python packages from GCP Artifact Registry

  • Dataflow options for setup.py :
--setup_file: {composer_dag_folder}/setup.py

The DAG folder path can be retrieved from Airflow Python code with an environment variable :

DAGS_FOLDER = os.getenv("DAGS_FOLDER")
  • Dataflow options for custom Docker image
--experiments: "use_runner_v2" \
--sdk_container_image: eu.gcr.io/{my-gcp-project}/beam_custom:2.41.0

You can also check the offical documentation for this part :

https://cloud.google.com/dataflow/docs/guides/using-custom-containers?hl=en

5. Deployment of DAG and job in Google Cloud

The Airflow DAG and Dataflow job can be deployed in the Cloud Composer bucket, with the following command :

  • team_league is the Python root folder in the project containing the DAG and job
  • environment is the Composer instance name
  • location is the location of the Composer cluster
  • project is the GCP project

All the code examples and project are accessibles in my personal Github repository :

I hope you learned something and enjoyed reading this article.

If you like my articles and want to see my posts, follow me on :

- Medium
- Twitter
- LinkedIn

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Head of Data & Cloud GroupBees | Data | Serverless | IAC | Devops | FP