Kafka Streaming using Spark

Sarvan Kattamuru
4 min readAug 3, 2023

--

Banner

Spark Overview:

Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python, and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.

Structure Streaming Overview:

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

Streaming control flow

Getting started with development:

Set up a Spring Boot project: Start by creating a new Spring Boot project using the Java language. You can use Spring Initializer (https://start.spring.io/) or your preferred IDE to generate the project structure.

Add Kafka dependencies: In your project’s build.gradle (or pom.xml for Maven), include the necessary dependencies for Kafka and Spring Kafka. For example, add the following dependencies to your build.gradle file:

implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'

Add Spark Kafka dependencies: In your project’s build.gradle (or pom.xml for Maven), include the necessary dependencies for Spark & Spark Kafka. For example, add the following dependencies to your build.gradle file:

implementation('org.apache.spark:spark-sql_2.12:3.3.2')
implementation("org.apache.spark:spark-hive_2.12:3.1.0")
implementation "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.0"
implementation 'org.apache.kafka:kafka-streams'

Configure Kafka properties: In your application.properties or application.yml file, configure the Kafka-related properties such as bootstrap servers, group ID, and any other required properties. For example:

kafka.bootstrapServers= localhost:9093
kafka.topic= stream-topic

Configure Spark properties: In your application.properties or application.yml file, configure the Spark-related properties such as application name, master thread config and any other required properties. For example:

spark.appName= spark-kafka-streaming
spark.master= local[1]

Create a Kafka Event producer: Create a Java class to define the Kafka Event producer that produce multiple events, read using the spark application. For example:

public void publishEvents(int eventsCount) {
   for (int i = 0; i < eventsCount; i++) {       UUID key = UUID.randomUUID();       ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(kafkaProperties.getTopic(), key.toString(), "Event" + i);
producerHandler.publishEvent(producerRecord, (metadata, ex) -> {
if (ex == null) {
// the record was successfully sent
LOGGER.info("Received new metadata. \n" +
"Topic:" + metadata.topic() + "\n" +
"Partition: " + metadata.partition() + "\n" +
"Offset: " + metadata.offset() + "\n" +
"Timestamp: " + metadata.timestamp());
} else {
LOGGER.error("Error while publishing event : {}", ex.getLocalizedMessage());
}
});
}
}

Create a Spark Session: Create a Java class to define the Spark session configuration using the spark configuration properties. For example:

@Bean
public SparkSession getSparkSession(){
SparkConf sparkConf = new SparkConf().setAppName("Kafka Extractor");
SparkSession ss = null;
try {
ss = SparkSession.builder()
.master(sparkProperties.getMaster())
.appName(sparkProperties.getApplicationName())
.getOrCreate();
}catch (Exception e){
e.printStackTrace();
}
return ss;
}

Reading events from kafka: Create a Java class to define the Sql select expression used for retrieving the events from Kafka using Spark. Spark uses the Dataset API to load the events red from the Kafka. Events can be read by iterating over the Dataset and retrieving the event key and value can be read using the Row API. For example:

public Dataset<Row> extract() {
Dataset<Row> dataset = spark.read()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProperties.getBootstrapServers())
.option("subscribe", kafkaProperties.getTopic())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
return dataset;
}
@Override
public void run(ApplicationArguments args) throws Exception {
LOGGER.info("Inside run, before extracting events");
Dataset<Row> dataset = extractor.extract();
   LOGGER.info("No of events fetched : {}",dataset.count());
dataset.foreach(row -> {
String key = row.getAs("key");
String value = row.getAs("value");
LOGGER.info("Key : {} , Value : {}", key, value);
});
}

That’s all for now. Further areas to be explored are Different expression mechanisms to read the data from kafka, Spark Master configuration for yielding more performance and Submitting the spark jobs (Airflow).

Github repo: https://github.com/sarvan9/spring_spark_sql_kafka

References:

https://spark.apache.org/docs/latest/index.html

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

https://airflow.apache.org/docs/

--

--