How to Set Up a Local Flink Cluster Using Docker

Ricardo Cordeiro
Marionete
Published in
6 min readJul 16, 2024

Running a local Flink cluster provides an isolated environment for developing, testing, experimenting, and troubleshooting Flink applications. By setting up a local Flink cluster, you can gain hands-on experience and ensure that your Flink jobs are robust and performant before deploying them to a production environment.

What is Flink?

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Deployment Modes

Before we start its important to notice that Flink can execute applications in two ways:

  • Application Mode: Creates a cluster per submitted application. A dedicated JobManager is started for submitting the job. The JobManager will only execute this job, then exit. Although Application Mode comes with better resource isolation, it comes at the price of spinning up a dedicated cluster for each application.
  • Session Mode: The applications are executed on the client side and one JobManager instance manages multiple jobs that share the same cluster of TaskManagers. This has the advantage that you do not pay the resource overhead of spinning up a cluster for every job.

We will use Session Mode during this demonstration to have Job and Task Managers running continuously for testing and interaction with the cluster.

Dockerfile Image

First, create a new file named 'flink.Dockerfile' with the following content:

FROM flink:1.19.0-scala_2.12-java17

RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar && \
wget -P /opt/flink/lib https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.6.1/kafka-clients-3.6.1.jar

To ensure your Flink cluster can connect to various data sources and sinks, you need to include the necessary dependencies. In this example, we are adding dependencies for Apache Kafka. Depending on your use case, you might need to add additional connectors to your setup.

Now build the Docker image with:

docker build -t my-flink -f flink.Dockerfile .

Docker compose

Afterwards we will create a new file named 'env-compose.yml' with the following content:

version: '1'
services:
taskmanager:
image: my-flink
container_name: taskmanager
hostname: taskmanager
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 6

jobmanager:
image: my-flink
container_name: jobmanager
hostname: jobmanager
ports:
- "8082:8082"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
rest.port: 8082

This configuration includes the minimum necessary components to run a functional Flink cluster:

  • JobManager: Manages job execution, task scheduling, and cluster resources.
  • TaskManager: Executes the tasks of the submitted jobs.

To enable our Flink cluster to interact with other technologies, you might add services for the data sources and sinks that your Flink jobs will interact with.

In order to have a Kafka cluster available in our environment, add the following configuration to the 'env-compose.yml' file:

  zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"

kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "input-topic:1:1,output-topic:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock

If both files are correctly created, run the docker-compose to deploy the environment:

docker-compose -f env-compose.yml up -d

Likewise, if you want to tear down the environment:

docker-compose -f env-compose.yml down

Access Flink Dashboard

Access the Flink web dashboard at http://localhost:8082. This dashboard allows you to monitor the status of your Flink cluster, including Job and Task Managers statuses. It also allows submitting and inspecting already finished or ongoing jobs.

Creating a Flink Job

Creating jobs in Apache Flink involves defining the logic for data processing. Flink jobs can be written in various programming languages and paradigms, including SQL, Java, Scala, and Python.

Here, we will focus on creating jobs in SQL and Java, demonstrating how to interact with a Kafka cluster. Both examples will do exactly the same thing: read data from a Kafka topic, process it using SQL/Java, and write the result back to another Kafka topic.

SQL:

-- Create source table to read from Kafka
CREATE TABLE source_table (
`field1` STRING,
`field2` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'input-topic',
'properties.bootstrap.servers' = 'kafka:9093',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

-- Create sink table to write to Kafka
CREATE TABLE sink_table (
`field1` STRING,
`field2` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output-topic',
'properties.bootstrap.servers' = 'kafka:9093',
'format' = 'json'
);

-- Define the SQL query to process the data
INSERT INTO sink_table
SELECT field1, field2
FROM source_table
WHERE field2 = 'correct';

Java:

package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MyTableExample {
public static void main(String[] args) throws Exception {
// Set up the Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up the Flink table environment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create source table to read from Kafka
String create_source_table = "CREATE TABLE source_table (\n" +
" field1 STRING,\n" +
" field2 STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'input-topic',\n" +
" 'properties.bootstrap.servers' = 'kafka:9093',\n" +
" 'properties.group.id' = 'my-group',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
tableEnv.executeSql(create_source_table);

// Create sink table to write to Kafka
String create_sink_table = "CREATE TABLE sink_table (\n" +
" field1 STRING,\n" +
" field2 STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'output-topic',\n" +
" 'properties.bootstrap.servers' = 'kafka:9093',\n" +
" 'format' = 'json'\n" +
")";
tableEnv.executeSql(create_sink_table);

// Define the SQL query to process the data
String insertQuery = "INSERT INTO sink_table " +
"SELECT field1, field2 " +
"FROM source_table " +
"WHERE field2 = 'correct'";

tableEnv.executeSql(insertQuery);
}
}

In this Java example, to keep it simple, we are using the SQL API to define our job, just like in the SQL example above. Feel free to explore and experiment with Flink’s Table and DataStream APIs.

Also we need our pom.xml to include the necessary dependencies for Flink and Kafka:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>java-flink</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.19.0</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.19.0</version>
<scope>compile</scope>
</dependency>

</dependencies>

</project>

Submit a Flink Job

Flink offers multiple methods to submit jobs to a cluster, each suited for different use cases. For instance, the Command Line Interface (CLI), Web Interface, REST API, or programmatically. We will do it in two ways, using the Flink SQL Client, and via REST using the dashboard.

Flink SQL Client:

The Flink SQL Client allows you to interact directly with the Flink cluster, including submitting jobs using a SQL-like syntax.

Access your Flink cluster and from within the JobManager start the SQL client using the following command:

./bin/sql-client.sh

We now need to run each statement from the SQL job described above individually. First, the two CREATE TABLE statements and only at the end the INSERT INTO.

REST (using Dashboard):

1. Package Your Flink Job:

Ensure your Flink job is packaged as a JAR file.

2. Access the Flink Dashboard:

Open a browser and go to the Flink Dashboard URL http://localhost:8082.

3. Submit the Flink Job:

  • Click on the Submit New Job button.
  • Upload your JAR file.
  • Select the job class: org.example.MyTableExample.
  • Click on the Submit button.

Congratulations, you were able to submit your first Flink job!

You can now send data with the right format, for example {field1: 'a', field2: 'correct'}, through the input-topic topic and see the results in the output-topic topic.

Note that our configuration doesn’t support processing malformed json messages.

Conclusion

By following this guide, you should now have a functional local Flink cluster up and running.

Also, we explained how to integrate Kafka with your Flink cluster, enabling you to process and analyse streaming data efficiently.

Not only this, we presented detailed examples of creating Flink jobs using both SQL and Java, showcasing how to read from and write to Kafka topics.

In the end, we explored two different ways to submit jobs to your Flink cluster: using the Flink SQL Client, and via the REST API using the dashboard. Each method offering unique advantages tailored to different use cases.

For the complete code and additional resources, please visit flink-kafka-local.

This setup will allow you to experiment with different job configurations, connectors, and processing logic in a safe and controlled environment.

--

--