Apache Spark BigQuery Connector — Optimization tips & example Jupyter Notebooks

Learn how to use the BigQuery Storage API with Apache Spark on Cloud Dataproc

Tahir Fayyaz
Google Cloud - Community
8 min readMay 21, 2020

--

BigQuery storage to BigQuery compute

Google BigQuery is Google Cloud’s fully managed data warehouse and just turned 10 years old (Happy Birthday BigQuery!!!). One of its key features is that it separates compute and storage and recently this led to the development of the BigQuery Storage API which allows you to read data at scale from other platforms, like Apache Spark, where the data will be processed without the need to first export the data to Google Cloud Storage as an intermediate step.

BigQuery storage API connecting to Apache Spark, Apache Beam, Presto, TensorFlow and Pandas

Some examples of this integration with other platforms are Apache Spark (which will be be the focus of this post), Presto, Apache Beam, Tensorflow, and Pandas.

Apache Spark can read multiple streams of data from the BigQuery Storage API in parallel

The BigQuery Storage API allows you reads data in parallel which makes it a perfect fit for a parallel processing platform like Apache Spark.

Using the Apache Spark BigQuery connector, which is built on top of the BigQuery Storage API and BigQuery API, you can now treat BigQuery as another source to read and write data from Apache Spark.

Apache and Spark and JupyterLab

How Cloud Dataproc, Apache Spark, Apache Spark BigQuery Connector and Jupyter notebooks connect

Jupyter notebooks are a great way to get started with learning how to use the Apache Spark BigQuery connector.

You can read this post on using Apache Spark with Jupyter Notebooks on Cloud Dataproc to get set-up and then read on for tips on how to use the connector, how to optimize your jobs and view the example Jupyter notebooks now available on GitHub.

Set-up the Apache Spark BigQuery Storage connector

Once you have your notebook running you just need to include the Apache Spark BigQuery Storage connector package when you create a Spark session.

from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Optimize BigQuery Storage') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()

Based on what version of Scala you are running will need to change the artifact name to one of the following:

  • Scala 2.11: com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
  • Scala 2.12: com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

You can view the release notes of the connector to check what is the latest version of the package.

Reading BigQuery Data

Followed by this you simply need to set the read format as "bigquery" and you are ready to load the BigQuery table into your Spark job. These examples will make use of the Wikipedia pageviews public dataset created by Marc Cohen.

table = "bigquery-public-data.wikipedia.pageviews_2019"df = spark.read \
.format("bigquery") \
.option("table", table) \
.load()

However as the Wikipedia pageviews table is very large at 2TB you should filter the data you actually need for your Apache Spark job.

The data will not actually be read now as .load() does not trigger a Spark job. The data will be read when an action is called which is the .show() method we use later.

Filtering data to optimize Apache Spark jobs

To optimize the performance of your Apache Spark jobs when using the Apache Spark BigQuery Storage connector here are some steps to show you how to only read the data required for the job.

BigQuery partition filtering

The API rebalances records between readers until they all complete

BigQuery tables can be partitioned by date or integers in a similar fashion to Hive, Parquet and ORC. The wikipedia public dataset uses date partitions and so you can set the filter option to only read 7 days of data instead of all 365 days.

table = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08'") \
.load()

BigQuery columnar storage filtering

Only the columns required are read by Apache Spark

BigQuery uses columnar storage similar to Apache Parquet and ORC. Therefore you can read just the columns you need for your Spark job by selecting certain columns with the filter option.

table = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views")
df_wiki_en.printSchema()

BigQuery rows filtering

The BigQuery Storage API supports predicate push-down of filters which means that if you set a filter in the where statement later on in your Apache Spark job it will attempt to push the filter to BigQuery. Therefore the two jobs below would yield the same result

Using filter and where

table = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en.show()

Using only filter

table = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08' AND views > 10 AND wiki in ('en', 'en.m')") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views")
df_wiki_en.show()

View job performance in the Spark UI

When you run your job using data from BigQuery you will want to look at the Spark job performance. Take this aggregation job for example:

import pyspark.sql.functions as Ftable = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False).show()

Using the Component Gateway feature you can easily access the Spark UI to see how the job performed.

This stage of the job here shows that there were 188 tasks created to read data in parallel from BigQuery Storage.

Setting Max Parallelism

As we can see, some of these tasks are empty as shown in the summary metrics for the 188 completed tasks. We can reduce the number of tasks reading from the BigQuery Storage by setting the maxParallelism property in the read API to match the cluster size and settings. For most use cases the default setting of attempting to read one partition per 400MB should be adequate.

If you have a cluster with a total of 8 executor cores you can consider setting the property as maxParallelism property as:

  • 8 total executor cores * 7 partition days = 56
table = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("maxParallelism", 56) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08'") \
.load()

You should test this with your data size, BigQuery format (partitioned vs non-partitioned), filter options and cluster configuration.

Currently the maximum parallelism allowed by the BigQuery storage API is 1000. If you set the maxParallelism property value to greater than 1000 you will still only have 1000 tasks reading from 1000 multiple streams within a session.

Cache data in memory

There might be scenarios where you want the data in memory instead of reading from BigQuery Storage every time to improve performance .

The job above to find the total page views per title will read the data from BigQuery and push the filter to BigQuery. The aggregation will then be computed in Apache Spark.

You can modify the job to include a cache of the filtered data from the table and further filter the data on the wiki column which will be applied in memory by Apache Spark.

import pyspark.sql.functions as Ftable = "bigquery-public-data.wikipedia.pageviews_2019"df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2019-01-01' AND datehour < '2019-01-08'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
df_wiki_all.cache()df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False).show()

If you view the job details you will see that the table was cached as part of the job.

You can then filter for another wiki language using the cache instead of reading data from BigQuery storage again.

df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False).show()

The difference in this next job is that the table is now being read from memory and therefore runs much faster.

You can remove the cache by running

df_wiki_all.unpersist()

Examples Jupyter Notebooks

BigQuery Storage & Spark DataFrames — Python Jupyter notebook

Example Jupyter notebooks now available on the official Google Cloud Dataproc Github repo on how the Apache Spark BigQuery Storage connector works with Spark DataFrames, Spark SQL and Spark MLlib to read and write data.

Note: These notebooks are designed to work with the Python 3 kernel (not PySpark kernel) as this allows you create your Spark session and include the Apache Spark BigQuery connector

These notebooks make use of spark.sql.repl.eagerEval to output the results of DataFrames in each step without the need to use df.show() and also improves the formatting of the output.

What’s next

--

--

Tahir Fayyaz
Google Cloud - Community

Google Cloud Developer Advocate — Data Lake Lead. Writing about Apache Spark, Jupyter, Cloud Dataproc, Cloud Composer and BigQuery Storage.