BigQuery DeDuplicate — Window Function vs Group by For Stitch

Bhuvanesh
Bhuvanesh
Sep 25 · 4 min read

If you are here means, maybe you are using Stitch Data for ETL and replicating the OLTP database into BigQuery, RedShift or any other Data warehouses. Its a cool ETL tool with NoOps for a Data Engineer, and no need to manage an Infra for ETL. De-Duplication is one of the key components in Data analytics. But you can do this during the ETL process. But Stitch works in a different way. If you are using CDC, then it’ll perform Append only operations.

We have a data pipeline to sync MySQL tables to BigQuery. It's near-realtime sync. Meantime the database is getting so many updates. So which sync the data every time there will be a lot of duplicates(Each id will have multiple versions). Stitch has a solution for this. They are adding some additional columns to your target table to identify the most recent row.

_sdc_batched_at TIMESTAMP

Indicating when Stitch loaded the batch the record was a part of into the data warehouse.
Example data: 2019-08-08 14:55:08.82+00

_sdc_sequence INTEGER

A Unix epoch (in milliseconds) that indicates the order in which data points were considered for loading.
Example data: 1565276078922000095

_sdc_primary_key STRING

Applicable only if Stitch doesn’t detect a Primary Key in the source table. Stitch will use this column to de-dupe data.
Example data: 5d8b9a05-33cc-4d5f-8163-4474814b46c6

Based on the above columns, we can perform the de-duplication. They are proving the following statement for this.

# Standard SQL
SELECT
DISTINCT o.*
FROM
`bhuvi-project.sanbox.bhuvi_test` o
INNER JOIN (
SELECT
id,
MAX(_sdc_sequence) AS seq,
MAX(_sdc_batched_at) AS batch
FROM
`bhuvi-project.sanbox.bhuvi_test`
GROUP BY
id) oo
ON
o.id = oo.id
AND o._sdc_sequence = oo.seq
AND o._sdc_batched_at = oo.batch

I have tested the query on my dataset.

  • Total rows: 23,806,735
  • Total size: 2.44GB

The above query looks like a statement which we use in any normal transactional database. And it has many aggregations.

Let's scan the complete data with the above query. I used https://bqvisualiser.appspot.com/ to visualize the query plan.

Total Execution time: 56.483 sec

A better way of doing this:

BigQuery is an analytic database. It’ll efficiently handle the Group by and select max but let's try the analytical way with Window functions and get rid of the old school approach.

# Standard SQL
WITH
cte AS (
SELECT
ROW_NUMBER() OVER(PARTITION BY id ORDER BY _sdc_batched_at DESC, _sdc_sequence DESC )AS rownum,
*
FROM
`bhuvi-project.sanbox.bhuvi_test`)
SELECT
*
FROM
cte
WHERE
rn=1

Total execution time: 37.248 sec

This proves that the window functions are not performing any aggregations. Instead, directly get the data and sort it then provide the output. So use window functions in your BI queries as much as you can.

What else we can get?

A BigQuery slot is a unit of computational capacity required to execute SQL queries. BigQuery automatically calculates how many slots are required by each query, depending on query size and complexity.

By default 2k slots allocated for any BQ users. If you are running massive analytical work and many Data Analysts/Scientists are generating a huge amount of reports then you may end up the shortage of the slots. This window functions dramatically reduce the slot usage.

Aggregation:

"elapsedMs"       : "56,483",
"estd. slots used": "22"

Window Functions:

"elapsedMs"       : "37,248",
"estd. slots used": "16"

Deletes on BQ?

I know its not a good practice to run Delete query on a Datawarehouse. But it depends. In my case, for a single table, Im getting 1million duplicates. Window functions will take care of the de-dup, but after some days it’ll end up with billions of duplicate rows. And we need to pay some cost for storage and your window functions will take more time. But don’t run the delete query on every data load, run it once in a day.

DELETE
FROM
`bhuvi-project.sanbox.bhuvi_test`
WHERE
STRUCT(id,
_sdc_batched_at,
_sdc_sequence) NOT IN (
SELECT
AS STRUCT id,
MAX(_sdc_batched_at),
MAX(_sdc_sequence)
FROM
`bhuvi-project.sanbox.bhuvi_test`
GROUP BY
id)

Query complete (1 min 13 sec elapsed, 2.4 GB processed)
This statement removed 10,590 rows

Unfortunaly, the window function didn’t help me with the deletes. If somebody can do this with Window functions that would be super awesome.

Final approach:

Searce Engineering

We identify better ways of doing things!

Bhuvanesh

Written by

Bhuvanesh

Cloud | BigData | Database Architect | blogger thedataguy.in

Searce Engineering

We identify better ways of doing things!

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade