Processing databricks Delta Lake data in Google Cloud Dataproc Serverless for Spark

Earlier this year, Google announced the General Availability (GA) release of Dataproc Serverless for Spark (Dataproc s8s), which allows you to run your Spark jobs on Dataproc without having to spin up and manage your own Spark cluster.

Recently I’ve been working in a project with a customer who wanted to offload to Dataproc s8s some jobs they were currently performing with databricks with the goal of reducing and simplifying their operations. On the other hand, the customer wanted to maintain their current Data Lake based on databricks Delta Lake.

Delta Lake is an open format storage layer developed by databricks and based on Apache Parquet which brings scalable, ACID transactions to Apache Spark and other big-data engines (Reference: delta.io, github).

Although the job migration process was quite straightforward, there were several topics that required extra attention:

  • Connectivity
  • Runtime versions
  • Additional libraries and dependencies

Connectivity

Spark driver and executor instances will have internal IP addresses. Therefore, if you plan to access any Google API from your Spark code you need to make sure that the subnet you are using has Private Google Access enabled. Additionally, if you need to reach the Internet (ie. to download a package or to access another Cloud storage bucket) make sure you have deployed Cloud NAT in your subnet or you have available an Internet Proxy.

Additionally, make sure the subnet has a firewall rule in place which allows all subnet ingress communications using all protocols on all ports.

More info here.

Runtime versions

Dataproc s8s currently supports Spark 3.2.1, Python 3.9, Java 11 and Scala 2.12. If your code uses previous versions you will need to check that it runs ok with the required runtime versions. See full details here.

Additional libraries and dependencies

By default, Dataproc s8s uses a container image that includes the default Spark, Java, Python and R packages associated with a runtime release version. In order to achieve the 60s boot time, the provided runtime for Dataproc s8s is very optimized in terms of supported Java libraries and python packages (see details here).

However, there are several mechanisms to include additional Java libraries or python packages:

a) Provisioning dependencies during the boot process

Dataproc s8s for Spark batches API supports several parameters to specify additional JAR files and archives.

For pyspark, you can use the following arguments in the gCloud CLI to specify the dependencies:

--jars=[JAR,…] Comma-separated list of jar files to be provided to the classpaths.

--py-files=[PY,…] Comma-separated list of Python scripts to be passed to the PySpark framework. Supported file types: .py, .egg and .zip.

--archives=[ARCHIVE,…] Archives to be extracted into the working directory. Supported file types: .jar, .tar, .tar.gz, .tgz, and .zip.

Additionally, you can use the spark runtime property spark.jars.packages to include a comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths.

To install the required delta lake libraries, you will need to specify the following property when running the job using the gCloud CLI:

--properties=”spark.jars.packages=io.delta:delta-core_2.12:1.1.0"

No additional Python packages will be required since the required python modules are already packaged in the io.delta:delta-core jar file.

b) Including dependencies in a custom container

The Dataproc s8s for Spark batches API allows you to use a custom container image instead of the default image. Typically, a custom container image adds Spark workload Java or Python dependencies not provided by the default container image.

For environments with restricted Internet access or when the number of required additional packages is high, this would be the recommended approach.

See full details here.

End2end example — Alternative 1: provisioning dependencies during boot process

Once we have reviewed the context and introduce some theory, let’s go with the hands on example!

We will review initially how to run this example provisioning dependencies during boot process. In the next section we will do the same using a custom container.

1-Google Cloud project and resources

For running the example we will need:

  • A GCP Project to host all the resources
  • A VPC network and a subnet to be used by Dataproc s8s
  • A firewall rule to allow the internal subnet communication required by Dataproc
  • Cloud NAT, required to reach the internet from the Dataproc computing resources
  • A service account with the minimum permissions required by Dataproc
  • A GCS bucket to host the Delta Lake files and additional staging objets required by Dataproc.

All of this setup can be deployed automatically using terraform. You can find the different terraform files and the process to follow at the end of this post.

2-Pyspark job

Once the resources are available, we can run the following pyspark job in Dataproc s8s:

The python code basically creates a dummy file with some data, save it in the GCS bucket using delta format and then read it.

3-Launching the Job

And finally, the job is launched using gCloud CLI:

Running the job with the default container

Once the job has finished, you will be able to access the job details in the Google Cloud Console (Dataproc -> Batches).

Job details in GCP console

Additionally, thanks to the use of the “labels=job=” parameter, you will be able to query the cost of the job in the GCP billing console (this data will appear after approximately 24 hrs.)

Billing details

End2end example — Alternative 2: Including dependencies in a custom container

Installing Python packages during cluster creation/runtime is not deterministic and depending on the package and dependencies this process could be very slow, impacting the workers and executors boot time.

However there are alternatives to achieve the same goal: using custom images or distributing packages Python env as an archive (ref).

In this example we will explore the process of creation of a custom container.

Google Cloud project and resources

In this example will leverage on the same Google Cloud project and resources depicted above.

Creating the custom container

The use of custom containers with Dataproc s8s is well explained in the official guide (link). We will update the provided example and include the required Delta Lake Java libraries and python packages.

This is achieved with the following code additions:

COPY delta-core_2.12–1.1.0.jar “${SPARK_EXTRA_JARS_DIR}”(…)RUN ${CONDA_HOME}/bin/pip install ‘delta-spark’

Below you can find the full Dockerfile:

Dockerfile

For building the container we can use Google Cloud Build:

Building the custom container

Launching the Job

The job is launched using gCloud CLI as seen before, but adding the container-image parameter:

Running the job with a custom container

And that’s all! We have the same example but this time running in a custom container with the dependencies included in the Dockerfile.

I hope these examples help you during the process of deploying jobs in Dataproc s8s.

Annex-Terraform files and deployment

Below you can find the terraform files and the process to execute them:

main.tf
variables.tf

To run the terraform code:

  • Copy the content of main.tf and variables.tfto the folder of your choice (the content is available at the end of this post).
  • Create a terraform.tfvars file and specify the required variables:
admins = [
"user:admin@xxxxxx.com"
]
billing_account_id = "1234-1234-1234"
root_node = "folders/12345678"
prefix = "myprefix"
  • Make sure you have the right authentication setup (application default credentials, or a service account key)
  • run terraform init and terraform apply

 by the author.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store