Google BigQuery, Large Table Joins and How Nested, Repeated Values and the Capacitor Storage Format (and Looker) Saves the Day

I’m flying over to Hungary early next week to meet my good friend Bence Arato, and to speak at the Budapest Data Forum about analytics and big data on Google’s Cloud Platform using BI tools such as Looker, both of which I’ve written about on this blog over the past few months. BigQuery lets you store gigabytes and petabytes of data in a distributed, cloud-based query and compute layer that automatically scales-up as you load more data into it, and presents data out in familiar tables and columns, datasets (schemas) and projects (databases) that would be immediately recognisable to anyone coming into this new technology from the old world of relational databases and SQL queries.

But that familiarity can be deceiving sometimes, particularly when you start to really do things at scale. Take, for example, two of the sample datasets that Google make available on the Google BigQuery Public Datasets page; “Bay Area Bike Share Trips” that contains records of all public bike hire trips in San Francisco over a number of years:

and another that contains records of all the San Francisco 311 (non-emergency) calls made since 2008 including reason for the call, time and caller location, both of which are ready for analysis and can be queried immediately using BigQuery’s ANSI SQL-standard query language.

The bike rides dataset contains just under a million records stored in a 120MB table and returns answers to queries in just a few seconds with none of the indexes, summary tables or in-memory caches that traditional databases and OLAP servers required you to populate and manage to keep query response times within an acceptable limit.

In fact, combining these two datasets together so I can see which bikeshare locations had the most traffic accidents associated with them would be an interesting exercise, and BigQuery supports joins between tables using the ANSI join syntax, like this:

zip_code, count(trip_id) as total_trips, call_type, count(call_type) as total_calls
`bigquery-public-data.san_francisco.bikeshare_trips` b
LEFT JOIN `bigquery-public-data.san_francisco.sffd_service_calls` s
ON b.zip_code = s.zipcode_of_incident
call_type = 'Traffic Collision'

Joining large fact table-style tables together with smaller dimension-style tables works fine with BigQuery, but joining these two large denormalized fact tables together gives us a resources exceeded error after the query runs for over fifteen minutes.

What we’re seeing here is the trade-off we get when running data warehouse workloads on a distributed query platform like BigQuery. The size of datasets we can work with become huge, but joins between those datasets become relatively expensive due to the need to sort, co-ordinate and send intermediate results between thousands of server nodes in the the cluster.

The usual answer is to denormalize even further, creating one single large table containing all the columns from both input tables and multiple rows for each base fact if you’ve got a one-to-many (or many-to-many) relationship between the joined tables. Storage is cheap with cloud platforms from Google and Amazon but BigQuery offers another solution that relies on the Colossus distributed file system that Google replaced Google File System with(the inspiration for HDFS), and in-particular the Capacitor columnar storage format that BigQuery leverages for its fast, column-store table storage

Google BigQuery supports several input formats for data you load into tables — CSV files, JSON files, AVRO files and datastore backups — but under the covers BigQuery uses a columnar storage format developed by Google called Capacitor (originally called ColumnIO) that’s used by Google’s replacement for GFS/HDFS, the Colossus distributed filesystem. Like Parquet and other columnar storage formats that were inspired by ColumnIO, it stores data in compression-friendly columns of nested, repeating values that suit BI-style applications that filter and aggregate big, wide tables of data rather than process transactions using whole rows of data.

What this means for BigQuery and our two tables of data is that we’d be better off storing the joined table’s values as nested repeating field types in the first table, avoiding the join altogether and storing our data in the way best suited to BigQuery’s storage format. To take an example, if we took the aggregated bike trips values and wrote them out to a JSON format input file like this:

