Kafka to GCS using Dataproc Serverless Python Template
Dataproc Serverless for Spark runs batch workloads without provisioning and managing a cluster. It can dynamically scale workload resources, such as the number of executors, to run your workload efficiently. As data developers, this allows us to focus on business logic, instead of spending time managing infrastructure.
In contrast, Dataproc on Compute Engine offers managed Hadoop and Spark service on GCP. It is ideal for users who want to provision and manage infrastructure, then execute workloads on Spark.
Dataproc Serverless Templates: Ready to use, open sourced, customizable templates based on Dataproc Serverless for Spark. These templates help the data engineers to further simplify the process of development on Dataproc Serverless, by consuming and customizing the existing templates as per their requirements.
In this post we will explore how to perform batch or streaming load of data from a Kafka topic to Google Cloud Storage(GCS) using Dataproc Template.
Template for reading files from Kafka topic and writing them to a Cloud Storage bucket. It supports reading JSON, CSV, Parquet and Avro formats.
It uses the Spark-Sql Kafka jars to write streaming data from Kafka topic to Cloud Storage .
Required JAR files
The template allows the following parameters to be configured through the execution command:
Arguments
kafka.gcs.checkpoint.location
: Cloud Storage location for storing checkpoints during transfer (format:gs://bucket/...
)kafka.gcs.output.location.gcs.path
: Output Cloud Storage Location for storing streaming data (format:gs://bucket/...
)kafka.gcs.bootstrap.servers
: List of kafka bootstrap servers (format: '[x1.x2.x3.x4:port1,y1.y2.y3.y4:port2]')kafka.gcs.topic
: Topic names for respective kafka serverkafka.gcs.starting.offset
: Offset to start reading from. Accepted values: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """kafka.gcs.output.format
: csv| json| parquet| avrokafka.gcs.output.mode
: append|complete|updatekafka.gcs.termination.timeout
: timeout (in seconds)
Optional Arguments
kafka.gcs.output.chartoescapequoteescaping
: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwisekafka.gcs.output.compression
: Nonekafka.gcs.output.dateformat
: Sets the string that indicates a date format. This applies to date typekafka.gcs.output.emptyvalue
: Sets the string representation of an empty valuekafka.gcs.output.encoding
: Decodes the CSV files by the given encoding typekafka.gcs.output.escape
: Sets a single character used for escaping quotes inside an already quoted valuekafka.gcs.output.escapequotes
: A flag indicating whether values containing quotes should always be enclosed in quotes. Default is to escape all values containing a quote characterkafka.gcs.output.header
: Uses the first line of CSV file as names of columns. Defaults to Truekafka.gcs.output.ignoreleadingwhitespace
: A flag indicating whether or not leading whitespaces from values being read/written should be skippedkafka.gcs.output.ignoretrailingwhitespace
: A flag indicating whether or not trailing whitespaces from values being read/written should be skippedkafka.gcs.output.linesep
: Defines the line separator that should be used for parsing. Defaults to \r, \r\n and \n for reading and \n for writingkafka.gcs.output.nullvalue
: Sets the string representation of a null valuekafka.gcs.output.quote
: Sets a single character used for escaping quoted values where the separator can be part of the value. For reading, if you would like to turn off quotations, you need to set not null but an empty stringkafka.gcs.output.quoteall
: A flag indicating whether all values should always be enclosed in quotes.kafka.gcs.output.sep
: Sets a separator for each field and value. This separator can be one or more characterskafka.gcs.output.timestampformat
: Sets the string that indicates a timestamp with timezone formatkafka.gcs.output.timestampntzformat
: Sets the string that indicates a timestamp without timezone format.
Steps to Run the Template
- Ensure you have enabled the subnet with Private Google Access. If you are using “default” VPC created by GCP, you will still have to enable private access as below.
gcloud compute networks subnets update default --region=us-central1 --enable-private-ip-google-access
2. Create a GCS bucket and staging location for jar files.
3. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with python and Git.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python
4. Authenticate the GCloud CLI credentials.
gcloud auth login
5. Configure the Dataproc Serverless job by exporting the variables needed for submission —
GCP_PROJECT
: GCP project id to run Dataproc Serverless on
REGION
: Region to run Dataproc Serverless in
GCS_STAGING_LOCATION
: GCS staging bucket location, created in Step 3
SUBNET
: The VPC subnet to run Dataproc Serverless on, if not using the default subnet (format: projects/<project_id>/regions/<region>/subnetworks/<subnetwork>)
export GCP_PROJECT=<gcp-project>
export REGION=<region>
export GCS_STAGING_LOCATION=<gcs-staging-location>
export SUBNET=<subnet>
export JARS="gs://{jar-bucket}/spark-sql-kafka-0-10_2.12-3.2.0.jar,gs://{jar-bucket}/kafka-clients-2.8.0.jar,gs://{jar-bucket}/commons-pool2-2.6.2.jar,gs://{jar-bucket}/spark-token-provider-kafka-0-10_2.12-3.2.0.jar"
./bin/start.sh \
-- --template=KAFKATOGCS \
--kafka.gcs.checkpoint.location="<gcs checkpoint storage location>" \
--kafka.gcs.output.location.gcs.path= "<gcs output location path>" \
--kafka.gcs.bootstrap.servers="<list of kafka connections>" \
--kafka.gcs.topic="<integration topics to subscribe>" \
--kafka.gcs.starting.offset="<earliest|latest|json_offset>" \
--kafka.gcs.output.format="{json|csv|avro|parquet}" \
--kafka.gcs.output.mode="{append|overwrite}" \
--kafka.gcs.termination.timeout="time in seconds"
6. Monitor the Spark batch job
After submitting the job, you will be able to view the job in the Dataproc Batch UI. From there, we can view both metrics and logs for the job.