Streaming Data with Pyspark to Iceberg Table using GCP PubSub Lite, Dataproc and Dataproc Metastore.

Danny Lesnik
9 min readOct 10, 2023

--

In this article I’m going to describe event streaming from a GCP Pubsub Lite topic to Iceberg Table using Dataproc and Dataproc Metastore.

Architecture

The pipeline itself consists of the following components:

Mobile, Laptop, Services — are clients that produce a stream of events and using ingestion API layer (out of scope of this article) publishes those events to a Pubsub Lite topic.

Pubsub Lite — GCP Pubsub Lite topic. For simplicity, we will create a single partition topic, however, when planning a production environment, the amount of partition parameters is significant.

Pubsub Lite Subscription — The object which actually gives a subscriber access to a topic. a subscription name value should be passed as a השךוק a Spark job. This resource also includes Pubsub Lite Reservation. Again, the values we are using in this example are minimal and should not be used in production.

Dataproc Cluster — GCP service which is Hadoop implementation on top of Google Cloud, This cluster will actually run our simple Spark streaming job. Further in this article, we will create this cluster using gcloud command, However, in a production environment such infrastructure should be provisioned using terraform.

Iceberg GCS Bucket — This is the GCS bucket where Apache Iceberg tables are located..

Dataproc Metastore — A fully available, serverless Hive Metastore compatible GCP Service.

Now you may ask. What actually is Hive Metastore? The short answer to this would be that this is the service that stores Apache Hive related metadata and uses RDBMS behind a veil. Dataproc uses Apache Hive Metastore to keep Iceberg databases and tables related metadata.

Resources that are not in the diagram, but are still required to run the pipeline.

GCS Code bucket — the bucket where our job’s Python code will be stored. For simplicity, we will also store this pipeline dependency’s jars that our job is using .

Checkpoint bucket — this bucket is required for keeping a checkpoint folder which is a mechanism where every so often Spark streaming application stores data and metadata in the fault-tolerant file system.

Dataproc Worker Service Account — Service Account that Dataproc Worker is using to access Pubsub Lite subscription, create and write to a Big Query table, and perform Dataproc related job tasks.

Why Pubsub Lite and not Pubsub?

The short answer is because Pyspark doesn’t have a streaming library for Pubsub Lite. Lucky JVM users can use Spark PubSub Lite connector in Apache Baghir, but Python world has to wait.

The long answer is, because, in a lot of cases, Pubsub Lite turns out to be a more suitable solution for a Spark Streaming Job. It supports infinite data retention period, partitioning, and lack of cross regional support significantly reduces cost in case of high usage. So those are the reasons we will focus on Pubsub Lite in this article.

Prerequisites

You have a command line with a shell console available. gcloud installed and initialized and the result of command should be an account with Project Admin role (Remember, we’re still in a POC mode).

gcloud auth list

Iceberg schema

We’re using schema directly from apache Iceberg documentation available online.

+ — — — — — — — — — — + — — — — +
| Field Name | Type |
+ — — — — — — — — — — + — — — — +
| vendor_id | Long |
| trip_id | Long |
| trip_distance | Float |
| fare_amount | Double |
| store_and_fwd_flag | String |
+ — — — — — — — — — — + — — — — +

Creating Environment.

We will create our pipeline using a list of gcloud commands and describe every step below.

Our script will start with setting up the job name, project ID, and resource suffix name. Resource suffix variable will add a unique suffix for every resource created.

#!/bin/bash

#Exporting Project ID
export PROJECT_ID=[] #Add your GCP Project ID

export SUFFIX=f1r3fa

export JOB_NAME=trips-stream-job-${SUFFIX}

export JOB_BUCKET=spark-jobs-${SUFFIX}

Create Job Bucket

Create the Job bucket where our python main file will be located. After the bucket creation, let’s copy our spark job script to this bucket and copy all required JAVA jar dependencies.

#Create a Job Bucket and copy source code
gcloud storage buckets create gs://${JOB_BUCKET} --project=${PROJECT_ID}
gsutil cp main.py gs://${JOB_BUCKET}/pubsub-lite-to-iceberg-stream-${SUFFIX}/

