Breaking up very large BigQuery jobs

Using integer range partitions with BigQuery scripting

I recently worked on a BigQuery use case, which required a very large, skewed, many-to-many join resulting in a fan-out of over 180 Billion records. On-Demand projects, limited to 2000 slots, would frequently time-out before the query could be completed. The query plan identified this problem as bytes spilled to disk and pipelined repartition stages, where the output of a repartition stage is the input to another repartition stage.

A pipelined repartition: Stage 0A reads the output of Stage 09

Here is a quick tutorial on how to work around these issues; using BigQuery’s scripting and integer range partitioning features to efficiently break down large queries.

The Setup

BigQuery’s OpenStreetMap (OSM), public dataset offers a convenient example dataset to perform expensive joins with high fan-out, e.g. join on geospatial distance. To make the dataset a little more manageable, I am filtering the data on nodes in Denmark (country code: DK), which has a high concentration of OSM node entries.

CREATE TABLE `mydataset.osm_nodes_dk`
AS
(
SELECT *
FROM `bigquery-public-data.geo_openstreetmap.planet_nodes`
WHERE EXISTS (
SELECT 1
FROM unnest(all_tags)
WHERE key = 'addr:country'
AND value = 'DK'
)
)

The goal is then to join each entity to all related entities in the dataset, such that a service can compare the content of each matched pair. The scale of the query job, which compares each with each, grows exponentially as the table size increases.

Authors Note: Similarity searches, and the inherent cross-joins they create, are not unique to geospatial data and can be seen in domains like retail, matching sales by related product codes or fuzzy text matching.

select
a.id,
a.geometry,
a.all_tags,
b.id as comp_id,
b.geometry as comp_geo,
b.all_tags as comp_tags
FROM
`mydataset.osm_nodes_dk` a
INNER JOIN
`mydataset.osm_nodes_dk` b
ON
ST_DISTANCE(a.geometry, b.geometry) <= 5000
54 minutes and 24TB of spill

This basic query, suitable for a couple million records, will fail when the table size grows to 10s or 100s of millions of records. As is seen with querying the full bigquery-public-data.geo_openstreetmap.planet_nodes table in this way.

Splitting up Large Query Jobs

To prevent disk spills, it would be ideal to efficiently address a chunk of the table at a time as it is compared to the whole set. Addressing integer range partitions is easily scripted and instantly prunes records when used in the query predicate.

BigQuery tables are limited to 4000 partitions; this DDL statement will create a partitioned version of the table. The added range_id column will be a deterministic value between 0 and 3999 and should be evenly distributed as much the data allows. The technique used below borrows from this blog.

CREATE TABLE `mydataset.osm_dk_irp`
PARTITION BY RANGE_BUCKET(range_id, GENERATE_ARRAY(0, 3999, 1))
AS (
SELECT *,
ABS(MOD(FARM_FINGERPRINT(CAST(id as STRING)), 4000)) range_id
FROM `mydataset.osm_nodes_dk`
)

The job code assembles the full result in a while loop; where each iteration filters on a block of partitions, sized by the increment variable. Before executing the full job, the increment size should be tested to be no smaller than is necessary to avoid spills and deeply pipelined repartitions.

DECLARE max_partition int64 DEFAULT 3999;
DECLARE increment int64 DEFAULT 50;
DECLARE start int64 DEFAULT 0;
CREATE OR REPLACE TABLE `mydataset.osm_irp_join`
(
id INT64,
geometry GEOGRAPHY,
all_tags ARRAY<STRUCT<key STRING, value STRING>>,
comp_id INT64,
comp_geo GEOGRAPHY,
comp_tags ARRAY<STRUCT<key STRING, value STRING>>
);
WHILE start <= max_partition DO

INSERT INTO mydataset.osm_irp_join
SELECT
a.id,
a.geometry,
a.all_tags,
b.id as comp_id,
b.geometry as comp_geo,
b.all_tags as comp_tags
FROM
`mydataset.osm_dk_irp` a
INNER JOIN
`mydataset.osm_dk_irp` b
ON
ST_DISTANCE(a.geometry, b.geometry) <= 5000
WHERE
a.range_id between start and start + increment - 1;
SET start = start + increment;END WHILE;--SELECT * FROM `mydataset.osm_irp_join`
Scripted job runs in under 14 minutes, 70% faster

Looking closely at the results of one of the child jobs, there are zero bytes spilled and only a single repartition stage. The job only has to reshuffle the data once and can do so completely in memory.

Choosing Increment Sizes

I recommend you run a few tests with different increment sizes to get an idea as to how the increment size will affect your performance. These screenshots compare single queries with partition ranges of 50, 250, and 500 for this job. Total throughput should rise as the increment size goes up, but can substantially drop when bytes spill to disk.

Pro-tip: BigQuery on-demand is not a clean room and unpredictable factors like wait time and bursting can influence your results.

  • 50: 36 seconds (1.38 p/s), 0 bytes spilled
  • 250: 2 minutes (2.08 p/s), 0 bytes spilled
  • 500: 7 minutes (1.19 p/s), 4.04 TB spilled
Increment of 500 (7 minutes)

Final Thoughts

BigQuery’s ability to dynamically redistribute data allows it to complete very complex queries without any extra user effort. However, there are cases where the amount of shuffled data can overwhelm the available slots, leading to spills, successive repartitions, and slower query times.

The techniques described here should be considered alongside rewriting large queries to use features like analytic functions and invoking the Reservations API to allocate an appropriate number of dedicated slots.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store