Dataproc Serverless PySpark Template for Ingesting Compressed Text files To Bigquery

Shradha Tyagi
Google Cloud - Community
5 min readJul 7, 2022

Dataproc Serverless allows users to run Spark workloads without the need to provision or manage clusters. Hence, the Data Engineers can now concentrate on building their pipeline rather than worrying about the cluster infrastructure .

Dataproc Templates is an initiative to further simplify the work of data engineers on Dataproc Serverless. These templates come with a ready-made set of features which implements basic use cases and can be further customized as per need. Templates are available in both Java and Python. In this post, we will be focusing on how to use “Text To BigQuery PySpark template” for ingesting compressed data in GZIP format to BigQuery.

Dataproc Serverless — “Text to BigQuery” Template

This template is used for reading text files from Google Cloud Storage and writing them to a BigQuery table. It also supports compressed text files in BZIP2, GZIP, LZ4, DEFLATE format, as input. ( if the text file is uncompressed, use NONE as the compression format). This template uses Spark BigQuery Connector for writing into BigQuery.

Prerequisites

For running these templates, we will need:

  • The Google Cloud SDK installed and authenticated.
  • Python 3.7+ installed.
  • A VPC subnet with Private Google Access enabled. The default subnet is suitable, as long as Private Google Access was enabled. You can review all the Dataproc Serverless networking requirements here.

Configuration Arguments

This template includes the following arguments to configure the execution:

  • text.bigquery.input.location: the GCS location for the input files. Supports wildcards. Example:gs://BUCKET/*.gz
  • text.bigquery.input.compression: the input file compression format. One of bzip2, gzip,lz4,deflate or none.
  • text.bigquery.input.delimiter : the input file text delimiter. Example: "/", "|", ","etc.
  • text.bigquery.output.dataset :the BigQuery dataset for the output table.
  • text.bigquery.output.table : the BigQuery output table.
  • text.bigquery.output.mode : the Spark output save mode. One of: append, overwrite, ignore, errorifexists. Defaults to append. You can read about how each save mode behaves here.
  • text.bigquery.temp.bucket.name : Temporary bucket for the Spark BigQuery connector. The connector will write the output data to this GCS bucket before loading it into BigQuery. This method is called indirect write and you can learn more about it here.

Run the Template

  1. Create a GCS bucket

A GCS bucket is required as the staging location for Dataproc. Dataproc will use this bucket to store dependencies required to run our serverless cluster.

export STAGING_BUCKET=”my-staging-bucket”
gsutil mb gs://$STAGING_BUCKET

2. Clone the Dataproc Templates repository

When successfully cloned, navigate to the Python templates’ directory

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

3. Configure the Dataproc Serverless job

To submit the job to Dataproc Serverless, we will use the provided bin/start.sh script. The script requires us to configure the Dataproc Serverless cluster using environment variables.

The mandatory configurations are:

  • GCP_PROJECT: The GCP project to run Dataproc Serverless on.
  • REGION : The region to run Dataproc Serverless on.
  • GCS_STAGING_LOCATION : A GCS location to where Dataproc will store staging assets. Should be within the bucket we created earlier.
# Project ID to run the Dataproc Serverless Job
export GCP_PROJECT=<project_id>
# GCP region where the job should be submitted
export REGION=<region>
# The staging location for Dataproc
export GCS_STAGING_LOCATION=gs://$STAGING_BUCKET/staging

In our case, the Text To BigQuery template needs the Spark BigQuery Connector to be available in the classpath. The connector is publicly hosted, so we will add it using the JARS environment variable. You can also choose to store the JAR file on a bucket you own.

# Path to the Spark BigQuery Connector JAR file
export JARS=”gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar”

4. Execute the Text To BigQuery Dataproc template

After configuring the job, we are ready to trigger it. We will run the bin/start.sh script, specifying the template we want to run and the argument values for the execution.

./bin/start.sh \
-- --template=TEXTTOBIGQUERY \
--text.bigquery.input.compression=<gzip|bzip4|lz4|deflate|none> \
—-text.bigquery.input.delimiter=<delimiter> \
--text.bigquery.input.location=<gs://bucket/path> \
--text.bigquery.output.dataset=<dataset> \
--text.bigquery.output.table=<table> \
--text.bigquery.output.mode=<append|overwrite|ignore|errorifexists> \
--text.bigquery.temp.bucket.name=”<temp-bq-bucket-name>”

NOTE: Submitting the job will ask you to enable the Dataproc API, if not enabled already.

5. Monitor the Spark batch job

Once the job is submitted, we can monitor our job in Dataproc Batches UI. Search for the batch id to get metrics and logs in detail.

Advanced Job Configuration Options

When configuring our Dataproc Serverless job, we can also specify the following advanced configuration options:

  • SUBNET : The VPC subnet to run Dataproc Serverless on, if not using the default subnet (format: projects/<project_id>/regions/<region>/subnetworks/<subnetwork>).
  • JARS : comma-separated JAR files that should be added to the Spark class path. We have used it in this guide to provide our job with the Spark BigQuery connector.
  • FILES : comma-separated files to be placed in the Spark working directory.
  • PY_FILES : comma-separated Python scripts or packages to be added to PySpark. Supports .py , .zip and .egg formats.
  • HISTORY_SERVER_CLUSER : An existing Dataproc cluster to act as a Spark History Server (format: projects/<project_id>/regions/<region>/clusters/<cluster_name>).
  • METASTORE_SERVICE : Name of a Dataproc Metastore service to be used as an external metastore (format: projects/<project_id>/locations/<region>/services/<service_name>).

Spark Configuration Options

When submitting Spark batch workload, we can also set Spark properties to be used by Dataproc Serverless for a particular batch. Which allows you to determine the compute, memory, and disk resources to allocate your workload.

In the following example, we specify minimum and maximum executors to be used as 100 and 400, respectively.

./bin/start.sh \
--properties=spark.dynamicAllocation.minExecutors=100, \
spark.dynamicAllocation.maxExecutors=400 \
-- --template=TEXTTOBIGQUERY \
--text.bigquery.input.compression=<gzip|bzip4|lz4|deflate|none> \
—-text.bigquery.input.delimiter=<delimiter> \
--text.bigquery.input.location=<gs://bucket/path> \
--text.bigquery.output.dataset=<dataset> \
--text.bigquery.output.table=<table> \
--text.bigquery.output.mode=<append|overwrite|ignore|errorifexists> \
--text.bigquery.temp.bucket.name=”<temp-bq-bucket-name>”

With the above resource allocation, it took ~43 minutes for ~6TB load in BigQuery. With minimum 10 and maximum 100 executors, it took 1 hour 20 minutes for the same load.

Summary

The “Text To Bigquery Template” provides a starting point to ingest data from compressed/uncompressed text files with any delimiter directly to BigQuery. Further it allows the following advantages:

  • Can be used as is for direct ingestion of compressed text files from GCS to BigQuery. No need to uncompress the data first.
  • Can be customised for any processing or transformation requirement.
  • Allows splitting data on custom delimiters.

Note: If you are looking for data ingestion from avro, parquet, csv or json files, use “GCS To Bigquery Template”.

References:

--

--