Timers management in Apache Flink

Elevate Global LLC
8 min readJan 16, 2020

--

Introduction

Apache Flink® is an open source framework for distributed stateful data streams processing that is used for robust real-time data applications at scale: it enables fast, accurate and fault tolerant handling of massive streams of events. Apache Flink also supports batch processing as a special case of streaming.

The Streaming API (DataStream API), which is our primary focus here, offers a lot of possibilities and freedom for developing very powerful and scalable real-time data stream processing applications.

As a part of this API, Flink provides a lot of high level functionalities for data stream processing and manipulation, such as several types of windowing, aggregations, partitioning and joining strategies. With these functionalities, one can easily process and perform different high-level computations over unbounded data streams. For example, one can define different windowing strategies with the predefined window assigners (TumblingWindow, SlidingWindow or SessionsWindow for keyed-partitioned data streams or GlobalWindow for non-keyed-partitioned data streams) for splitting the stream into “buckets” of finite size and aggregate the “buckets” using the predefined high-level reduced functionalities (AggregationFunction or ReduceFunction).

Besides the high-level functionalities, Flink provides an API for low-level stream processing operations, which can be defined utilizing the processFunction. The processFunction gives direct access and control of the Flink’s state and the Flink’s context. The Flink’s context keeps the information of the current partition key, current timestamp (watermark in event time, processing time or ingestion time) and the timer service.

The timer service is used for registering and unregistering timers that are used for performing an action when a timer is triggered by the current time. It is worth mentioning that managed state and timer service are only available in process function on a keyed stream (a stream partitioned by a specified key).

In this blog we will keep our attention on the timers.

The timer service manages the registered timers and stores them in queues. The timers are sorted according to their triggering time. The timer that is going to fire earliest is first for dequeue. Only one timer can be registered per key and timestamp.

That means, If multiple timers are registered on a partition with the same key (on the same logical partition) at the same triggering timestamp, only the last will be registered. Moreover, for every timer, the same onTimer function is called when a timer fires, that creates additional limitation for executing more complex and timer dependent onTimer behaviour.

Sometimes, we need multiple timers at the same time (on the same partition) and each one of them needs to provide different onTimer behaviour. For example, let’s say that we are processing messages from different types that belong to one entity. The data stream is keyed by the entity itself. There are several types of messages that are processed on the same partition (belongs to the same entity). We want to create a timeout for each type of message (if a certain message has not arrived for a specified period) and for different types of messages we need different timeout behaviour.

Let’s suppose that there are two types of messages that have arrived at the same timestamp (and belong to the same entity), thus we need to register two timers at the same triggering timestamp and with different onTimer behaviour. Using only the Flink’s timer service, this functionality can’t be accomplished because Flink deduplicates timers per key and timestamp, so some manual management needs to be done.

What needs to be done is to keep the timers that can’t be registered in Flink’s timer service (with same triggering timestamp) in our own data structure and keep information of the current officially registered timer (the one registered in Flink’s timer service). Then, we need to fire all of them when their time comes. In addition we need to provide functionality for properly unregistering of the scheduled timers.

So far, this implementation only allows us to register multiple timers at the same timestamp and we still haven’t provided the functionality for performing different behaviour for different timers. So we need to use more data structures for mapping the timers with the corresponding callbacks.

In this article we present the Time Manager a small library that overcomes these limitations, abstracts the mentioned technical difficulties and offers freedom and simplicity in registering multiple timers at the same timestamp, deleting timers and performing different onTimer behaviour for every timer.

Using the Flink’s API for registering timers

Here is one example for a scenario where two timers are registered at the same timestamp at same partition using only Flink’s timer service (for simplicity we assume that the time characteristics are set to Event-Time):

For simplicity they all have the same key, therefore they all go to the same partition. Here are the json messages (representing objects of the class Sample) that are used as an input:

