Apache Spark Streaming Simplified

kiran sreekumar
The Startup
Published in
4 min readJan 11, 2018
A typical spark streaming data pipeline.

The above data flow depicts a typical streaming data pipeline used for streaming data analytics. OK Lets split it up, You need a source and in this example I will use a delimited file as a source for the Kafka topic.There are multiple ways by which we can send data to Kafka. You can either write a Kafka producer or use a service like Flume with source as the file and sink as Kafka.

A sample Kafka producer using the java API which reads the file and sends the data line by line as messages to a predefined topic in Kafka is shown below.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.FileReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerFile {public static void main(String[] args) throws InterruptedException,
ExecutionException {
final String fileName = "/resources/SalesJan.csv";
String line;
String topicName = test;
final KafkaProducer<String, String> kafkaProducer;
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("client.id", "KafkaFileProducer");
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(properties);
int count = 0;
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(
fileName))) {
while ((line = bufferedReader.readLine()) != null) {
count++;
kafkaProducer.send(new ProducerRecord<String, String>(
topicName, Integer.toString(count), line));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

So now we have data in our Kafka topic and if we open a console consumer it should start showing the contents of the file as Kafka messages.

> bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic test

Now, We will see how we can consume this data using spark streaming.

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaSparkStream {public static void main(String[] args) throws InterruptedException {SparkConf sparkConf = new SparkConf().setAppName("kafkaSparkStream")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "1");
Set<String> topicName = Collections.singleton("test");
JavaPairInputDStream<String, String> kafkaSparkPairInputDStream = KafkaUtils
.createDirectStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams,
topicName);
JavaDStream<String> kafkaSparkInputDStream = kafkaSparkPairInputDStream
.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
kafkaSparkInputDStream.print();
ssc.start();
ssc.awaitTermination();
}
}

The job when submitted will connect to the Kafka topic and accumulate data for the interval specified (Duration(5000)).There should be some spark action specified in the job to execute it. kafkaSparkInputDStream.print() is the action for the job which will keep on printing the content of the Dstream into the console. Any ETL’s can be applied on the data using spark transformations and actions or Spark SQL and the enriched data can be pushed to different destinations supported by the API.

Let us see how spark streaming process the data.

Dstreams are split into RDDS and are processed by the executors.

When the job runs, each 5 seconds worth of data from Kafka is converted into a Dstream. It is at this point that spark brings its parallelism into picture. The Dstream is split into multiple RDD’s and is sent across to the Executors for processing.The processed data can be sent to any destination supported or can simply be written into local disk. Parallelism in data consumption in spark streaming job depends upon the number of partitions in the Kafka topic which internally means that the number of consumers started by the job will be equal to the number of partitions.

Some tuning tips.Look out for the below parameters. These should be tuned to the Hadoop cluster resource availability.

spark.executor.cores=5 — Number of cores used by each executor in the cluster
spark.driver.memory=8g — Spark 2 requires better driver heap size.This is the main reason for OOM errors.
spark.executor.memory=10g — Memory used by each executor in the cluster
spark.executor.instances=10 — Number of executors that should be used.This depends on the resource availability.If the total memory available is 10 GB and each executor is given 2 GB, then the max number of possible executors will be 3.The rest of the memory will most probably be allocated to other resources and be reserved as overhead.

Thanks for reading!!

This story is published in The Startup, Medium’s largest entrepreneurship publication followed by 283,454+ people.

Subscribe to receive our top stories here.

--

--