Oracle Developers
Published in

Oracle Developers

15 Minutes to get a Kafka Cluster running on Kubernetes — and start producing and consuming from a Node application

For workshop I will present on microservices and communication patterns I need attendees to have their own local Kafka Cluster. I have found a way to have them up and running in virtually no time at all. Thanks to the combination of:

  • Kubernetes
  • Minikube
  • The Yolean/kubernetes-kafka GitHub Repo with Kubernetes yaml files that creates all we need (including Kafka Manager)

Prerequisites:

  • Minikube and Kubectl are installed
  • The Minikube cluster is running (minikube start)

In my case the versions are:

Minikube: v0.22.3, Kubectl Client 1.9 and (Kubernetes) Server 1.7:

The steps I went through:

Git Clone the GitHub Repository: https://github.com/Yolean/kubernetes-kafka

From the root directory of the cloned repository, run the following kubectl commands:

(note: I did not know until today that kubectl apply –f can be used with a directory reference and will then apply all yaml files in that directory. That is incredibly useful!)

kubectl apply -f ./configure/minikube-storageclass-broker.yml
kubectl apply -f ./configure/minikube-storageclass-zookeeper.yml

(note: I had to comment out the reclaimPolicy attribute in both files — probably because I am running a fairly old version of Kubernetes)

kubectl apply -f ./zookeeper

kubectl apply -f ./kafka

(note: I had to change API version in 50pzoo and 51zoo as well as in 50kafka.yaml from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 — see https://github.com/kubernetes/kubernetes/issues/55894 for details; again, I should upgrade my Kubernetes version)

To make Kafka accessible from the minikube host (outside the K8S cluster itself)

kubectl apply -f ./outside-services

This exposes Services as type NodePort instead of ClusterIP, making them available for client applications that can access the Kubernetes host.

I also installed (Yahoo) Kafka Manager:

kubectl apply -f ./yahoo-kafka-manager

(I had to change API version in kafka-manager from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 )

At this point, the Kafka Cluster is running. I can check the pods and services in the Kubernetes Dashboard as well as through kubectl on the command line. I can get the Port at which I can access the Kafka Brokers:

And I can access the Kafka Manager at the indicated Port.

Initially, no cluster is visible in Kafka Manager. By providing the Zookeeper information highlighted in the figure (zookeeper.kafka:2181) I can make the cluster visible in this user interface tool.

Finally the eating of the pudding: programmatic production and consumption of messages to and from the cluster. Using the world’s simplest Node Kafka clients, it is easy to see the stuff is working. I am impressed.

I have created the Node application and its package.json file. Then added the kafka-node dependency (npm install kafka-node –save). Next I created the producer:

// before running, either globally install kafka-node (npm install kafka-node) 
// or add kafka-node to the dependencies of the local application var kafka = require('kafka-node')
var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;
var client;
KeyedMessage = kafka.KeyedMessage;
var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaProducer"
var topicName = "a516817-kentekens";
var KAFKA_BROKER_IP = '192.168.99.100:32400';
var kafkaConnectDescriptor = KAFKA_BROKER_IP;
console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
function initializeKafkaProducer(attempt) {
try { console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`);
const client = new kafka.KafkaClient({ kafkaHost: kafkaConnectDescriptor });
console.log("created client");
producer = new Producer(client);
console.log("submitted async producer creation request");
producer.on('ready', function () {
console.log("Producer is ready in " + APP_NAME);
});
producer.on('error', function (err) {
console.log("failed to create the client or the producer " + JSON.stringify(err));
)
} catch (e) {
console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
console.log("Try again in 5 seconds");
setTimeout(initializeKafkaProducer, 5000, ++attempt);
}
}//initializeKafkaProducer
initializeKafkaProducer(1);
var eventPublisher = module.exports;
eventPublisher.publishEvent = function (eventKey, event) {
km = new KeyedMessage(eventKey, JSON.stringify(event));
payloads = [ { topic: topicName, messages: [km], partition: 0 } ];
producer.send(payloads, function (err, data) {
if (err) {
console.error("Failed to publish event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(err));
}
console.log("Published event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(data));
});
} //publishEvent
//example calls: (after waiting for three seconds to give the producer time to initialize)
setTimeout(function () { eventPublisher.publishEvent("mykey", { "kenteken": "56-TAG-2", "country": "nl" }) } , 3000)

and ran the producer:

The create the consumer:

var kafka = require('kafka-node');var client;
var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaConsumer"
var eventListenerAPI = module.exports;
var Consumer = kafka.Consumervar topicName = "a516817-kentekens";console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
console.log("Topic " + topicName);
var KAFKA_BROKER_IP = '192.168.99.100:32400';var consumerOptions = {
kafkaHost: KAFKA_BROKER_IP,
groupId: 'local-consume-events-from-event-hub-for-kenteken-applicatie',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};
var topics = [topicName];var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumerLocal' }, consumerOptions), topics);consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);
consumerGroup.on('connect', function () {
console.log('connected to ' + topicName + " at " + consumerOptions.host);
})
function onMessage(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d'
, this.client.clientId, message.topic, message.partition, message.offset);
}
function onError(error) {
console.error(error);
console.error(error.stack);
}
process.once('SIGINT', function () {
async.each([consumerGroup], function (consumer, callback) {
consumer.close(true, callback);
});
});

and ran the consumer — which duly consumed the event published by the publisher. It is wonderful.

Resources

The main resources is the GitHub Repo: https://github.com/Yolean/kubernetes-kafka . Absolutely great stuff.

Also useful: npm package kafka-node — https://www.npmjs.com/package/kafka-node

Documentation on Kubernetes: https://kubernetes.io/docs/user-journeys/users/application-developer/foundational/#section-2 — with references to Kubectl and Minikube — and the Katakoda playground: https://www.katacoda.com/courses/kubernetes/playground

kafka kubectl kubernetes minikube node

Originally published at technology.amis.nl on April 19, 2018.

--

--

--

Aggregation of articles from Oracle engineers, Groundbreaker Ambassadors, Oracle ACEs, and Java Champions on all things Oracle technology. The views expressed are those of the authors and not necessarily of Oracle.

Recommended from Medium

TT Farm’s Big Giveaway: Incoming 50,000 FTT Airdrop Event!

Creating a simple Christmas Tree with pure CSS

LetsGrowMore Internship

Looking at Next-Gen Answers to Technical Debt — Enabling Sustainable Modernization (Part III)

Extremly Fast Way For Table Row Counts Sql Server Database

Introduction to Bash Script — 4

Stack Traces in 3 Minutes

How to begin your ERP system journey

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Lucas Jellema

Lucas Jellema

Lucas Jellema is solution architect and CTO at AMIS, The Netherlands. He is Oracle ACE Director, Groundbreaker Ambassador, JavaOne Rockstar and programmer

More from Medium

Using Telepresence to intercept microservices on a Kubernetes cluster

Implementing Kong API gateway with gRPC on a Kubernetes cluster

Setup Container Redis, run commands, and publish/subscribe messages

K3S — Lightweight Kubernetes