#Coping required jar dependancies to Job bucket.
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar
wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.1.0/iceberg-spark-runtime-3.3_2.12-1.1.0.jar
gsutil cp postgresql-42.5.1.jar gs://${JOB_BUCKET}/dependencies/
gsutil cp iceberg-spark-runtime-3.3_2.12-1.1.0.jar gs://${JOB_BUCKET}/dependencies/
rm iceberg-spark-runtime-3.3_2.12-1.1.0.jar
rm postgresql-42.5.1.jar

Create Checkpoint bucket

Now Let’s create the bucket where our Spark job will store its cache.

export CHECKPOINT_BUCKET=checkpoint-location-${SUFFIX}

#Creating Chechpoint Bucket
gcloud storage buckets create gs://${CHECKPOINT_BUCKET} --project=${PROJECT_ID}

Create Lake Bucket

Let’s create a bucket where our iceberg database and table will be located.

export LAKE_BUCKET=iceberg-lake-${SUFFIX}

#Creatring a lake bucket.
gcloud storage buckets create gs://${LAKE_BUCKET} --project=${PROJECT_ID}

Create Pubsub Lite Reservation

Now let’s create a Pubsub Lite Reservation. As I said, all values are just minimum values and are used just to save POC costs. It should not be used in production.

export TOPIC_NAME=trips-${SUFFIX}

#Creating Pubsub Lite reservation
gcloud pubsub lite-reservations create ${TOPIC_NAME}-reservation --project=${PROJECT_ID} --location=us-central1 --throughput-capacity=2

Creating Pubsub Lite topic.

gcloud pubsub lite-topics create ${TOPIC_NAME} --project=${PROJECT_ID} --location=us-central1 --partitions=2 --per-partition-bytes=30GiB --throughput-reservation=${TOPIC_NAME}-reservation

Create Pubsub Lite Subscription.

Let’s create a Pubsub Lite subscription. This subscription’s name will be passed to our Job as parameter. We will see it soon.

Creating Dataproc Metastore

Now, where the fun begins. Let’s create a Dataproc Metastore. First of all, we should take into consideration that it might take up to one hour to provision Dataproc Metastore and gcloud command has a 180 seconds timeout. As a result you will see that this gcloud command returns timeout errors, but no worries, this command will continue in background and our cluster is in the Creating state and will be available soon.

Let's talk about the parameters we’re passing to our gcloud command.

–tier - Service tier. DEVELOPER/ENTERPRISE. We are using a cheaper Developer tier. This tier is typically used for production for low-cost proof-of-concept projects. This tier provides limited scalability and no fault tolerance

–network - The name of a VPC to which Metastore would be connected. We’re using default VPC which is available by default.

--hive-metastore-version - Metastore version 3.1.2

export METASTORE_NAME=iceberg-metastore

gcloud metastore services create ${METASTORE_NAME} --location=us-central1 --tier=DEVELOPER --network=default --hive-metastore-version=3.1.2 --project=${PROJECT_ID}

Creating a Dataproc worker Service Account.

Now we need to create a service account that Dataproc workers will use. This service account should interact with the following GCP resource.

  • GCP Dataproc Master nodes
  • GCP Pubsub Lite — We need to subscribe to the topic and to read the topic metadata, such as the number of partitions, etc…
  • Read and Write to the GCS bucket where our Iceberg table is stored.

So we need to add the following roles to our SA.

roles/dataproc.worker

roles/pubsublite.subscriber

roles/pubsublite.viewer

Now you may ask yourself, what about our GCS bucket? Where exactly is the role for accessing our Iceberg tables?. Well, the answer is simple, roles/dataproc.worker has already a permissions to access GCS bucket.

export DATAPROC_SA_NAME=dataproc-worker-sa

gcloud iam service-accounts create ${DATAPROC_SA_NAME} --description="Service account for a dataproc worker" --display-name=${DATAPROC_SA_NAME} --project ${PROJECT_ID}

