EXPEDIA GROUP TECHNOLOGY — DATA

Apache Spark Structured Streaming — Operations (5 of 6)

Get your streams in shape with Filters, Joins, Windows, and User Defined Functions

Neeraj Bhadani
Expedia Group Technology

--

A carpentry workshop with many neatly organised tools.
Photo by Barn Images on Unsplash

In previous blog posts, we discussed sources and sinks to use with Apache Spark™️ Streaming as well as checkpoints and triggers. In this post, we discuss operations that we can perform in Spark Streaming like filter, join, UDF, and window.

You may also be interested in some of my other posts on Apache Spark.

We can perform various operations on a streaming DataFrame like select, filter, groupBy, join, window, UDF, map, flatMap, etc. We cover some of these operations in this blog.

decorative separator

Setup

Let’s start by creating a streaming DataFrame named initDF from a file source by reading two files in each micro-batch to perform some of the above-mentioned operations. We use console output throughout this blog which we discussed in detail in our previous blogs.

Filter

Now let’s perform simple select and filter operations. We filter records if the stock closed with some gain on a particular day.

Let’s check the contents of the above DataFrame.

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----------+----------+----------+
| Name| Date| Open| Close|
+-----+----------+----------+----------+
|GOOGL|2017-01-03|800.619995| 808.01001|
|GOOGL|2017-01-05| 807.5| 813.02002|
|GOOGL|2017-01-06| 814.98999|825.210022|
+-----+----------+----------+----------+
only showing top 3 rows

We get records with selected columns and where Close is greater than Open. You can find the complete code on GitHub.

GroupBy

Now let’s perform a simple aggregation operation using groupBy to find the year-wise maximum HIGH price for each stock. Copy file MSFT_2017.csv to data/stream a folder to simulate streaming data.

Let’s check the content of resultDF DataFrame as follow.

+----+----+----+
|Name|Year|Max |
+----+----+----+
|MSFT|2017|87.5|
+----+----+----+

Here we get the maximum stock price for Microsoft in 2017.

Joins

Here we discuss join operations with Streaming DataFrames. We can join a Streaming DataFrame with another Streaming DataFrame and call it a stream-stream join. Also, we can join a Streaming DataFrame with a Batch DataFrame and call it a stream-batch join.

Let’s start with stream-batch join.

Stream-Batch Join

With stream-batch joins, we can join a batch DataFrame with a streaming DataFrame. Let’s discuss with an example.

Our streaming DataFrame is the initDF defined in the Setup section above. We create a batch DataFrame from the COMPANY.CSV file which contains two columnsName and FullCompanyName. Here is the sample data for COMPANY.CSV file.

Name,FullCompanyName
AAPL,Apple
AMZN,Amazon
GOOGL,Google

Output:

+-----+---------------+
| Name|FullCompanyName|
+-----+---------------+
| AAPL| Apple|
| AMZN| Amazon|
|GOOGL| Google|
+-----+---------------+

Let’s perform an inner join on streaming DataFrame (resultDF) and batch DataFrame (companyDF).

Here is the output of the first micro-batch:

+-----+---------------+----+-------+
|Name |FullCompanyName|Year|Max |
+-----+---------------+----+-------+
|AMZN |Amazon |2017|1213.41|
|GOOGL|Google |2017|1086.49|
+-----+---------------+----+-------+

Let’s perform left_outer join with streaming DataFrame on left and batch DataFrame on right.

Output for one of the micro-batches:

+-----+----+-------+---------------+
|Name |Year|Max |FullCompanyName|
+-----+----+-------+---------------+
|MSFT |2017|87.5 |null |
|AMZN |2017|1213.41|Amazon |
|GOOGL|2017|1086.49|Google |
+-----+----+-------+---------------+

FullCompanyName is null for Name = MSFT because we don’t have any such record in companyDF .

Note: right_outer join would work similarly if we switch the order of DataFrames.

Let’s perform left_outer join with batch DataFrame on left and streaming DataFrame on right.

We will get the error below.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Left outer join with a streaming DataFrame/Dataset on the right and a static DataFrame/Dataset on the left is not supported;;

Here we are matching all the records from a static DataFrame on the left with a stream DataFrame on right. If records do not match from the static DF (Left) to stream DF (Right) then the system cannot return null, since the data changes on stream DF (Right) and we cannot guarantee if we will get matching records. That is why full_outer join is not supported. You can find the supported join details here as well. You can find the complete code on GitHub.

Note: We will get the same error with right_outer join with streaming DataFrame on left and batch DataFrame on right side of the join.

Stream-stream join

Let’s create 2 streaming DataFrames from initDF defined above and check if they are streaming DataFrames.

Output:

streamDf1 Streaming DataFrame : true
streamDf2 Streaming DataFrame : true

Now we perform an inner join operation on the above DataFrames based on Name and Date column.

Output for one of the sample micro-batch.

+----+----------+-----------+-----------+-----------+-----------+
|Name|Date |High |Low |Open |Close |
+----+----------+-----------+-----------+-----------+-----------+
|AMZN|2017-04-11|911.23999 |897.5 |907.039978 |902.359985 |
|AMZN|2017-07-20|1034.969971|1022.52002 |1031.589966|1028.699951|
|AMZN|2017-08-02|1003.210022|981.72998 |1001.77002 |995.890015 |
+----+----------+-----------+-----------+-----------+-----------+

