Dataplex — Data Lineage for Spark Applications | Data Governance | Part — 4 Contd..

Nishit Kamdar
Google Cloud - Community
6 min readOct 25, 2023
Photo by Anne Nygård on Unsplash

This is in continuation of the Data lineage Part 4.0 blog post focussing on the Data Lineage for the Spark Applications.

Data Lineage for Spark Applications:

The Spark jobs are primarily executed through GCP Dataproc Service and via Dataproc Serverless. We will look at both the examples.

1.0 Lineage for Spark jobs on Dataproc GCE Cluster

The Lineage for Spark Jobs on Dataproc GCE is supported out-of-the-box. It requires enabling lineage on the Cluster level or at the Job level.

Once you enable the lineage feature in your Dataproc cluster or Dataproc Spark job, the jobs capture lineage events and publish them to the Dataplex Data Lineage API. Dataproc integrates with the Data Lineage API through OpenLineage, using the OpenLineage Spark plugin.

1.1. Enabling Lineage in Dataproc

a) Cluster level at creation time

  1. All jobs report lineage unless explicitly disabled at job level.
  2. To disable specific jobs from reporting lineage, include the Spark property spark.extraListener with value of null.
  3. Once enabled, lineage cannot be disabled.

b) Cluster level at creation time

  1. At job level at submission time Set the property spark.extraListener with value io.openlineage.spark.agent.OpenLineageSparkListener at job submission time.

1.2. What’s supported & not

a) Supported:

  1. Data lineage is available for all Dataproc Spark jobs
  2. Dataproc Compute Engine 2.0.74+ and 2.1.22+ images
  3. Lineage is available for BigQuery and Cloud Storage data sources.

b) Not supported:

  1. SparkR
  2. Spark Structured Streaming

Let’s test it!

1.3 Create a Dataproc Cluster

PROJECT_ID=`gcloud config list --format "value(core.project)" 2>/dev/null`
PROJECT_NBR=`gcloud projects describe $PROJECT_ID | grep projectNumber | cut -d':' -f2 | tr -d "'" | xargs`
LOCATION="us-central1"
SUBNET=lab-snet
SUBNET_URI="projects/$PROJECT_ID/regions/$LOCATION/subnetworks/$SUBNET"
UMSA_FQN="lab-sa@$PROJECT_ID.iam.gserviceaccount.com"
DPGCE_CLUSTER_NM=lineage-enabled-spark-cluster-$PROJECT_NBR
SPARK_BUCKET=dataproc-lineage-spark-bucket-$PROJECT_NBR
SPARK_BUCKET_FQN=gs://$SPARK_BUCKET
DPMS_NM=lab-dpms-$PROJECT_NBR

Grant the User managed Service account “Lineage Admin” role

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member=serviceAccount:${UMSA_FQN} \
--role=roles/datalineage.admin
gcloud dataproc clusters create $DPGCE_CLUSTER_NM \
--project $PROJECT_ID \
--subnet $SUBNET \
--region $LOCATION \
--enable-component-gateway \
--bucket $SPARK_BUCKET \
--scopes=cloud-platform \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 500 \
--num-workers 2 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--image-version 2.1.22-debian11 \
--optional-components JUPYTER \
--dataproc-metastore projects/$PROJECT_ID/locations/$LOCATION/services/$DPMS_NM \
--properties 'dataproc:dataproc.lineage.enabled=true' \
--impersonate-service-account $UMSA_FQN \
--scopes https://www.googleapis.com/auth/cloud-platform

Note the --properties 'dataproc:dataproc.lineage.enabled=true' \ . This will enable the Lineage at the cluster level.

1.4 Run the Spark job

I have written a pyspark job to create a Chicago crimes curated table from the raw table similar to the Bigquery and Cloud composer example in the previous blog.

gcloud dataproc jobs submit pyspark gs://raw-code-${PROJECT_NBR}/pyspark/chicago-crimes-analytics/curate_crimes.py \
--cluster=$DPGCE_CLUSTER_NM \
--project $PROJECT_ID \
--region $LOCATION \
--id $JOB_ID \
--impersonate-service-account $UMSA_FQN \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.29.0.jar \
--properties=spark.openlineage.namespace=$PROJECT_ID,spark.openlineage.appName=$JOB_NM \
-- --projectID=$PROJECT_ID --tableFQN="oda_curated_zone.crimes_curated_spark_dataproc" --peristencePath="gs://curated-data-$PROJECT_NBR/crimes-curated-spark-dataproc/"

