How to deploy Flink on K8s

Punit K
5 min readMar 17, 2024

--

In this blog we will learn how we can deploy a python flink app on k8s , Flink can execute applications in one of three ways:

  • in Application Mode,
  • in Session Mode,
  • in a Per-Job Mode (deprecated).
  1. Pre-Job Mode (deprecated) : Aiming at providing better resource isolation guarantees, the Per-Job mode uses the available resource provider framework (e.g. YARN) to spin up a cluster for each submitted job. This cluster is available to that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own Task Managers. In addition, it spreads the load of book-keeping across multiple Job Managers, as there is one per job.
  2. Session Mode : Session mode assumes an already running k8s cluster and uses the resources of that cluster to execute any submitted application. Applications executed in the same (session) cluster use, and consequently compete for, the same resources. This has the advantage that you do not pay the resource overhead of spinning up a full cluster for every submitted job. But, if one of the jobs misbehaves or brings down a Task Manager, then all jobs running on that Task Manager will be affected by the failure. This, apart from a negative impact on the job that caused the failure, implies a potential massive recovery process with all the restarting jobs accessing the filesystem concurrently and making it unavailable to other services. Additionally, having a single cluster running multiple jobs implies more load for the Job Manager, who is responsible for the book-keeping of all the jobs in the cluster.
  3. Application Mode : The Application mode requires that the user code is bundled together with the Flink image because it runs the user code’s main() method on the cluster. The Application Mode makes sure that all Flink components are properly cleaned up after the termination of the application. if you want to learn more https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode

We will use application to deploy our pyflink app on k8s cause this is suggest method as per official for production. The Flink community provides a base Docker image which can be used to bundle the user code

Prerequisites

  • Kubernetes >= 1.9.
  • KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.
  • default service account with RBAC permissions to create, delete pods.
  • Helm https://helm.sh/docs/intro/install/
  • Cert manager : Install the certificate manager on your Kubernetes cluster to enable adding the webhook component (only needed once per Kubernetes cluster): https://artifacthub.io/packages/helm/cert-manager/cert-manager

In case the cert manager installation failed for any reason you can disable the webhook by passing --set webhook.create=false to the helm install command for the operator.

Now you can deploy the selected stable Flink Kubernetes Operator version using the included Helm chart:

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.7.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

To find the list of stable versions please visit https://flink.apache.org/downloads.html

You may verify your installation via kubectl and helm:

kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-fb5d46f94-ghd8b 2/2 Running 0 4m21s
helm list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
flink-kubernetes-operator default 1 2022-03-09 17 (tel:12022030917):39:55.461359 +0100 CET deployed flink-kubernetes-operator-1.8-SNAPSHOT 1.8-SNAPSHOT

Now we have flink CRD install on k8s Next step will be :

  1. Python script of a simple streaming job
  2. DockerFile to build custom image with pyflink and python demo
  3. Example YAML for submitting the python job using the operator

Python Script of a simple streaming job python_demo.py

import logging
import sys

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment


def python_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
)""")

t_env.execute_sql("""
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE orders""")
t_env.execute_sql("""
INSERT INTO print_table SELECT * FROM orders""")


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
python_demo()

2. Dockerfile to build custom image with pyflink and python demo .

# Check https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker for more details
FROM flink:1.16
# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source, \
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# install PyFlink
RUN pip3 install "apache-flink>=1.16.0,<1.17.1"
# add python script
USER flink
RUN mkdir /opt/flink/usrlib
ADD python_demo.py /opt/flink/usrlib/python_demo.py

Now we will create a image out of above docker image and push it to docker hub .

docker build -t punitkashyup/flink-python-example:v1 .

docker push punitkashyup/flink-python-example:v1

3. Example YAML for submitting the python job using the operator

Let write manifest to deploy our docker image python-example.yaml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: python-example
spec:
image: punitkashyup/flink-python-example:v1
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder and it is optional .
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/python_demo.py"]
parallelism: 1
upgradeMode: stateless

Now we will apply this yml .

kubectl create-f python-example.ymal

You may follow the logs of your job, after a successful startup (which can take on the order of a minute in a fresh environment, seconds afterwards) you can:

kubectl logs -f deploy/basic-example

2022-03-11 21:46:04,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 206 (type=CHECKPOINT) @ 1647035164458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:04,465 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 206 for job a12c04ac7f5d8418d8ab27931bf517b7 (28509 bytes, checkpointDuration=7 ms, finalizationTime=0 ms).
2022-03-11 21:46:06,458 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 207 (type=CHECKPOINT) @ 1647035166458 for job a12c04ac7f5d8418d8ab27931bf517b7.
2022-03-11 21:46:06,483 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 207 for job a12c04ac7f5d8418d8ab27931bf517b7 (28725 bytes, checkpointDuration=25 ms, finalizationTime=0 ms).

To expose the Flink Dashboard you may add a port-forward rule or look the ingress configuration options:

kubectl port-forward svc/basic-example-rest 8081

In order to stop your job and delete your FlinkDeployment you can:

kubectl delete flinkdeployment/basic-example

--

--

Punit K
0 Followers

DevOps Engineer | Azure DevOps| AWS | Cloud Native | Kubernetes