Assimilation of Spark Streaming With Kafka

Knoldus Inc.
Knoldus - Technical Insights
2 min readNov 27, 2017

As we know Spark is used at a wide range of organizations to process large datasets. It seems like spark becoming main stream. In this blog we will talk about Integration of Kafka with Spark Streaming. So, lets get started.

How Kafka can be integrated with Spark?

Kafka provides a messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be used to publish results into yet another Kafka topic.

Let’s see how to configure Spark Streaming to receive data from Kafka by creating a SBT project first and add the following dependencies in build.sbt.

val sparkCore = "org.apache.spark" % "spark-core_2.11" % "2.2.0"
val sparkSqlKafka = "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.2.0"
val sparkSql = "org.apache.spark" % "spark-sql_2.11" % "2.2.0"
libraryDependencies ++= Seq(sparkCore, sparkSql, sparkSqlKafka)Now, we need to create a Spark session.val sparkConf: SparkConf = new SparkConf()
.setAppName(sparkAppName)
.setMaster(sparkMaster)
.set("spark.executor.memory", sparkMemory)
.set("spark.executor.core", sparkCores)
.set("spark.sql.streaming.checkpointLocation", "PATH_TO_CHECKPOINT_LOCATION")
val spark: SparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()

Creating Kafka as Source

We can read messages in spark stream by subscribing to a particular topic.val dataFrame: DataFrame = spark
.readStream
.format("kafka")
.option("subscribepattern", "TOPIC")
.option("kafka.bootstrap.servers", "KAFKA_SERVER_URL")
.option("startingoffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) AS value")
subscribePattern : The pattern used to subscribe to topic(s).kafka.bootstrap.servers : It consists of a comma-separated list of host:port.startingOffsets : The start point when a query is started either "latest" or "earliest".Another topic will consume the messages from the previous topic.dataFrame.writeStream
.format("kafka")
.option("truncate", value = false)
.option("kafka.bootstrap.servers", "KAFKA_SERVER_URL")
.option("topic", "TOPIC")
.start()
.awaitTermination()
So, we have created a basic example of Spark Streaming with Kafka. For more information you can refer to these blogs written by Jatin Demla and Ayush Tiwari. I hope you liked the blog.Happy Coding!! :)References
knoldus-advt-sticker

--

--

Knoldus Inc.
Knoldus - Technical Insights

Group of smart Engineers with a Product mindset who partner with your business to drive competitive advantage | www.knoldus.com