On Streams with Apache Flink

Tiểu Đông Tà
Life of a Senior Data Engineer
11 min readOct 6, 2020

“Talking is cheap. Show me the code”

-Linus Torvalds-

For those who are impatient, here is the full code base of the examples.

Enjoy !

For those who are more patient, please read through :)

Everywhere around the industry, since the day when “data” and “analytic” become the “cream” and “sweet spot” for most of the companies, I hear people constantly screaming

“How the **** do I have to wait for 1 day to have data in a batch ?

I want it in real-time ! It has to be in real-time !

My business would collapse if I wait for 24 hours to run my analytic dashboard queries (which has 2 views per month) ! “

Boss: I want everything-real-time

Well I exaggerate ( just a little bit) . But everywhere people keep asking for their engineering team to reduce the latency of analytic processes. Without going into the discussion about should their companies need it or not (most of the time, they don’t), I would like to share how it should be done.

I have seen with my eyes streaming systems that are built to crash or that are doomed for failure in the beginning, or streaming systems that are in production but extremely hard to evolve and maintain. Thus, I feel that I need to point out , from my personal perspective, one good way to built streaming system.

There are other good ways, and there are many better ways. But don’t worry, I will come up to that in the future posts (that’s why you need to subscribe)

Now, first get into some (boring) theories

Scenarios

I have multiple electronic 5G connected sensors that constantly emit measurement results to the system every second.

The sensors’ measurement creates a stream of data that we want to analyze in real-time. Imagine a true scenario where real-time analyse is vital( I am not talking about a real-time dashboard that has 2 views per month here): the sensors measure heart rates of a patient and we need to detect anomalies in real-time.

The sensors send their measurements in real-time to the back-end server, which then forwards its to main platform for processing.

We want to do simple aggregation (SUM, MEDIAN, MEDIUM) on the stream of the coming data every hour.

Stream

In the scenario where data comes from a source system continuously over a large period of time, we can give a notion that the data is unbounded by a time frame. It has a starting moment but no ending moment, which is different from batch processing.

In this case, the data sent by my sensor can be defined as a stream of data. Two schools of thought exist on how to handle stream data:

  1. Consider a stream of data as mini-batches Spark Stream is an example.
  2. Consider a stream as un-bounded data and updates as either append-upsert-retract operation on the existing data (statefulness). Apache Flink is an example.

The first approach is not truly stream-processing but rather an extension of batch processing and thus will not be covered in this system. This document handles uniquely the later case: true stream processing

Sampling

In this case, the data represent the measurement of power and energy consumption. Given that these 2 entities are in continuous time-domain (analog), the sensor samples it and converts it to digital format.

It is thus important to clarify this sampling notion, as it will seriously affect the outcome of the system.

In order to fully represent the analog measurements, the sampling frequency (i.e: number of measurements per seconds ) must be at least twice the frequency of the maximum frequency component in the analog signal. This is the famous Shanon-Nyquist theorem.

Keep that in mind, it is vital to acknowledge that the number of measurements taken per second and sent to our system must satisfy these requirements in order to give us the correct raw data. Otherwise, the metrics produced by our system will be falsified.

In this scenario, we have 1 measurement sent per second. This rate is considered to be “enough” and satisfies the Shanon-Nyquist theorem of sampling.

Any sampling frequency lesser than that is considered to be too risky to give correct aggregation metrics.

Sampling analog signal to digital signal

Continuous query of unbounded data

As a concept, the stream-processing system will keep in its memory the state of the data falling within a range-bounded period (defined as a WINDOW).

Once all data within a window is collected, the data frame can be defined as a temporal bounded-data and thus aggregation can be performed.

As in the example-below, the GROUP BY aggregation is performed o a 5 second non-overlapping interval.

Different from batch processing, stream-query is difficult as it works with data on the move, and it tries to reconstitute the state of the system by accumulating data over the period of time.

In a streaming application, the state at moment tn is defined as the sum of all events from moment t0 up to moment tn.

Similar to the sampling of analog signals to digital signals, a stream system tries to sample the continuous “state” of the system by sampling “events” over a certain period of time with the given sampling frequency.

The state built by the stream-processing system thus, becomes the estimated version of the actual state.

It is important to keep that in mind, as that means there will be certain degree of estimation errors (See On Error section for details on how to reduce this estimation error)

Boundaries and System Time

Boundaries and Time are two fundamental concepts in any stream processing system. It answers 2 questions:

  • How to define the boundaries for the stream so that all data fall to that categories will be considered as a single dataset for processing
  • How to define who comes first, who comes later and who truly comes late.

There are 3 types of times to considered:

Event time: timestamp when event was emitted

Ingestion time: timestamp when event was received by our system

Processing time: timestamp when event was processed by our system.

Be reference, the system has to know which time we consider, otherwise the system cannot assign an event to a given time frame.

In this example, we use event time .

Consider a non-overlapping windows:

The event will be assigned to the corresponding windows thanks to its event time (in our case).

When considering windows, the system shall create different windows and assign events to its.

Note that , for Apache Flink, the windows is created from THE START OF UNIX TIME EPOCH ( 00:00:00 UTC 1 January 1970)

It means that, even when our first event is registered on 2020–01–01 00:00:00, the windows creation actually starts from the Unix epoch, which is 00:00:00 UTC on 1 January 1970.

Also, Apache Flink consider:

  • Inclusive lower bound
  • Exclusive upper bound

For a window:

2020–01–01 01:00:00 until 2020–01–01 02:00:00, all events with timestamp starting from 2020–01–01 01:00:00 until 2020–01–01 01:59:59 will be assigned to this window.

This is very logical, as it systematically traces the time back to t0 (UNIX time), thus it must take inclusive-lower bound (to take care of event start at t0) but it cannot take inclusive upper bound, as this would create overlapping windows.

