Automatic Apache Flink deployments in Golang

WB Advanced Analytics
inganalytics.com/inganalytics
5 min readApr 5, 2018

At ING’s Wholesale Banking Advanced Analytics team we’ve been using Apache Flink and more specifically the DataStreams API for over 6 months now. Although we’re very enthusiastic about this open source project, we’ve identified one major pain point in using it in a production environment: managing job deployments. This post will highlight the challenges and the open source solution we’ve built to tackle this.

#TL;DR
Managing job deployments and state recovery with Apache Flink is tedious and tricky within a production cluster and sometimes requires chaining operators that are provided by the native Apache Flink CLI. The deployer CLI that we’ve developed will provide several methods which encapsulate this process for you. You can find it here: https://github.com/ing-bank/flink-deployer

What is Apache Flink?

Apache Flink is a real-time distributed stream processing engine. A Flink cluster consists of one or more job managers and multiple task managers. Job managers are responsible for the coordination of jobs (which are JAR files) in the cluster and task managers actually execute the jobs. Multiple job managers allow for failover and robustness. Multiple task managers can be used for distributing and scaling a job over multiple nodes. Besides that, Apache Flink provides a CLI tool which allows you to perform basic commands.

The challenge & our solution

As mentioned in the intro, the tricky part is that for most deployments, you will need to perform multiple commands offered by the default Apache Flink CLI. You could just do this by hand for every deployment, but in an automated CI/CD pipeline, you would definitely like to do this in a more consistent manner. You could get away with writing a bash script, but with all software, you want to have a way to continuously test any changes you do to the script itself.

We eventually decided that a homegrown CLI tool was the way to go. Since Golang perfectly supports compiling binaries, has great test support, type safety and more, our flink-deployer CLI was born.

A challenging scenario: job updates

Let’s take the updating of a job as an example. We identified the following steps involved in this process:

  1. Find the job ID of the currently running job that has to be updated.
  2. Create a savepoint of that job ID so we can recover from that in our new job.
  3. Save the location of the savepoint that Flink generated.
  4. Cancel the job.
  5. Submit the new job with the previously extracted savepoint location so it can recover it’s state.

We’d like to automate these steps to easily perform these complex operations in our CI/CD setup.

To support updating stateful jobs, the state in both jobs has to be compatible. If you’re using case classes in Scala (or any other complex data type), any changes to the data type will prevent you from restoring the state from your old job to your new job. The Apache Flink community is currently working on state migrations when using custom serializers [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/custom_serialization.html]. However, if you’re using Apache Avro for data serialization and schema evolution, there’s no easy support just yet. For this, we’ve also been working on a solution which we’ll share in a later post

So, what does that look like in code? Our Golang CLI runs through all the steps shown above and parses the default Flink CLI’s output in order to perform any next steps. This snippet of code does as we described (simplified):

The CLI should be self-describing but you can get more information by appending `-h` to the command itself. For the update method, the help output would look like this:

Deployment in a cluster

The second pain point was performing CLI actions on a Kubernetes cluster. In our case, Flink is running in a Kubernetes cluster. With this setup in mind, we identified 2 different problems:

  1. All the instances of Flink in your cluster need to be configured with an address internal to the Kubernetes cluster so that the task managers can access the cluster. This results in any outside connections being rerouted to an internal route. Because of it, we cannot run the CLI tool in our CI pipeline, which runs outside the Kubernetes cluster. This issue also occurs when querying the internal state of Flink using Queryable State.
  2. We store job savepoints in a Kubernetes Persistent Volume. During the deployment in which you want to restore from a savepoint, the CLI tool needs access to the savepoint. Exposing the persistent volume to the outside world would compromise security, thus the only viable option would be to run the CLI commands from within the Kubernetes cluster.

We knew that we wouldn’t be able to run the CLI commands from outside the Kubernetes cluster. We tackled this issue by packaging the CLI binary in a Docker image based on the official Apache Flink docker image. This gives us access to the native Apache Flink CLI. It also allows us to specify a Flink configuration file which identifies the cluster in Kubernetes based on environment variables. The resulting container is environment agnostic and can be deployed within our Kubernetes clusters. This ensures that:

  1. Whenever Flink reroutes the deployer to its internal address, it will resolve fine as the deployer itself is also running inside the cluster.
  2. You can share the persistent volumes that contain your Flink savepoints with this deployer container. This way the CLI can restore from any savepoint made by your cluster.

The Docker image of our CLI is available on Dockerhub and there’s an example of the Kubernetes YAML descriptor that we use to deploy the image to the cluster on our github.

Conclusion

We’ve demonstrated a few pain points you’re likely to run into when using Apache Flink in a production environment and how we tried to solve these. We hope that by sharing this solution we can help others who’re facing the same challenges. We are of course very interested in your use case and whether or not our CLI can be of any help to you. If there’s anything you’d like to ask or remark just get in touch with us.

Next up, we’d like to share how we’re managing state migrations. As mentioned before, an update to a job requires the state to be compatible with the old and new job. We are using Apache Avro to support schema evolution across jobs in Flink. It allows us to update state saved to the cluster without having to fully replay from Kafka (and any inconsistencies in stream joins that result from it).

By Marc Rooding and Niels Denissen

--

--