Streaming data from Flume to Spark Streaming
In my last post I showed how to configure IntelliJ IDEA for Spark development. Once I setup my own environment I took it for a spin and tried a sample application of reading data in from Flume and using Spark Streaming API to process that in real time.
Spark Streaming API is the built on top of Spark Core API and lets you process the incoming messages in near real time manner. It’ not exactly streaming, such as Apache Storm or Apache Flink, its rather micro-batching, but you can attain micro-batches up-to the level of a second. So if your application needs processing at sub-second levels try using other frameworks, but 1 second processing is also pretty dope.
Spark Streaming API also supports sliding window calculations, this means you can calculate certain statistics or metrics over a period of time. For example, Top 10 twitter hashtags in past 5 minutes. In this case you can configure your spark application to keep calculating top 10 hash tags for window size of 5 minutes. You can also control how often this metric should be calculated by specifying sliding interval which could be as low as 1 sec. So you application will calculate top 10 hash tags for last 5 minutes, every 1 second. Really this level of processing is insanely awesome and can unlock various insights that are hidden in your data.
Without further ado let’s jump into writing some code. This problem is divided into two parts
- Setting up flume to send data.
- Writing Spark Streaming application to process it in real-time.
Part 1: Setting up Flume to emit data
Flume can talk to Spark application can in two ways:
- Data Push — Data will be pushed in a certain format on a certain port where the receiver (Spark Streaming application) is already waiting to read the data. One caveat in this approach is that your receiver application should be running for Flume to push the data.
- Custom sink — Your application can register itself as custom sink in Flume. In this case data will stay buffered within Flume until the sink reads it hence your spark streaming application can read this data once it’s up, there is no requirement for it to be up and running when the data is written by Flume source.
In general Flume sinks work in push model rather than pull, but there is a custom sink specifically built for spark which can work with pull model also. This custom sink is just a wrapper around avro sink that utilizes polling to read data in pull manner.
In this tutorial I’ll be working with Data push approach. In coming posts I’ll show the custom sink approach as well.
We’ll use the Flume’s SpoolDir source agent which keeps checking a directory for new files and publish its content to the sink. For sink we’ll use the Flume’s built in Avro sink at a predefined port where our Spark Streaming receiver will be listening for new data.
Here is the Flume configuration file that we’ll use to configure the above mentioned setup.
# Agent declaration
agent.sources = src
agent.sinks = snk
agens.channels = chn
# Define source and its properties
agent.sources.src.type = spooldir
agent.sources.src.spoolDir = /home/maria_dev/spool
# Define sink and its properties
agent.sinks.snk.type = avro
agent.sinks.snk.hostname = localhost
agent.sinks.snk.port = 4000
# Channel configuration
agent.channels.chn.type = memory
agent.channels.chn.capacity = 10000
# Attaching channels to the source and sink
agent.sources.src.channels = chn
agent.sinks.snk.channel = chnIf you are familiar with flume the configuration file should be pretty straight forward to understand. You can read more about sources and sinks here in case you need a refresher.
Now that we have create the configuration file we’ll move to the 2nd part of the problem that is, creating the spark streaming application.
Part 2: Writing Spark Streaming application
Create a Scala SBT project in IntelliJ or any other IDE of your choice.
Add following dependencies to build.sbt file.
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-streaming_2.11" % "2.1.1" % "provided",
"org.apache.spark" % "spark-streaming-flume_2.11" % "2.1.0"
)Add a file App.scala with following code.
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object App {
def parseLines(line : String) = {
val fields = line.toString.split('t')
val movieId = fields(1)
(movieId, 1)
}
def main(args : Array[String]): Unit ={
val sc = new SparkContext("local[*]", "SparkStreamingDemo")
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(1))
val flumeStream = FlumeUtils.createStream(ssc, "localhost" , 4000)
val lines = flumeStream.map(x => new String(x.event.getBody().array(), "UTF-8"))
val movies = lines.map(parseLines)
val count = movies.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x-y, Seconds(300), Seconds(10))
val sortedCount = count.transform(rdd => rdd.sortBy( x=> x._2, false))
sortedCount.print()
ssc.checkpoint("/home/maria_dev/checkpoint")
ssc.start()
ssc.awaitTermination()
}
}The above code calculates the top 10 rated movie in past 5 minutes window with a 10 second sliding window on the movielens data set. You can find this data set here.
This program will continue doing this until we explicitly terminate the spark job.
Now build the artifact/jar for this project and transfer it to you cluster from where you’ll submit it using spark-submit command.
Running the application
I have a single node HDP cluster running in Virutal Box on my machine, but same steps can be followed for other distributions as well like Cloudera, MapR or your custom hadoop setup.
Once you have both flume configuration file and spark jar file on the cluster we’ll start by executing the spark job first.
Issue the following command from the directory where your jar resides.
spark-submit --class App <name_of_your_jar>Once your spark streaming job start running you’ll see that it is processing every 10 seconds. Since we haven’t connected the flume source yet that’s why its not printing anything as there is no data to process.
Now lets start the flume agent. Issue following command from the directory where flume config resides.
flume-ng agent -n agent -f <flume_config_file_name>Once the agent starts running you’ll see that it is looking for new files in the /home/maria_dev/spool directory. This is the directory that we specified in the flume config file. You can change it to any location you want.
Now try placing u.data file from the movielens data set into the spool configured directory and see your spark streaming job calculate the top 10 movies.
You’ll see your program give top 10 movies list. If you put the same file again, with different name, in the spool directory these count should get double.
And there you have it. Integrating Flume with Spark Streaming for near real time processing.
Published July 10, 2017April 16, 2018
Originally published at sushilkumar.xyz on July 10, 2017.