On Error

As stated, the stream is the approximation of the true state of the measurement, thus, errors are generated due to:

  • Does not have enough event within a single window period. For example, in our case, if the sensor only sends 3 measurements at 01:00:00, 01:30:00 and 02:00:00, then the estimation of the stream will be based on only 3 samples, which is clearly not enough to generate any good metrics.
  • Boundaries: As the boundaries are lower-inclusive and upper-exclusive, the measurements at the precise upper bound will be assigned to the next windows.

For example, for a window of 1 hour, all events from 01:00:00 up to 01:59:59 will be considered, but the event at 02:00:00 will be assigned to the next window. This is logical, as technically speaking, the event at 02:00:00 is outside the 1 hour window (starting from 01:00:00).

In our case, for example, consider a window of 1 hour:

The sensor emitted a measure at 01:00:00 with a value of 1000

The sensor emitted a measure at 01:58:00 with a value of 1995

The sensor emitted a measure at 02:00:00 with a value of 2000

The true difference is : 2000–1000=1000.

However , the difference within the time window of 1 hour will be 1995–1000 = 995.

Clearly, the best way to reduce the error is to increase the sampling frequency.

On system design

Perfect scenario

Kafka topic for raw data:

Sensor metrics should be sent to a Kafka Topic. In order to facilitate the scaling, the client would have to make sure that data of the same sensor should be sent to the same partition. This ensures that in order to scale the system, we only have to scale up the number of partitions in a topic.

The fact that we keep all data of the same metrics on the same partition ensure that the order is respected (Kafka message on the same partition is guaranteed to be FIFO-First In First Out), but also ensure future job scheduling, since all data of the same metrics is on the same partition already, we do not have to look for further complication.

Such an operation can be done by using the a small mapping function

Hash_value ⇐ ID of the sensor device

Partition ⇐ Module of hash_value with total numbers of partitions

This ensures that the data of the same sensor device would be sent to the same Kafka partition.

Flink Stream Job for filtering data

A stream job (Flink Stream) which sinks raw data and perform filtering (i.e: filtering out NULL, negative measurement…etc)

The output will be sunk to another Kafka topic

Note that in Flink, we can set for parallelism ( https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html )

Each parallel executor can be mapped to a specific Kafka partition. This ensures that each parallel executor can have all information of the same sensor’s metrics at hand . For data filtering, it is not that important, but for aggregation it is vital to have such capacity.

Kafka topic for filtered data

This is where the filtered data should be sunk to. The same convention about the number of partitions are applied.

Flink Stream Job for filtering data

A stream job (Flink Stream) which aggregates the data and output aggregated metrics.

The output can be a file on HDFS, S3 or output directly to a database table.

The same rule about parallelism is applied. Once we have the guarantee that all data of the same sensors should be on the same Kafka partition, the aggregation can be guaranteed to produce the correct result.

Schema of the system

Scaling

As seen above, setting parallelism is the key to scale Flink stream jobs. But how exactly should we scale our jobs ? How much parallelism is enough ?

It actually depends on how many Kafka partitions we have. Note that, we assume that data of the same sensor stays on the same Kafka partition at this moment.

In fact, Kafka allows maximum ONE consumer per partition, so according to Dirichlet’s theorem, we have the following cases:

Number of parallel jobs == Number of Kafka partitions

This is the case when Flink ensures that one job (one consumer) takes one partition. Note that Kafka does NOT allow multiple consumers to consume the same partition anyway.

This is the case when the system is actually balanced, although maybe it is not necessary to spawn that many Flink parallel jobs.

Number of parallel jobs > Number of Kafka partitions

Clearly, Flink will not assign some of its jobs to consume Kafka messages

Number of parallel jobs < Number of Kafka partitions

This is the case one one Flink job may consume from multiple partitions. In this case, the system is also balanced.

Timing and late-arrival handling

In our case, we consider event time as a watermark. Event time is stamped upon emission of the event.

Late event may be considered into the correct windows if it is not over the offset margin.

Flink allows setting the explicit offset to handle the late-arrival windows, meaning if the event arrives late but still in the margin, it will be processed as well.

Refer to :

Dry-run of demo pipeline

For the demo run, the sensor output is stored in a sample file

The Flink Job would process the file and “sinks” it to an output file.

You may use the same code to plug the input/output to Kafka if you want

Demo run

Running the pipelines

To test the dry-run pipelines, please ensure:

  1. Python 3.6.9 or 3.7 is installed
  2. Install the dependency by using
pip install -r requirements.txt
  1. Execute the pipeline by using command:
bash dry_runner.sh -i <path_to_csv>

The csv file should not contain the header line.

The aggregated metrics data will be output to ./tmp/data/event_agg.csv

Explains

The input file format simulates the timely-ordered entries of 3 sensors (A, B and C)

Every second, each sensor (A, B, C) emits one single measurement.

The stream is aggregated on-the-flight to generate Average, Min and Max of the time window 1 hour. for each sensor

Flink has a concept called Stream table, which allows you to conveniently query stream data with SQL.

The code below performs a simple aggregation on the flight.

The code and the comment should explain itself !

Conclusion

In the small article, we go through the base concept of stream processing . Whatever framework you may use, the fundamentals are likely to be the same. Apache Flink is actually quite powerful and easy to use. Since Alibaba Cloud commits to its development, Apache Flink evolves quite quickly. It’s always fun to follow through those development .

The next time you find yourself asking “How do I run analytic queries on-the-flight?”, then try not to reinvent the wheel and use one of the production-grade frameworks out there.

--

--

Tiểu Đông Tà
Life of a Senior Data Engineer

Làm việc tùy tiện theo ý mình, y bốc tướng số trị thủy toán thư, môn nào cũng muốn học lấy một chút