For the past year and a half, we at Brandfolder have been moving toward using BigQuery as a datalake. It began by storing a copy of all our raw data there, and we’ve steadily moved into storing feature extraction tables as well as data produced by machine learning models and accessed directly by our product. Housing data in BigQuery we find it much easier to access both in terms of speed and flexibility.
For those unfamiliar with Google BigQuery, it is a serverless cloud data warehouse (think database) with built-in and automated distributed computing. Initially, we started using BigQuery as a copy of our production database, and found ourselves leaning on it as our data size grew. Async replica DB queries would take upwards of half an hour, while in BigQuery these same queries would be complete within a minute.
Additionally, with our datalake living primarily in parquet in Google Cloud Storage, we were limited in several ways. We needed Pyspark to access it, which meant getting Google Dataproc involved with clusters, and it often meant pulling much bigger datasets than we needed and whittling down after reading the files. Using Pyspark to read and manipulate the files would also often take quite a long time, which negatively impacted our ETL process. As some of our tables began to grow by 30M+ rows a day, our ETL process began taking longer than we could afford. Surveying our infrastructure, we noticed that we could spend much less effort managing our data pipelines with our datalake in BigQuery as opposed to continuing to manage Spark instances facing ever-growing datasets with complex joins.
It’s important to note that BigQuery charges based on the size (in GB) of the data accessed. If the tables you’re querying are large and you’re running the same query multiple times a day, it could become expensive. On the other hand, if you lean on BigQuery for data wrangling you won’t have to pay for processing power and compute time the way you might have to when using Spark.
Our data infrastructure is almost entirely in Python, from ETL through to ML model development. In order to make the most of a Python + BigQuery infrastructure, we access BigQuery in three ways: Pandas, Pyspark, and the OS python package. For each of these methods, I’ll discuss the package we use, examples, the use cases for that method, and notes.
1. Pandas + BigQuery
This combination is often used to pre-aggregate data and pull in smaller datasets for tasks like reporting or visualizations. With a clearly defined in-line query, this may be the most intuitive and easy package to use, but is limited by data size, since the results need to be handled in Pandas.
import pandas as pd
from google.cloud import bigquerybqclient = bigquery.Client()query = (
WHEN count(distinct id) = 0 then 0
ELSE count(distinct CASE
WHEN atts.state = 'created' THEN id
END AS created_percent,
count(distinct id) AS num_attachments
FROM `datalake.dataset.attachments` AS atts
WHERE CAST(atts.created_at AS DATE) = '2020-07-07'
)result = bqclient.query(query).to_dataframe()
- Internal product alerting: we use this package in conjunction with Rollbar to set up specific alerting. For example, we pull the files imported the previous day as well as their ingestion states from BigQuery via Pandas and set a threshold which triggers a Rollbar alert. The above example is a simplified version of that script.
- Customer-specific reporting: in order to enable our customers with tailored reports to suit their needs, we leaned heavily on the Pandas/BigQuery package. Since customer-specific reports are often small datasets, this package works perfectly. It also easily allows for the query adjustments we need to be able to make fairly regularly. Additionally, leveraging Python allows us to connect these reports to user-facing aspects of application via service-to-service communication.
- We often use this package with its Google Cloud Storage counterpart, google-cloud-storage. It is used in our custom reporting workflow to write formatted reports to buckets, which allows the application to surface those to users.
- The format of the query allows the use of variables within the query string. We use Airflow + Google Dataproc to schedule jobs and maintain dependencies, and will often pass a date string from our Airflow instance into the query to limit results by that date.
2. Pyspark + BigQuery
The Spark / BigQuery connector comes in where the Pandas BigQuery package is limited by data size. This connector allows you to pull in BigQuery tables as a simple Spark read command. However, with the BigQuery / Spark connector you can’t write queries in your Python code. The way we’ve found around that is to create views in BigQuery that contain the necessary query, which Spark can then read.
The following example shows how we pull data from a view — you can also pull from a table, and wouldn’t need the
results = spark.read.format('bigquery') \
.option('table', 'datalake.dataset.table') \
.option('viewsEnabled', 'true') \
.load()## insert any data manipulations here ##results.write.format('bigquery') \
.option('table', 'datalake.dataset.output_table') \
Below is an example of a view the Spark connector might read from.
SELECT o.key AS org_key,
t.name AS tag
FROM (SELECT name,
FROM `datalake.dataset.tags`) t
GROUP BY 1,2) a
ON t.asset_id = a.id
attachment_key AS attachment_key
GROUP BY 1,2) att
ON a.id = att.asset_id
GROUP BY 1,2) o
ON t.organization_id = o.id
GROUP BY 1,2,3
- Feature extraction: the above example is a snippet from one of our feature extraction scripts. Moving from parquet to BigQuery for that job brought the processing time from half an hour to thirty seconds.
- Pulling extracted features to build ML models
- Writing large datasets to BigQuery: when a process’ results are quite large but are most beneficial when written to BigQuery (such as when the data needs to be accessed by our product), this connector comes in handy. Results from ML models may fall into this use case.
- This isn’t actually its own package, but requires a jar to work. You can find the jar information here.
- Reading from a view enables you to control queries similarly to the Pandas library, but you’ll need to supply the viewsEnabled option as above, and running a view incurs the same costs as if running as the same query.
- Depending on later manipulations, your cluster, and data size, you may want to cache the data after reading it — later manipulations without caching will requery BigQuery, incurring additional costs.
- This Spark / BigQuery connector actually works by writing to Google Cloud Storage first as parquet and then copies over to BigQuery, so it’s not technically writing directly from Spark to BigQuery.
3. OS + BigQuery
This is another method that can allow you to modify and insert queries within Python. The benefit of using the OS package is the data does not get pulled into memory. We use this method when the script in question needs to be dependent on other jobs but needs no Python data manipulation.
This example shows how to replace a BigQuery table with a query. We also use the OS package to write BigQuery tables from parquet, delete rows from BigQuery tables, or copy BigQuery tables.
query = """
a.id as asset_id,
FROM `datalake.dataset.events` e
inner join `datalake.dataset.assets` a
on a.asset_key = e.resource_key)
inner join `datalake.dataset.brandfolders` b
on a.brandfolder_id = b.id
WHERE action_name in ("viewed", "downloaded", "shared")
group by 1,2,3,4
bq query \
--destination_table project:dataset.asset_sessions \
'""" + query + """'
- ETL: in our ETL process we need to replace records that have been updated in the past 24 hours. We use the OS package to remove those outdated records, copy over the new, updated table, and to write to BigQuery from parquet.
- Feature extraction for ML models: the example above is taken from one of our feature extraction jobs. If we can perform all the data manipulations in BigQuery, we’ll use this method.
- We use Airflow + Dataproc to schedule jobs, so we’ll use the OS package when we want a job to be contingent on other jobs’ success, but don’t need to pull the data into the cluster
One of the risks of leaning on BigQuery is cost, as each query’s cost is determined by the size of the tables it’s accessing. It’s worth noting that in making that transition the cost of using BigQuery has not increased noticeably, but there are several ways to ameliorate those costs:
- Table partitioning: from BigQuery documentation “A partitioned table is a special table that is divided into segments, called partitions, that make it easier to manage and query your data. By dividing a large table into smaller partitions, you can improve query performance, and you can control costs by reducing the number of bytes read by a query.”
- Caching results: this is especially helpful when directly querying BigQuery from a production setting. BigQuery writes all query results to a table — this can either be an explicit assignment (destination table) or a temporary cached results table. Thus, native BigQuery caching can mitigate some costs, but applications can also leverage caching.
It’s also worth mentioning that we evaluated BigQuery’s Storage API about a year ago. Unfortunately, it was fairly immature at the time, and challenging to access with Spark. Furthermore it’s exposing the underlying data in BQ that we’d still need to wrangle, as opposed to leveraging BQ for wrangling as described.
Between Pandas, Pyspark, and the OS package, we’ve been able to seamlessly transition to using BigQuery as our primary datalake. We’ve benefited with quicker processing times, reliability boosts, and code simplification. With those benefits, I highly recommend dipping a toe into the world of Python + Bigquery.