Of stream processing and lateness (part 1/2)

William Attache
The AB Tasty Tech & Product Blog
5 min readJan 4, 2024

--

Image by Monoar Rahman Rony from Pixabay

Streaming data ingestion became ubiquitous these years, as many more companies want to offer a realtime personalised experience to their visitors. Within the data engineering department of AB Tasty, we use Apache Beam as ETL engine to ingest our client’s data. We collect the events generated by the visitors browsing the web and being exposed to our A/B tests. Most of the time, we get these data in realtime. Sometimes, client’s data is received minutes, or even hours, after they have been generated.

You might think “ok, no problem here, data will be processed later”. And you are perfectly right ! And this is also the problem : what if another processing, like reporting, depends on the consistency of the data until a certain point in time ? What if every hour, you need the last three hours of data, but some are missing ?

There are also concerns about enrichments for which you need to have consistency over the elements within a certain time range. For example, you might count the number of events you receive every minute. But what if you got 100 events in time, and 3 other events later, once the pipeline is counting the events for another minute time range ?

In order to get better control on this, Apache Beam implements a heuristic function called Watermark . This function is in charge of closing the temporal windows, in the processing time, for which we consider that we will never get any other events, in the event time. Let’s make it concrete with an example.

Group elements by key and count values

Note : the code related to the below experiment is provided at the end of this story.

Let’s say we simply want to group by key-value events within a 1 minute fixed window and sum their associated values. We consider the case when 3 elements have been generated on our website between 02:00:00 p.m. and 02:01:00 p.m.. The below table relates their value, event timestamp, processing timestamp, and associated window.

All the elements are within the same window. When landing in the pipeline, each element is associated with a window. In the case of fixed windows, it’s very simple : the lower (resp. upper) bound of the window is the event timestamp rounded to the previous (resp. following) minute.

As you may have noticed, we receive element KV.of("A", 5) 2 minutes and 20 seconds after it has been generated. In this case, we can expect for the watermark to have closed the window [02:00:00-02:01:00] before this element arrives. Let’s visualise it on a graphe :

Once the watermark closes the current window, the resulting event KV.of(“A", 4)is emitted. However, we see that the event KV.of("A", 5) is behind the watermark in the processing time, and thus is not taken into account while emitting the result, which we expected to be KV.of("A", 9). Hopefully, Apache Beam allows to deal with these events, which are called late events.

First thing is to determine whether the element is too late or not. This is what the method withAllowedLateness(org.joda.time.Duration d) controls. In our case, we put it to 5 minutes , in order to consider that event KV.of("A", 5) is late (it is behind the watermark, on which we do not have any control), while arriving with less than 5 minutes of lateness (we suppose it, not representing the timestamp on the y-axis is volunteer, to avoid confusions).

Then, we need to specify how the dataflow must behave with late events. Either “accumulating” panes, or “‘discarding” them and emit new results.
If we useaccumulatingFiredPanes() method, then the late pane will be emitted with value KV.of("A", 9) , and will be marked as LATE (meaning, for a given ProcessContext c , we havec.pane().getTiming() = org.apache.beam.sdk.transforms.windowing.Timing.LATE ). If we use method discardingFiredPanes() , then we’ll get the first element KV.of("A", 4) which was emitted when the watermark closed the window, and the late event KV.of("A", 5) with only the last emitted event.

The below table sums up what you get :

I strongly encourage you to test if to get your own opinion, before going to the next part which deals with Sessions, instead of FixedWindows .

@Test
public void testLatenessFixedWindow() {

final String BASE_TIME = "2023-06-20T00:00:00.00Z";

// Init TestStream with test data
Instant baseInstant = Instant.parse(BASE_TIME);

TestStream<KV<String, Integer>> createEvents = TestStream.create(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))
.advanceWatermarkTo(baseInstant)
.addElements(TimestampedValue.of(KV.of(KEY_C, 1), baseInstant.plus(Duration.standardSeconds(10))))
.addElements(TimestampedValue.of(KV.of(KEY_C, 3), baseInstant.plus(Duration.standardSeconds(30))))
.advanceWatermarkTo(baseInstant.plus(Duration.standardMinutes(3)).minus(Duration.standardSeconds(01)))
.addElements(TimestampedValue.of(KV.of(KEY_C, 5), baseInstant.plus(Duration.standardSeconds(40))))
.advanceWatermarkToInfinity();

PCollection<KV<String, Integer>> output = p.apply(createEvents)
.apply("Window", Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(2))
)
.apply("Display window", ParDo.of(new DisplayWindow<Integer>()))
.apply("GroupByKey", Combine.perKey((a, b) -> a+ b))
.apply("Display groups window", ParDo.of(new DisplayWindow<Integer>()));


p.run().waitUntilFinish();

}

with

public class DisplayWindow<T> extends DoFn<KV<String, T>, KV<String, T>> {

private static final Logger LOGGER = LoggerFactory.getLogger(DisplayWindow.class);

private static final DateTimeFormatter formatter = DateTimeFormat.forPattern("HH:mm:ss");


@ProcessElement
public void processElement(ProcessContext c, IntervalWindow window) {

LOGGER.info("Key[{}][{}]: [{}][{}-{}]",
c.element().getKey(),
c.element().getValue(),
c.pane().getTiming().toString(),
formatter.print(window.start()),
formatter.print(window.end()));
c.output(c.element());
}

}

I hope you enjoyed this reading, and that you are ready for the part. 2, where we use Sessions instead of FixedWindows.

Additional resources

For those who want to dive into these passionating topics, I strongly suggested you reading these articles :

--

--

William Attache
The AB Tasty Tech & Product Blog

Data engineer and architect, I mainly focus on GCP and also enjoy everything around data (access management, networking, CI/CD or testing)