Windowing in Apache Flink

Sruthi Sree Kumar
Big Data Processing
3 min readJul 8, 2020

Windowing is a key feature in stream processing systems such as Apache Flink. Windowing splits the continuous stream into finite batches on which computations can be performed. In Flink, windowing can be done on the entire steam or per-key basis.

Keyed window is windowing for the keyed stream, using keyBy(…) method, and then we invoke the window(…) method. For non keyed window, we just need to call the windowAll(…) method as shown below.

//Keyed window
stream.keyBy(...)
.window(...)
//Non keyed window
stream.windowAll(...)

The type of window is defined in Flink using a window assigner. This defines how elements are assigned to windows. All the Flink defined window assigners assign elements based on time which can be either event time or process time. Types of windows assigner the Flink supports are:

  1. Tumbling window
  2. Sliding window
  3. Session window
  4. Global window
  5. User-defined window

Tumbling window

A tumbling window is an equal-sized, continuous and non-overlapping window. A tumbling window is defined with a window interval. For example as in fig a, if we have a tumbling window with a window interval 10 seconds, every incoming event from the data stream for a duration of 10 seconds will fall into the same window.

Fig a: Tumbling window with window interval 10s
// tumbling event-time windows
stream.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))

// tumbling processing-time windows
stream.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

Sliding window

A sliding window is an overlapping window. A sliding window is defined with a window interval and a sliding offset. As we can see in figure b, the window size is 10 second which starts from 0–10 s. We have a sliding offset of 5 s and hence after 5 seconds, the window slides by 5 seconds and we get the second window in between 5–15 seconds.

Fig b: Sliding window with window interval 10s and sliding offset 5s
//Sliding window for non keyed window
stream.timeWindowAll(
Time.seconds(10), //Window Size
Time.seconds(5)) //Slide by 5
// sliding event-time windows
stream.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

// sliding processing-time windows
stream.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

Session window

A session window covers all the elements that occur on a continuous basis. Session window is defined with a session boundary. A session boundary is a period of inactivity during which there will be no incoming events. For example in figure c, we have a session window with a session boundary of 5 sec. As long as we have incoming events within a gap of session boundary, they belong to the same window. Hence we have all the events from 0–19s in the window one. From time 19s to 26s we do not have any incoming events and the time gap is greater than the defined session boundary(5s). Hence we have a new window from time 26s.

Session windows do not overlap and do not have a fixed start and end time. A session window assigner can be configured with either a static session gap or with a session gap extractor function which defines how long the period of inactivity is.

Fig c: A session window with a session boundary of 5s.
// session processing-time windows (static gap)
stream.keyBy(<key selector>).window(ProcessingTimeSessionWindows
.withGap(Time.seconds(5))) //session gap
// session event-time windows (static gap)
stream.keyBy(<key selector>).window(EventTimeSessionWindows
.withGap(Time.seconds(5))) //session gap
// session event-time windows (dynamic gap)
stream.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
// session processing-time windows (dynamic gap)
stream.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap
}))

Global window

A Global window considers the entire stream as a single window. As mentioned in figure d, we will have all the events belonging to the same window.

Fig d: Global window
stream.keyBy(<key selector>)
.window(GlobalWindows.create())

User-defined window

This is a custom window defined by the user by extending the WindowAssigner class.

References:

  1. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
  2. https://www.linkedin.com/learning/apache-flink-real-time-data-engineering/windowing-concepts

--

--