Apache Flink Best Practices for Realtime Data Processing Pipeline

Murtaza Zaveri
4 min readMay 19, 2020

--

Apache Flink is used for building a pipeline for streaming data analysis. This section discusses best practises I have used to build stream processing pipelines using Apache Flink.

1. Enable Checkpointing

Checkpointing is the most important thing a developer needs to be taken care of when creating a Stream data pipeline. A stream processor(Flink) consists of stateless operators and stateful operator, so in order to recover the state of a Stream Operators Checkpointing is required.
Checkpointing is the heart of fault tolerance mechanism in Apache flink. it draws consistent snapshots of the distributed data stream and operator state.

we can enable the checkpointing using below method,
enableCheckpointing(n) → N defines the time interval in ms.
On enabling the checkpoints flink will create a SavePoints for us and this is been used when we pause or upgrade our pipeline.(Savepoint is a collection of related checkpoints).
It is very important to use the UIDs for rach operator, because flink uses this UIds to match the stored state with the operators upon restart of pipeline. If state wont match with the flink operators, flink will fail to restore the state and that will result on starting a pipeline from the source state.
By default, state of an operator is stored in the jobmanager’s memory but it is advisable to store in distributed cache for production use.

2. Restart Strategies

Its been always a challenging to choose correct restart strategy in a realtime data pipeline. I encountered this problem when designing the data pipeline using Flink, Kafka.
So basically what is restart strategies?
In case a data pipeline encounters an exception, we need to restart the pipeline in case of some intermediate issue instead of failing a whole pipeline. so Restart strategies is basically a setting which can be set at a per-job level to instruct the job manager to rerun the pipeline in case of any failures.
We can set the restart strategies using below method,
streamExecutionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(10,Time.of(restartDelayInSec, TimeUnit.SECONDS));

Flink offers three types of restart strategies,
1)
FixedDelayRestart(Default one) : Flink will restart a job for fixed number of times once the count is been exhausted the pipeline will be failed. this is useful when we have a requirement to fail a batch job after number of retries. we need to provide two args for this Strategy,
Example :
(RestartStrategies.fixedDelayRestart(10,Time.of(5, TimeUnit.SECONDS));
a) Number of attempts
b) delay(a batch job will wait of this much interval and restart the job again).

2) Failure Rate Strategy : By using this strategy, we are informing jobmanager to restart the job for a specific number of time, in given intervals .we need to provide three args for this Strategy,
Example :
(RestartStrategies.fixedDelayRestart(10,Time.of(15, TimeUnit.Minutes),Time.of(5, TimeUnit.SECONDS));
a) Number of attempts.
b) Time interval for measuring a failure rate.
c) delay(a batch job will wait of this much interval and restart the job again).
This means, basically, that Flink will restart the job every 15 seconds to 15 minutes to 10 times. And let’s assume if the job began in the 9th attempt, the count would start from 0 if the failure happened 15 minutes after the first failure. where as in the fixed delay strategy, it will not be reset to 0 at any case.

3. Use unique Metric Operator names

In earlier versions of Apache Flink, metric operators were named automatically based on their associated task names. But this strategy was been reported as a failure and was been replaced in later versions of Flink. However, we should make our metric operator unique names to avoid naming conflicts.
The best practice for naming conventions is, name of a metric operator should not exceed more than 70–75 characters. because if the names would be larger then these metrics will be ignored by the graphite reported(open source monitoring tool)

4. Configure HA for Flink job Manager

Job Manager is a master node which co-ordinates for resource management with slave nodes in Flink pipeline. And let's say due to some Xyz reason JobManager nodes fails to respond then running Flink pipeline fails.so it is always a best practice to keep more than one JobManager (Active and Passive). So having more than one JobManager in Flink cluster is called a High Availability.
Multiple Job Managers are managed via Zookeeper which coordinates the leader. and let's assume if one of the JobManager goes down then zookeeper elects new leader from available JobManagers(Zookeeper does this by running election algorithm).
Note : Flink Job Manager HA option increases the cost of running a Flink Pipeline.Resources for secondary job manager should be same as primary Job manager.

5. Configuring Parallelism

From Official Apache Flink’s Documentation,
“A Flink program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism.”
In Flink, we can decide per operator parallelism. for example : Map operator can have different parallelism and Join operator may have different parallelism.
Hence we can operate the Flink parallelism either at task level or at operator level or at globally(runtime environment level).

6. Exactly Once Semantics

From Flink 1.7.1.1 and above it supports End to End Exactly once processing. this has been achieved using new feature introduced called as TwoPhaseCommitSinkFunction with this Flink builds end to end exactly once processing applications with a selection of data sources including Apache Kafka etc.
TwoPhaseCommitSinkFunction is a base class for all of the SinkFunction that intend to implement exactly-once semantic. It does by implementing a two-phase commit algorithm on top of the CheckpointedFunction and CheckpointListner. We should provide custom TXN to handle the transactions.

Exactly Once Two Phase Commit

--

--