gcloud projects add-iam-policy-binding ${PROJECT_ID} --member=serviceAccount:${DATAPROC_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com --role=roles/dataproc.worker

gcloud projects add-iam-policy-binding ${PROJECT_ID} --member=serviceAccount:${DATAPROC_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com --role=roles/pubsublite.subscriber

gcloud projects add-iam-policy-binding ${PROJECT_ID} --member=serviceAccount:${DATAPROC_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com --role=roles/pubsublite.viewer

Creating Dataproc Cluster

Now it’s time to create our Dataproc cluster.

gcloud dataproc clusters create $DATAPROC_CLUSTER_NAME --region=us-central1 --project=${PROJECT_ID} \
--service-account=${DATAPROC_SA_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
--network=default \
--bucket=${CHECKPOINT_BUCKET} \
--num-masters=1 \
--master-machine-type=n1-standard-2 \
--master-boot-disk-size=30 \
--master-boot-disk-type=pd-ssd \
--num-workers=2 \
--worker-machine-type=n1-standard-2 \
--worker-boot-disk-size=30 \
--worker-boot-disk-type=pd-ssd \
--num-worker-local-ssds=1 \
--num-secondary-workers=0 \
--image-version=2.1.8-ubuntu20 \
--dataproc-metastore=projects/${PROJECT_ID}/locations/us-central1/services/${METASTORE_NAME}

Let’s deep dive into the cluster creation command.

We will create a cluster with the name assigned to $DATAPROC_CLUSTER_NAME environment variables and will use the following parameters.

–region — The region in which our cluster would be created (remember that Dataproc is a zonal cluster)

–project — GCP Project Name

–network — the network in which this custer should be deployed. We will use a default network.

–bucket — a name for a stage bucket (we will use the same bucket where our jobs located)

–num-master — only a single master will be used (Don’t do it in production).

–master-boot-disk-size — We will test it with 30 Gb SSD disk size

–master-boot-disk-type — a type of a master boot disk

–num-workers — number for workers in our cluster.

–worker-machine-type — A worker node GCE machine type

–worker-boot-disk-size — We will test it with 30 Gb SSD disk size

–worker-boot-disk-type — a type of a master boot disk

–image-version — The worker image version

–dataproc-metastore — the full name for the dataproc metastore

Deploying Pyspark Job

First of all, we will need to know two parameters. First parameter is a Metastore URI and second parameter is spark.hive.metastore.warehouse.dir is a default location for the Hive warehouse.

We will get those values from the gcloud command and filter required attributes.

export METASTORE_ENDPOINT=$(gcloud metastore services describe ${METASTORE_NAME} --project ${PROJECT_ID} --location=us-central1 --format="value(endpointUri)")

export METASTORE_WAREHOUSE=$(gcloud metastore services describe ${METASTORE_NAME} --project ${PROJECT_ID} --location=us-central1 --format="value(hiveMetastoreConfig.configOverrides[hive.metastore.warehouse.dir])")

Now after we know those values, we can submit our spark job.

gcloud dataproc jobs submit pyspark gs://${JOB_BUCKET}/pubsub-lite-to-iceberg-stream-${SUFFIX}/main.py \
--cluster=${DATAPROC_CLUSTER_NAME} \
--region=us-central1 \
--project=${PROJECT_ID} \
--jars=gs://${JOB_BUCKET}/dependencies/postgresql-42.5.1.jar,gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-1.0.0-with-dependencies.jar,gs://${JOB_BUCKET}/dependencies/iceberg-spark-runtime-3.3_2.12-1.1.0.jar \
--properties spark.app.name=batch_iceberg,spark.hive.metastore.uris=${METASTORE_ENDPOINT},spark.hive.metastore.warehouse.dir=${METASTORE_WAREHOUSE} \
-- --lake_bucket=gs://${LAKE_BUCKET}/datalake/ \
--psub_subscript_id=projects/${PROJECT_ID}/locations/us-central1/subscriptions/datastream-${SUFFIX}-subscription \
--checkpoint_location=gs://${CHECKPOINT_BUCKET}/checkpoint/

Let’s discuss what we have done here:

We are submitting pyspark job for with the job main.py file located in our $JOB_BUCKET and are passing to our gcloud command the following parameters:

–cluster — The name of a Dataproc cluster we’re using for this job

–region — The region where our job is running

–project — a project where that we’re using

–properties — We are submitting out spark job with two spark properties: Metastore URI, and hive warehouse directory.

this where a job parameter passed. We are passing the following parameters for our job:

–lake_bucket — The bucket where our Iceberg table is located.

–psub_subscript_id — the id for our pubsub lite subscription.

–checkpoint_location — a location for our checkpoint directory.

Testing

Lets publish some message to our topic:

gcloud pubsub lite-topics publish ${TOPIC_NAME} --location=us-central1 --message="{'vendor_id':25,'trip_id': 1000474,'trip_distance': 2.3999996185302734,'fare_amount': 42.13,'store_and_fwd_flag': 'Y'}" --project=${PROJECT_ID}
gcloud pubsub lite-topics publish ${TOPIC_NAME} --location=us-central1 --message="{'vendor_id':26,'trip_id': 1000474,'trip_distance': 2.3999996185302734,'fare_amount': 42.13,'store_and_fwd_flag': 'Y'}" --project=${PROJECT_ID}

And now if we look at our data in our bucket we will see that those messages were written to our table.

Spark Job

Now finally let’s talk about our code.

import argparse

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import LongType, StructField, StructType, StringType, DoubleType, FloatType

parser = argparse.ArgumentParser()
parser.add_argument("--lake_bucket", help="bucket where lake is located.")
parser.add_argument("--psub_subscript_id", help="PUB/SUB lite subscription id")
parser.add_argument("--checkpoint_location", help="Checkpoint location.")

args = parser.parse_args()
lake_bucket = args.lake_bucket
psub_subscript_id = args.psub_subscript_id
checkpoint_location = args.checkpoint_location
print("--------------------------------------------------------------------------")
print("spark.sql.warehouse.dir = {} ".format(lake_bucket))
print("psub_subscript_id = {}".format(psub_subscript_id))
print("checkpoint location = {} ".format(checkpoint_location))

conf = (
SparkConf()
.setAppName('injest-trips-iceberg-table')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
.set('spark.sql.catalog.spark_catalog.type', 'hive')
.set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
.set(f'spark.sql.catalog.dev.type', 'hive')
.set(f'spark.sql.warehouse.dir', lake_bucket)
)
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

# Create table if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS dev.lakehouse")

schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])

# Create table, if it doesn't exist.
df = spark.createDataFrame([], schema)
df.writeTo("dev.lakehouse.trips").partitionedBy("vendor_id").createOrReplace()

# Subscribe to GCP PubSub lite.
sdf = (
spark.readStream.format("pubsublite")
.option(
"pubsublite.subscription",
psub_subscript_id,
)
.load()
)

# Converting Column data
sdf = sdf.withColumn("data", sdf.data.cast(StringType()))
sdf = sdf.withColumn("data", from_json("data", schema)).select(col('data.*'))


query = (sdf
.writeStream
.format("iceberg")
.trigger(processingTime="30 seconds")
.option("path", "dev.lakehouse.trips")
.option("checkpointLocation", checkpoint_location)
.start()
)

query.awaitTermination()
query.stop()

The code itself is quite straightforward. We are creating a database named dave.lakehouse if it doesn’t exist. The same goes for the table itself. It will be created if this table doesn’t exist. However, I would like to explain the way we’re handling pubsub messages in Spark.

When our PubsubLite spark connector pulls an incoming message from a subscription we get a DataFrame with the following structure.

{
"data":"string",
"attributes":{
"string":"string"
...
},
"messageId":"string",
"publishTime":"string",
"orderingKey":"string"
}

it looks like what we need to do is to fetch the value of the data field and create another DataFrame with the structure similar to our Iceberg Table schema and this can be done with those two lines.

# Converting Pubsub DF to a iceberg DF
df = spark.createDataFrame([], schema)
df.writeTo("dev.lakehouse.trips").partitionedBy("vendor_id").createOrReplace()

That’s all. You can find all the code I’m using in this article in this github repository.

In my next article, I’ll show how we can easily have Dataproc Batch jobs to read the data that we are streaming to our Iceberg Table.

--

--