Getting started with SpringBoot and Kafka

Mayank dixit
Average problems quick solutions
3 min readDec 10, 2018
  1. Install Kafka and start zkServer and kafka-server
  2. Uncomment listeners & advertised.listerners in server.properties.
  3. SpringBoot App for kafka (sample)
  4. Run kafka in local
  5. gradle build this repo
  6. Start the app using: java -jar build/libs/spring-kafka-0.0.1-SNAPSHOT.jar
  7. Publish using http://localhost:8080/greetings?message=HelloManThisIsPublished
  8. Check terminal logs for publish and polling logs. Cheers!

App Directory structure. Download from git repo.

├── build.gradle
└── src
└── main
├── java
│ └── com
│ └── springkafka
│ └── streamkafka
│ ├── StreamkafkaApplication.java
│ ├── config
│ │ └── StreamsConfig.java
│ ├── model
│ │ └── Greetings.java
│ ├── service
│ │ ├── GreetingsListener.java
│ │ └── GreetingsService.java
│ ├── stream
│ │ └── GreetingsStreams.java
│ └── web
│ └── GreetingsController.java
└── resources
└── application.yaml

resources/application.yaml

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json

build.gradle

buildscript {
ext {
springBootVersion = '2.1.1.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.springkafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven { url "https://repo.spring.io/milestone" }
}
ext['springCloudVersion'] = 'Greenwich.M3'
// High chance you might run into compatibility issue: Chnage springCloudVersion after verifying her: http://start.spring.io/actuator/info
dependencies {
compile("org.springframework.boot:spring-boot-starter-web")
implementation('org.springframework.boot:spring-boot-starter-actuator')
implementation('org.springframework.cloud:spring-cloud-stream')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka')
implementation('org.springframework.kafka:spring-kafka')
runtimeOnly('org.springframework.boot:spring-boot-devtools')
compileOnly('org.projectlombok:lombok')
testImplementation('org.springframework.boot:spring-boot-starter-test')
testImplementation('org.springframework.cloud:spring-cloud-stream-test-support')
testImplementation('org.springframework.kafka:spring-kafka-test')
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}

StreamkafkaApplication.java

package com.springkafka.streamkafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamkafkaApplication {
public static void main(String[] args) {
SpringApplication.run(StreamkafkaApplication.class, args);
}
}

config/StreamsConfig.java

package com.springkafka.streamkafka.config;
import com.springkafka.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

model/Greetings.java

package com.springkafka.streamkafka.model;
// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter @Setter @ToString @Builder
public class Greetings {
private long timestamp;
private String message;
}

service/GreetingsListener.java

package com.springkafka.streamkafka.service;
import com.springkafka.streamkafka.model.Greetings;
import com.springkafka.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class GreetingsListener {
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
log.info("Received greetings: {}", greetings);
}
}

service/GreetingsService.java

package com.springkafka.streamkafka.service;
import com.springkafka.streamkafka.model.Greetings;
import com.springkafka.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
@Service
@Slf4j
public class GreetingsService {
private final GreetingsStreams greetingsStreams;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greetings greetings) {
log.info("Sending greetings {}", greetings);
MessageChannel messageChannel = greetingsStreams.outboundGreetings();
messageChannel.send(MessageBuilder
.withPayload(greetings)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}

stream/GreetingsStreams.java

package com.springkafka.streamkafka.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface GreetingsStreams {
String INPUT = "greetings-in";
String OUTPUT = "greetings-out";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}

web/GreetingsController.java

package com.springkafka.streamkafka.web;
import com.springkafka.streamkafka.model.Greetings;
import com.springkafka.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GreetingsController {
private final GreetingsService greetingsService;
public GreetingsController(GreetingsService greetingsService) {
this.greetingsService = greetingsService;
}
@GetMapping("/greetings")
@ResponseStatus(HttpStatus.ACCEPTED)
public void greetings(@RequestParam("message") String message) {
Greetings greetings = Greetings.builder()
.message(message)
.timestamp(System.currentTimeMillis())
.build();
greetingsService.sendGreeting(greetings);
}
}

PS: gist
Tags: java-kafka, springboot, kafka, kafka in 5 minutes, quick start kafka, kafka and java, kafka and springboot

--

--

Mayank dixit
Average problems quick solutions

Web & open source enthusiast. Interested in #code #comedy #music and #kitchen. Learns, writes and shares tech stuff. Wannabe product guy.