Apache Flink Memory Model, Metrics and Optimisation

Amit Patel
4 min readJan 4, 2023

--

Pre-requisite: Basic understanding of Apache Flink architecture and its components. (Ref : Flink Architecture).

There are 2 types of processes in Flink at runtime which are 1 JobManager(JM) or one or more TaskManagers(TM). Below described memory model and configuration is applicable for release version ≥ 1.10 for TM and ≥ 1.11 for JM.

We can define total process memory(TFM) of Flink JVM processes (JM or TM) which is consist of memory consumed by Flink Application(Total Flink Memory) and by the JVM to run the process. Total Flink Memory includes JVM Heap and Off-Heap (Direct or Native) memory.

Total Process Memory(TPM):

TPM = TFM + JVM memory to run the process (JVM Meta space + JVM Overhead) where TFM = JVM Heap + Off-heap (Direct or native)

Flink Memory Overview

We can use below either of the two following options to configure Flink Memory:

Note: If you configure TPM, it will corresponds to the size of the requested container in containerised deployments.
Flink has also given option to set up the memory of the required internal components of the TFM which are specific to the respective Flink process.

Task Manager Memory Model:

Below is the sample of memory categorisation and allocation of a Flink job TM. This you can find in Flink Job UI (Task Manager -> Click on one of TM -> Metrics)

Task Manager memory model

Explanation of each categorisation:
TFM:
1. JVM Heap:

a) Framework Heap: It is the size of the JVM Heap memory reserved for Task Executors which will not be allocated to task slots.
b) Task Heap: Task Heap Memory size for Task executors reserved for tasks. If not specified, it will derives as (TFM - (Framework Heap Memory + Framework Off-Heap Memory + Task Off-Heap Memory + Managed Memory+ Network Memory))
2. Off-heap:
a) Managed Memory: managed by memory manager, reserved for sorting, hash tables, caching and RocksDB state backend. If unspecified, it will be derived to make up the configured fraction of the TFM
b) Framework Off-heap & Task Off-heap: JVM direct and native memory reserved for tasks.
c) Network: It uses direct memory. When data is exchanged between tasks(shuffle), data needs to be cached. The memory that can be used by the cache is the network memory.

JVM metaspace and overhead are reserved for the JVM. For example thread stack, code cache, GC reclaim space and so on.

Summarisation of TM Memory model configuration with default values:

Key Task Manager Memory Configurations

Job Manager Memory model :

Job Manager Memory Model

Summarisation of Job Manager Memory Model config with default values:

Key Job Manager Memory Configurations

Metrics to observe Flink Memory:

You can export Prometheus metrics and observe for below metrics for respective processes.

1. Job Manager :
a. flink_jobmanager_Status_JVM_Memory_Heap_Max (JVM-Heap)
b. flink_jobmanager_Status_JVM_Memory_Heap_Used
c. flink_jobmanager_Status_JVM_Memory_NonHeap_Max (JVM-Overhead)
d. flink_jobmanager_Status_JVM_Memory_NonHeap_Used

2. Task Manager :
a. flink_taskmanager_Status_Flink_Memory_Managed_Total (Managed Memory)
b. flink_taskmanager_Status_Flink_Memory_Managed_Used
c. flink_taskmanager_Status_JVM_Memory_Heap_Max (JVM Heap)
d. flink_taskmanager_Status_JVM_Memory_Heap_Used
e. flink_taskmanager_Status_JVM_Memory_NonHeap_Max (JVM Overhead)
f. flink_taskmanager_Status_JVM_Memory_NonHeap_Used

Export these metrics and Grafana dashboard can be used to observe the memory consumption of jobs for a given period of time.

Observations and Optimisations:

Flink jobs which were not using the complex state related operation like joins, timers, states has no significant utilisation of managed memory. We can configure the required fraction (default 0.4) of TFM as managed memory to allocate memory based on the complexity of the jobs.

--

--

Amit Patel

I am a real time streaming data engineer. Responsible for designing, developing and maintaining streaming pipelines. Currently working in Kafka, ML, Beam, Flink