Our journey with Apache Flink (Part 1) : Operation and deployment tips

Igor Mukam
Lumen Engineering Blog
9 min readDec 8, 2022

Apache Flink is a great framework for building data processing applications. In our team (Data / R&D for CDN at Lumen), we love it for the wide range of use cases it allows us to tackle, the simplicity to get started with it, its very comprehensive documentation, and the lively community evolving around it.

We have been developing with it for a few years now, to process the real time video quality data sent by Mesh Delivery and CDN Load Balancer end users, and today our Flink applications process millions of messages every day. We learned a few things along the road, and in this article, we will share a few key aspects of how we deploy and operate our Flink applications.

Split responsibilities: one Flink cluster = one Flink application

Flink applications run on Flink clusters. A cluster is a combination of one or several Job Managers and one or several Task Managers. Job Managers are the brains of the cluster: they receive requests for submitting applications to run on the cluster, and they schedule it on the Task Managers. Task Managers are the muscles of the cluster: they are the workers actually running the applications.

You can run as many Flink applications as you want on a single Flink cluster, provided that you have enough Task Managers on your cluster. But our experience showed us that it’s better to have as many Flink clusters as applications to run. This allowed us to:

  • adapt the resources in each cluster to the needs of the application running on it
  • isolate the applications from one another and avoid propagating errors / bugs in one job affect to other applications

Examples for this second point are situations where submitting a new application to the Job Manager leads to an unexpected crash of the Job Manager process. If all your applications are running on the same cluster, they all rely on the same Job Manager so they will all be affected and stop unexpectedly (unless you use a Highly Available setup with Zookeeper for your Job Managers, which is not always possible).

Anticipate: make regular savepoints of your applications

One of the super cool Flink features is the checkpointing mechanism. Checkpoints are backups of the application state at a given point in time. In your application code, you can ask Flink to take regular checkpoints (it is recommended if your application has some kind of state that you don’t want to lose when the application stops).

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enables checkpoints
env.enableCheckpointing(conf.getCheckpointsFreqMs());
env.getCheckpointConfig().setCheckpointTimeout(conf.getCheckpointsTimeoutMs());
env.getCheckpointConfig().setMaxConcurrentCheckpoints(conf.getMaxConcurrentCheckpoints());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(conf.getTolerableCheckpointFailures());
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(conf.getCheckpointsMinPauseTimeMs());

Checkpoints are great because you can restart the application from them if it unexpectedly failed. But there are some limitations, and in some situations, you might be unable to restart your application from a given checkpoint.

