Flink Fault tolerance using externalized checkpoint

Hao Gao
Hadoop Noob
Published in
2 min readApr 9, 2018

As I am writing this, Flink already on 1.4 release and 1.5 snapshot is already out. But we are still on flink 1.3.2

I want to talk a little bit about Flink externalized checkpoint. Flink’s checkpoint is a great feature. And externalized checkpoint make Flink pipeline easy to resume from where it stopped. In real world, there are so many reasons our Flink pipelines could fail. A code bug could break our pipeline; Task manager or Job manager could be lost due to network; Flink cluster could gone if you running it on spot instances. So if somehow our pipeline failed, it will try to recover from the last success checkpoint, job manager knows where state backend stores.

Sounds like it works really well. The tricky part is, what if the job managers are gone, all of them. In this case, if we enabled externalized checkpoint, we can use the externalized checkpoint path as a save point to recover our pipeline. BUT if we look at how the externalized checkpoint directory structured, can you tell which checkpoint belongs to which pipeline?

Our simple approach: organize the checkpoint directory well, so we can figure out which belongs to which. We assign a unique name to each of our pipelines, each pipeline only generate checkpoints under its own folder. Externalized Checkpoint file also carries a timestamp so we know which checkpoint is the latest. The new directory will look like the following:

/checkpoint/pipeline_a/9c87f7685fe0947ab39286d0f3f1f748-1522439833113-4048
/checkpoint/pipeline_b/9a678059febd20b5e00cc8fd8c32f789-1522985422370-1480

Under checkpoint folder, pipeline_a & pipeline_b are unique, under the each pipeline folder, there are checkpoint files. In the file name, there are three parts, first part is the hash which already exists in current system; second part is the timestamp; third part is the checkpoint id.

In our Flink Mesos setup, we have a Flink launcher in Marathon. Flink launcher will try to figure out what’s the last success externalized checkpoint base on the above folder structure, then it uses the checkpoint as savepoint to resume the Flink job.

I know this is a small improvement/trick. But in the production, it really saves us a lot of headache. Parts of mesos agents run on spot instances to save some money, the ability to recover itself when the spot instances are gone benefits us lot.

I thought we are going to prepare something for the upcoming Flink Forward, but it turns out we don’t have time. But maybe we will go to Flink Forward this year as an audience :)

--

--