Kafka in Kubernetes with Spring

Lightphos
actual-tech
Published in
6 min readJul 24, 2020

We install Kafka in Kubernetes (k8s) with Istio Service Mesh and a Spring boot application to send and receive messages with Kafka.

What does Kafka give you?
Speed. High Volume, across commodity hardware. Guaranteed ordering. Message Replay. Horizontal (Distributed) Scaling. Replication for resilience.

Where to use it?
Event driven applications. Event sourcing with event logs. Metrics. Real time stream processing. Used in areas where RabbitMQ and AMQP are unable to handle the volume and speed needed.

Our Setup
Docker desktop in macos, with its in built k8s. Set the resources to at least 6G, and cores to 2+.

Installing Kafka on K8s

This is based on https://strimzi.io/quickstarts/

If you have followed my previous posts with Istio the following namespace should already have been created.

kubectl create namespace vadal

and this namespace has already been assigned to Istio for sidecar injection.

kubectl label namespace vadal istio-injection=enabled

Kafka Setup

Add required CRDs

kubectl apply -f ‘https://strimzi.io/install/latest?namespace=vadal' -n vadal

Create the persistent kafka cluster

kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n vadal

Wait for it.

kubectl wait kafka/my-cluster — for=condition=Ready — timeout=300s -n vadal

Check the pods

kc get pods -n vadal | grep cluster
my-cluster-entity-operator-6b7d7657dd-2pv6s 4/4 Running 1 76s
my-cluster-kafka-0 3/3 Running 0 99s
my-cluster-zookeeper-0 2/2 Running 0 2m10s
strimzi-cluster-operator-6c9d899778-ql9nn 2/2 Running 0 4m49s

Check Connectivity

Send

kubectl -n vadal run kafka-producer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never --bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

If you don’t see a command prompt, try pressing enter.

hi there
>

Receive

kubectl -n vadal run kafka-consumer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never --bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
{my-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

hi there

View in Kiali

This will only work if you have Istio installed with all the goodies. Also you will need to have followed the example using a namespace labelled with istio injection (see above).

istioctl dashboard kiali

A lot going on here as you can see. There are the bootstrap, broker and zookeeper pods at play.

Spring Kafka

Application

@SpringBootApplication
@RestController
@Slf4j
public class VadalApplication {

public static void main(String[] args) {
SpringApplication.run(VadalApplication.class, args);
}

private static final String TOPIC = "my-topic";

private final List<String> CONSUMED_MESSAGES = new ArrayList<>();

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@KafkaListener(topics = TOPIC, groupId = "group_id")
public void consume(String m) {
log.info("Message consumed: {}", m);
CONSUMED_MESSAGES.add(m);
}

@GetMapping("/pub/{m}")
public void produce(@PathVariable String m) {
log.info("Message produced: {}", m);
kafkaTemplate.send(TOPIC, m);
}

@GetMapping("/get")
public List<String> get() {
log.info("Get consumed messages");
List<String> l = new ArrayList<>(CONSUMED_MESSAGES);
CONSUMED_MESSAGES.clear();
return l;
}

}

The Spring Kafka wiring will create the topic if it does not exist (no need for the NewTopic method although you can add it).

The /pub endpoint publishes the message string, the KafkaListener receives the messages and stores them in a list. The /get endpoint retrieves from this list. This is obviously a contrived example to demonstrate Kafka interaction with Java Spring.

If you have multiple instances, then to save the message, you would need a common store such as a Db or a DataGrid.

POM

    <dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>

I’ve left in spring-boot-starter-data-rest from previous applications, it can be replaced by spring-boot-starter-web if you wish. The parent pom has the main spring, lombok etc dependencies (see the full code here).

application.yml

server.port: 7777

spring:
kafka:
consumer:
bootstrap-servers: my-cluster-kafka-bootstrap:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: my-cluster-kafka-bootstrap:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


management:
endpoints:
web:
exposure:
include: "*"

Note the bootstrap-servers are those we created earlier with strimzi CRDs.

Create the image:

mvn spring-boot:build-image

This should create a vadal-kafka-message:0.0.1-SNAPSHOT docker image.

Deploy and Serve

kubectl create deployment vkafka — image vadal-kafka-message:0.0.1-SNAPSHOT -n vadal

kubectl expose deployment vkafka — type NodePort — port 8888 — target-port 7777 -n vadal

kc get svc -n vadal

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 10.107.228.196 <none> 9091/TCP,9092/TCP,9093/TCP 38h
my-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 38h
my-cluster-zookeeper-client ClusterIP 10.111.83.76 <none> 2181/TCP 38h
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 38h
vkafka NodePort 10.102.114.125 <none> 8888:32758/TCP 37h

Send a message through our application

curl -i localhost:32758/pub/test-message
HTTP/1.1 200 OK
content-length: 0
date: Mon, 20 Jul 2020 13:32:56 GMT
x-envoy-upstream-service-time: 122
server: istio-envoy
x-envoy-decorator-operation: vkafka.vadal.svc.cluster.local:8888/*

Check the logs

kc get po -n vadal
NAME READY STATUS RESTARTS AGE
vkafka-546f454db9-kpkv9 2/2 Running 1 36h

kc logs vkafka-546f454db9-kpkv9 -n vadal vadal-kafka-message

2020-07-20 13:32:57.153  INFO 1 --- [nio-7777-exec-2] u.c.a.vadalkafka.VadalApplication        : Message produced: test-message
2020-07-20 13:32:57.162 INFO 1 --- [nio-7777-exec-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-07-20 13:32:57.186 INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-07-20 13:32:57.186 INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-07-20 13:32:57.186 INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1595251977185
2020-07-20 13:32:57.201 INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: tao-huzJTTGmMCqwhtQF1Q
2020-07-20 13:32:57.280 INFO 1 --- [ntainer#0-0-C-1] u.c.a.vadalkafka.VadalApplication : Message consumed: test-message

We can see the listener consuming the message.

Lets grab it via our get endpoint

curl -i localhost:32758/get
HTTP/1.1 200 OK
content-type: application/json
date: Mon, 20 Jul 2020 13:38:37 GMT
x-envoy-upstream-service-time: 12
server: istio-envoy
x-envoy-decorator-operation: vkafka.vadal.svc.cluster.local:8888/*
transfer-encoding: chunked
[“test-message”]

Kafka GUI

There is a Kakfa GUI which can be deployed to K8s.

See:
https://github.com/spring-projects/spring-kafka/tree/master/samples

helm repo add akhq https://akhq.io/

helm install --name akhq akhq/akhq --namespace vadal

Edit akhq-secrets to change the bootstrap.serversfrom kafka:9092 to my-cluster-kafka-bootstrap:9092 (you can do it by hand or use Lens https://k8slens.dev/)

Edit service to provide a nodeport say 30011, delete the akhq pod and let it restart, then you can navigate to it.

http://localhost:30011/my-cluster-plain-text/topic

Conclusion

There we go, Kakfa running in K8s, a spring boot application which reads and writes to it and a GUI to boot.

Further Reading

https://strimzi.io/docs/operators/latest/overview.html#configuration-points-connect_str

Originally published at https://blog.ramjee.uk on July 24, 2020.

--

--