Distributed Hadoop Cluster2(Multinode Kafka-KRaft) with all dependencies

Furkan Cetukkaya
11 min readDec 28, 2023

--

In our previous article, we discussed the setup of a Hadoop cluster and Spark as a multi-node system. Now, we are expanding our cluster by establishing a multi-node Kafka cluster with three brokers on the same cluster.

Disclaimer ;
Before proceeding, it is essential to read the previous article and ensure that the current cluster is properly installed. Going through the settings again is the best way to avoid errors.

For best practice, you can refer to the following link: https://medium.com/@ahmetfurkandemir/distributed-hadoop-cluster-1-spark-with-all-dependincies-03c8ec616166

Roadmap ;

  1. Creating Kafka user
  2. Configure Kafka requirements for kafka_2.12–3.2.0
  3. Setting available java
  4. Server stand-up
  5. Make it kafka service systemd in Linux (Optional but strongly recommend that)
  6. Kraft vs. Zookeeper (For metadata management)

Requirements ;

We will use multipass again for virtualizing machines. Make sure you have multipass installed.

Check your virtual machines with the following command:

# list your machines
multipass list

Cluster configuration;

Let’s begin;

1. Creating Kafka user

2. Configure Kafka requirements for kafka_2.12-3.2.0

master:

multipass shell master
sudo adduser kafka
# It asks you password please be basic...
# It asks you something unnecessary things just press enter.

sudo usermod -aG kafka kafka
# specify group and users

sudo adduser kafka sudo
# make sudoers

su kafka
# connect with kafka user(type your basic password..)

cd /home/kafka

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz

tar xzf kafka_2.12-3.2.0.tgz

rm kafka_2.12-3.2.0.tgz

Next, open the server.properties file using your preferred text editor.

I’m a vim fan because I’m an old user, but you young people can use nano like AHMET FURKAN DEMIR :) .

vim kafka_2.12-3.2.0/config/kraft/server.properties
process.roles=broker,controller

node.id=1

controller.quorum.voters=1@master:19092,2@node1:19093,3@node2:19094

listeners=PLAINTEXT://master:9092,CONTROLLER://master:19092

inter.broker.listener.name=PLAINTEXT

advertised.listeners=PLAINTEXT://master:9092

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

.
.
.

log.dirs=/data/logs/kafka

num.partitions=3

Note: Update the necessary configurations in the file. Save and exit.


sudo mv kafka_2.12-3.2.0 /opt/
# move your file opt(opt files in linux generally using for applications)

sudo ln -sf /opt/kafka_2.12-3.2.0 /opt/kafka
# give soft link

sudo chown kafka:root /opt/kafka* -R
# give owner permissions in recursive mode

sudo chmod g+rwx /opt/kafka* -R
# give executable permissions in recursive mode

sudo mkdir /data/logs/kafka -p
# remember from server.properties file we had changed log.dirs location so we need specify

sudo chown -R kafka:kafka /data/logs/kafka
# give owner permissions

Save your ports using ufw.

sudo ufw allow 9092
sudo ufw allow 9092
sudo ufw allow 19092
sudo ufw allow 19093
sudo ufw allow 19094

sudo ufw enable
sudo ufw reload
sudo ufw status

Warning !

Warning: Ensure that your /etc/hosts file is configured correctly across all nodes.

Exit from the master node.

Note: Of course your IP possibly difference.

Exit from master we will return :)

Type exit in command shell or try cmd + D for macos, ctrl + D for windows users.

node1:

multipass shell node1 

Same steps…

sudo adduser kafka
# It asks you password please be basic...
# It asks you something unnecessary things just press enter.

sudo usermod -aG kafka kafka
# specify group and users

sudo adduser kafka sudo
# make sudoers

su kafka
# connect with kafka user(type your basic password..)

cd /home/kafka

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz

tar xzf kafka_2.12-3.2.0.tgz

rm kafka_2.12-3.2.0.tgz

Just change something to server.properties

vim kafka_2.12-3.2.0/config/kraft/server.properties
process.roles=broker,controller

node.id=2

controller.quorum.voters=1@master:19092,2@node1:19093,3@node2:19094

listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:19093

inter.broker.listener.name=PLAINTEXT

advertised.listeners=PLAINTEXT://node1:9092

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

.
.
.

log.dirs=/data/logs/kafka

num.partitions=3

Note = node.id, listenerts, advertised.listener changed.

Save exit that file (server.properties)

sudo mv kafka_2.12-3.2.0 /opt/
# move your file opt(opt files in linux generally using for applications)

sudo ln -sf /opt/kafka_2.12-3.2.0 /opt/kafka
# give soft link

sudo chown kafka:root /opt/kafka* -R
# give owner permissions in recursive mode

sudo chmod g+rwx /opt/kafka* -R
# give executable permissions in recursive mode

sudo mkdir /data/logs/kafka -p
# remember from server.properties file we had changed log.dirs location so we need specify

sudo chown -R kafka:kafka /data/logs/kafka
# give owner permissions

Save network :)