In joinDF streaming DataFrame, we have High and Low columns from streamDf1 and Open and Close from streamDf2 streaming DataFrame. You can find the complete code on GitHub.

left_outerand right_outer joins are conditionally supported with watermark but full_outer join is not supported. You can find more details here.

UDF — User Defined Function

Similar to batch DataFrame we can also perform user-defined functions (UDF) on streaming DataFrames as well. Let’s create a UDF to see if the stock is Up or Down on a given day.

Now let’s apply the above-defined UDF on initDF streaming DataFrame.

val resultDF = initDF
.withColumn("up_down_udf", upUdf(col("Close"), col("Open")))

Output for one of the micro-batch

+----+----------+----------+----------+-----------+
|Name|Date |Open |Close |up_down_udf|
+----+----------+----------+----------+-----------+
|AMZN|2017-01-03|757.919983|753.669983|Down |
|AMZN|2017-01-04|758.390015|757.179993|Down |
|AMZN|2017-01-05|761.549988|780.450012|UP |
|AMZN|2017-01-06|782.359985|795.98999 |UP |
|AMZN|2017-01-09|798.0 |796.919983|Down |
+----+----------+----------+----------+-----------+

You can verify the values in up_down_udf column based on upUDF.

We can also use UDFs in SQL queries as well

You can find the complete code on GitHub.

Window

Window operations are very similar to groupBy operations. In groupBy, aggregation is based on the specified group or key while in window operations aggregation is based on event windows. Spark supports 2 types of windows Tumbling window and Sliding window.

Let’s discuss each one of them in detail. Take an example of numbers from 1–10 and we will calculate SUM using different window operations.

Tumbling window

Tumbling windows are non-overlapping which means each data point will be part of only one window.

Demonstrate Tumbling window operation. Tumbling windows are non-overlapping which means each data point will be part of only one window. The diagram shows a window size of 2 being used to aggregate 10 data points into 5 windows.

Here the size of the window is 2 and we have 5 non-overlapping windows along with the sum of the elements in each window. Also, we can verify that none of the elements are overlapping between windows.

Now let us define one in our streaming application. We can use a window function and specify the DateTime column and window duration say 2 minutes or30 seconds or 1 hour or 5 days etc.

Let’s consider the initDF defined above in the Setup section and try to apply a window function.

Here we used the Date column with 10 days as window duration and sorted the result by window start time to check the non-overlapping behavior of tumbling windows.

Now let’s have a look at the first few records from the output of a micro-batch.

+------------------------------------------+----+-----+
|window |Name|Max |
+------------------------------------------+----+-----+
|[2016-12-25 00:00:00, 2017-01-04 00:00:00]|MSFT|62.84|
|[2017-01-04 00:00:00, 2017-01-14 00:00:00]|MSFT|63.4 |
|[2017-01-14 00:00:00, 2017-01-24 00:00:00]|MSFT|63.12|
|[2017-01-24 00:00:00, 2017-02-03 00:00:00]|MSFT|65.91|
+------------------------------------------+----+-----+

We can check the size of the window in the window column and can see that none of the windows overlap. You can change the duration of the window and check the result yourself as well.

Sliding window

As its name suggests, this window will slide instead of tumbling on the data. We can specify the level of sliding needed. These are overlapping windows. Let’s first try to understand with a simple example of numbers from 1–10.

Demonstrates the sliding window. Sliding windows are overlapping windows and we can specify slide interval while defining it. The diagram shows a window with size 3 moving through 4 positions with slide interval 2 as it aggregates over 10 data points.

Here we have defined the window size as 3 and slide interval as 2. As we can see in the snapshot above, these windows overlap. For example, the number 3 is present in both windows 1 and 2.

To define a sliding window, along with DateTime and Window Size in the window function, we specify slide Duration as the third argument. Let’s try to perform a sliding window operation in our streaming application.

We used 5 days as our slide duration. Let’s check the output now.

+------------------------------------------+----+----+-----+
|window |Name|Year|Max |
+------------------------------------------+----+----+-----+
|[2016-12-25 00:00:00, 2017-01-04 00:00:00]|MSFT|2017|62.84|
|[2016-12-30 00:00:00, 2017-01-09 00:00:00]|MSFT|2017|63.15|
|[2017-01-04 00:00:00, 2017-01-14 00:00:00]|MSFT|2017|63.4 |
|[2017-01-09 00:00:00, 2017-01-19 00:00:00]|MSFT|2017|63.4 |
+------------------------------------------+----+----+-----+

In the output above, the windows overlap. You can find the complete code on GitHub. You can also find various window functions on batch DataFrame in this blog.

The next post in this series covers Watermarking in Spark Streaming.

I hope you enjoyed learning about important operations like Select, Filter, Join, UDF, and Window in Spark Streaming.

Here are other blogs on Apache Spark Structured Streaming series.

decorative separator

References:

This blog post is provided for educational purposes only. Certain scenarios may utilize fictitious names and test data for illustrative purposes. The content is intended as a contribution to the technological community at large and may not necessarily reflect any technology currently being implemented or planned for implementation at Expedia Group, Inc.

--

--