Time Attributes in Apache Flink

Sruthi Sree Kumar
Big Data Processing
3 min readJul 9, 2020

One of the major difference between stream and batch processing is the need to explicitly handle time in stream processing. In a stream processing system, we have a continuous stream of events coming in; and in order to process them, we group them by a key and periodically(1sec, 10min etc) process them. Thus handling time is crucial in a stream processing system.

Apache Flink supports the following 3 notions of time.

  1. Event time
  2. Processing time
  3. Ingestion time

Event Time

Event time is the time at which an event is produced at the data producer. Event time is attached to the event and that event timestamp can be extracted from each record. This is one of the most appropriate time that can be used for windowing. Few of the issues with using event time includes:

  • Network latency between the producer and the processing nodes
  • Events with the same timestamp from different sources may arrive the processing function at different times.
  • Out of order events due to different sources and Flink parallelism.

In order to handle the issues with event, Flink uses the concept of the watermark.

Watermark

Watermarks are used in event time processing to determine when events are processed. Watermark can be periodic or custom watermark based on user-defined logic. Event time programs must specify how to generate Event Time Watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t.

Watermarks play a really important role to identify out of order events. In short, a watermark is an indication that at time watermark has arrived, all events up to its timestamp should have already arrived.

Processing time

Processing time is the time at which the event is processed at the node. Same processing function on different task nodes might have different timestamps as they might get different slots/scheduled differently. Events also might arrive at the processing node with a delay.

Processing time is one of the simplest notions of time as it does not require coordination between the stream and the processor. It will have low latency and provides the best performance compared to others. But, in distributed systems, processing time does not provide determinism, because it is susceptible to multiple factors such as the speed at which records arrive in the system, the speed at which the records flow between operators inside the system, and to node failures.

Ingestion time

Ingestion time is the time at which the event is ingested to Flink. In other words, it is the time at which the event enters Flink. Ingestion time is rarely used in Flink processing. Compared to event time, ingestion time programs cannot handle any out-of-order events, but the programs don’t have to specify how to generate watermarks. Compared to processing time, it is slightly more expensive but gives more predictable results.

Fig a: Event Time, Processing Time & Ingestion Time

The below code example show how we can set time characteristic in a Flink program.

// set up the execution enviornment
final StreamExecutionEnvironment env
= StreamExecutionEnvironment.getExecutionEnvironment();
// set the time characteristic to event time.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
/*
* similatly it can be set to ingestion time or processing time as
* mentioned below: *env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
*env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
*/

References:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_time.html
  2. https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#:~:text=A%20time%20attribute%20in%20a,(so%2Dcalled%20watermarks).
  3. https://www.linkedin.com/learning/apache-flink-real-time-data-engineering/time-attributes-in-flink

--

--