Delta Sharing Recipient guide for BigQuery

This is a guide to use delta sharing client to read the data and write to BigQuery

Ashkan Moaven
Databricks Platform SME
6 min readMay 7, 2024

--

Authors:

Ashkan Moaven , Bilal Obeidat, Himanshu Gupta

Overview

Delta Sharing is an open protocol developed by Databricks for secure data sharing with other organizations regardless of the computing platforms they use.

This guide aims to help recipients read data shared using Delta Sharing open sharing and write it to a BigQuery table. There are two options that have been validated by our clients:

  1. Read the data as a Pandas dataframe and leverage the pandas_gbq library to write the data to BigQuery.
  2. Use Spark connector for delta share to perform initial extract and perform ongoing incremental insert, update and delete into BigQuery

First, we will discuss how to create the recipient and download the credential file.

Solution

Delta Sharing Credentials as a Recipient

Consuming Delta Sharing data requires an OSS connector, authenticated using a credential file typically obtained when a provider shares an activation token with a recipient.

So when a new Recipient entity is created for a Delta Share an activation link for that recipient will be generated. That URL will lead to a website for data recipients to download a credential file that contains a long-term access token for that recipient. Following the link will take the recipient to an activation page that looks similar to this:

From this site the .share credential file can be downloaded by the recipient. This file contains the information and authorization token needed to access the Share. The contents of the file will look similar to the following example.

Due to the sensitive nature of the token, be sure to save it in a secure location and be careful when visualizing or displaying the contents.

Storing the Share File

Recipients should download the shared file and store it in a secure location. In order to utilize the share file it will have to be uploaded to a location such as GCS which is accessible to your resources (cloud function or Dataproc in this example).

Connecting to a Share

By using the share file credentials recipients can establish a connection to the Delta Share. The process for establishing the connection includes the following steps:

The client presents the query and credentials to the sharing server

  • The server verifies whether the client is allowed to access the data, logs the request, and then determines which data to send back
  • The server generates short-lived and pre-signed URLs that allow the client to read these Parquet files directly from the cloud provider

Recipients can access the shared data using many computing tools and platforms, including Databricks, Apache Spark, Pandas, and Power BI.

Query the Shared Table Using Pandas

For this solution we are going to use Pandas and leverage GCP cloud function to run the code. There are two libraries we can use to load the pandas dataframe to BigQuery. pandas-gbq and google-cloud-BigQuery. You can find the comparison between them here . We are going to use pandas-gbq. Here’s the code:

deltashare_gbq.py

import delta_sharing
import pandas
import pandas_gbq


def delta_to_bq(request):
"""Loads data from a Delta Sharing table to a BigQuery table.

Args:
request: A Flask request object containing the required parameters.

Returns:
'DONE' if successful, or an error message.
"""

REQUIRED_KEYS = [
"share_file_path",
"share_id",
"database",
"table",
"bq_dataset",
"bq_table",
"gcp_project_id",
]

# Validate input parameters
missing_keys = [key for key in REQUIRED_KEYS if key not in request.get_json(silent=True)]
if missing_keys:
return f"Error: Missing values for {', '.join(missing_keys)}"

# Extract parameters from request
data = request.get_json(silent=True)
table_url = f"{data['share_file_path']}#{data['share_id']}.{data['database']}.{data['table']}"

# Load data from Delta Sharing
client = delta_sharing.SharingClient(data['share_file_path'])
pandas_df = delta_sharing.load_as_pandas(table_url)

# Write data to BigQuery
pandas_gbq.to_gbq(
pandas_df,
f"{data['bq_dataset']}.{data['bq_table']}",
project_id=data['gcp_project_id'],
)

return 'DONE'

requirements.txt

  delta-sharing==1.*
gcsfs==2024.*
pandas
pandas-gbq

Sample request to invoke the function

curl -m 550 -X POST <cloud-finction-URL> \
-H "Authorization: bearer $(gcloud auth print-identity-token)" \
-H "Content-Type: application/json" \
-d '{
"share_file_path": "gs://<path-to-credinital-file>",
"share_id": "<deltashare-id>",
"database": "<delta-database-name>",
"table": "<delta-table-name>",
"bq_dataset": "<target-bq-dataset-name>",
"bq_table-name": "<target-bq-table-name>",
"gcp_project_id": "<recipient_gcp_project_id>"
}'

Query the Shared Table Using Spark

