Kafka in Kubernetes with Spring
We install Kafka in Kubernetes (k8s) with Istio Service Mesh and a Spring boot application to send and receive messages with Kafka.
What does Kafka give you?
Speed. High Volume, across commodity hardware. Guaranteed ordering. Message Replay. Horizontal (Distributed) Scaling. Replication for resilience.
Where to use it?
Event driven applications. Event sourcing with event logs. Metrics. Real time stream processing. Used in areas where RabbitMQ and AMQP are unable to handle the volume and speed needed.
Our Setup
Docker desktop in macos, with its in built k8s. Set the resources to at least 6G, and cores to 2+.
Installing Kafka on K8s
This is based on https://strimzi.io/quickstarts/
If you have followed my previous posts with Istio the following namespace should already have been created.
kubectl create namespace vadal
and this namespace has already been assigned to Istio for sidecar injection.
kubectl label namespace vadal istio-injection=enabled
Kafka Setup
Add required CRDs
kubectl apply -f ‘https://strimzi.io/install/latest?namespace=vadal' -n vadal
Create the persistent kafka cluster
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n vadal
Wait for it.
kubectl wait kafka/my-cluster — for=condition=Ready — timeout=300s -n vadal
Check the pods
kc get pods -n vadal | grep cluster
my-cluster-entity-operator-6b7d7657dd-2pv6s 4/4 Running 1 76s
my-cluster-kafka-0 3/3 Running 0 99s
my-cluster-zookeeper-0 2/2 Running 0 2m10s
strimzi-cluster-operator-6c9d899778-ql9nn 2/2 Running 0 4m49s
Check Connectivity
Send
kubectl -n vadal run kafka-producer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never --bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic
If you don’t see a command prompt, try pressing enter.
hi there
>
Receive
kubectl -n vadal run kafka-consumer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never --bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
{my-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)hi there
View in Kiali
This will only work if you have Istio installed with all the goodies. Also you will need to have followed the example using a namespace labelled with istio injection (see above).
istioctl dashboard kiali
A lot going on here as you can see. There are the bootstrap, broker and zookeeper pods at play.
Spring Kafka
Application
@SpringBootApplication
@RestController
@Slf4j
public class VadalApplication {
public static void main(String[] args) {
SpringApplication.run(VadalApplication.class, args);
}
private static final String TOPIC = "my-topic";
private final List<String> CONSUMED_MESSAGES = new ArrayList<>();
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = TOPIC, groupId = "group_id")
public void consume(String m) {
log.info("Message consumed: {}", m);
CONSUMED_MESSAGES.add(m);
}
@GetMapping("/pub/{m}")
public void produce(@PathVariable String m) {
log.info("Message produced: {}", m);
kafkaTemplate.send(TOPIC, m);
}
@GetMapping("/get")
public List<String> get() {
log.info("Get consumed messages");
List<String> l = new ArrayList<>(CONSUMED_MESSAGES);
CONSUMED_MESSAGES.clear();
return l;
}
}
The Spring Kafka wiring will create the topic if it does not exist (no need for the NewTopic method although you can add it).
The /pub endpoint publishes the message string, the KafkaListener receives the messages and stores them in a list. The /get endpoint retrieves from this list. This is obviously a contrived example to demonstrate Kafka interaction with Java Spring.
If you have multiple instances, then to save the message, you would need a common store such as a Db or a DataGrid.
POM
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
I’ve left in spring-boot-starter-data-rest from previous applications, it can be replaced by spring-boot-starter-web if you wish. The parent pom has the main spring, lombok etc dependencies (see the full code here).
application.yml
server.port: 7777
spring:
kafka:
consumer:
bootstrap-servers: my-cluster-kafka-bootstrap:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: my-cluster-kafka-bootstrap:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
management:
endpoints:
web:
exposure:
include: "*"
Note the bootstrap-servers are those we created earlier with strimzi CRDs.
Create the image:
mvn spring-boot:build-image
This should create a vadal-kafka-message:0.0.1-SNAPSHOT docker image.
Deploy and Serve
kubectl create deployment vkafka — image vadal-kafka-message:0.0.1-SNAPSHOT -n vadal
kubectl expose deployment vkafka — type NodePort — port 8888 — target-port 7777 -n vadal
kc get svc -n vadal
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 10.107.228.196 <none> 9091/TCP,9092/TCP,9093/TCP 38h
my-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 38h
my-cluster-zookeeper-client ClusterIP 10.111.83.76 <none> 2181/TCP 38h
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 38h
vkafka NodePort 10.102.114.125 <none> 8888:32758/TCP 37h
Send a message through our application
curl -i localhost:32758/pub/test-message
HTTP/1.1 200 OK
content-length: 0
date: Mon, 20 Jul 2020 13:32:56 GMT
x-envoy-upstream-service-time: 122
server: istio-envoy
x-envoy-decorator-operation: vkafka.vadal.svc.cluster.local:8888/*
Check the logs
kc get po -n vadal
NAME READY STATUS RESTARTS AGE
vkafka-546f454db9-kpkv9 2/2 Running 1 36hkc logs vkafka-546f454db9-kpkv9 -n vadal vadal-kafka-message
2020-07-20 13:32:57.153 INFO 1 --- [nio-7777-exec-2] u.c.a.vadalkafka.VadalApplication : Message produced: test-message
2020-07-20 13:32:57.162 INFO 1 --- [nio-7777-exec-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2020-07-20 13:32:57.186 INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-07-20 13:32:57.186 INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-07-20 13:32:57.186 INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1595251977185
2020-07-20 13:32:57.201 INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: tao-huzJTTGmMCqwhtQF1Q
2020-07-20 13:32:57.280 INFO 1 --- [ntainer#0-0-C-1] u.c.a.vadalkafka.VadalApplication : Message consumed: test-message
We can see the listener consuming the message.
Lets grab it via our get endpoint
curl -i localhost:32758/get
HTTP/1.1 200 OK
content-type: application/json
date: Mon, 20 Jul 2020 13:38:37 GMT
x-envoy-upstream-service-time: 12
server: istio-envoy
x-envoy-decorator-operation: vkafka.vadal.svc.cluster.local:8888/*
transfer-encoding: chunked
[“test-message”]
Kafka GUI
There is a Kakfa GUI which can be deployed to K8s.
See:
https://github.com/spring-projects/spring-kafka/tree/master/samples
helm repo add akhq https://akhq.io/
helm install --name akhq akhq/akhq --namespace vadal
Edit akhq-secrets to change the bootstrap.serversfrom kafka:9092 to my-cluster-kafka-bootstrap:9092 (you can do it by hand or use Lens https://k8slens.dev/)
Edit service to provide a nodeport say 30011, delete the akhq pod and let it restart, then you can navigate to it.
http://localhost:30011/my-cluster-plain-text/topic
Conclusion
There we go, Kakfa running in K8s, a spring boot application which reads and writes to it and a GUI to boot.
Further Reading
https://strimzi.io/docs/operators/latest/overview.html#configuration-points-connect_str
Originally published at https://blog.ramjee.uk on July 24, 2020.