High Available Flink Cluster on Kubernetes-Setup

Gökçe Sürenkök
hepsiburadatech
Published in
4 min readFeb 25, 2020

--

Introduction

Flink is a great distributed stream processor to run streaming applications at any scale.

Since it has efficient and consistent checkpoints, it ensures that its internal state remains consistent.Therefore , it can recover from failures with no trouble in order to run streaming applications 24/7.

It can be integrated with cluster managers , such as Hadoop Yarn, Mesos or Kubernetes.

In Hepsiburada, we are running Flink in Kubernetes to stream changes from Kafka clusters to Elasticsearch clusters.

Flink has a very detailed installation guide for Kubernetes on its official website , you can find it here :

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/kubernetes.html

Though Flink has a very detailed and clean instructions how to setup flink cluster on Kubernetes , it is a bit unclear how to run with high availability as well as how to export prometheus metrics of cluster.

Therefore i will be sharing our setup with you.

Flink task managers waiting for a job

Prerequisites

Before you begin with this guide, ensure you have the following available to you:

  • a Kubernetes cluster (i tested with version equal and higher than 1.11)
  • a ZooKeeper cluster
  • an HDFS storage

Flink’s latest version is now 1.10 , but this setup will be using 1.9 since 1.10 is not compatible with our elasticsearch cluster.

Dockerfile

First of all , we need to modify official flink image a bit in order to give ability to connect hdfs and export prometheus metrics.

FROM flink:1.9.0-scala_2.12
ARG hadoop_jar
ARG prometheus_jar
COPY --chown=flink:flink $prometheus_jar $FLINK_HOME/lib/
COPY --chown=flink:flink $hadoop_jar $FLINK_HOME/lib/
USER flink

you can find details of the docker image from the following repo.

Kubernetes Setup Details

In this section , i will explain why we added some configs to kubernetes resource yamls.

Configmaps

  1. Flink Config
web.submit.enable: false

this line is preventing people to upload jobs from web-ui . I added this on purpose , by default it is true.

metrics.reporter.prom.class:org.apache.flink.metrics.prometheus.PrometheusReporter

this line points where the prometheus metrics exporters class , which we already added to our docker image.

high-availability: zookeeper
high-availability.zookeeper.quorum: $zookeper-ip:port
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.storageDir: hdfs://$hdfs-hostname/flink/zookeeper
high-availability.storageDir: hdfs://$hdfs-hostname/flink/recovery
high-availability.cluster-id: /flink-cluster
fs.hdfs.hadoopconf: /etc/hadoop

this section is about high availability. We provide high availability with zookeeper , and it stores the states in hdfs file system.

you should change zookeeper quorum part with your zookeeper cluster ip and port, as well as your hdfs cluster’s hostname

2. Hadoop Config

core-site.xml:

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://$hdfs-hostname:port</value>
<description>Enter your NameNode hostname</description>
</property>
</configuration>

hdfs-site.xml:

<configuration>
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>true</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>

since we use hadoop hdfs to store the states , we have to provide core-site.xml and hdfs-site.xml as above.

you need to modify hdfs name and port with your setup.

Deployment

  1. Job Manager and Task Manager

Since we use the hdfs nodename in configurations, we need to define host alias in both job manager and task managers resources

hostAliases:
- ip: "$hdfs-node-ip"
hostnames:
- "hdfs-node-hostname"

you need to give hdfs ip and hostname as above , if you have a hdfs cluster , you can multiply it.

Liveness Probe

## job-managerlivenessProbe:
httpGet:
path: /overview
port: 8081
initialDelaySeconds: 30
periodSeconds: 10
---
## task-manager
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60

For both job manager and task manager , we need to define liveness probes.

Because task managers may lose connection with zookeeper or job managers , or vice versa .This is where we take advantages of Kubernetes.

With liveness probes, Kubernetes can kill the failed job manager or task manager and create a new one .

And the new pod can continue to execute the job by looking its state from zookeeper.

You can find the all resource yaml in the following repo

Monitoring

Prometheus metrics of flink instances are exported from :9249/metrics.If you have a prometheus db outside of kubernetes cluster, in service yaml , 9249 is exposed with NodePort. Therefore you can scrape the metrics by editing prometheus.yml as following

- job_name: 'flink-job-manager-metrics'
static_configs:
- targets: ['kube-node-ip:31222']
- job_name: 'flink-task-manager-metrics'
static_configs:
- targets: ['kube-node-ip:31333']

or if you have already prometheus db in your kubernetes cluster , you can add prometheus annotation to deployment and sts yamls.

template:
metadata:
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: '/metrics-text'

Since we have our metrics , we can visualize them on grafana dashboard like the following

This dashboard is customized for Hepsiburada Search Team.

Conclusion

Flink is a great streaming processor alone ,and we use it heavily as Hepsiburada Search team .

With Kubernetes cluster management, it provided us a perfect reliable 7/24 running streaming jobs like in the following example

We see how task managers are recovered at 2 am in the morning.

I hope this article helps you to build a high available Flink cluster on Kubernetes.

Thanks

Helpful Links

--

--