"trips":[{"start_station_name":"Embarcadero at Vallejo",
{"start_station_name":"Townsend at 7th"

that input file could then be used to populate a table that stored each of the trips for a given zipcode as nested repeated values in a single column rather than one row per trip, and with BigQuery’s columnar storage those trips would only be retrieved from storage if that particular column was included in the query, like this:

SELECT zipcode, trips.trip_id, trips.duration_sec
FROM personal_metrics.bike_trips_nested,
UNNEST (trips) as trips

If I’ve not got input files handy in this nested repeated JSON format, I could use BigQuery Standard SQL to output nested fields using the ARRAY_AGG(STRUCT() functions, like this:

SELECT zip_code as zipcode, 
ARRAY_AGG(STRUCT(trip_id, duration_sec, start_date,
start_station_name, start_station_id, end_date,
end_station_id, end_station_name, bike_number))
FROM `aerial-vehicle-148023.personal_metrics.bikeshare_trips`

To store the data in this nested format make sure you store the query results in a table, check the Allow Large Results checkbox and deselect the Flatten Results checkbox when using the Web UI.

You can either then export that tables’ contents into a JSON format output file with nested repeated JSON elements, or query the table in place for more efficient lookups from the repeated dimension values. Taking this to the logical conclusion you could do this for both of the tables we’re looking to join together as they both have these nested records — bike trips per zipcode for one, police incidents per zipcode in the other — and create one big SELECT statement that nests both of these and outputs one big denormalized table with nested repeated column values for each zipcode’s bike trips and incidents records, like this:

bike_trips AS (
zip_code AS zipcode,
AVG(duration_sec) AS avg_duration_sec,
COUNT(trip_id) AS trip_count
FROM personal_metrics.bikeshare_trips b
GROUP BY 1,2),
incidents_temp AS (
SELECT zipcode_of_incident AS zipcode,
COUNT(call_type) AS call_type_count
FROM personal_metrics.sffd_service_calls
bt.trip_count)) trips,
ic.call_type_count)) incidents
FROM bike_trips bt
LEFT JOIN incidents_temp ic
ON bt.zipcode = ic.zipcode

and then the equivalent to the two-table join query right at the start would look like this:

zipcode, count(zipcode_trips.trip_count) as total_trips, zipcode_incidents.call_type, count(zipcode_incidents.call_type) as total_calls
LEFT JOIN UNNEST (trips) as zipcode_trips
LEFT JOIN UNNEST (incidents) AS zipcode_incidents
zipcode_incidents.call_type = 'Traffic Collision'

and then, most importantly, this new query where we’ve eliminated the large table-to-large table join and nested the dimension values in a format that aligns with BigQuery’s storage format, returns results in … 7 seconds.

Not all ODBC drivers and BI tools support BigQuery’s nested repeated column format but Looker, the BI tool I’m working with day-to-day on BigQuery right now does, supporting nested joins in LookML like this:

explore: sf_biketrips_incidents_nested {
join: sf_biketrips_incidents_nested__incidents {
view_label: "Incidents"
sql: LEFT JOIN UNNEST(${sf_biketrips_incidents_nested.incidents}) as sf_biketrips_incidents_nested__incidents ;;
relationship: many_to_one
join: sf_biketrips_incidents_nested__trips {
view_label: "Trips"
sql: LEFT JOIN UNNEST(${sf_biketrips_incidents_nested.trips}) as sf_biketrips_incidents_nested__trips ;;
relationship: many_to_one
view: sf_biketrips_incidents_nested {
sql_table_name: personal_metrics.sf_biketrips_incidents_nested ;;
dimension: incidents {
hidden: yes
sql: ${TABLE}.incidents ;;
dimension: trips {
hidden: yes
sql: ${TABLE}.trips ;;
dimension: zipcode {
type: zipcode
sql: ${TABLE}.zipcode ;;
measure: count {
type: count
drill_fields: []
view: sf_biketrips_incidents_nested__trips {
dimension: id {
primary_key: yes
sql: CONCAT(sf_biketrips_incidents_nested.zipcode,${TABLE}.start_station_name) ;;
measure: avg_duration_sec {
type: average
sql: ${TABLE}.avg_duration_sec ;;
dimension: start_station_name {
type: string
sql: ${TABLE}.start_station_name ;;
measure: trip_count {
type: sum
sql: ${TABLE}.trip_count ;;
view: sf_biketrips_incidents_nested__incidents {
dimension: id {
primary_key: yes
sql: CONCAT(sf_biketrips_incidents_nested.zipcode, ${TABLE}.call_type) ;;
dimension: call_type {
type: string
sql: ${TABLE}.call_type ;;
measure: call_type_count {
type: sum
sql: ${TABLE}.call_type_count ;;

and displaying the two nested columns as regular Looker views in the dashboard, so users don’t even need to be aware that you’ve denormalized and optimized the storage of those two tables and details of bike trips and police incidents are now stored in nested repeated columns.

Like anything in database design or IT in-general nested columns aren’t a silver bullet that solves every large table join or query optimization problem, and they will hit the same sort of resource limit issues if you try to store thousands or millions of dimension values in this nested repeating format.

But for most star-schema style queries across sets of denormalized fact and dimension tables they work better when you store data in this format, and using the ARRAY_AGG and STRUCT functions you can even generate test data in this more optimized format without having to write your own JSON data exporter to nest the dimension values properly.