Dataproc Serverless template to migrate your data from Kafka to BigQuery

Abhi Sharma
Google Cloud - Community
3 min readNov 6, 2023

--

Dataproc Serverless is a fully managed severless spark offering by Google Cloud Platform. It helps new age data engineers to focus on their spark jobs rather can thinking about infrastructure management. With its serverless nature, developers are liberated from the burdens of cluster provisioning and management, enabling them to channel their undivided attention towards refining their business logic.

In this Article, We are going to discuss about a Dataproc serverless template that helps migrate data from a streaming Kafka topic to BigQuery. These Dataproc Serverless templates stand as open-source, readily adaptable code templates, offering a seamless solution for efficiently transferring data from a source system to a designated destination system.

Kafka to BigQuery using Serverless Spark Template

Objective

The objective of this article is to demonstrate how dataproc serverless tempate for Kafka to BigQuery can ease the pain for a data engineer to read streaming kafka topic and push the data to a bigquery table.

Pre-requisites

This template uses spark bigquery connector to connect to bigquery using spark and kafka 0.10+ source for structured streaming to read data from a kafka topic. These two jars will be required to use this template.

  1. Spark BigQuery connector
  2. Kafka 0.10+ Source For Structured Streaming

Set-up and Initialization

Before we can use the template, we need to follow some basic steps to set-up the project in GCP.

  1. Create a GCP project and enable dataproc API before we could create a serverless batch job for kafka to bigquery migration.
  2. Create a GCP staging bucket to store the required jar files.
export GCS_STAGING_BUCKET=”my-gcs-staging-bucket”
gsutil mb gs://$GCS_STAGING_BUCKET

3. Ensure that the subnet is configured with Private Google Access enabled. Even if you’re utilizing the “default” VPC Network created by GCP, it’s essential to activate private access, as demonstrated below:

Process to execute the template

  1. Clone the git repository from the cloud shell and navigate to the python folder.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

2. Get the authentication login using the below command

gcloud auth application-default login

3. Export the variables which are needed for the serverless job along with the jar files required.

-export GCP_PROJECT=<gcp-project>
-export REGION=<region>
-export SUBNET=<subnet>
-export JARS="gs://$GCS_STAGING_BUCKET/spark-sql-kafka-0-10_2.12-3.2.0.jar,gs://$GCS_STAGING_BUCKET/kafka-clients-2.8.0.jar,gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar,gs://{jar-bucket}/commons-pool2-2.6.2.jar,gs://$GCS_STAGING_BUCKET/spark-token-provider-kafka-0-10_2.12-3.2.0.jar"

4. Run the below command to execute a shell script which will trigger the serverless dataproc template along with the required arguments.

-./bin/start.sh \
-- --template=KAFKATOBQ \
--kafka.to.bq.checkpoint.location="<gcs checkpoint storage location>" \
--kafka.to.bq.bootstrap.servers="<list of kafka connections>" \
--kafka.to.bq.topic="<integration topics to subscribe>" \
--kafka.to.bq.starting.offset="<earliest|latest|json_offset>" \
--kafka.to.bq.dataset="<bigquery_dataset_name>" \
--kafka.to.bq.table="<bigquery_table_name>" \
--kafka.to.bq.temp.bucket.name="<bucket name for staging files>" \
--kafka.to.bq.output.mode=<append|overwrite|update> \
--kafka.to.bq.termination.timeout="time in seconds"

Arguments

  1. kafka.to.bq.checkpoint.location: This refers to the Cloud Storage location designated for storing checkpoints during data transfer, with the format being ‘gs://bucket/…’
  2. kafka.to.bq.bootstrap.servers: This parameter expects a list of Kafka bootstrap servers in the format ‘[x1.x2.x3.x4:port1,y1.y2.y3.y4:port2]’.
  3. kafka.to.bq.topic: Here, you should specify the names of the topics corresponding to the Kafka server you’re working with.
  4. kafka.to.bq.starting.offset: Choose the desired offset to initiate data reading. Accepted values include “earliest”, “latest” (applicable for streaming only), or a JSON string like {“topicA”:{“0”:23,”1":-1},”topicB”:{“0”:-2}}.
  5. kafka.to.bq.dataset: This parameter designates the temporary bucket to be used for the Spark BigQuery connector.
  6. kafka.to.bq.table: Indicate the name of the BigQuery table that serves as the destination for your data.
  7. kafka.to.bq.output.mode: Specify the output mode for the table, which can be ‘append’, ‘overwrite’, ‘update’, or ‘complete’.
  8. kafka.to.bq.temp.bucket.name: Provide the name of the bucket designated for temporary storage files (not the actual storage location).
  9. kafka.to.bq.termination.timeout: This parameter sets the wait time, in seconds, before terminating the stream.

Conclusion

Once the job starts running, it will be visible in the dataproc batch UI. This job is going to migrate data from the kafka topic to the specified bigquery table.

References

Dataproc Serverless

Dataproc Serverless Templates[Github]

--

--