Apache Flink Stateful Functions deployment

Dmytro Vedetskyi
DevOops World … and the Universe
6 min readJan 26, 2021

Overview:

Stateful Functions is an API that simplifies the building of distributed stateful applications with a runtime built for serverless architectures. It brings together the benefits of stateful stream processing — the processing of large datasets with low latency and bounded resource constraints — along with a runtime for modeling stateful entities that supports location transparency, concurrency, scaling, and resiliency.

Flink StateFun Function can be written on the severals programming languages like: Java, Python, Go, Rust, Haskell

Flink jobs deployment types:

Flink can execute applications in one of three ways:

  • in Application Mode
  • in a Per-Job Mode
  • in Session Mode

The above modes differ in:

  • the cluster lifecycle and resource isolation guarantees
  • whether the application’s main() method is executed on the client or on the cluster.

Application Mode

Pros:
*
Submission applications with multiple jobs
* Specific cluster configs per job
* Session cluster per application

Cons:
* Separate cluster per job/application
* Resources overhead (CPU, RAM, Network)

Per-Job Mode

Pros:
* Resources isolation
* Spin up a cluster for each submitted job
Cons:
*
Separate cluster per job/application

Session Mode

Pros:
*
Shared cluster/resources
* Support single cluster

Cons:
*
Jobs affect each other
* Resources are shared across all jobs

Stateful Functions building and deployment

Stateful Functions applications can be packaged as either standalone applications or Flink jobs that can be submitted to a cluster.

Building

Docker Image

The recommended deployment mode for Stateful Functions applications is to build a Docker image. This way, user code does not need to package any Apache Flink components. The provided base image allows teams to package their applications with all the necessary runtime dependencies quickly.

Example Docker file

FROM flink-statefun:2.2.0

RUN mkdir -p /opt/statefun/modules/statefun-example
RUN mkdir -p /opt/statefun/modules/remote

COPY target/statefun-example*jar /opt/statefun/modules/statefun-example/
COPY module.yaml /opt/statefun/modules/remote/module.yaml

Official Docker image still in developing so you can use Ververica Docker image flink-statefun:2.2.0

Flink jar job

If you prefer to package your job to submit to an existing Flink cluster, simply include statefun-flink-distribution as a dependency to your application.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-distribution</artifactId>
<version>2.2.0</version>
</dependency>

For start your Function use:

./bin/flink run -c org.apache.flink.statefun.flink.core.StatefulFunctionsJob ./statefun-example.jar

The following configurations are strictly required for running StateFun application.

classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf

Deployment

Remote Functions

Functions can be executed remotely, with message and state access provided as part of the invocation request. This way, functions can be managed independently, like stateless processes. The state/messaging tier (i.e., the Flink processes) and the function tier are deployed, managed, and scaled independently.

Function invocations happen through an HTTP/gRPC protocol and go through a service that routes invocation requests to any available endpoint, for example a Kubernetes (load-balancing) service, the AWS request gateway for Lambda, etc. Because invocations are self-contained (contain message, state, access to timers, etc.) the target functions can be treated like any stateless application.

Co-located Functions

An alternative way of deploying functions is co-location with the Flink JVM processes. In such a setup, each Flink TaskManager would talk to one Function process sitting “next to it”. A common way to do this is to use a system like Kubernetes and deploy pods consisting of a Flink container and the function side-car container; the two communicate via the pod-local network.

This mode supports different languages while avoiding to route invocations through a Service/LoadBalancer, but it cannot scale the state and compute parts independently.

Embedded Functions

Embedded Functions are similar to the execution mode of Stateful Functions 1.0 and to Flink’s Java/Scala stream processing APIs. Functions are run in the JVM and are directly invoked with the messages and state access. This is the most performant way, though at the cost of only supporting JVM languages. Updates to functions mean updating the Flink cluster.

Flink Docker deployment

Kudo deployment

Kudo helps easy to deploy Flink cluster with jar jobs.

Here is example of the job uploader to the flink cluster.

Job will use kafka as from Kudo for events

command:            
- "/usr/local/bin/bash"

args: ['-c',
'export JOB_FILENAME=$(basename $DOWNLOAD_URL);
echo "DOWNLOAD_URL: $DOWNLOAD_URL FILE: $JOB_FILENAME JOBMANAGER: $JOBMANAGER";
apk add --no-cache jq curl;
curl -s $DOWNLOAD_URL -o $JOB_FILENAME;
curl -s -X POST -H "Expect:" -F "jarfile=@$JOB_FILENAME" $JOBMANAGER:8081/jars/upload;
while true; do date; export JAR_ID=$(curl -s $JOBMANAGER:8081/jars | jq -r ".files[].id");
if [ -z $JAR_ID ];
then
echo "No uploaded jar detected";
else
echo "Found jar $JAR_ID";
export SUBMIT_MSG=$(curl -s -X POST -H "Expect:" $JOBMANAGER:8081/jars/$JAR_ID/run?program-args=--kafka_host%20kafka-kafka-1.kafka-svc.default.svc.cluster.local:9093 | jq -r ".errors"); echo "RESPONSE: $SUBMIT_MSG";
if [ $SUBMIT_MSG == "null" ];
then
echo "SUBMITTED JOB!";
exit 0;
else
echo "Failed to submit job: $SUBMIT_MSG";
fi;
fi;
echo "=====================";
sleep 5; done;']

K8s Helm chart

Statefun has native Dockerfile and building script in their repository that can be very useful for building images

https://github.com/apache/flink-statefun/tree/master/tools/docker

Helm chart already there as well. It deploys Flink cluster with your function in place
https://github.com/apache/flink-statefun/tree/master/tools/k8s

Docker compose

version: "2.1"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka-broker:
image: wurstmeister/kafka:2.12-2.0.1
ports:
- "9092:9092"
environment:
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
master:
build:
context: .
expose:
- "6123"
ports:
- "8081:8081"
environment:
- ROLE=master
- MASTER_HOST=master
volumes:
- ./checkpoint-dir:/checkpoint-dir
db:
image: postgres
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "5432:5432"
worker:
build:
context: .
expose:
- "6121"
- "6122"
depends_on:
- master
- kafka-broker
links:
- "master:master"
- "kafka-broker:kafka-broker"
environment:
- ROLE=worker
- MASTER_HOST=master
volumes:
- ./checkpoint-dir:/checkpoint-dir
python-worker:
build:
context: ./greeter
expose:
- "8000"
event-generator:
build:
context: generator
dockerfile: Dockerfile
depends_on:
- kafka-broker

You can download a skeleton project here https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/downloads/walkthrough.zip

Summary:

Stateful Functions is useful and modern serverless solution. Stateful Functions is a framework built on top of the Apache Flink runtime, which means it inherits Flink’s deployment and operations model, and there are no new concepts you need to learn.

It deploys very easy with several types of deployments depends on the your needs and business goals. What method is better you should chose by yourself as each of them has the right to exist. Stateful Functions has ability to be delivered in common method. The main advantages that it supports high availability and scalability.

Flink helps to monitor and show Stateful Functions status using UI

URLs:

https://en.wikipedia.org/wiki/Apache_Flink
https://flink.apache.org/news/2020/11/11/release-statefun-2.2.1.html
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/distributed_architecture.html
https://github.com/apache/flink-statefun/tree/master/tools
https://github.com/kudobuilder/operators/tree/master/repository/flink
https://github.com/apache/flink-statefun/tree/master/tools/docker
https://github.com/apache/flink-statefun/tree/master/tools/k8s
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/

--

--