Using Spark on Dataproc & Apache Iceberg for an Open Lakehouse

Jerome Rajan
Google Cloud - Community
9 min readDec 6, 2023

--

In this article, our primary focus is using Spark on Dataproc in GCP for reading and writing from a Lakehouse.

So, if you want to warmup to the idea of a Lakehouse on GCP, I’d strongly recommend you to read the below paper and then continue further.

BUILDING A DATA LAKEHOUSE

While the article speaks heavily about using BigQuery & BigLake as the Lakehouse platform, you will notice that Dataproc (Spark) is an integral component of data ingestion, processing as well as consumption pipelines.

This article will focus on the Spark aspect of the architecture and also how it enables the use of Apache Iceberg among other open data formats to use as the format of choice in your Lakehouse.

Now, let’s begin by understanding some of the fundamental building blocks of a modern open Lakehouse.

Lakehouse?

A Lakehouse is the fusion of a Data Lake and a Data Warehouse, creating a simplified and unified platform that minimizes data hops while emphasizing efficiency and governance.

Not crucial for this article, but if you have the time, consider reading THIS WHITEPAPER.

The key to making the Lakehouse pattern effective lies in the ability to interact with objects in the same manner as with a traditional transactional store. This includes performing ACID-compliant transactions, facilitating graceful schema evolution, and, most importantly, executing CRUD operations on your data with the capability to rollback and restore. In a data lake, these actions were either impossible or required cumbersome tooling with potential side effects such as data loss risks or bloat.

This capability, mentioned in the paragraph above, is realized through several open data formats, namely Apache Iceberg, Apache Hudi, and Delta. While each has distinct feature sets, their fundamental purpose is rooted in the same set of principles. Though it’s challenging to find an unbiased comparison, I found this article to do a decent job.

Iceberg

For the purpose of this article, we’ll focus on Apache Iceberg. But it should be safe to assume that the basic principles of Iceberg would apply across other open formats like Hudi and Delta as well.

So what is Apache Iceberg?

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table.

— Official Apache Iceberg Docs

Metadata Management

While each framework has its pros and cons, the cornerstone of these frameworks that ultimately powers a modern Lakehouse is the Metadata Layer. In the case of Iceberg, the metadata layer is double-sided. On one hand, you have the metastore or the Iceberg catalog that holds pointers to the actual table metadata. On the other side, the table metadata itself contains all the details about the data files that constitute the table and other information required by the Iceberg APIs to execute the necessary tasks.

https://www.dremio.com/blog/a-hands-on-look-at-the-structure-of-an-apache-iceberg-table/
This is a great talk by one of the co-creators of the Apache Iceberg project that speaks at length about the design principles and co-existence with the Hive metastore.

This approach abstracts metadata management and centralizes it into a layer that can be shared across multiple frameworks and, more significantly, be managed independently without impacting the actual data. One of the most omnipresent metastores out there is the ever-so-popular Hive Metastore. Of course, there are other implementations of this layer, but our focus here is predominantly on the open stack, hence the Hive Metastore!

What is the Hive Metastore?

The Hive Metastore functions as a centralized repository of metadata for Hive tables, storing crucial information about their structure, location, and partitioning. It plays a pivotal role in Hive’s table management and provides users with essential information.

Interestingly, despite its name associating it with the “Hive” engine, the Metastore serves a broader purpose. It isn’t limited to the Hive engine but also extends its utility to Spark, Trino, Impala, and others, acting as their central metastore when needed.

https://blog.jetbrains.com/big-data-tools/2022/07/01/why-we-need-hive-metastore/

The Metastore simplifies table management and information retrieval for users, serving as a crucial component in big data systems. Its role in centralized metadata management, support for multi-tenancy, and efficient schema evolution makes it integral to various features.

As peculiar as it may sound, you don’t necessarily need Hive to utilize a Hive Metastore. For our discussion, the spotlight will be on how Spark leverages the Hive Metastore to facilitate reads and writes to Iceberg.

SparkSession

In Apache Spark, the SparkSession is responsible for interacting with a Hive Metastore when needed.

The official documentation very aptly defines the SparkSession as —

The entry point into all functionality in Spark is the SparkSession class

So when we say Spark speaks with the Hive metastore, it is actually the SparkSession object that is doing all this work for you. And that is why it is important to create this session object correctly to use the metastore catalog and other extensions.

Dataproc

Dataproc is a managed Hadoop and Spark service that makes it easy to process large datasets on the Google Cloud Platform. It provides a fully configured Hadoop and Spark environment that can be used to run big data jobs, including batch processing, interactive queries, and machine learning.

As we mentioned earlier, a crucial aspect is having a catalog capable of high-level table management for Iceberg tables. To expedite this setup with a fully managed and resilient Hive Metastore, we’ll leverage the Dataproc Metastore Service.

Head over to https://console.cloud.google.com/dataproc/metastore/services and set up the DPMS using Dataproc Metastore 2 that gives you Horizontal Scalability, HA, and Active-Active DR. Not that you need it for this learning exercise but is a good to know when you hit production!

We’ll use the thrift protocol for this demo.

You should get something like this along with a thrift URI

Using this DPMS, let’s create our Dataproc on GCE cluster where we will run our Spark jobs that will read and ingest into our Iceberg-based Lakehouse.