The Spark job creates the parquet files for the curated table and stores them in the curated bucket.

1.4.1 Dataplex Discovery in action

Since this zone is set up for discovery scans in dataplex, Dataplex discovery will register this new table created by Spark into Data Catalog as well as create an external table in Bigquery.

Navigate to BigQuery and you should see a BQ external table-

1.5 — Create Data Products from the Curated Table

Next, we will create the 4 data products from the Curated table using Spark jobs.

baseName="crimes-by-year-spark-dataproc"
JOB_ID="$baseName-$RANDOM"
reportName='Chicago Crime Trend by Year'
reportDirGcsURI="gs://product-data-${PROJECT_NBR}/$baseName"
reportSQL='SELECT cast(case_year as int) case_year,count(*) AS crime_count FROM oda_curated_zone.crimes_curated_spark_dataproc GROUP BY case_year;'
reportPartitionCount=1
reportTableFQN="oda_product_zone.crimes_by_year_spark_dataproc"
reportTableDDL="CREATE TABLE IF NOT EXISTS ${reportTableFQN}(case_year int, crime_count long) STORED AS PARQUET LOCATION \"$reportDirGcsURI\""

gcloud dataproc jobs submit pyspark gs://raw-code-${PROJECT_NBR}/pyspark/chicago-crimes-analytics/crimes_report.py \
--cluster=$DPGCE_CLUSTER_NM \
--project $PROJECT_ID \
--region $LOCATION \
--id $JOB_ID \
--impersonate-service-account $UMSA_FQN \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.29.0.jar \
--properties=spark.openlineage.namespace=$PROJECT_ID,spark.openlineage.appName=$baseName \
-- --projectNbr=$PROJECT_NBR --projectID=$PROJECT_ID --reportDirGcsURI="$reportDirGcsURI" --reportName="$reportName" --reportSQL="$reportSQL" --reportPartitionCount=$reportPartitionCount --reportTableFQN="$reportTableFQN" --reportTableDDL="$reportTableDDL"

Repeat this for Month, Week and Hour Data Products.

All the assets created will be registered in dataplex and Bigquery.

1.6 — Lets look at the lineage generated

To summarize what we did above, we used a BigQuery native table (oda_raw_zone.crimes_raw) as our source and created 4 external Hive tables (in Dataproc Metastore/Hive Metastore) on it and we persisted data in the Cloud Storage.

Goto — Bigquery -> oda_raw_zone.crimes_raw table

Click on the Dataproc Icon and it will show the details of the Dataproc Spark job and it’s job runs.

1.7. The Lineage Namespace

In the Spark job command below, we added properties for lineage-

Here is where it manifests-

It’s important to have a unique spark.openlineage.appName value for a Dataproc Spark process, but not for Dataproc Spark process runs. If you have unique values per run, there will be an explosion of Dataproc process icons, one for each run.

In summary, using the out-of-the-box Lineage support, Dataplex allows you to easily capture the lineage for Spark Applications from within Dataproc on GCE clusters.

2.0 Lineage for Spark jobs on Dataproc Serverless

Dataproc Serverless is not a natively supported service with Dataplex automated lineage capture at the moment. So, we will use a Custom Lineage feature in Cloud Composer.

We will repeat what we did with the lineage of BigQuery using Airflow DAG, except, we will use Apache Spark on Dataproc Serverless instead.

2.1. The Airflow DAG with custom lineage — run on Cloud Composer

I have created a Composer DAG that creates Curated and Data Products tables using Dataproc Serverless.

The code additionally has “Inlets” and “Outlet” attributes that creates custom lineage.

Copy the code in the DAGs folder and run it.

Once the job completes, navigate to the Bigquery UI and click on the external table, oda_curated_zone.crimes_curated_spark table. It shows the lineage of the Dataproc jobs captured through Composer.

In summary, Dataplex captures the Lineage of the Spark applications run through Dataproc Serverless via Composer “Inlet” and “Outlet” specifications.

Conclusion:

Spark applications are one of the most commonly used transform mechanisms in the Enterprise Data platform and therefore it is important to capture their Lineage. Dataplex provides an out-of-the-box support and easy configurations to capture Lineage from the Spark Applications on GCE Cluster or via Dataproc serverless that helps get an end-to-end visibility of the data pipelines.

--

--

Nishit Kamdar
Google Cloud - Community

Data and Artificial Intelligence specialist at Google. This blog is based on “My experiences from the field”. Views are solely mine.