Of streaming processing and lateness (part 2/2)

William Attache
The AB Tasty Tech & Product Blog
4 min readMay 16, 2024
Image by TaniaRose from

In my previous post, we discussed what late events are in Apache Beam , and how to handle them when using FixedWindows. The same reasoning holds for SlidingWindows, and I strongly suggest you to adapt the example we saw last time for these type of windows before reading the rest of this post.

Scenario

We take the same use case as in the previous story: group the elements by keys and sum their values. In this scenario, we get elements :

Characterisation of each input event

Individual windows are sessions which starts with the timestamp of the event, and length is the size of the session, which is 1 minute in this use case. To be more accurate, a session is characterised by a period of inactivity. When choosing 1 minute, each individual element is associated with a window which lower bound is the timestamp of the event, and upper bound is the timestamp plus the window inactivity gap. This way, in order to know if two elements can be merged within the same session, one just need to see if their individual windows overlap.

Below is the way our individual windows are merged together if everything happens as expected :

Merging of individual window

The final window of the session is [02:00:05–02:01:49].

Lateness

Back on our initial thought, we want to experiment what late events are when it comes to sessions windows.

In this scenario, we receive elements KV("A", 1) and KV("A", 3), which are gathered in the same session having the window [02:00:05-02:01:20]. Then the watermark is moved to 02:01:48 (we do it by ourself here to illustrate the concepts, see the test code below). This closes the current session, and emits the merged element KV("A", 4).

Then when element KV("A", 5) is received, with individual window [02:00:47-02:01:47]. It is considered as late. Indeed the previous session has been merged, and this window upper bound (02:01:47) is lower than 02:01:48. In consequence, using discardingFiredPanes() (resp. accumulatingFiredPanes()) method, we get LATE pane with value KV("A", 5) (resp. KV("A", 9)) with associated window [02:00:05-02:01:47] .

Finally, we receive element KV("A", 7) , which is not considered as late, and for which emitted result will be KV("A", 7) with method discardingFiredPanes() , and KV("A", 16) with method accumulatingFiredPanes() . The associated merged window is [02:00:05-02:01:49] , and this result is considered ON_TIME .

You might wonder: why is it considered on time? It is because the watermark has been moved to 02:01:48 , and the individual window of element KV("A", 7) is [02:00:49-02:01:49].

Below the schema which sumps up what happened. The trigger which generates the last event KV("A", 16) with window [02:00:05-02:01:49] is not represented, as any ulterior watermark can close this session and emit the event. I let you image your own wonderful scenario !

As in my previous story, I provide you the code, and let you play with it, getting better understanding of the way watermark acts !

Test
public void testLatenessSessions() {

final String BASE_TIME = "2023-06-20T00:00:00.00Z";

// Init TestStream with test data
Instant baseInstant = Instant.parse(BASE_TIME);

TestStream<KV<String, Integer>> createEvents = TestStream.create(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))
.advanceWatermarkTo(baseInstant)
.addElements(TimestampedValue.of(KV.of(KEY_C, 1), baseInstant.plus(Duration.standardSeconds(5))))
.addElements(TimestampedValue.of(KV.of(KEY_C, 3), baseInstant.plus(Duration.standardSeconds(20))))
.advanceWatermarkTo(baseInstant.plus(Duration.standardSeconds(108)))
.addElements(TimestampedValue.of(KV.of(KEY_C, 5), baseInstant.plus(Duration.standardSeconds(47))))
.addElements(TimestampedValue.of(KV.of(KEY_C, 7), baseInstant.plus(Duration.standardSeconds(49))))
.advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> output = p.apply(createEvents)
.apply("Window", Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(3))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(1)))

.apply("Display window", ParDo.of(new DisplayWindow<Integer>()))
.apply("GroupByKey", Combine.perKey((a, b) -> a+ b))
.apply("Display groups window", ParDo.of(new DisplayWindow<Integer>()));

p.run().waitUntilFinish();

}

with

public class DisplayWindow<T> extends DoFn<KV<String, T>, KV<String, T>> {

private static final Logger LOGGER = LoggerFactory.getLogger(DisplayWindow.class);

private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("HH:mm:ss");


@ProcessElement
public void processElement(ProcessContext c, IntervalWindow window) {

LOGGER.info("Key[{}][{}]: [{}][{}-{}]",
c.element().getKey(),
c.element().getValue(),
c.pane().getTiming().toString(),
formatter.print(window.start()),
formatter.print(window.end()));
c.output(c.element());
}

}

I hope you enjoyed this journey, and you get benefits of these readings.

Additional resources

For those who want to dive into these passionating topics, I strongly suggested you reading these articles :

--

--

William Attache
The AB Tasty Tech & Product Blog

Data engineer and architect, I mainly focus on GCP and also enjoy everything around data (access management, networking, CI/CD or testing)