Kafka to GCS using Dataproc Serverless Python Template

Shubham Pathak
Google Cloud - Community
4 min readNov 2, 2023

--

Dataproc Serverless for Spark runs batch workloads without provisioning and managing a cluster. It can dynamically scale workload resources, such as the number of executors, to run your workload efficiently. As data developers, this allows us to focus on business logic, instead of spending time managing infrastructure.

In contrast, Dataproc on Compute Engine offers managed Hadoop and Spark service on GCP. It is ideal for users who want to provision and manage infrastructure, then execute workloads on Spark.

Dataproc Serverless Templates: Ready to use, open sourced, customizable templates based on Dataproc Serverless for Spark. These templates help the data engineers to further simplify the process of development on Dataproc Serverless, by consuming and customizing the existing templates as per their requirements.

In this post we will explore how to perform batch or streaming load of data from a Kafka topic to Google Cloud Storage(GCS) using Dataproc Template.

Template for reading files from Kafka topic and writing them to a Cloud Storage bucket. It supports reading JSON, CSV, Parquet and Avro formats.

It uses the Spark-Sql Kafka jars to write streaming data from Kafka topic to Cloud Storage .

Required JAR files

The template allows the following parameters to be configured through the execution command:

Arguments

  • kafka.gcs.checkpoint.location: Cloud Storage location for storing checkpoints during transfer (format: gs://bucket/...)
  • kafka.gcs.output.location.gcs.path: Output Cloud Storage Location for storing streaming data (format: gs://bucket/...)
  • kafka.gcs.bootstrap.servers: List of kafka bootstrap servers (format: '[x1.x2.x3.x4:port1,y1.y2.y3.y4:port2]')
  • kafka.gcs.topic: Topic names for respective kafka server
  • kafka.gcs.starting.offset: Offset to start reading from. Accepted values: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
  • kafka.gcs.output.format: csv| json| parquet| avro
  • kafka.gcs.output.mode: append|complete|update
  • kafka.gcs.termination.timeout: timeout (in seconds)

Optional Arguments

  • kafka.gcs.output.chartoescapequoteescaping: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise
  • kafka.gcs.output.compression: None
  • kafka.gcs.output.dateformat: Sets the string that indicates a date format. This applies to date type
  • kafka.gcs.output.emptyvalue: Sets the string representation of an empty value
  • kafka.gcs.output.encoding: Decodes the CSV files by the given encoding type
  • kafka.gcs.output.escape: Sets a single character used for escaping quotes inside an already quoted value
  • kafka.gcs.output.escapequotes: A flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote character
  • kafka.gcs.output.header: Uses the first line of CSV file as names of columns. Defaults to True
  • kafka.gcs.output.ignoreleadingwhitespace: A flag indicating whether or not leading whitespaces from values being read/written should be skipped
  • kafka.gcs.output.ignoretrailingwhitespace: A flag indicating whether or not trailing whitespaces from values being read/written should be skipped
  • kafka.gcs.output.linesep: Defines the line separator that should be used for parsing. Defaults to \r, \r\n and \n for reading and \n for writing
  • kafka.gcs.output.nullvalue: Sets the string representation of a null value
  • kafka.gcs.output.quote: Sets a single character used for escaping quoted values where the separator can be part of the value. For reading, if you would like to turn off quotations, you need to set not null but an empty string
  • kafka.gcs.output.quoteall: A flag indicating whether all values should always be enclosed in quotes.
  • kafka.gcs.output.sep: Sets a separator for each field and value. This separator can be one or more characters
  • kafka.gcs.output.timestampformat: Sets the string that indicates a timestamp with timezone format
  • kafka.gcs.output.timestampntzformat: Sets the string that indicates a timestamp without timezone format.

Steps to Run the Template

  1. Ensure you have enabled the subnet with Private Google Access. If you are using “default” VPC created by GCP, you will still have to enable private access as below.
gcloud compute networks subnets update default --region=us-central1 --enable-private-ip-google-access

2. Create a GCS bucket and staging location for jar files.

3. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with python and Git.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

4. Authenticate the GCloud CLI credentials.

gcloud auth login

5. Configure the Dataproc Serverless job by exporting the variables needed for submission —

GCP_PROJECT : GCP project id to run Dataproc Serverless on

REGION : Region to run Dataproc Serverless in

GCS_STAGING_LOCATION : GCS staging bucket location, created in Step 3

SUBNET : The VPC subnet to run Dataproc Serverless on, if not using the default subnet (format: projects/<project_id>/regions/<region>/subnetworks/<subnetwork>)

export GCP_PROJECT=<gcp-project>
export REGION=<region>
export GCS_STAGING_LOCATION=<gcs-staging-location>
export SUBNET=<subnet>
export JARS="gs://{jar-bucket}/spark-sql-kafka-0-10_2.12-3.2.0.jar,gs://{jar-bucket}/kafka-clients-2.8.0.jar,gs://{jar-bucket}/commons-pool2-2.6.2.jar,gs://{jar-bucket}/spark-token-provider-kafka-0-10_2.12-3.2.0.jar"

./bin/start.sh \
-- --template=KAFKATOGCS \
--kafka.gcs.checkpoint.location="<gcs checkpoint storage location>" \
--kafka.gcs.output.location.gcs.path= "<gcs output location path>" \
--kafka.gcs.bootstrap.servers="<list of kafka connections>" \
--kafka.gcs.topic="<integration topics to subscribe>" \
--kafka.gcs.starting.offset="<earliest|latest|json_offset>" \
--kafka.gcs.output.format="{json|csv|avro|parquet}" \
--kafka.gcs.output.mode="{append|overwrite}" \
--kafka.gcs.termination.timeout="time in seconds"

6. Monitor the Spark batch job

After submitting the job, you will be able to view the job in the Dataproc Batch UI. From there, we can view both metrics and logs for the job.

References

--

--

Shubham Pathak
Google Cloud - Community

Cloud Data Engineer @ Google. Focused on helping people with my GCP knowledge.