BigQuery partitioning with Beam streams

Alex Van Boxel
Google Cloud - Community
4 min readJun 1, 2017

--

If you’ve been using Google BigQuery you know how fast it can crunch through massive amounts of data in record time. It’s one of GCP’s most mature products and is a window into your business data for engineers and also less technical people. If you know SQL you can work with BigQuery.

This article assumes that your are knowledgable around BigQuery and Apache Beam (Cloud Dataflow)

But what about the costs? Well you only pay for the amount of data queried (around $5 per tera at the time of this writing). In general you won’t spend a lot of money doing ad hock analysis. The caveat is when you start to integrate BigQuery with a data visualisation tool like Tableau that provides real time dashboards. I remember a case where on a real time dashboard some queries where fired thousands of time a day resulting in a substantial rise in costs.

Luckily there a some techniques to limit the amount of data that is need for a query. First of all, start with selecting only columns that you need, so never use star (*). Because BigQuery uses a columnar storage system it will only traverses the columns that your select or search on, only these will be billed.

Partitioned table

But most of the time you know you only need to search in part of your table. Certainly if your data has a date component. You don’t need to search the complete dataset if you are only are interested in the last two weeks. For this BigQuery has introduced partitioned tables.

Partitioned tables are tables that are divided into different shards that make it easier to manage your data. For now BigQuery only offers day partitioned tables, this means that the data is split into partitions per day.

The tables have a pseudo column (_PARTITIONTIME) that contains a day value. That day value represents the day partition that the row is in and make it easy to limit the set of partitions that you include and traverse. You can use it in a standard WHERE clause and BigQuery will automaticly include the correct partitions. Look at the following WHERE clause:

https://unsplash.com/photos/24tsXm7qGQE
WHERE
_PARTITIONTIME
BETWEEN
TIMESTAMP("2017-06-01")
AND
TIMESTAMP("2017-06-14")

This will select only the relevant tables and costing only the fraction then a query where you would traverse the complete dataset. Look at the documentation for more information.

Inserting Data

Having control over the partition where you data ends up can be a bit tricky though. It’s not that you can insert a value in the _PARTITIONTIME and the rows will end up in the right partition because the column is a read only pseudo column. But if you don’t do anything the rows will end up in the partition of the current day. In a few streaming use-cases this may be adequate, but for a control freaks like me this is not good enough.

The way to control the destination partition is by using the table decorator. So if you want to get your data into the New Years partition your destination should be tablename$20170101. The documentation has a few examples doing that in a load job, but how do you do it in your streaming beam job?

Auto Partitioned Streaming

The trick (and a well kept secret) is using a TableDestination functions. This is a function that you supply to the BigQueryIO.Write with a .to method. It’s not really well known because all examples in the wild just supply a fixed output table.

The function’s purpose is to maps elements to tables. As soon as the Apache Beam pipeline encounters a new window it will call the functions to see what the table name should be.

Get a fixed window, and supply the TableReference function to the BigQuery writer.

So when you build your Beam pipeline make sure to have a windows set that doesn’t intersect a day. My favorite is a 1 minute fixed window, that way we don’t have to wait to long for the data to reach BigQuery. When we look in the function you’ll see why you need a window, then supply the function to the .to function.

The function does a transformation from a ValueInSingleWindow to a TableDestination. It’s here that the magic happens. So for each window this function will be called and you can create the correct table name.

https://gist.github.com/alexvanboxel/0f38ceb5ccebbccebcd759576be428c2

The Bounded window has one interesting method and that’s maxTimestamp. If you use that and truncate it to a day, you have your target partition. And don’t worry, the max timestamp is really ok, look at the example:

max = 2017–01–01T23:59:59.99999 >>> tablename$20170101

So with the decorated ( tablename$YYYYMMDD ) table reference the data will end up in the correct partition. When streaming though, make sure that the table exists with the correct schema.

Well that’s it. Now you can stream the the data in the correct partition and don’t have to worry that late date will arrive in an incorrect partition.

For a complete example you can have a look at the pubsub-backup example repo on GitHub:

It’s been adapted for Apache Beam 2.0

--

--