Spark Integration With kafka(Batch)

Aditya Pimparkar
The Startup
Published in
3 min readAug 9, 2020

Spark integration with kafka (Batch)

In this article we will discuss about the integration of spark(2.4.x) with kafka for batch processing of queries.

Kafka:-
Kafka is a distributed publisher/subscriber messaging system that acts as a pipeline for transfer of real time data in fault-tolerant and parallel manner. Kafka helps in building real-time streaming data pipelines that reliably gets data between systems or applications. This data can be ingested and processed either continuously (spark structured streaming) or in batches. In this article we will discuss ingestion of data from kafka for batch processing using spark. We will discuss interaction of spark with kafka and the spark APIs used for reading as well as writing of data.

Kafka Source (Read):-

Dataset<Row> kafka_df = spark.read().format(“kafka”).option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”).option(“subscribe”, “topic1,topic2”).option(“startingOffsets”, “{\”topic1\”:{\”0\”:23,\”1\”:-2},\”topic2\”:{\”0\”:-2}}”).option(“endingOffsets”, “{\”topic1\”:{\”0\”:50,\”1\”:-1},\”topic2\”:{\”0\”:-1}}”).load();

Each dataframe created from kafka ingestion has seven columns. These columns define attributes of each message ingested from kafka.
1. key
2. value
3. topic
4. partition
5. offset
6. timestamp
7. timestampType

‘Key’ and ‘value’ columns are used to extract the content of the message. Mostly ‘value’ column contains data that can be expanded into a dataframe.
Data can be present in multiple formats in kafka. Here we have provided methods for two formats.

1) JSON:-

• If schema is present:-

StructType json_schema=”schema of the dataframe”
Dataset<Row> input = kafka_df.withColumn(“data”, functions.from_json(kafka_df.col(“value”), schema)).select(“data.*”);

This method can be used if the schema of the data is fixed and already defined. Mostly this is not the scenario in the real world as columns can be added or deleted leading to changes in schema.

• If Schema is not present:-
If the ‘value’ column contains json strings as source, then these json strings can be converted in a dataframe that can be further used for processing using the following transformations:-

Dataset<Row> input = sparkSession.read().json(kafka_df.selectExpr(“CAST(value AS STRING) as value”).map(Row::mkString, Encoders.STRING()))

In this transformation, we are casting ‘value’ column into a string column (converting binary column into string) and converting it into a dataframe of json strings. Map function helps in converting Dataset<Row> to Dataset<String>. Reading this dataset having json strings using the API “read().json()” will help in creation of input dataset that can be used for further processing.
Above can also be done alternatively using Java RDD:-

JavaRDD<String> store = kafka_df.selectExpr(“CAST(value AS STRING) as value”).toJavaRDD().map(x->x.mkString());
Dataset<Row> input = spark.read().json(store);

These methods are really helpful in scenarios of changing schema as explicit declaration of schema is not a prerequisite.

2) AVRO:-

Ingestion of data in avro format needs schema to be present in the form of json string.

String json_schema=”schema of the dataframe”
Dataset<Row> store = kafka_df.select(package$.MODULE$.from_avro(kafka_df.col(“value”),json_schema).as(“data”));
Dataset<Row> input = store.select(“data.*”);

Kafka Sink (Write):-

Data can be published into kafka in batches or using a streaming job. ‘Value’ column is required to be published and rest of the columns are optional.

1) JSON:-

output.selectExpr(“to_json(struct(*)) AS value”).write().format(“kafka”).option(“kafka.bootstrap.servers”, “host:port”).option(“topic”, “topic_name”).save();

2) AVRO:-

output.select(package$.MODULE$.to_avro(struct(“*”)).as(“value”)).write().format(“kafka”).option(“kafka.bootstrap.servers”, “localhost:9093”).option(“topic”,”test_avro”).save();

The idea in both cases is to construct a column of type struct having all the columns as sub-columns and write this column (value) into kafka so that later this column can be retrieved and used to recreate the dataframe.

root
| — value: struct (nullable = true)
| | — column1: string (nullable = true)
| | — column2: string (nullable = true)

The above methods can be used for reading and writing of data from and to kafka respectively.

Code can be found on the link below:-
https://github.com/parkar-12/spark-examples/tree/master/KafkaBatchProcessing

--

--