Flink has a more advanced type of checkpoint, called savepoints, that need to be triggered externally (i.e. cannot be triggered from the code). Restarting a job from a savepoint works almost all the time, so it’s better to have one to restart your job from, especially after an application crash that requires some non-trivial changes to be made on the application code. In that case (when the application schema changed), a savepoint is the only way to have the guarantee to be that you are able to restart your job without losing the state of the application (see https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/#capabilities-and-limitations).

So, to avoid finding ourselves forced to restart stateful applications from scratch (with empty states), we configured a simple cron job (to run every 15 minutes) on each of our Job Managers to make regular savepoints of our applications.

# Finds the id of the application
JOB_ID=$(/bin/flink list | grep $JOB_NAME | awk -F' ' '{print $4}')
if [ -z "${JOB_ID}" ]; then
echo "Error : empty job_id"
exit 1
fi
# Takes a savepoint of the job, and include the date in the path
/bin/flink savepoint $JOB_ID $SAVEPOINT_FOLDER/$(date '+%Y-%m-%d-%H-%M')

That way, we have the guarantee ensure that we won’t have to restart an application with a state older than 15 minutes before a potential crash.

Automate: take advantage of the Flink API for CD

Another great thing about Flink clusters is that you can do a lot of things with the Flink API exposed by the Job Managers. You can use it to get the list of jobs running on the cluster, check what are the last checkpoints taken by a given job, take a savepoint of a running job, stop a running application, submit a new application to run on the cluster, etc.… All of these steps are actually what you need to do when you need to update / redeploy a Flink application. So we bundled those API calls in simple Python scripts, and used those scripts to set up Continuous Deployment on our Flink applications.

For example, we have a script deploy_or_update_job.py, which goes to the Job Manager:

  • stops the application if it’s running and takes a savepoint (/jobs/{job_id}/stop)
  • uploads a given JAR to the Job Manager (/jars/upload)
  • runs the JAR with specified command line parameters (using the last savepoint taken for that job) (/jar/{jar_id}/run)

Additionally, we created Docker images that allow us to use those scripts alongside the appropriate JAR files containing the Flink jobs to run. A sample of our CD process (using Google Cloud Build) looks like this:

# Clone the data-flink-scripts repository containing our Python scripts for Flink API
- name: 'gcr.io/cloud-builders/git'
entrypoint: 'bash'
args:
- -c
- |
eval "$(ssh-agent -s)"
ssh-add /root/.ssh/id_rsa_data_flink_scripts
git submodule update --init -- data-flink-scripts
volumes:
- name: 'ssh'
path: /root/.ssh

# Builds the Docker image packaging the job JAR file + our Python scripts
- id: 'build image'
name: 'gcr.io/cloud-builders/docker'
args: [ 'build', '-t', 'my-flink-job', '.' ]

# Uses the deploy_or_update_job.py script to actually deploy the JAR on a Flink cluster
- id: deploy
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args:
- -c
- |
docker run -e FLINK_JOBMANAGER_URL=http://10.x.x.x:8081 my-flink-job data-flink-scripts/deploy_or_update_job.py --job_name my-flink-job --parallelism 72 --config_file my-conf-file.yaml

Scale: leverage the Flink adaptive scheduler with Google Cloud Managed Instance Groups

Flink 1.13 came with a long-awaited capability: automatic rescaling of a Flink job. In this section, we will detail how we use the adaptive scheduler to automatically upscale / downscale our Flink application when the amount of data to process increases or decreases.

Flink allows you to choose between several types of strategies to decide how to use the slots available on a cluster (scheduler strategies). The scheduler that is useful for auto scaling is the adaptive scheduler (which uses all the slots available in a given cluster as long as the parallelism required when the application was launched is not reached, see https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-scheduler). That scheduler will automatically restart your application if you add Task Managers to a cluster to use the new slots added by the new workers, and gracefully handle the loss of a Task Manager by simply restarting the application with a lesser parallelism.

Step 1: create an instance template for the Task Managers

A Google Managed Instance Group is a set of identical machines performing a given task. When auto scaling is on, Google can automatically add new instances to the group based on an instance template.

In our case, we chose to base our instance templates on Google Container-Optimized OS, which are optimized for running Docker containers. We simply specify our custom Flink Task Manager Docker image (based on the official Flink Docker image) as the container to run when a new instance is spawned.

Step 2: create the instance group and configure the auto scaler

The setup that we have is:

  • One VM running the Job Manager
  • One Managed Instance Group where each VM runs a Task Manager
  • Network connections allowed from the Job Manager to the Managed Instance Group
  • Network connections allowed within the Managed Instance Group

We used the Instance Template created in Step 1 to create a Managed Instance Group, and we enabled the Google Auto Scaler on that Instance Group.

The Auto Scaler has four main parameters to fine tune:

  • The minimum number of machines in the Instance Group
  • The maximum number of machines in the Instance Group
  • The CPU utilization threshold (averaged across all instances) that the auto scaler will try to maintain (a good value for our Flink applications is 70%)
  • The “cool down period,” which lets you adjust how much time it takes for one new instance to be fully working and prevents the auto scaler from making decisions too early after adding a new instance.

Step 3: correctly configure the Flink environment

There are a few things to adjust in the configuration if you want to use auto scaling for Flink applications.

In the Job Manager Flink environment

  • Set the jobmanager scheduler to ‘adaptive’ to enable the adaptive Scheduler
  • Set the “resource-wait-timeout,” the maximum time that the Job Manager will wait for at least one worker to be available to schedule your application (in case the Managed Instance Group finds itself with zero workers for some reason)
  • Adjust the heartbeat timeout duration: you want your job to react as quickly as possible to the deletion of a Task Manager (when your cluster is scaling down), so you need to change the value of `heartbeat.timeout` in the flink-conf.yaml. (for Job Manager AND Task Managers). A value of 15 seconds, for example, is large enough for us to avoid false positives, while allowing fast rescale of the application.
  • Set a restart strategy in your application that sets no limit on the max number of restarts: each time a rescale event occurs, it is counted as a restart, so there should be no limit in the number of restarts the Job Manager can trigger before terminating the application. For our applications, we use the exponential Delay Restart strategy, but it’s also possible to stick with the default fixed-delay restart strategy with Integer.MAX_VALUE restart attempts.
jobmanager.scheduler: adaptive
jobmanager.adaptive-scheduler.resource-wait-timeout: 3600 s
hearbeat.timeout: 15000
restart-strategy.type: exponential-delay
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1

In the Task Manager Flink environment

  • Set the heartbeat timeout.
  • Set the number of task slots: Flink recommends you set one task slot per CPU available on your machine.
  • Set the address of the Job Manager: all the Task Managers in your Instance Group should talk to the same Job Manager.
jobmanager.rpc.address: x.x.x.x
hearbeat.timeout: 15000
taskmanager.numberOfTaskSlots: 2

Step 4: deploy your applications and enjoy auto scaling!

The important thing to mention when deploying is that we set the parallelism of the job to a value higher than the total number of slots that we can have on the cluster. That way, the Job Manager starts the application using all the slots available.

When traffic increases, the Google Cloud auto scaler adds new Task Managers to the Managed Instance Group (when the Task Managers’ CPU utilization reaches the configured threshold). The Job Manager detects that there are new slots available, and since we configured an arbitrary high value for parallelism, it rescales the job to use all the new slots.

When traffic decreases, Google autoscaler removes Task Managers (when CPU utilization is too low compared to the configured threshold). The Job Manager detects the loss of a Task Manager and simply restarts the job with a lower parallelism.

The graph below shows the typical shape of the number of Task Managers in one of our Flink cluster over a few days.

Number of Task Managers in one Flink cluster over time

The auto scaling is not perfectly smooth;, there are is still some tuning to be made done with the auto scaler settings, but overall we are quite happy with this setup as it allowsing us to save money without compromising on data quality, consistency and availability.

Conclusion

We hope that this article gave you some practical tips to operate and deploy your Flink applications! In our next article, we will show some specific data processing issues that we solved using Apache Flink, and how we overcame them. Feedback is more than welcome, and if you want to help us process and analyze the data we receive from the thousands of video sessions powered by our Mesh Delivery and CDN Load Balancer technologies every day, we’re hiring!

--

--

Igor Mukam
Lumen Engineering Blog

Data Scientist / Engineer, Data / R&D Team Lead for Lumen Technologies (formerly Streamroot)