sudo ufw allow 9092
sudo ufw allow 9092
sudo ufw allow 19092
sudo ufw allow 19093
sudo ufw allow 19094

sudo ufw enable
sudo ufw reload
sudo ufw status

Remember /etc/hosts file warning. Just check before exit.

node2:

multipass shell node2
sudo adduser kafka
# It asks you password please be basic...
# It asks you something unnecessary things just press enter.

sudo usermod -aG kafka kafka
# specify group and users

sudo adduser kafka sudo
# make sudoers

su kafka
# connect with kafka user(type your basic password..)

cd /home/kafka

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz

tar xzf kafka_2.12-3.2.0.tgz

rm kafka_2.12-3.2.0.tgz
vim kafka_2.12-3.2.0/config/kraft/server.properties
process.roles=broker,controller

node.id=3

controller.quorum.voters=1@master:19092,2@node1:19093,3@node2:19094

listeners=PLAINTEXT://node2:9092,CONTROLLER://node2:19094

inter.broker.listener.name=PLAINTEXT

advertised.listeners=PLAINTEXT://node2:9092

controller.listener.names=CONTROLLER

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

.
.
.

log.dirs=/data/logs/kafka

num.partitions=3
sudo mv kafka_2.12-3.2.0 /opt/
# move your file opt(opt files in linux generally using for applications)

sudo ln -sf /opt/kafka_2.12-3.2.0 /opt/kafka
# give soft link

sudo chown kafka:root /opt/kafka* -R
# give owner permissions in recursive mode

sudo chmod g+rwx /opt/kafka* -R
# give executable permissions in recursive mode

sudo mkdir /data/logs/kafka -p
# remember from server.properties file we had changed log.dirs location so we need specify

sudo chown -R kafka:kafka /data/logs/kafka
# give owner permissions
sudo ufw allow 9092
sudo ufw allow 9092
sudo ufw allow 19092
sudo ufw allow 19093
sudo ufw allow 19094

sudo ufw enable
sudo ufw reload
sudo ufw status

Before exit node2 check again /etc/hosts file.

Exit.

3. Setting available java

What we need to know before running Kafka is that kafka has java dependencies. We will investigate the answers to these questions on;

How to determine this according to the version we will use?
How to make an error-free installation?

In our case;

We select kafka_2.12–3.2.0 so, this we need to know this version documentation check for java version.

Please don’t too lazy to read documentation :)

Documentation writes for kafka3.2 version ;

Note that Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0.

It is important information.

Lets check our java version.

Note: You can check any node(master, node1, node2) for this information.

java -version

So, we have java8 but we need java11.

Also, there is hadoop dependencies for java8, we don’t need to change run time java8 because hadoop using default as we set first articles. We just need to set java11 when kafka running.

Let’s install and set java11 for kafka run time;

master:

multipass shell master

If neccesary you can change user with;

su kafka

Be sure in now kafka user;

sudo apt update

sudo apt install openjdk-11-jdk openjdk-11-jre

Type yes if you asking continue.

Then check default java and javac version with following commands;

java -version

javac -version

Probably you will see this kind of output after installing java11

So, we will change default run-time java config following commands;

sudo update-alternatives --config java

You will faced this screen then select number 2 or which one defination of java8 and press enter.

Default java is again setting java8… Perfect.

Also we will do for javac;

sudo update-alternatives --config javac

Select java8, to turn old version which is javac8.

Change .bashrc file for kafka user.

echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64' >> ~/.bashrc

source ~/.bashrc # Reload the changed bashrc file

#check it! /usr/lib/jvm/java-11-openjdk-arm64
echo $JAVA_HOME

When Kafka runs, it will get the JAVA_HOME parameter from here and thus we will use an alternative Java version for Kafka.

Generate SSH keys and establish connections between nodes;

In kafka user;

ssh-keygen

Enter empty all questions.

Then write;

cat .ssh/id_rsa.pub

Copy output take in your clipboard

Go node1;

multipass shell node1

Change user

su kafka

check .ssh folder;

cd .ssh

then if not exist create;

mkdir .ssh 

then ;

vim authorized_keys

Paste your output which your take with “cat .ssh/id_rsa.pub” command.

Save and exit authorized file

Exit node1

Enter master node

multipass shell master

Then try;

ssh node1

It needs to be go directly node1 with kafka user.

After;

Copy again your “cat .ssh/id_rsa.pub” command output from master then go

multipass shell node2

Change user

su kafka

check .ssh folder;

cd .ssh

then if not exist create;

mkdir .ssh

then ;

vim authorized_keys

Paste your output which your take with “cat .ssh/id_rsa.pub” command.

Save and exit authorized file.

Exit node2

Enter master node

multipass shell master

Then try;

ssh node2

It needs to be go directly node2 with kafka user.

I know ssh-copy-id command but I just want to show different solution.

Let’s continue Java issues. Don’t give up :)

node1:

multipass shell node1 
su kafka
sudo apt update

sudo apt install openjdk-11-jdk openjdk-11-jre

Check versions;

