Here’s What Makes Apache Flink scale

A glance at the Memory management and Network flow control

Kartik Khare
Walmart Global Tech Blog
5 min readOct 2, 2019

--

Time-Lapse Photography of Blue Lights by Pixabay

Apache Flink is a popular real-time data processing framework. It’s gaining more and more popularity thanks to its low-latency processing at extremely high throughputs in a fault-tolerant manner.

I have been using Apache Flink in production for the last three years, and every time it has managed to excel at any workload that is thrown at it. I have run Flink jobs handling datastream at more than 10 million RPM with not more than 20 cores. It’s not just me. You can see the benchmarks from all other companies.

You can find the official benchmarks here.

So the natural question which comes to our mind is, How does Flink manage to scale efficiently?
Here are some of the neat tricks.

Reduce Garbage Collection

When you are operating on large amounts of data in Java, garbage collection can quickly become a bottleneck. A full GC can stall the JVM for seconds and even minutes in some cases.

Flink takes care of this by managing memory itself. Flink reserves a part of heap memory (typically around 70%) as Managed Memory. The Managed Memory is filled with memory segments of equal size (32KB by default). These memory segments are similar to java.nio.ByteBuffer and contain only byte arrays.

Whenever an operator wants to use memory, it requests segments from the memory manager and once done, returns them to the pool. Since these memory segments are long-lived and reused continuously, they reside in the Old Generation of Heap and don’t have to go through many GC cycles.

Flink also provides the functionality to put memory segments to off-heap memory for faster I/O to network and File system, especially for stateful operators.

Another advantage of Managed Memory is that Flink can destage larger segments to disk and read them back later. This spilling helps in preventing Out of Memory errors.

A high-level overview of how Flink stores data serialized in memory segments (From Juggling with Bits and Bytes by Apache Flink)

Note: In the modern JVM (Java 8+), with new G1 GC, heap space is almost as efficient as off-heap. Off-heap space can have significant setup and teardown costs but is faster than heap if you have a better serialization.

Minimize data transfer

In a distributed data processing system, a Map or Filter task can run on one Node and Reduce task on another. This requires data to be shared over the network across nodes.

Now, suppose that you need to perform multiple maps and filter operations on a data stream. There’s no need to run each of these operations on separate tasks. That’s because each of these operators doesn’t require input from any other operator except the previous one.

Apache Flink performs the optimization as mentioned earlier, where several mapping and filter transformations are done sequentially in a single slot. This chaining minimizes the sharing of data between slots and multiple JVM processes. As a result, jobs have a low network I/O, data transfer latencies, and minimal synchronization between objects.

Operator chaining in a simple Flink job. The small rectangle inside the JVM represents each slot.

Squeeze your bytes

Java objects as such are quite heavy, e.g., a simple Integer object occupies 24 bytes to store a 4-byte data. To avoid storing such heavy objects, Flink implements its serialization algorithm, which is much more space-efficient.

However, why not use other serialization frameworks such as Kryo (currently used in spark), Avro or Protocol Buffers?

Implementing its binary representations allows Flink to store related objects and the associated keys, hashes close together. e.g., The index keys of a String Array can be stored adjacent to each other. The cache fetches a group of consecutive bytes in a single instruction and also pre-fetches the next few bytes. The cache results in faster access times for objects stored adjacent to each other.

Another advantage of having a custom binary representation is you can directly operate on binary data, thus reducing the serialization/deserialization overhead. For example, Flink makes comparisons instantly on serialized objects. This provides the ability to do operations such as sorting without even deserializing the data.

Benchmark for different serialized formats in Flink (Taken from https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html)

Avoid blocking everyone

Flink revamped its network communications after Flink 1.4. This new policy is called credit-based flow control.

Multiple slots run inside each task manager, and each of these slots can run a subtask. However, due to this, the network connection between multiple task managers needs to be multiplexed, i.e., the connections need to handle data from all the subtasks.

The multiplexing comes with a compromise. Whenever a receiving sub-task blocks, because it’s network buffer, is full, the whole TCP connection gets backpressure. This is not optimal because there could be other receiving sub-tasks that have an empty buffer but now, due to backpressure they have to sit idle.

Normal Flow without credit announcements from A Deep-Dive into Flink’s Network Stack

Flink solved this problem by introducing a credit announcement. Receiver sub-tasks announce how many buffers they have left to sender sub-tasks. When a sender becomes aware that a receiver doesn’t have any buffers left, it merely stops sending to that receiver. This helps in preventing the blocking of TCP channels with bytes for the blocking sub-task.

Credit-based flow control from A Deep-Dive into Flink’s Network Stack

A more straightforward representation can be found below —

Without Credit-Based Flow
With Credit-Based Flow

Apache Flink is the most powerful real-time data processing system that is available today.
However, it’s still new compared to other data streaming systems and has certain drawbacks -

  1. Inability to scale the application at runtime.
  2. Not possible to add new operators at runtime.
  3. Flink ML is not as mature as Spark MLLib.

To read more on this topic, you can refer to the following articles —

--

--

Kartik Khare
Walmart Global Tech Blog

Software Engineer @StarTree | Previously @WalmartLabs, @Olacabs | Committer @ApachePinot