Getting started with Dataproc Serverless PySpark templates

Pablo Paglilla
4 min readJun 20, 2022

--

Dataproc Serverless is one of the newest additions to the Google Cloud Dataproc platform and it allows users to run Spark workloads without the need to provision or manage clusters. After specifying the Spark workload parameters and submitting the job to the service, Dataproc Serverless will manage all the infrastructure required behind the scenes. As data developers; this allows us to focus on business logic, instead of spending time managing infrastructure.

Dataproc Templates go a step further and allow us to run common use cases on Dataproc Serverless without the need to develop them ourselves. These templates implement common Spark workloads, letting us customize and run them easily.

Templates are provided in both Java and Python. In this post, we will be focusing on how to use PySpark templates. More specifically, we will be focusing on the GCS To BigQuery template.

Prerequisites

For running these templates, we will need:

  • The Google Cloud SDK installed and authenticated.
  • Python 3.7+ installed.
  • A VPC subnet with Private Google Access enabled. The default subnet is suitable, as long as Private Google Access was enabled. You can review all the Dataproc Serverless networking requirements here.

The GCS to BigQuery template

The template we will be using implements reading files from Google Cloud Storage and writing them to a BigQuery table. It supports input files in JSON, CSV, Parquet and Avro format. For writing into BigQuery, it uses the Spark BigQuery Connector.

Configuration Arguments

This template includes the following arguments to configure the execution:

  • gcs.bigquery.input.location: the GCS location for the input files. Supports wildcards. Example: gs://BUCKET/*.parquet).
  • gcs.bigquery.input.format: the input file format. One of avro, parquet, csv or json.
  • gcs.bigquery.output.dataset: the BigQuery dataset for the output table.
  • gcs.bigquery.output.table: the BigQuery output table.
  • gcs.bigquery.output.mode: the Spark output save mode. One of: append, overwrite, ignore, errorifexists. Defaults to append. You can read about how each save mode behaves here.
  • gcs.bigquery.temp.bucket.name: Temporary bucket for the Spark BigQuery connector. The connector will write the output data to this GCS bucket before loading it into BigQuery. This method is called indirect write and you can learn more about it here.

Usage

  1. Create a GCS bucket to use as the staging location for Dataproc. Dataproc will use this bucket to store dependencies required to run our serverless cluster.
export STAGING_BUCKET=”my-staging-bucket”
gsutil mb gs://$STAGING_BUCKET

2. Clone the Dataproc Templates repository and navigate to the Python templates’ directory

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

3. Configure the Dataproc Serverless job

To submit the job to Dataproc Serverless, we will use the provided bin/start.sh script. The script requires us to configure the Dataproc Serverless cluster using environment variables.

The mandatory configuration are:

  • GCP_PROJECT: The GCP project to run Dataproc Serverless on.
  • REGION : The region to run Dataproc Serverless on.
  • GCS_STAGING_LOCATION : A GCS location to where Dataproc will store staging assets. Should be within the bucket we created earlier.
# Project ID to run the Dataproc Serverless Job
export GCP_PROJECT=<project_id>
# GCP region where the job should be submitted
export REGION=<region>
# The staging location for Dataproc
export GCS_STAGING_LOCATION=gs://$STAGING_BUCKET/staging

In our case, the GCS To BigQuery needs the the Spark BigQuery Connector to be available in the classpath. The connector is publicly hosted, so we will add it using the JARS environment variable. You can also choose to store the JAR file on a bucket you own.

# Path to the Spark BigQuery Connector JAR file
export JARS=”gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar”

4. Execute the GCS To BigQuery Dataproc template

After configuring the job, we are ready to trigger it. We will run the bin/start.sh script, specifying the template we want to run and the argument values for the execution.

./bin/start.sh \
-- --template=GCSTOBIGQUERY \
—-gcs.bigquery.input.format=”<json|csv|parquet|avro>” \
—-gcs.bigquery.input.location=”<gs://bucket/path>” \
--gcs.bigquery.output.dataset=”<dataset>” \
--gcs.bigquery.output.table=”<table>” \
--gcs.bigquery.output.mode=”<append|overwrite|ignore|errorifexists>” \
--gcs.bigquery.temp.bucket.name=”<temp-bq-bucket-name>”

NOTE: Submitting the job will ask you to enable the Dataproc API, if not enabled already.

5. Monitor the Spark batch job

After submitting the job, we will be able to see in the Dataproc Batches UI. From there, we can view both metrics and logs for the job.

Advanced job configuration options

When configuring our Dataproc Serverless job, we can also specify the following advanced configuration options:

  • SUBNET : The VPC subnet to run Dataproc Serverless on, if not using the default subnet (format: projects/<project_id>/regions/<region>/subnetworks/<subnetwork>).
  • JARS : comma-separated JAR files that should be added to the Spark class path. We have used it in this guide to provide our job with the Spark BigQuery connector.
  • FILES : comma-separated files to be placed in the Spark working directory.
  • PY_FILES : comma-separated Python scripts or packages to be added to PySpark. Supports .py , .zip and .egg formats.
  • HISTORY_SERVER_CLUSER : An existing Dataproc cluster to act as a Spark History Server (format: projects/<project_id>/regions/<region>/clusters/<cluster_name>).
  • METASTORE_SERVICE : Name of a Dataproc Metastore service to be used as an external metastore (format: projects/<project_id>/locations/<region>/services/<service_name>).

References

--

--