Distributed Hadoop Cluster2(Multinode Kafka-KRaft) with all dependencies
In our previous article, we discussed the setup of a Hadoop cluster and Spark as a multi-node system. Now, we are expanding our cluster by establishing a multi-node Kafka cluster with three brokers on the same cluster.
Disclaimer ;
Before proceeding, it is essential to read the previous article and ensure that the current cluster is properly installed. Going through the settings again is the best way to avoid errors.
For best practice, you can refer to the following link: https://medium.com/@ahmetfurkandemir/distributed-hadoop-cluster-1-spark-with-all-dependincies-03c8ec616166
Roadmap ;
- Creating Kafka user
- Configure Kafka requirements for kafka_2.12–3.2.0
- Setting available java
- Server stand-up
- Make it kafka service systemd in Linux (Optional but strongly recommend that)
- Kraft vs. Zookeeper (For metadata management)
Requirements ;
We will use multipass again for virtualizing machines. Make sure you have multipass installed.
Check your virtual machines with the following command:
# list your machines
multipass list
Cluster configuration;
Let’s begin;
1. Creating Kafka user
2. Configure Kafka requirements for kafka_2.12-3.2.0
master:
multipass shell master
sudo adduser kafka
# It asks you password please be basic...
# It asks you something unnecessary things just press enter.
sudo usermod -aG kafka kafka
# specify group and users
sudo adduser kafka sudo
# make sudoers
su kafka
# connect with kafka user(type your basic password..)
cd /home/kafka
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz
tar xzf kafka_2.12-3.2.0.tgz
rm kafka_2.12-3.2.0.tgz
Next, open the server.properties
file using your preferred text editor.
I’m a vim fan because I’m an old user, but you young people can use nano like AHMET FURKAN DEMIR :) .
vim kafka_2.12-3.2.0/config/kraft/server.properties
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@master:19092,2@node1:19093,3@node2:19094
listeners=PLAINTEXT://master:9092,CONTROLLER://master:19092
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://master:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
.
.
.
log.dirs=/data/logs/kafka
num.partitions=3
Note: Update the necessary configurations in the file. Save and exit.
sudo mv kafka_2.12-3.2.0 /opt/
# move your file opt(opt files in linux generally using for applications)
sudo ln -sf /opt/kafka_2.12-3.2.0 /opt/kafka
# give soft link
sudo chown kafka:root /opt/kafka* -R
# give owner permissions in recursive mode
sudo chmod g+rwx /opt/kafka* -R
# give executable permissions in recursive mode
sudo mkdir /data/logs/kafka -p
# remember from server.properties file we had changed log.dirs location so we need specify
sudo chown -R kafka:kafka /data/logs/kafka
# give owner permissions
Save your ports using ufw
.
sudo ufw allow 9092
sudo ufw allow 9092
sudo ufw allow 19092
sudo ufw allow 19093
sudo ufw allow 19094
sudo ufw enable
sudo ufw reload
sudo ufw status
Warning !
Warning: Ensure that your /etc/hosts
file is configured correctly across all nodes.
Exit from the master node.
Note: Of course your IP possibly difference.
Exit from master we will return :)
Type exit in command shell or try cmd + D for macos, ctrl + D for windows users.
node1:
multipass shell node1
Same steps…
sudo adduser kafka
# It asks you password please be basic...
# It asks you something unnecessary things just press enter.
sudo usermod -aG kafka kafka
# specify group and users
sudo adduser kafka sudo
# make sudoers
su kafka
# connect with kafka user(type your basic password..)
cd /home/kafka
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz
tar xzf kafka_2.12-3.2.0.tgz
rm kafka_2.12-3.2.0.tgz
Just change something to server.properties
vim kafka_2.12-3.2.0/config/kraft/server.properties
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@master:19092,2@node1:19093,3@node2:19094
listeners=PLAINTEXT://node1:9092,CONTROLLER://node1:19093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node1:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
.
.
.
log.dirs=/data/logs/kafka
num.partitions=3
Note = node.id, listenerts, advertised.listener changed.
Save exit that file (server.properties)
sudo mv kafka_2.12-3.2.0 /opt/
# move your file opt(opt files in linux generally using for applications)
sudo ln -sf /opt/kafka_2.12-3.2.0 /opt/kafka
# give soft link
sudo chown kafka:root /opt/kafka* -R
# give owner permissions in recursive mode
sudo chmod g+rwx /opt/kafka* -R
# give executable permissions in recursive mode
sudo mkdir /data/logs/kafka -p
# remember from server.properties file we had changed log.dirs location so we need specify
sudo chown -R kafka:kafka /data/logs/kafka
# give owner permissions
Save network :)
sudo ufw allow 9092
sudo ufw allow 9092
sudo ufw allow 19092
sudo ufw allow 19093
sudo ufw allow 19094
sudo ufw enable
sudo ufw reload
sudo ufw status
Remember /etc/hosts file warning. Just check before exit.
node2:
multipass shell node2
sudo adduser kafka
# It asks you password please be basic...
# It asks you something unnecessary things just press enter.
sudo usermod -aG kafka kafka
# specify group and users
sudo adduser kafka sudo
# make sudoers
su kafka
# connect with kafka user(type your basic password..)
cd /home/kafka
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.12-3.2.0-site-docs.tgz
tar xzf kafka_2.12-3.2.0.tgz
rm kafka_2.12-3.2.0.tgz
vim kafka_2.12-3.2.0/config/kraft/server.properties
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@master:19092,2@node1:19093,3@node2:19094
listeners=PLAINTEXT://node2:9092,CONTROLLER://node2:19094
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://node2:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
.
.
.
log.dirs=/data/logs/kafka
num.partitions=3
sudo mv kafka_2.12-3.2.0 /opt/
# move your file opt(opt files in linux generally using for applications)
sudo ln -sf /opt/kafka_2.12-3.2.0 /opt/kafka
# give soft link
sudo chown kafka:root /opt/kafka* -R
# give owner permissions in recursive mode
sudo chmod g+rwx /opt/kafka* -R
# give executable permissions in recursive mode
sudo mkdir /data/logs/kafka -p
# remember from server.properties file we had changed log.dirs location so we need specify
sudo chown -R kafka:kafka /data/logs/kafka
# give owner permissions
sudo ufw allow 9092
sudo ufw allow 9092
sudo ufw allow 19092
sudo ufw allow 19093
sudo ufw allow 19094
sudo ufw enable
sudo ufw reload
sudo ufw status
Before exit node2 check again /etc/hosts file.
Exit.
3. Setting available java
What we need to know before running Kafka is that kafka has java dependencies. We will investigate the answers to these questions on;
How to determine this according to the version we will use?
How to make an error-free installation?
In our case;
We select kafka_2.12–3.2.0 so, this we need to know this version documentation check for java version.
Please don’t too lazy to read documentation :)
Documentation writes for kafka3.2 version ;
Note that Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0.
It is important information.
Lets check our java version.
Note: You can check any node(master, node1, node2) for this information.
java -version
So, we have java8 but we need java11.
Also, there is hadoop dependencies for java8, we don’t need to change run time java8 because hadoop using default as we set first articles. We just need to set java11 when kafka running.
Let’s install and set java11 for kafka run time;
master:
multipass shell master
If neccesary you can change user with;
su kafka
Be sure in now kafka user;
sudo apt update
sudo apt install openjdk-11-jdk openjdk-11-jre
Type yes if you asking continue.
Then check default java and javac version with following commands;
java -version
javac -version
Probably you will see this kind of output after installing java11
So, we will change default run-time java config following commands;
sudo update-alternatives --config java
You will faced this screen then select number 2 or which one defination of java8 and press enter.
Default java is again setting java8… Perfect.
Also we will do for javac;
sudo update-alternatives --config javac
Select java8, to turn old version which is javac8.
Change .bashrc file for kafka user.
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64' >> ~/.bashrc
source ~/.bashrc # Reload the changed bashrc file
#check it! /usr/lib/jvm/java-11-openjdk-arm64
echo $JAVA_HOME
When Kafka runs, it will get the JAVA_HOME parameter from here and thus we will use an alternative Java version for Kafka.
Generate SSH keys and establish connections between nodes;
In kafka user;
ssh-keygen
Enter empty all questions.
Then write;
cat .ssh/id_rsa.pub
Copy output take in your clipboard
Go node1;
multipass shell node1
Change user
su kafka
check .ssh folder;
cd .ssh
then if not exist create;
mkdir .ssh
then ;
vim authorized_keys
Paste your output which your take with “cat .ssh/id_rsa.pub” command.
Save and exit authorized file
Exit node1
Enter master node
multipass shell master
Then try;
ssh node1
It needs to be go directly node1 with kafka user.
After;
Copy again your “cat .ssh/id_rsa.pub” command output from master then go
multipass shell node2
Change user
su kafka
check .ssh folder;
cd .ssh
then if not exist create;
mkdir .ssh
then ;
vim authorized_keys
Paste your output which your take with “cat .ssh/id_rsa.pub” command.
Save and exit authorized file.
Exit node2
Enter master node
multipass shell master
Then try;
ssh node2
It needs to be go directly node2 with kafka user.
I know ssh-copy-id command but I just want to show different solution.
Let’s continue Java issues. Don’t give up :)
node1:
multipass shell node1
su kafka
sudo apt update
sudo apt install openjdk-11-jdk openjdk-11-jre
Check versions;
java -version
javac -version
It is Java11
So change them all;
sudo update-alternatives --config java
Select java8
sudo update-alternatives --config javac
Select java8 again.
Then check again.
java -version
javac -version
It needs to be java8.
Change .bashrc file for kafka user.
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64' >> ~/.bashrc
source ~/.bashrc # Reload the changed bashrc file#check it! /usr/lib/jvm/java-11-openjdk-arm64
echo $JAVA_HOME
Exit from node1
node2:
multipass shell node2
su kafka
sudo apt update
sudo apt install openjdk-11-jdk openjdk-11-jre
Check versions;
java -version
javac -version
It is Java11
So change them all;
sudo update-alternatives --config java
Select java8
sudo update-alternatives --config javac
Select java8 again.
Then check again.
java -version
javac -version
It needs to be java8.
Change .bashrc file for kafka user.
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64' >> ~/.bashrc
source ~/.bashrc # Reload the changed bashrc file
#check it! /usr/lib/jvm/java-11-openjdk-arm64
echo $JAVA_HOME
Exit from node2
4. Server stand-up
Let’s stand-up multi-node kafka cluster;
multipass shell master
su kafka
/opt/kafka/bin/kafka-storage.sh random-uuid
Copy the generated UUID(“PwYJ3fPwSzm5c1g5RgNFtQ”) and use it in the format command.
After that command if we chance, we can split our terminal it will be more easy way.
Paste all terminal master, node1 and node2 that command
/opt/kafka/bin/kafka-storage.sh format --config /opt/kafka/config/kraft/server.properties --cluster-id 'PwYJ3fPwSzm5c1g5RgNFtQ' --ignore-formatted
Run that all;
Probably you don’t see “already formatted”, you will see “formatted all log directories” something like.
Then copy all nodes that command
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
Add your splitted terminal one more master don’t write the command.
Please run litte fast :) for all nodes. Press enter and go the other one.
If you catch some kind of logs you are successful.(If you don’t catch please nevermind.) Go next step.
Let’s check more proper way;
Check with that command;
/opt/kafka/bin/kafka-topics.sh --bootstrap-server master:9092,node1:9092,node2:9092 --list
Probably you don’t see any topic because, that one from my past tries.
Let’s try creating a topic;
/opt/kafka/bin/kafka-topics.sh --bootstrap-server master:9092,node1:9092,node2:9092 --create --topic test2 --partitions 3 --replication-factor 2
Then make producer from master node;
Warning when you try produce something write exact ip’s don’t use hostnames.
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.67.7:9092,192.168.67.5:9092,192.168.67.6:9092 --topic test2
Make consumer from node2;
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --topic test2
Type some funny messages…
5. Make it kafka service systemd in Linux (Closing cluster safely.)
First we need to safely close our kafka cluster node by node.
Open new terminals;
multipass shell master
multipass shell node1
multipass shell node2
Then try this command for all nodes;
/opt/kafka/bin/kafka-server-stop.sh /opt/kafka/config/kraft/server.properties
Note: Sometimes, last broker is when closing little late if you don’t want to wait apply this;
For example in my case node1 couldn’t closed then I write that command in node1;
ps aux | grep kafka
You will see something like this select pid number which is log path context. In my case long context assign by 9984 process number.
Then KILL IT.
kill -9 9984
Please be sure closed all nodes. Because port needs to be open.
Let’s open that folder;
You will do this for all each hosts;
multipass shell master, multipass shell node1, multipass shell node2
sudo vim /lib/systemd/system/kafka.service
Copy that.
[Unit]
Description=Apache Kafka with KRaft
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/kafka
User=kafka
Group=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh /opt/kafka/config/kraft/server.properties
Restart=on-failure
[Install]
WantedBy=default.target
And paste it.
sudo systemctl daemon-reload
sudo systemctl enable kafka.service
Split for the command line
sudo systemctl start kafka.service
Run it all.
sudo systemctl status kafka.service
So, you can monitor your kafka cluster as linux process.
6. Kraft vs. Zookeeper (For metadata management)
Choosing Between KRaft and ZooKeeper:
The decision to use KRaft or ZooKeeper for metadata management in Kafka depends on factors such as deployment requirements, operational preferences, and the desire for simplicity.
- ZooKeeper: It remains a stable and widely used option, especially in existing Kafka deployments. It may be preferred if you have an established ZooKeeper infrastructure or if you rely on features not yet supported by KRaft. Also,If you zookeeper you need stand-up new service.
- KRaft: It offers a simpler and more integrated approach to metadata management. It’s suitable for new deployments and those looking for a more streamlined Kafka architecture.
In summary, KRaft is an evolution of Kafka’s architecture that aims to provide a more integrated, scalable, and fault-tolerant solution for metadata management without relying on external systems like ZooKeeper. The choice between KRaft and ZooKeeper depends on your specific use case and requirements.
Furkan Cetukkaya