Analytics Vidhya
Published in

Analytics Vidhya

Apache Spark Structured Streaming with Pyspark

In the previous article, we looked at Apache Spark Discretized Streams (DStreams) which is a basic concept of Spark Streaming. In this article we will look at the structured part of Spark Streaming.

Structured Streaming is built on top of SparkSQL engine of Apache Spark which will deal with running the stream as the data continues to recieve. Just like the other engines of Spark, it is scalable as well as it is fault-tolerant. Structured Streaming enhances Spark DataFrame APIs with streaming features.

Structured Streaming also ensures recovery of any fault as soon as possible with the help of checkpoints and caching options. In summary, Structured Streaming is a scalable, fault-tolerant and nearly instant operations.

Now it is time to get our hands dirty with the first example 😉

For this example we will use csv files as an input data for our streaming. I’ll simply upload 5 csv files in our directory. These csv files contain some data (ten rows for each file) about randomly generated people and some informations about them like their ages, professions, cities and salaries. Below you can see an example of input data;

Csv data sample

First we will import required Pyspark libraries from Python and start a SparkSession.

Remember that structured streaming proccesing always requires the specification of a schema for the data in the stream.

We will load our data into a streaming DataFrame by using the “ readStream”. We can also check status of our streaming with the “isStreaming” method.

Yes! It is working… Now we have created a streaming DataFrame. Now comes the tricky part for our demonstration 🧐 While we are working on an example, our csv data isn’t created in real time, so we have to simulate streaming conditions. Instead of streaming data as it comes in, we will copy each of our csv files one at a time to our path that we specified in “readStream” above in the code. That’s why we are also setting “maxFilesPerTrigger” option to 1, which tells us only a single csv file will be streamed at a time.

Let’s also look at the schema of DataFrame in a tree format

Next we will apply some transformations which will show us the number of people from each profession and also average salaries of professions with descending order in a DataFrame that will be updated with every new file.

Now we are ready for Streaming except one last point; we need to specify a “format()” for streaming to a destination and “outputMode()” for the determination of the data to be written into a streaming sink.

Most used formats are console, kafka, parquet and memory. I will use the console option as format so we can follow our streaming results from terminal.

We have three options for outputMode() method. These are;

  • append: Only new rows will be written to the sink.
  • complete: All rows will be written to the sink, every time there are updates.
  • update: Only the updated rows will be written to the sink, every time there are updates.

I will also use “complete” option as we have an aggregation in our DataFrame.

Finally we can start streaming with the “start()” method.

Now it is the time for loading our csv files one at a time. Remember we have 5 different csv files, each includes ten rows of data.

Let’s look at the results from terminal after each file loaded (batch 0 to 4 )

After the first csv file
After the second csv file
After the third csv file
After the fourth csv file
After the fifth csv file

As you can see from the screenshots, our DataFrame, which we created from streaming data, updated its columns as the new files loaded.

Windowed Operations in Structured Streaming

As I already mentioned about windowed operation in my previous article about DStreams, I would like to demonstrate it with the example below;

In this example we used socket as our format which enables us to enter data from terminal with the help of netcat utility. Assume that we have a market and we want to know the number of the products that we sale in every single minute. So we enter product names by typing into the terminal and get the results from another terminal (console format) in a DataFrame.

First look at the terminal that we used as socket for data input

I entered a couple of product names in from the terminal in about two minutes (between 03:02- 03:04). So let’s see the results from console which is an Anaconda terminal.

Our query worked perfectly. Now we can easily get the idea of structured windowed streaming. As you can from terminal screenshots, our DataFrame updated itself with the new coming data. We achieved what we were looking for by having a DataFrame which shows us our sales in every single minute by odering it from the latest time on.

Finally we will stop our streaming with the “stop()” method

Conclusion

In this article, I have tried to introduce you to brief basics of Structured Streaming in Apache Spark with using Pyspark API. There are a more advanced operations defined on Structured Streaming. You can always improve your knowledge by searching Spark Structured Streaming Programming Guide and Python Api docs for pyspark in Apache Spark documentations.

I hope you will find this article helpful. In the next article, I will write about how to use Jupyter Notebook for our Spark applications by using the cloud enviroment of Amazon Elastic MapReduce service (EMR).

I will be happy to hear any comments or questions from you. May the data be with you!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Sercan Karagoz

Sercan Karagoz

64 Followers

Data Scientist, Electrical Engineer and Commercial Pilot