Mastering Flink Job and Task Managers

Benjamin Barrett
4 min readJul 15, 2024

--

In a previous post, I discussed the Flink Datastream API and included a demo project. Running the main class of the JAR file as is may work, but it’s better to run it in a Flink cluster instead. But what is a Flink cluster anyway, and where can you get one? Let’s find out!

Installing Flink

If you want to follow along, download the Flink executables from this link and add the bin folder to your PATH.

If you’re running this from a Windows environment, you’ll have to use a Unix-like command line like cygwin.

Flink cluster

A Flink cluster is a distributed system for processing large-scale data streams in real-time using Apache Flink. It has several parts that each have a vital role in making sure the stream processing is efficient, elastic and resilient to errors.

Job Manager

The Job Manager is the master node of a Flink cluster. It is responsible for coordinating the execution of Flink jobs, managing resources and overseeing task execution. It also handles checkpoints to monitor stream positions and save points to keep the state of a streaming job consistent.

You can start a Job Manager with the following command:

jobmanager.sh start 

If the operation is successful, the web dashboard will be available at http://localhost:8081.

Task Manager

Task Managers are the worker nodes in a Flink cluster. They execute the task assigned by the Job Manager while managing the task’s memory and network buffers, communicating with other Task Managers for data shuffling and state sharing, and reporting task status and progress to the Job Manager.

For the demo project from my previous post, we will need at least two Task Managers, which we can start by executing the following command twice:

taskmanager.sh start 

The Task Manager is supposed to receive a resource ID automatically, but this doesn’t (always) work on Windows. To fix this, you can configure the taskmanager.resource-id property in conf/flink-conf.yaml.

After the Task Managers have been initialized, they should be visible in the web UI. This means they are ready to accept jobs.

Kafka

We need a Kafka environment to connect our Flink Jobs to, and there are various options available to run a Kafka cluster locally.

I’m going to use the Conduktor one, which comes with a console and a Redpanda cluster: https://www.conduktor.io/get-started/.

Before proceeding, make sure that the following topics exists:

  • cms_person
  • analytics_person
  • person

Deploying Jobs

Build project

First, clone the git repository containing the demo from this GitHub repo and build the project with the following command:

mvn clean package

This will create the flink-1.0-SNAPSHOT.jar file in the target directory, which we will use to deploy the jobs.

Environment Configuration

The demo project connects to the Kafka cluster using the kafka.properties file in the resources folder:

bootstrap.servers=${env:KAFKA_BOOTSTRAP_SERVER} 
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='${env:KAFKA_USERNAME}' password='${env:KAFKA_PASSWORD}';
sasl.mechanism=PLAIN

Make sure to also set the required environment variables so your task can connect to the Kafka cluster.

Deploying DataGen Job

The first job that will be deployed is the DataGen task, which will post generated data on the cms_person and analytics_person topics.

flink run -c eu.cymo.flink.job.DataGen.DataGenJob flink-1.0-SNAPSHOT.jar 

We also need to provide the main class that we are running and the JAR file that contains it as extra parameters.

Navigating back to the overview in the web UI, we can now see a running job for data_gen_job:

We can also verify that messages are being posted on the expected topics:

Deploying Import Job

This job will process messages being posted on the previous topics and map it to a uniform structure before forwarding it to the person topic. Let’s repeat the previous steps, but this time, let’s execute the PersonImportJob command:

flink run -c eu.cymo.flink.job.importer.PersonImporterJob flink-1.0-SNAPSHOT.jar 

Now, let’s verify the web UI and person topic to see if everything is in order:

That’s it, we have now deployed our Flink job to a Flink cluster!

Clean up

When you’re done experimenting with your Flink cluster and jobs, don’t forget to terminate your environment.

Jobs can be canceled through the web UI or by executing the following command:

flink cancel <jobid>

Task Managers and Job Managers can be stopped with the stop command:

taskmanager.sh stop # execute this once for every running Task Manager 

jobmanager.sh stop

Conclusion

In this post, we’ve covered the moving parts of a Flink Cluster and have seen how we can deploy jobs to it. This setup is fine for testing the inner workings of a Flink Cluster locally but running it like this production probably isn’t the best approach. Once you’re ready to start doing that, head over to this link and see which solution works best for you.

--

--