Exporting data from Snowflake to GCS using PySpark on Dataproc Serverless

Varunika Gupta
Google Cloud - Community
5 min readDec 28, 2022

Introduction

Snowflake is a cloud based data warehousing solution provided as a SaaS offering. It leverages the infrastructure provided by AWS, Google Cloud or Azure to meet its storage and computational requirements.

A typical procedure to export data from Snowflake to GCS would involve using the COPY INTO command to move data directly into GCS or by creating an external stage. Although this is the simplest way to get data into GCS, it is not a favourable / recommended approach if you need to export data on scheduled basis or if the data that needs to be exported is too large. When you have large amounts of data to be exported on a regular basis, Dataproc Serverless would come in handy.

This blog post explains how you can export data from Snowflake to GCS by using Dataproc Serverless.

Below are some use cases that will require exporting data from Snowflake to GCS:

  1. You want to migrate your DWH to BigQuery and want GCS to act like a staging layer.
  2. You want to archive your cold data to GCS on a defined schedule.
  3. You want to use your data for machine learning activities (BQML, Vertex AI etc. etc.)

How can Dataproc Serverless help?

Well, Dataproc Serverless gives you the capabilities of Hadoop & Spark with one worry less i.e. the overhead of managing the Dataproc cluster configuration and tuning. No matter what the size of your data is, Dataproc Serverless will run your workload on a managed compute infrastructure, autoscaling / downscaling resources as and when needed.
It also is cheaper compared to traditional Dataproc clusters where you would have to worry about stopping the clusters when they are idle and restarting them when needed, in order to save costs. In case of Dataproc Serverless, you’re being charged only for the time when a workload is being processed.

Enter the Dataproc Serverless Templates..

Dataproc templates provide you with ready to use code for most common use cases around Spark workloads both in JAVA and Python. These can be further customised for specific needs of the end users. These templates when run according to the instructions provided in the repo, will deploy Dataproc Serverless jobs to execute the processing of your data workloads.

In this post, we’ll talk about how to use the PySpark Snowflake to GCS template.

Prerequisites

For running these templates, we will need:

  • Snowflake account with object read/create privilege
  • GCP account with bucket creating privileges and Dataproc API enabled.
  • The Google Cloud SDK installed and authenticated.
  • Python 3.7+ installed.
  • Required JARS mentioned here
  • A VPC subnet with Private Google Access enabled. The default subnet is suitable, as long as Private Google Access was enabled. You can review all the Dataproc Serverless networking requirements here.
# Example updating default network to enable Private Google Access
gcloud compute networks subnets update default — region=us-central1 \
— enable-private-ip-google-access

Usage

  1. Create a GCS bucket to use as the staging location for Dataproc. This bucket will be used to store dependencies required to run our serverless cluster.
export STAGING_BUCKET=”dataproc-staging-bucket”
gsutil mb gs://$STAGING_BUCKET

2. Clone the Dataproc Templates repository and navigate to the Python. template’s directory

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

3. Configure the Dataproc Serverless job

To submit the job to Dataproc Serverless, we will use the provided bin/start.sh script. The script requires us to configure the Dataproc Serverless cluster using environment variables.

The mandatory configurations are:

  • GCP_PROJECT : The GCP project to run Dataproc Serverless on.
  • REGION : The region to run Dataproc Serverless on.
  • GCS_STAGING_LOCATION : A GCS location to where Dataproc will store staging assets. Should be within the bucket we created earlier.
  • SUBNET: The Subnet to run Dataproc Serverless in.
# Project ID to run the Dataproc Serverless Job
export GCP_PROJECT=<project_id># GCP region where the job should be submitted
export REGION=<region># The staging location for Dataproc
export GCS_STAGING_LOCATION=gs://$STAGING_BUCKET/staging
export SUBNET=<subnet>

In our case, the Snowflake To GCS needs the JARs mentioned in pre-requisites to be available in the classpath. You can store the JAR file on a bucket and we will add it using the JARSenvironment variable.

# Path to the Snowflake connector JAR file
export JARS=<comma-seperated-gcs-bucket-location-containing-jar-file>

4. Run the Dataproc Templates shell script, which will read the above variables, create a Python package, and submit the job to Dataproc Serverless.

bin/start.sh \
-- --template=SNOWFLAKETOGCS \
--snowflake.to.gcs.sf.url=<snowflake-account-url> \
--snowflake.to.gcs.sf.user=<snowflake-user> \
--snowflake.to.gcs.sf.password=<snowflake-user-password> \
--snowflake.to.gcs.sf.database=<snowflake-database> \
--snowflake.to.gcs.sf.schema=<snowflake-schema> \
--snowflake.to.gcs.sf.warehouse=<snowflake-warehouse> \
--snowflake.to.gcs.sf.query=<snowflake-select-query> \
--snowflake.to.gcs.output.location="gs://bucket" \
--snowflake.to.gcs.output.format=<csv|avro|orc|json|parquet> \
--snowflake.to.gcs.output.mode=<Overwrite|ErrorIfExists|Append|Ignore> \
--snowflake.to.gcs.partition.column=<gcs-output-partitionby-columnname> \
--snowflake.gcs.sf.autopushdown=<on|off>

Note:

  • Ensure that the “python” executable is in your PATH
  • Submitting the job will ask you to enable the Dataproc API, if not enabled already.
  • Mandatory Parameters: snowflake.to.gcs.sf.url, snowflake.to.gcs.sf.user, snowflake.to.gcs.sf.password, snowflake.to.gcs.sf.database, snowflake.to.gcs.output.location

5. You may have noticed that some parameters need to be filled in when submitting the template. These parameters are described in the template documentation.

6. Monitor the Spark batch job

After submitting the job, we will be able to see in the Dataproc Batches UI. From there, we can view both metrics and logs for the job.

Scheduled Execution

Instead of submitting the job via the start.sh script, you can also choose to set up a scheduled execution of the job. This setup is useful when you want to periodically move from Snowflake to GCS as new data comes in during the day. You can use Cloud Scheduler for scheduled execution of the Dataproc templates. Cloud scheduler is a GCP service that offers functionality of a cron job scheduler.

Since snowflake is an OLAP database, CDC changes would mainly include batch updates. Snowflake tables have an additional timestamp column that maintains the last update timestamp. You would need modify the query parameter snowflake.to.gcs.sf.query value to include timestamp filter, so that your scheduled job reads most recent data for an incremental load.

Keywords

Autopushdown

  • Feature provided by Spark-Snowflake connector that introduces advanced optimization capabilities for better performance by allowing large and complex Spark logical plans to be translated and pushed down to Snowflake, instead of being processed in spark. This means, Snowflake would do most of the heavy lifting, by leveraging its performance efficiencies.

References

--

--