java -version

javac -version

It is Java11

So change them all;

sudo update-alternatives --config java

Select java8

sudo update-alternatives --config javac

Select java8 again.

Then check again.

java -version
javac -version

It needs to be java8.

Change .bashrc file for kafka user.

echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64' >> ~/.bashrc
source ~/.bashrc # Reload the changed bashrc file#check it! /usr/lib/jvm/java-11-openjdk-arm64 
echo $JAVA_HOME

Exit from node1

node2:

multipass shell node2
su kafka
sudo apt update

sudo apt install openjdk-11-jdk openjdk-11-jre

Check versions;

java -version
javac -version

It is Java11

So change them all;

sudo update-alternatives --config java

Select java8

sudo update-alternatives --config javac

Select java8 again.

Then check again.

java -version
javac -version

It needs to be java8.

Change .bashrc file for kafka user.

echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64' >> ~/.bashrc
source ~/.bashrc # Reload the changed bashrc file
#check it! /usr/lib/jvm/java-11-openjdk-arm64 
echo $JAVA_HOME

Exit from node2

4. Server stand-up

Let’s stand-up multi-node kafka cluster;

multipass shell master
su kafka
/opt/kafka/bin/kafka-storage.sh random-uuid

Copy the generated UUID(“PwYJ3fPwSzm5c1g5RgNFtQ”) and use it in the format command.

After that command if we chance, we can split our terminal it will be more easy way.

Paste all terminal master, node1 and node2 that command

/opt/kafka/bin/kafka-storage.sh format --config /opt/kafka/config/kraft/server.properties --cluster-id 'PwYJ3fPwSzm5c1g5RgNFtQ' --ignore-formatted

Run that all;

Probably you don’t see “already formatted”, you will see “formatted all log directories” something like.

Then copy all nodes that command

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties

Add your splitted terminal one more master don’t write the command.

Please run litte fast :) for all nodes. Press enter and go the other one.

If you catch some kind of logs you are successful.(If you don’t catch please nevermind.) Go next step.

Let’s check more proper way;

Check with that command;

/opt/kafka/bin/kafka-topics.sh --bootstrap-server master:9092,node1:9092,node2:9092 --list

Probably you don’t see any topic because, that one from my past tries.

Let’s try creating a topic;

/opt/kafka/bin/kafka-topics.sh --bootstrap-server master:9092,node1:9092,node2:9092 --create --topic test2 --partitions 3 --replication-factor 2

Then make producer from master node;

Warning when you try produce something write exact ip’s don’t use hostnames.

/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.67.7:9092,192.168.67.5:9092,192.168.67.6:9092 --topic test2

Make consumer from node2;

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --topic test2

Type some funny messages…

5. Make it kafka service systemd in Linux (Closing cluster safely.)

First we need to safely close our kafka cluster node by node.

Open new terminals;

multipass shell master
multipass shell node1
multipass shell node2

Then try this command for all nodes;

 /opt/kafka/bin/kafka-server-stop.sh /opt/kafka/config/kraft/server.properties

Note: Sometimes, last broker is when closing little late if you don’t want to wait apply this;

For example in my case node1 couldn’t closed then I write that command in node1;

ps aux | grep kafka

You will see something like this select pid number which is log path context. In my case long context assign by 9984 process number.

Then KILL IT.

kill -9 9984

Please be sure closed all nodes. Because port needs to be open.

Let’s open that folder;

You will do this for all each hosts;

multipass shell master, multipass shell node1, multipass shell node2

sudo vim /lib/systemd/system/kafka.service

Copy that.

[Unit]
Description=Apache Kafka with KRaft
After=network.target

[Service]
Type=simple
WorkingDirectory=/opt/kafka
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh /opt/kafka/config/kraft/server.properties
Restart=on-failure

[Install]
WantedBy=default.target

And paste it.

sudo systemctl daemon-reload
sudo systemctl enable kafka.service

Split for the command line

sudo systemctl start kafka.service

Run it all.

sudo systemctl status kafka.service
check all status

So, you can monitor your kafka cluster as linux process.

6. Kraft vs. Zookeeper (For metadata management)

Choosing Between KRaft and ZooKeeper:

The decision to use KRaft or ZooKeeper for metadata management in Kafka depends on factors such as deployment requirements, operational preferences, and the desire for simplicity.

  • ZooKeeper: It remains a stable and widely used option, especially in existing Kafka deployments. It may be preferred if you have an established ZooKeeper infrastructure or if you rely on features not yet supported by KRaft. Also,If you zookeeper you need stand-up new service.
  • KRaft: It offers a simpler and more integrated approach to metadata management. It’s suitable for new deployments and those looking for a more streamlined Kafka architecture.

In summary, KRaft is an evolution of Kafka’s architecture that aims to provide a more integrated, scalable, and fault-tolerant solution for metadata management without relying on external systems like ZooKeeper. The choice between KRaft and ZooKeeper depends on your specific use case and requirements.

Furkan Cetukkaya

AHMET FURKAN DEMIR

--

--