For this solution, we are going to use Spark and leverage GCP Dataproc to run the code. The following code will be run on Dataproc to load the dataset into BigQuery. Let’s upload the Python file to a GCS bucket.

deltashare_spark.py

import delta_sharing
from pyspark.sql import SparkSession

# Constants (for clarity and maintainability)
CREDENTIALS_FILE_PATH = 'gs://<path-to-credential-file>'
TEMP_BUCKET = "<temp_bucket_fo_loading_data_into_BigQuery>"

def load_and_transfer_delta_table(
share_name, schema_name, table_name, gcp_dataset_name,
database_name, target_table_name
):
"""Loads a Delta Sharing table and transfers it to a BigQuery table.

Args:
share_name: Name of the Delta share.
schema_name: Schema name within the share.
table_name: Table name within the schema.
gcp_dataset_name: Name of the GCP dataset.
database_name: Name of the BigQuery database.
target_table_name: Name of the target table in BigQuery.
"""

# Create Spark Session
spark = SparkSession.builder.appName('delta_share_to_bq').getOrCreate()

# Create SharingClient
client = delta_sharing.SharingClient(CREDENTIALS_FILE_PATH)

# Construct the Delta table URL
table_url = f"{CREDENTIALS_FILE_PATH}#{share_name}.{schema_name}.{table_name}"

# Load the Delta table
shared_df = delta_sharing.load_as_spark(table_url)

# Configure temporary GCS bucket for BigQuery write
spark.conf.set('temporaryGcsBucket', TEMP_BUCKET)

# Write to BigQuery
shared_df.write.format('bigquery') \
.option('table', f'{gcp_dataset_name}.{database_name}.{target_table_name}') \
.save()

if __name__ == '__main__':
# Example usage (replace placeholders)
load_and_transfer_delta_table(
share_name='<your_share_name>',
schema_name='<your_schema_name>',
table_name='<your_table_name>',
gcp_dataset_name='<your_gcp_dataset>',
database_name='<your_bigquery_database>',
target_table_name='<your_bigquery_table>'
)

Next, we need to create a single node Dataproc cluster and specify Python package dependencies by setting up the properties as follows:

  • dataproc:pip.packages = delta-sharing==1.0.3,google-cloud-bigquery==3.20.1

Now we can use the dataproc cluster to submit a pyspark job to run the code. Create a Pyspark job, and provide the spark BigQuery jar and delta sharing packages.

  • Job Type = Pyspark
  • Cluster = Choose the one you have created above. The cluster should be in running state
  • Main python file = gcs bucket path to the python program
  • Jar files = gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11–0.23.2.jar
  • Properties — spark.jars.packages — io.delta:delta-sharing-spark_2.12:0.6.4

Now Submit the job and your BigQuery table will be populated.

Incremental query using Spark

In the previous example, we loaded the whole dataset into a BigQuery table. Now let’s see how we can do an incremental data ingestion from delta share into BigQuery using spark.

Prerequisite / Setup

  • Setup a delta share. In this solution, we have used a customer churn dataset. You can find the sample code in this github repo
  • Create the table, enable CDC, insert data and create the share
  • Setup a recipient and generate the credentials file for deltashare
  • Create a single node Dataproc cluster and specify python package dependencies by setting up the properties as below
  • dataproc:pip.packages = delta-sharing==1.0.3,google-cloud-bigquery==3.20.1
  • Create a BigQuery dataset (equivalent to a database or schema)
  • Create the “job_reference_table” in the BigQuery dataset. This table will be used as a watermark table to maintain status of latest delta table version loaded
  • Create a GCS bucket to store credential file generated above and store the python program
  • Create a GCS bucket required for query BigQuery using spark or pandas API

First time load logic

Incremental load logic

Execution

For the first time load, the provider must share the latest table version.
Submit a Dataproc Job with below configurations

  • Job Type = Pyspark
  • Cluster = Choose the one you have created above. Cluster should be in running state
  • Main python file = <gcs bucket path to the python program (share_spark.py in this repo example)>
  • Jar files = gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11–0.23.2.jar
  • Properties
    - spark.jars.packages — io.delta:delta-sharing-spark_2.12:0.6.4
    - spark.executorEnv.bucket -
    - spark.executorEnv.share_file_path -
    - spark.executorEnv.share_table_name -
    - spark.executorEnv.gcp_project_dataset -

--

--