Try out the save point in Apache Flink

Mu Kong
5 min readJun 13, 2017

The concept of Apache Flink’s save point seems very simple and powerful. So, is it easy to use and can we count on it?

In this post, we will use a Flink local setup with save point configured to consume events from a local Kafka instance.
We also will have a very simple Kafka producer to feed sequential numbers to Kafka.

To check whether the save point is actually working, we will stop the Flink program, and restore it from the last save point, then check whether the consumed events is in consistency with the producer.

Setup local Kafka

To setup a local Kafka is pretty easy. Just follow the official guide, then you will have a zookeeper and Kafka instance in your local machine.

Start zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka:

bin/kafka-server-start.sh config/server.properties

Create our testing topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-test

Create a script to produce events

As mentioned above, we will need a producer to produce sequential events to the Kafka. I'm going to use python to create this simple producer.

First, the Kafka dependency:

pip install kafka-python

Then create a python script producer.py like this:

Then run this script to start feeding events:

python producer.py

I recommend using a simple script to create a consumer to make sure whether the current setup is working. I highly recommend using Logstash to run a simple consumer with config file like:

And start to consume by

./bin/logstash -f <your-config-file>

But any other kind of consumer will also do the job.

Setup Flink

First you will need to download the Flink of the version you want/need.
After download the package, unpack it, then you will have everything you need to run Flink on your machine.

I will assume that Java and mvn are already installed.

Setup local Flink cluster

First we need to change the config file: ./conf/flink-conf.yaml.

In this file, you can specify basically all the configuration for the Flink cluster. Like job manager heap size, task manager heap size. But in this post, we are going to focus on save point configuration.

So, add following line to this file:

state.savepoints.dir: file:///home/<username>/<where-ever-you-want>

With this field specified, the Flink will use this folder as the storage of save point files.
In real world, I believe people usually use HDFS with hdfs:///... or S3 to store their save points.

For more information about save point, check their official document.

In this experiment, we will use a local setup for Flink cluster by running

./bin/start-local.sh

This script will read the configuration from ./conf/flink-conf.yaml, which we just modified.
Then the script will start a job manager along with a task manager on localhost.

Prepare the job

Consider Flink cluster is just a distributed worker system with empowered streaming ability.
It is just a bunch of starving workers (and their supervisors) waiting for the job assignment. Although we have set up the cluster, we still need a job.

First, create a Flink job project using the Maven archetypes:

mvn archetype:generate                               \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.3.0

After that, create a job to consume data from Kafka and write it to the local file.

Following snippet is a simple Flink job we will use to try the save point functionality. It will consume the events from our Kafka, and write it to local files divided by minute.

A simple Flink job

As you can see, there is no offset setting for the Kafka consumer or any sort of restarting strategy for it.

Also, please assign your own output path to the basePath.

Start experiment

To submit our job, first build the above project by:

mvn clean install -Pbuild-jar

There will be a fat jar generated under your target folder: <project-name>.<version>.jar

To submit this jar as our job, run:

./bin/flink run <project-name>.<version>.jar

Once it starts running, you will find files start to be generated in the basePath you assigned.

Restart the job without save point

After the job runs for a while, cancel the job in the flink UI.
Check the latest finished file before cancellation, and find the last line of this file. In my experiment, it was {"idx": 2056}.

Then start the job again:

./bin/flink run <project-name>.<version>.jar

After a few minute, check the first finished file after restart, and find the first line of this file. The number should be much greater than 2057. In my experiment, it's {"idx": 2215}.

This means, there is no status consistency guaranteed when we restart job without save point.

Finished file is the file that have been checked by Flink's check point. It is file that without prefix in-progress or suffix pending. Initially, a file under writing is in in-progress state.
When this file stop being written for a while(can be specified in config), this file become pending. A checkpoint will turn pending files to the finished files.
Only finished file should be considered as the consistent output of Flink. Otherwise you will get duplication.

For more info about check point, please check their official document.

Restart with save point

Let's try save pointing.
This time, we will create a save point before cancel the job.

Flink allows you to make save point by executing:

bin/flink savepoint <job-id>

The <job-id> can be found at the header of the job page in Flink web UI.

After you run this command, Flink will tell you the path to your save point file. Do record this path.

Then, cancel the job, and check the lastest finished file before cancellation, and find the last line of this file. In my experiment, it was {"idx": 4993}.

Then we restart the job.

Because we want to restart from a save point, we need to specify the save point when we restart the job:

./bin/flink run <project-name>.<version>.jar -s <your-save-point-path>

After a few minute, check first finished file after restart, and find the first line of this file. It should be only one point above the number before cancellation.

In my experiment, it was {"idx": 4994}, which is consistent with the number before cancellation.

General thoughts

Flink's save point is much easier than what I expect.
The only thing I should constantly keep in mind is that we need log the save point file path carefully when we use crontab to create save points.
Other than that, Flink seems to handle all the data consistency even without any other settings to Kafka consumer.

As part of the further experiment and research, I think it could be very useful if I try Flink's save point in the cluster setup with multiple task managers.

It could be fun.

--

--