In the below gcloud command, replace demo-project with your project name and the metastore with the one you created in the previous step.

gcloud dataproc clusters create iceberg-spark \
--enable-component-gateway --region us-central1 --no-address \
--master-machine-type n2d-standard-4 --master-boot-disk-type pd-ssd --master-boot-disk-size 500 \
--num-workers 2 --worker-machine-type n2d-standard-4 --worker-boot-disk-type pd-ssd --worker-boot-disk-size 500 --num-worker-local-ssds 4 \
--image-version 2.1-debian11 \
--optional-components JUPYTER \
--dataproc-metastore projects/demo-project/locations/us-central1/services/demo-dpmsv2 \
--scopes 'https://www.googleapis.com/auth/cloud-platform' \
--project demo-project

You now have a Yarn cluster on Dataproc with a fully managed Hive Metastore

Spark!

First, let’s get into the spark-sql and see what it takes to work with Iceberg there.

spark-sql \
--jars gs://demo-project/binaries/iceberg-spark-runtime-3.3_2.12-1.4.2.jar \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.spark_catalog.uri=thrift://10.38.64.26:9083 \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=gs://demo-project/spark-warehouse

There are a few important configurations here. Let’s discuss the significance of each one by one.

1: Download the Iceberg jar

For this example, I’m using a Dataproc 2.1 image which is bundled with Spark 3.3 and Scala 2.12. Head to https://iceberg.apache.org/releases/ and download the jar appropriate for your Dataproc image. In our case, we’ll use THIS JAR. If you run into ClassNotFound exceptions when running SQL queries, it is very likely that you are using the wrong jar.

I downloaded the jar and pushed it to a GCS bucket which I’ve referenced in the --jars flag. The other option is to use --packages and pass org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.14.2 as the value. I personally prefer --jars since the binary is within my environment and tends to be quicker as well.

2: Add Iceberg Support

Here, we configure the native Spark catalog called spark_catalog to use Iceberg’s SparkSessionCatalog. org.apache.iceberg.spark.SparkSessionCatalog adds support for Iceberg to Spark’s native catalog. So it becomes a Spark catalog that can also load non-Iceberg tables.

Setting the type to hive tells Spark to use the Hive metastore as the catalog.

--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.spark_catalog.uri=thrift://10.38.64.26:9083

Dataproc should have automatically updated the hive-site.xml with our DPMS uri at creation time. So unless you want to override that, you can safely omit spark.sql.catalog.spark_catalog.uri from the configs.

For e.g., in my cluster, when I fire the below command —

grep -A1 '<name>hive.metastore.uris<\/name>' /etc/hive/conf/hive-site.xml | grep '<value>' | awk -F'[><]' '/<value>/{print $3}'

I get thrift://10.38.64.26:9083 which is nothing but the thrift uri of my DPMS instance.

3: Add a HadoopCatalog to define your Spark warehouse

--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=gs://demo-project/spark-warehouse

Notice how we use org.apache.iceberg.spark.SparkCatalog here. The trick is that this catalog is used to load only Iceberg tables.

Let’s see this behavior in action

Here, I use the spark_catalog which uses the SparkSessionCatalog and therefore supports loading both Iceberg and non-Iceberg tables. If you were to create a table without specifying the USING ICEBERG syntax, the engine would create a normal Hive table as can be seen in the below screenshot

Now, if I switch over to the local namespace which uses the SparkCatalog , you’ll notice how re-running the same CREATE TABLE command now gives me an Iceberg table.

4: Float Away In Your Lakehouse!

The syntax for creating an Iceberg table is as simple as appending the USING ICEBERG keywords to the CREATE TABLE statement. There are of course other technicalities that are specific to using a format like ICEBERG but those are outside the scope of this article.

Dataproc Serverless

As a bonus tip, let me guide you on supercharging your Spark pipelines with Serverless Dataproc. The cake remains the same, the icing is the same, and the sprinkles are the same. The only difference is, now you don’t need to worry about baking the cake!

You’d still need to create the DPMS which is a completely independent and decoupled serverless Hive Metastore.

Once that’s ready, just whip up a gcloud dataproc batches submit command and that’s it!

Here’s a sample command that passes the same runtime jars and Spark configs all bundled into one. The demo.sql is a simple SELECT

SELECT * FROM local.db.my_table2;
gcloud dataproc batches submit \
--project demo-project \
--region us-central1 spark-sql \
--batch iceberg-demo gs://demo-project/code/demo.sql \
--version 1.1 \
--jars gs://demo-project/binaries/iceberg-spark-runtime-3.3_2.12-1.4.2.jar \
--subnet default \
--metastore-service projects/demo-project/locations/us-central1/services/demo-dpmsv2 \
--properties spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog,spark.sql.catalog.spark_catalog.type=hive,spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.local.type=hadoop,spark.sql.catalog.local.warehouse=gs://demo-project/spark-warehouse

Output:

Summary

In the above sections, we’ve seen in detail how to setup and configure a Managed Dataproc cluster, a Dataproc Metastore and use spark-sql to interact with your Lakehouse using Apache Iceberg. We even saw how to use Dataproc Serverless to achieve the same results only with zero infra overhead. Hope you found this useful. Drop a note if you liked the article and would like me to cover additional concepts.

--

--