Export data from Apache Kafka to BigQuery using Dataproc Serverless
Dataproc Serverless for Spark runs batch workloads without provisioning and managing a cluster. It can dynamically scale workload resources, such as the number of executors, to run your workload efficiently. As data developers, this allows us to focus on business logic, instead of spending time managing infrastructure.
In contrast, Dataproc on Compute Engine offers managed Hadoop and Spark service on GCP. It is ideal for users who want to provision and manage infrastructure, then execute workloads on Spark.
Dataproc Serverless Templates: Ready to use, open sourced, customizable templates based on Dataproc Serverless for Spark. These templates help the data engineers to further simplify the process of development on Dataproc Serverless, by consuming and customizing the existing templates as per their requirements.
In this post we will explore how to perform batch or streaming load of data from a Kafka topic to BigQuery using Dataproc Serverless for Spark.
Prerequisite
- Google Cloud SDK installed and authenticated
- Cloud Shell or a machine with Java 8, Maven 3 and Git pre-installed
Kafka to BigQuery template
The template will be reading data from Apache Kafka topics and writing it to a BigQuery table. It primarily supports Structured Streaming, but also works for batch workloads. It uses the Spark BigQuery connector and the Spark Kafka Structured Streaming connector.
The template allows the following parameters to be configured through the execution command:
- kafka.bq.bootstrap.servers: Comma-separated list of IP address with port number of the Kafka brokers. Example: 102.1.1.20:9092
- kafka.bq.topic: Comma-separated list of Kafka topics. Example: topicA,topicB
- kafka.bq.dataset: Output dataset of BigQuery where the tables resides or needs to be created
- kafka.bq.table: Output table name of BigQuery
- kafka.bq.temp.gcs.bucket: A preexisting GCS bucket name, where temporary files are staged. Example: templates-demo-kafkatobq
- kafka.bq.checkpoint.location: A GCS bucket location, where the checkpoint files are maintained. This location maintains the information about the offsets, and is used to resume the streaming from the offset which was last processed. Example: gs://templates-demo-kafkatobq/checkpoint
- kafka.bq.starting.offset: Specify the offset to start reading from. This property governs the behavior of the template, whether to batch load or stream load the data. Accepted values: “earliest”, “latest”, or JSON string
- kafka.bq.await.termination.timeout: Waits for specified time in ms before termination of stream
- kafka.bq.fail.on.dataloss: Fails the job when data is lost. Accepted values: true, false
- kafka.bq.stream.output.mode: Output mode for writing data. Accepted values: ‘append’, ‘complete’, ‘update’
Run the Template
- Ensure you have enabled the subnet with Private Google Access. If you are using “default” VPC created by GCP, you will still have to enable private access as below.
gcloud compute networks subnets update default --region=us-central1 --enable-private-ip-google-access
2. Create a GCS bucket and staging location for jar files.
3. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with JDK 8+, Maven and Git.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.gitcd dataproc-templates/java
4. Authenticate the gcloud CLI
gcloud auth login
5. Execute KafkaToBQ template
Sample execution command:
export GCP_PROJECT=my-gcp-project
export REGION=us-west1
export SUBNET=test-subnet
export GCS_STAGING_LOCATION=gs://templates-demo-kafkatobq
bin/start.sh \
-- \
--template KAFKATOBQ \
--templateProperty project.id=$GCP_PROJECT \
--templateProperty kafka.bq.checkpoint.location=gs://templates-demo-kafkatobq/checkpoint \
--templateProperty kafka.bq.bootstrap.servers=102.1.1.20:9092 \
--templateProperty kafka.bq.topic=msg-events \
--templateProperty kafka.bq.starting.offset=earliest \
--templateProperty kafka.bq.dataset=kafkatobq \
--templateProperty kafka.bq.table=kafkaevents \
--templateProperty kafka.bq.temp.gcs.bucket=templates-demo-kafkatobq \
--templateProperty kafka.bq.await.termination.timeout=1200000
6. Monitor the Spark batch job
After submitting the job, you will be able to view the job in the Dataproc Batch UI. From there, we can view both metrics and logs for the job.
Important properties
Usage of kafka.bq.starting.offset
- For batch loads, use earliest, which means start point of the query is set to be the earliest offsets.
kafka.bq.starting.offset=earliest
- For streaming loads, use latest, which means just start the query from the latest offsets:
kafka.bq.starting.offset=latest
- To read from only specific offsets from a TopicPartition, use a json string in the following format:
kafka.bq.starting.offset=""" {"click-events":{"0":15,"1":-1},"msg-events":{"0":-2}} """
In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
Note: The option kafka.bq.starting.offset is only relevant when the application is running for the very first time. After that, checkpoint files stored at kafka.bq.checkpoint.location are being used.
Note: The default value for this property is earliest, if not explicitly provided from execution command.
To read more this property refer Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
Usage of kafka.bq.stream.output.mode
- Append output mode is used when only the new rows in the streaming Dataset need to be written to the sink.
kafka.bq.stream.output.mode=append
- Complete output mode is used when all the rows in the streaming Dataset need to be written to the sink every time there are some updates.
kafka.bq.stream.output.mode=complete
- Update output mode is used when only the rows that were updated in the streaming Dataset need to be written to the sink every time there are some updates.
kafka.bq.stream.output.mode=update
For additional details refer the OutputMode Spark JavaDoc
Note: The default value for this property is append, if not explicitly provided from execution command.
Usage of kafka.bq.await.termination.timeout
This property is used to prevent the process from exiting while the query is active. Otherwise, it returns whether the query has terminated or not within the timeoutMs milliseconds.
kafka.bq.await.termination.timeout=1800000
Note: The default value for this property is 420000, if not explicitly provided from execution command.
Other advanced job configuration
- HISTORY_SERVER_CLUSER : An existing Dataproc cluster to act as a Spark History Server. This property can be used to specify a dedicated server, where you can view the status of running and completed Spark jobs. Example:
export HISTORY_SERVER_CLUSER=projects/<project_id>/regions/<region>/clusters/<cluster_name>
- SPARK_PROPERTIES : In case you need to specify spark properties supported by Dataproc Serverless like adjust the number of drivers, cores, executors etc. Use this to gain more control over the Spark configurations. Example:
export SPARK_PROPERTIES=spark.executor.instances=50,spark.dynamicAllocation.maxExecutors=200