{“key”:”1",”timestamp”:0}{“key”:”1",”timestamp”:3}{“key”:”1",”timestamp”:4}{“key”:”1",”timestamp”:7}{“key”:”1",”timestamp”:8}{“key”:”1",”timestamp”:16}{“key”:”1",”timestamp”:26}{“key”:”1",”timestamp”:30}{“key”:”1",”timestamp”:35}

In the process function for every message а timer is registered (if the timestamp of current Sample is even then the timer is registered at current watermark + 3, otherwise at current watermark +10). The watermark is the extracted timestamp of the previous message. Here is the output of the snipped above:

As it can be seen from the output, two timers had been registered at triggering timestamp 10 (from instance with timestamp 3 and instance with timestamp 8), but only one was fired. What actually happened is that firstly registered timer (which was registered when the instance with timestamp 3 arrived) was replaced by the second registered timer (which was registered on the same triggering timestamp 10 as the already registered first one when massage with timestamp 8 arrived).

This behaviour corresponds with the official documentation of Apache Flink: “there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the onTimer method will be called just once.”

In addition, what if we want to print the timestamp (when a certain timer fires) from the just arrived message after which that timer was registered. It is impossible. Because the onTimer function has the same implementation for every timer, and It’s arguments are quite limited (for this kind of purposes).

(As it can be noticed from the first few lines of the output, the first registered timer is fired immediately after it’s been registered, because at the moment the first message arrives, the watermark is negative and the triggering time for the timer is also set to negative. As soon as the message is being processed it’s timestamp updates the watermark and it becomes positive, as a consequence of that the timer is triggered)

Introducing the Timer Manager library

The proposed solution for timer management, overcomes these limitations. It offers an abstraction over the timers management and a complete freedom for registering multiple timers at the same timestamp, defining different behaviour per timer and deleting any registered timer at any moment.

Here is an example of using the Timer Manager for the same purpose input stream as in the previous example:

Here is the TestFullTimerManagerKeyedProcessFunction implementation:

TestFullTimerManagerKeyedProcessFunction is the TimerManager class used here. There are three different classes for timer manager that implements KeyedProcess, KeyedBroadcastProcess and KeyedCoProcess functions (FullTimerManagerKeyedProcess, FullTimerManagerKeyedBroadcastProcess, FullTimerManagerKeyedCoProcess respectively). We will talk about FullTimerManagerKeyedProcess (the other two classes have similar implementations, with minor differences that come from the implementation of the different process functions).

The output of running the snipped above where the TimerManager is used, with the same input as in the first example is:

Two main differences can be noticed comparing the two outputs:

1. In the output from the snipped where the regular Apache Flink API for registering timers is used, two were registered at 10 (one from instance with timestamp 3 and one from instance with timestamp 8), but only one was fired. On the other side, using the timer manager this behaviour is overcame, and the two timers registered at timestamp 10 are fired.

2. As mentioned previously, the TimerManager offers API for specifying user defined callback at timer registering, this callback will be executed when the corresponding timer fires. Using this, in the second output (when Timer Manager is used), when a registered timer fires, the instance from which arrival it was registered (with it’s timestamp) is also printed. Utilizing the possibility of defining different callbacks for different timers, a timer is registered for each instance with different callbacks. This can’t be done with the regular Flink API for timers, where there is only one function (onTimer) which needs to be overridden to define a behaviour that will be executed when any timer fires.

Timer Manager’s API

The TimerManager provides very simple API, which allows us to register timers with specified callback and delete registered timers.

  • Timers can be registered with the function:

The arguments are:

-triggeringTime -time at which the timer will be registered

-onTimerCallback -implementation (in our example lambda implementation) of the functional interface IOnTimerKeyedProcess

-context -Context from the corresponding processFunction

The return int value is the unique Id of the newly registered timer. This unique Id must be provided for deleting this timer.

Example of using the function from the code above:

With this line of code a timer is registered at the defined triggeringTime, with provided callback that will be executed when the timer fires.

  • For defining a callbacks there is the functional interface IOnTimerKeyedProcess, which needs to be implemented. Here is the interface IOnTimerKeyedProcess:

And here is an example for defining a callback from the code above:

This API allows us to specify different behaviour for different timers, even for multiple timers registered at the same time at the same partition (as it was shown in the example above, with the timers registered at timestamp 10).

  • After registering a timer, we may want to unregister it and all we have to do is to call the function unregisterTimer which is done by the function with signature:

The arguments are:

- timerToDeleteID -The unique Id of the timer we want to delete (this unique id is generated and returned when the registerTimer function is called).

-context -The Context of the respective processFunction

Please note that in the TestFullTimerManagerKeyedProcessFunction (where the Timer Manager is used) in the overridden Flink’s onTimer function the timerManager’s onTimer function is called with the provided arguments. This has to be done, otherwise the timer manager would not behave as expected.

Usage

The timer manager can be used for various tasks, such as timeout notifications when a certain message has not arrived for the specified time period, where we do not need to care about whether there is already registered timer at the same triggering time. Also we are able to specify different timeout behaviour for different massages. The Timer Manager gives us big freedom and abstraction of the timers in Apache Flink easing the build of application for advanced and powerful data stream processing on pretty low level.

You can find the complete code of the TimerManagement (including timer Managers for the other two types of keyed process function) including this and other examples available on my bitbucket account https://bitbucket.org/elevateglobalbiz/timermanager

--

--

Elevate Global LLC

Elevate Global LLC Skopje — Autonomous forecasting and analytics