EXPEDIA GROUP TECHNOLOGY — DATA
Apache Spark Structured Streaming — Operations (5 of 6)
Get your streams in shape with Filters, Joins, Windows, and User Defined Functions
In previous blog posts, we discussed sources and sinks to use with Apache Spark™️ Streaming as well as checkpoints and triggers. In this post, we discuss operations that we can perform in Spark Streaming like filter, join, UDF, and window.
You may also be interested in some of my other posts on Apache Spark.
- Apache Spark Structured Streaming — Overview
- Apache Spark Structured Streaming — Input Sources
- Apache Spark Structured Streaming — Output Sinks
- Apache Spark Structured Streaming — Checkpoint and Triggers
- Deep Dive into Apache Spark DateTime Functions
- Working with JSON in Apache Spark
- Deep Dive into Apache Spark Window Functions
- Deep Dive into Apache Spark Array Functions
- Start Your Journey with Apache Spark
We can perform various operations on a streaming DataFrame like select, filter, groupBy, join, window, UDF, map, flatMap, etc. We cover some of these operations in this blog.
Setup
Let’s start by creating a streaming DataFrame named initDF
from a file source by reading two files in each micro-batch to perform some of the above-mentioned operations. We use console
output throughout this blog which we discussed in detail in our previous blogs.
Filter
Now let’s perform simple select
and filter
operations. We filter records if the stock closed with some gain on a particular day.
Let’s check the contents of the above DataFrame.
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----------+----------+----------+
| Name| Date| Open| Close|
+-----+----------+----------+----------+
|GOOGL|2017-01-03|800.619995| 808.01001|
|GOOGL|2017-01-05| 807.5| 813.02002|
|GOOGL|2017-01-06| 814.98999|825.210022|
+-----+----------+----------+----------+
only showing top 3 rows
We get records with selected columns and where Close
is greater than Open
. You can find the complete code on GitHub.
GroupBy
Now let’s perform a simple aggregation operation using groupBy to find the year-wise maximum HIGH
price for each stock. Copy file MSFT_2017.csv
to data/stream
a folder to simulate streaming data.
Let’s check the content of resultDF
DataFrame as follow.
+----+----+----+
|Name|Year|Max |
+----+----+----+
|MSFT|2017|87.5|
+----+----+----+
Here we get the maximum stock price for Microsoft in 2017.
Joins
Here we discuss join operations with Streaming DataFrames. We can join a Streaming DataFrame with another Streaming DataFrame and call it a stream-stream join. Also, we can join a Streaming DataFrame with a Batch DataFrame and call it a stream-batch join.
Let’s start with stream-batch join.
Stream-Batch Join
With stream-batch joins, we can join a batch DataFrame with a streaming DataFrame. Let’s discuss with an example.
Our streaming DataFrame is the initDF
defined in the Setup section above. We create a batch DataFrame from the COMPANY.CSV
file which contains two columnsName
and FullCompanyName.
Here is the sample data for COMPANY.CSV
file.
Name,FullCompanyName
AAPL,Apple
AMZN,Amazon
GOOGL,Google
Output:
+-----+---------------+
| Name|FullCompanyName|
+-----+---------------+
| AAPL| Apple|
| AMZN| Amazon|
|GOOGL| Google|
+-----+---------------+
Let’s perform an inner
join on streaming DataFrame (resultDF
) and batch DataFrame (companyDF
).
Here is the output of the first micro-batch:
+-----+---------------+----+-------+
|Name |FullCompanyName|Year|Max |
+-----+---------------+----+-------+
|AMZN |Amazon |2017|1213.41|
|GOOGL|Google |2017|1086.49|
+-----+---------------+----+-------+
Let’s perform left_outer
join with streaming DataFrame on left and batch DataFrame on right.
Output for one of the micro-batches:
+-----+----+-------+---------------+
|Name |Year|Max |FullCompanyName|
+-----+----+-------+---------------+
|MSFT |2017|87.5 |null |
|AMZN |2017|1213.41|Amazon |
|GOOGL|2017|1086.49|Google |
+-----+----+-------+---------------+
FullCompanyName
is null
for Name = MSFT
because we don’t have any such record in companyDF
.
Note: right_outer
join would work similarly if we switch the order of DataFrames.
Let’s perform left_outer
join with batch DataFrame on left and streaming DataFrame on right.
We will get the error below.
Exception in thread "main" org.apache.spark.sql.AnalysisException: Left outer join with a streaming DataFrame/Dataset on the right and a static DataFrame/Dataset on the left is not supported;;
Here we are matching all the records from a static DataFrame on the left with a stream DataFrame on right. If records do not match from the static DF (Left) to stream DF (Right) then the system cannot return null
, since the data changes on stream DF (Right) and we cannot guarantee if we will get matching records. That is why full_outer
join is not supported. You can find the supported join details here as well. You can find the complete code on GitHub.
Note: We will get the same error with right_outer
join with streaming DataFrame on left and batch DataFrame on right side of the join.
Stream-stream join
Let’s create 2 streaming DataFrames from initDF
defined above and check if they are streaming DataFrames.
Output:
streamDf1 Streaming DataFrame : true
streamDf2 Streaming DataFrame : true
Now we perform an inner join operation on the above DataFrames based on Name
and Date
column.
Output for one of the sample micro-batch.
+----+----------+-----------+-----------+-----------+-----------+
|Name|Date |High |Low |Open |Close |
+----+----------+-----------+-----------+-----------+-----------+
|AMZN|2017-04-11|911.23999 |897.5 |907.039978 |902.359985 |
|AMZN|2017-07-20|1034.969971|1022.52002 |1031.589966|1028.699951|
|AMZN|2017-08-02|1003.210022|981.72998 |1001.77002 |995.890015 |
+----+----------+-----------+-----------+-----------+-----------+
In joinDF
streaming DataFrame, we have High
and Low
columns from streamDf1
and Open
and Close
from streamDf2
streaming DataFrame. You can find the complete code on GitHub.
left_outer
and right_outer
joins are conditionally supported with watermark but full_outer
join is not supported. You can find more details here.
UDF — User Defined Function
Similar to batch DataFrame we can also perform user-defined functions (UDF) on streaming DataFrames as well. Let’s create a UDF to see if the stock is Up or Down on a given day.
Now let’s apply the above-defined UDF on initDF
streaming DataFrame.
val resultDF = initDF
.withColumn("up_down_udf", upUdf(col("Close"), col("Open")))
Output for one of the micro-batch
+----+----------+----------+----------+-----------+
|Name|Date |Open |Close |up_down_udf|
+----+----------+----------+----------+-----------+
|AMZN|2017-01-03|757.919983|753.669983|Down |
|AMZN|2017-01-04|758.390015|757.179993|Down |
|AMZN|2017-01-05|761.549988|780.450012|UP |
|AMZN|2017-01-06|782.359985|795.98999 |UP |
|AMZN|2017-01-09|798.0 |796.919983|Down |
+----+----------+----------+----------+-----------+
You can verify the values in up_down_udf
column based on up
UDF.
We can also use UDFs in SQL queries as well
You can find the complete code on GitHub.
Window
Window operations are very similar to groupBy
operations. In groupBy
, aggregation is based on the specified group or key while in window
operations aggregation is based on event windows. Spark supports 2 types of windows Tumbling window and Sliding window.
Let’s discuss each one of them in detail. Take an example of numbers from 1–10 and we will calculate SUM
using different window operations.
Tumbling window
Tumbling windows are non-overlapping which means each data point will be part of only one window.
Here the size of the window is 2 and we have 5 non-overlapping windows along with the sum of the elements in each window. Also, we can verify that none of the elements are overlapping between windows.
Now let us define one in our streaming application. We can use a window
function and specify the DateTime
column and window duration
say 2 minutes
or30 seconds
or 1 hour
or 5 days
etc.
Let’s consider the initDF
defined above in the Setup section and try to apply a window function.
Here we used the Date
column with 10 days
as window duration and sorted the result by window start time to check the non-overlapping behavior of tumbling windows.
Now let’s have a look at the first few records from the output of a micro-batch.
+------------------------------------------+----+-----+
|window |Name|Max |
+------------------------------------------+----+-----+
|[2016-12-25 00:00:00, 2017-01-04 00:00:00]|MSFT|62.84|
|[2017-01-04 00:00:00, 2017-01-14 00:00:00]|MSFT|63.4 |
|[2017-01-14 00:00:00, 2017-01-24 00:00:00]|MSFT|63.12|
|[2017-01-24 00:00:00, 2017-02-03 00:00:00]|MSFT|65.91|
+------------------------------------------+----+-----+
We can check the size of the window in the window
column and can see that none of the windows overlap. You can change the duration of the window and check the result yourself as well.
Sliding window
As its name suggests, this window will slide instead of tumbling on the data. We can specify the level of sliding needed. These are overlapping windows. Let’s first try to understand with a simple example of numbers from 1–10.
Here we have defined the window size as 3
and slide interval as 2
. As we can see in the snapshot above, these windows overlap. For example, the number 3
is present in both windows 1 and 2.
To define a sliding window, along with DateTime
and Window Size
in the window function, we specify slide Duration
as the third argument. Let’s try to perform a sliding window operation in our streaming application.
We used 5 days as our slide duration. Let’s check the output now.
+------------------------------------------+----+----+-----+
|window |Name|Year|Max |
+------------------------------------------+----+----+-----+
|[2016-12-25 00:00:00, 2017-01-04 00:00:00]|MSFT|2017|62.84|
|[2016-12-30 00:00:00, 2017-01-09 00:00:00]|MSFT|2017|63.15|
|[2017-01-04 00:00:00, 2017-01-14 00:00:00]|MSFT|2017|63.4 |
|[2017-01-09 00:00:00, 2017-01-19 00:00:00]|MSFT|2017|63.4 |
+------------------------------------------+----+----+-----+
In the output above, the windows overlap. You can find the complete code on GitHub. You can also find various window functions on batch DataFrame in this blog.
The next post in this series covers Watermarking in Spark Streaming.
I hope you enjoyed learning about important operations like Select, Filter, Join, UDF, and Window in Spark Streaming.
Here are other blogs on Apache Spark Structured Streaming series.
- Apache Spark Structured Streaming — First Streaming Example
- Apache Spark Structured Streaming — Input Sources
- Apache Spark Structured Streaming — Output Sinks
- Apache Spark Structured Streaming — Checkpoints and Triggers
- Apache Spark Structured Streaming — Watermarking
References:
- http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#operations-on-streaming-dataframesdatasets
- http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations
- http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
This blog post is provided for educational purposes only. Certain scenarios may utilize fictitious names and test data for illustrative purposes. The content is intended as a contribution to the technological community at large and may not necessarily reflect any technology currently being implemented or planned for implementation at Expedia Group, Inc.