Using Spark on Dataproc & Apache Iceberg for an Open Lakehouse
In this article, our primary focus is using Spark on Dataproc in GCP for reading and writing from a Lakehouse.
So, if you want to warmup to the idea of a Lakehouse on GCP, I’d strongly recommend you to read the below paper and then continue further.
While the article speaks heavily about using BigQuery & BigLake as the Lakehouse platform, you will notice that Dataproc (Spark) is an integral component of data ingestion, processing as well as consumption pipelines.
This article will focus on the Spark aspect of the architecture and also how it enables the use of Apache Iceberg among other open data formats to use as the format of choice in your Lakehouse.
Now, let’s begin by understanding some of the fundamental building blocks of a modern open Lakehouse.
Lakehouse?
A Lakehouse is the fusion of a Data Lake and a Data Warehouse, creating a simplified and unified platform that minimizes data hops while emphasizing efficiency and governance.
Not crucial for this article, but if you have the time, consider reading THIS WHITEPAPER.
The key to making the Lakehouse pattern effective lies in the ability to interact with objects in the same manner as with a traditional transactional store. This includes performing ACID-compliant transactions, facilitating graceful schema evolution, and, most importantly, executing CRUD operations on your data with the capability to rollback and restore. In a data lake, these actions were either impossible or required cumbersome tooling with potential side effects such as data loss risks or bloat.
This capability, mentioned in the paragraph above, is realized through several open data formats, namely Apache Iceberg, Apache Hudi, and Delta. While each has distinct feature sets, their fundamental purpose is rooted in the same set of principles. Though it’s challenging to find an unbiased comparison, I found this article to do a decent job.
Iceberg
For the purpose of this article, we’ll focus on Apache Iceberg. But it should be safe to assume that the basic principles of Iceberg would apply across other open formats like Hudi and Delta as well.
So what is Apache Iceberg?
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table.
— Official Apache Iceberg Docs
Metadata Management
While each framework has its pros and cons, the cornerstone of these frameworks that ultimately powers a modern Lakehouse is the Metadata Layer. In the case of Iceberg, the metadata layer is double-sided. On one hand, you have the metastore or the Iceberg catalog that holds pointers to the actual table metadata. On the other side, the table metadata itself contains all the details about the data files that constitute the table and other information required by the Iceberg APIs to execute the necessary tasks.
This approach abstracts metadata management and centralizes it into a layer that can be shared across multiple frameworks and, more significantly, be managed independently without impacting the actual data. One of the most omnipresent metastores out there is the ever-so-popular Hive Metastore. Of course, there are other implementations of this layer, but our focus here is predominantly on the open stack, hence the Hive Metastore!
What is the Hive Metastore?
The Hive Metastore functions as a centralized repository of metadata for Hive tables, storing crucial information about their structure, location, and partitioning. It plays a pivotal role in Hive’s table management and provides users with essential information.
Interestingly, despite its name associating it with the “Hive” engine, the Metastore serves a broader purpose. It isn’t limited to the Hive engine but also extends its utility to Spark, Trino, Impala, and others, acting as their central metastore when needed.
The Metastore simplifies table management and information retrieval for users, serving as a crucial component in big data systems. Its role in centralized metadata management, support for multi-tenancy, and efficient schema evolution makes it integral to various features.
As peculiar as it may sound, you don’t necessarily need Hive to utilize a Hive Metastore. For our discussion, the spotlight will be on how Spark leverages the Hive Metastore to facilitate reads and writes to Iceberg.
SparkSession
In Apache Spark, the SparkSession is responsible for interacting with a Hive Metastore when needed.
The official documentation very aptly defines the SparkSession as —
The entry point into all functionality in Spark is the
SparkSession
class
So when we say Spark speaks with the Hive metastore, it is actually the SparkSession object that is doing all this work for you. And that is why it is important to create this session object correctly to use the metastore catalog and other extensions.
Dataproc
Dataproc is a managed Hadoop and Spark service that makes it easy to process large datasets on the Google Cloud Platform. It provides a fully configured Hadoop and Spark environment that can be used to run big data jobs, including batch processing, interactive queries, and machine learning.
As we mentioned earlier, a crucial aspect is having a catalog capable of high-level table management for Iceberg tables. To expedite this setup with a fully managed and resilient Hive Metastore, we’ll leverage the Dataproc Metastore Service.
Head over to https://console.cloud.google.com/dataproc/metastore/services and set up the DPMS using Dataproc Metastore 2 that gives you Horizontal Scalability, HA, and Active-Active DR. Not that you need it for this learning exercise but is a good to know when you hit production!
We’ll use the thrift
protocol for this demo.
Using this DPMS, let’s create our Dataproc on GCE cluster where we will run our Spark jobs that will read and ingest into our Iceberg-based Lakehouse.
In the below gcloud
command, replace demo-project
with your project name and the metastore with the one you created in the previous step.
gcloud dataproc clusters create iceberg-spark \
--enable-component-gateway --region us-central1 --no-address \
--master-machine-type n2d-standard-4 --master-boot-disk-type pd-ssd --master-boot-disk-size 500 \
--num-workers 2 --worker-machine-type n2d-standard-4 --worker-boot-disk-type pd-ssd --worker-boot-disk-size 500 --num-worker-local-ssds 4 \
--image-version 2.1-debian11 \
--optional-components JUPYTER \
--dataproc-metastore projects/demo-project/locations/us-central1/services/demo-dpmsv2 \
--scopes 'https://www.googleapis.com/auth/cloud-platform' \
--project demo-project
You now have a Yarn cluster on Dataproc with a fully managed Hive Metastore
Spark!
First, let’s get into the spark-sql
and see what it takes to work with Iceberg there.
spark-sql \
--jars gs://demo-project/binaries/iceberg-spark-runtime-3.3_2.12-1.4.2.jar \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.spark_catalog.uri=thrift://10.38.64.26:9083 \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=gs://demo-project/spark-warehouse
There are a few important configurations here. Let’s discuss the significance of each one by one.
1: Download the Iceberg jar
For this example, I’m using a Dataproc 2.1 image which is bundled with Spark 3.3 and Scala 2.12. Head to https://iceberg.apache.org/releases/ and download the jar appropriate for your Dataproc image. In our case, we’ll use THIS JAR. If you run into ClassNotFound
exceptions when running SQL queries, it is very likely that you are using the wrong jar.
I downloaded the jar and pushed it to a GCS bucket which I’ve referenced in the --jars
flag. The other option is to use --packages
and pass org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.14.2
as the value. I personally prefer --jars
since the binary is within my environment and tends to be quicker as well.
2: Add Iceberg Support
Here, we configure the native Spark catalog called spark_catalog
to use Iceberg’s SparkSessionCatalog
. org.apache.iceberg.spark.SparkSessionCatalog
adds support for Iceberg to Spark’s native catalog. So it becomes a Spark catalog that can also load non-Iceberg tables.
Setting the type to hive
tells Spark to use the Hive metastore as the catalog.
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.spark_catalog.uri=thrift://10.38.64.26:9083
Dataproc should have automatically updated the hive-site.xml
with our DPMS uri at creation time. So unless you want to override that, you can safely omit spark.sql.catalog.spark_catalog.uri
from the configs.
For e.g., in my cluster, when I fire the below command —
grep -A1 '<name>hive.metastore.uris<\/name>' /etc/hive/conf/hive-site.xml | grep '<value>' | awk -F'[><]' '/<value>/{print $3}'
I get thrift://10.38.64.26:9083
which is nothing but the thrift uri of my DPMS instance.
3: Add a HadoopCatalog to define your Spark warehouse
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=gs://demo-project/spark-warehouse
Notice how we use org.apache.iceberg.spark.SparkCatalog
here. The trick is that this catalog is used to load only Iceberg tables.
Let’s see this behavior in action
Here, I use the spark_catalog
which uses the SparkSessionCatalog
and therefore supports loading both Iceberg and non-Iceberg tables. If you were to create a table without specifying the USING ICEBERG
syntax, the engine would create a normal Hive table as can be seen in the below screenshot
Now, if I switch over to the local
namespace which uses the SparkCatalog
, you’ll notice how re-running the same CREATE TABLE
command now gives me an Iceberg table.
4: Float Away In Your Lakehouse!
The syntax for creating an Iceberg table is as simple as appending the USING ICEBERG
keywords to the CREATE TABLE
statement. There are of course other technicalities that are specific to using a format like ICEBERG but those are outside the scope of this article.
Dataproc Serverless
As a bonus tip, let me guide you on supercharging your Spark pipelines with Serverless Dataproc. The cake remains the same, the icing is the same, and the sprinkles are the same. The only difference is, now you don’t need to worry about baking the cake!
You’d still need to create the DPMS which is a completely independent and decoupled serverless Hive Metastore.
Once that’s ready, just whip up a gcloud dataproc batches submit
command and that’s it!
Here’s a sample command that passes the same runtime jars and Spark configs all bundled into one. The demo.sql
is a simple SELECT
SELECT * FROM local.db.my_table2;
gcloud dataproc batches submit \
--project demo-project \
--region us-central1 spark-sql \
--batch iceberg-demo gs://demo-project/code/demo.sql \
--version 1.1 \
--jars gs://demo-project/binaries/iceberg-spark-runtime-3.3_2.12-1.4.2.jar \
--subnet default \
--metastore-service projects/demo-project/locations/us-central1/services/demo-dpmsv2 \
--properties spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog,spark.sql.catalog.spark_catalog.type=hive,spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.local.type=hadoop,spark.sql.catalog.local.warehouse=gs://demo-project/spark-warehouse
Output:
Summary
In the above sections, we’ve seen in detail how to setup and configure a Managed Dataproc cluster, a Dataproc Metastore and use spark-sql
to interact with your Lakehouse using Apache Iceberg. We even saw how to use Dataproc Serverless to achieve the same results only with zero infra overhead. Hope you found this useful. Drop a note if you liked the article and would like me to cover additional concepts.