Streaming Kafka Receiver-Less Approach

Kindly read https://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html for more information about Receiver-Less Approach.

Here are some points to ponder on with this kind of approach:

  1. Periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch.
  2. Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel.
  3. There is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
  4. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
  5. Uses simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints.
  6. Each record is received by Spark Streaming effectively exactly once despite failures.

There are 2 files namely:

  1. DirectKafkaWordCount.scala — Consume data from kafka and perform WordCount with records and prints the output
  2. KafkaWordCountProducer.scala — Produces randomly generated numbers to Kafka

Note that this examples are readily available with spark’s github repository.
Before you run both codes make sure that your Zookeeper and Kafka is up.

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

/**
* Produces some random words between 1 and 100 to Kafka.
*
* This program is readily available at
* https://github.com/apache/spark/tree/v2.1.1/examples/src/main/scala/org/apache/spark/examples/streaming
*
*/
object KafkaWordCountProducer {

def main(args: Array[String]) {
val Array(brokers, topic, messagesPerSec, wordsPerMessage) =
Array("localhost:9092", "sample-topic", "20", "2")

// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

// Send some messages
while(true) {
(1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt)
.map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")

val message = new ProducerRecord[String, String](
topic, null, str)
producer.send(message)
}

Thread.sleep(1000)
}
}
}
import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._

/**
* A Program that will consume records from Kafka, perform Wordcount and simply print out output
*
* This program is readily available at
* https://github.com/apache/spark/tree/v2.1.1/examples/src/main/scala/org/apache/spark/examples/streaming
*
*/
object DirectKafkaWordCount {

def main(args: Array[String]): Unit = {

val Array(brokers, topics) =
Array("localhost:9092", "sample-topic")

// Create context with 2 second batch interval
val sparkConf = new SparkConf()
.setAppName("DirectKafkaWordCount")
.setMaster("local[*]")
.setAppName("Sample")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers)
val messages = KafkaUtils
.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words,
// count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words
.map(x => (x, 1L))
.reduceByKey(_ + _)

wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()
}
}

The following is my input:

2 0
5 2
4 0
0 1
3 4
6 6
3 0
0 0
8 4
6 6
9 2
0 8
4 4
5 3
3 3
3 9
2 2
1 4
0 4
6 8

And my output:

(4,7)
(8,3)
(0,8)
(5,2)
(9,2)
(1,2)
(6,5)
(2,5)
(3,6)

I used the following versions:

  • Spark — 2.0.0
  • Kafka — 0.8.2.1
Like what you read? Give Ana Esguerra a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.