Web messaging application with Apache Kafka and Elasticsearch

Filip Pandi
inovatrend
Published in
16 min readFeb 2, 2021

Introduction

Have you ever asked yourself how popular streaming platforms or messaging systems work in large companies? Companies like Facebook, Linkedin, Netflix are all using Apache Kafka and Elasticsearch in their products. In these situations where a lot of data needs to be processed very fast, the best practice is to use data streams, queuing, publish, and subscribe systems to achieve that. In this showcase project, particularly, those two technologies, Apache Kafka and Elasticsearch are used to handle large amounts of data.
The application featured in this blog is just a showcase application with many more improvements to come. Explained in the featured headings.

If you are interested in learning how to use distributed big-data technologies in a setup that is a little bit different than usual, this is the blog post for you!
Apache Kafka is a publish and subscribe queuing system which uses two traditional messaging models, shared message queues, and publish-subscribe models.
It provides very fast data processing in real-time which is great for any messaging system in general where there are many requests in a small time window. Kafka is used to achieving high availability and fault tolerance as well as high speed of processing.

Apache Kafka is used for real-time data streaming and processing, Elasticsearch is used for extremely fast searches of stored data. So Kafka and Elasticsearch go very well with each other.
Technologies used for this project are Angular 9, SpringBoot 2.3.0, Elasticsearch 7.9.1, Kafka Client 2.6.0, Confluent platform 5.5.1.

The Confluent Platform is used for its practical use, it contains all the binaries same as open-source Apache Kafka. It contains some other useful software that comes along with it, such as KSQL, Scheme Registry, Connect, Confluent control center, and more. Only the community version is free, so basically it is used for development.

In this showcase project, you are going to learn how to use Apache Kafka as an ingestion pipeline and Elasticsearch for efficient storage and querying data to client.

Concept

An ingestion pipeline is the best way to explain the workflow of this application. Messages enter from the rest interface, sent from Angular client to the rest controller on the backend side, and into Kafka topic. Kafka is in this case used as a distributed pipeline that reads and then writes messages from clients to Elasticsearch. All collected data from Kafka topic is consumed and saved to the Elasticsearch index which is then used to serve clients.

Configuring Apache Kafka

Firstly Confluent 5.5.1 platform or higher versions of the platform need to be downloaded. It requires just to be unzipped on disk, configure the backend server so it can connect to Kafka broker and it is ready for use.
All it needs to be done is to navigate to the Confluent directory on disc and start it with command confluent local start [<service>] path <path-to-confluent> it is simple as that. It should indicate if Kafka broker and Zookeeper are started successfully in a console window.

To connect to Kafka broker with SpringBoot backend server it is necessary to add a library for Kafka client version 2.5.0 or higher to Gradle or Maven dependencies. Inside project properties, the bootstrap server address needs to be specified like this: Kafka.bootstrap.servers=127.0.0.1:9092 and Kafka should be available.

Configuring Elasticsearch

Requires version 7.9.1 of Elasticsearch or higher. Just download Elasticsearch and run it locally, that is it for the development environment. Detailed installation guide here.

Frontend — Angular

Initialized setup —Angular side

As mentioned before, Angular is chosen for frontend technology, React or similar frontend technologies would do just fine as well.

Node.js needs to be installed and after that new Angular project could be created, create a new Angular project after Node.js is installed, “ng new ‘project_name’”. After the project is created, adjust routing, and create new components as follows.

Configuring proxy

What is exactly a proxy? Well, the proxy is a gateway (middleware) between the client and the server that provides communication between the client and the server.

For Angular — server communication proxy configuration needs to be configured just for the development stage of a project. To do that “proxy.config.json” file needs to be created inside the project directory.

{
“/kep/*”: {
“target”: “http://localhost:9090",
“secure”: false,
“logLevel”: “debug”,
“changeOrigin”: true
}
}

One last thing, “proxy.config.json” could be added to “angular.json” like this,

“serve”: {
“builder”: “@angular-devkit/build-angular:dev-server”,
“options”: {
“proxyConfig”: “proxy.conf.json”,
“browserTarget”: “KEPClient:build”
},

so when “ng serve” command is called it is using proxy configuration. Another way is just to call “ng serve — proxy-config proxy.config.json” to start the Angular server with proxy configuration.

Angular dependencies

Project is using these dependencies and versions of those dependencies (“package.json” file):

“dependencies”: {
“@angular/animations”: “~9.0.1”,
“@angular/common”: “~9.0.1”,
“@angular/compiler”: “~9.0.1”,
“@angular/core”: “~9.0.1”,
“@angular/forms”: “~9.0.1”,
“@angular/localize”: “⁹.1.12”,
“@angular/platform-browser”: “~9.0.1”,
“@angular/platform-browser-dynamic”: “~9.0.1”,
“@angular/router”: “~9.0.1”,
“@auth0/angular-jwt”: “⁵.0.1”,
“@ng-select/ng-select”: “⁴.0.4”,
“bootstrap”: “⁴.4.1”,
“chart.js”: “².9.3”,
“chartjs-plugin-datalabels”: “⁰.7.0”,
“chartjs-plugin-zoom”: “⁰.7.7”,
“hammerjs”: “².0.8”,
“ng-multiselect-dropdown”: “⁰.2.10”,
“ng-select”: “1.0.0-rc.5”,
“ng2-charts”: “².3.0”,
“ngx-spinner”: “⁹.0.1”,
“rxjs”: “~6.5.4”,
“tslib”: “¹.10.0”,
“zone.js”: “~0.10.2”
}

Components and routing overview

Necessary components are login component containing login form and authentication service for backend authentication.
Messenger component a core component for sending and receiving messages from user to user.
Monitoring component that is used for monitoring Kafka consumer lag for a topic that contains messages and register components that allows the user to register and use the system.

Firstly, in the app component, the routing module needs to be created, the routing module that contains URI paths to other components and it is crucial for navigation to other components.

const routes: Routes = [
{ path: ‘’, redirectTo: ‘/login’, pathMatch: ‘full’},
{ path: ‘˛˛login’, component: LoginComponent},
{ path: ‘register’, component: RegisterComponent},
{ path: ‘messenger’, component: MessengerComponent},
{ path: ‘monitoring’, component: MonitoringComponent}
];

The app component is going to be used as a navigation component. The landing page is going to be a login component. For login component authentication and interceptor service implementations are necessary.

Authentication (Angular side)

Authentication service communicates with the server and sends an authentication request containing credentials and in response, the server returns the JWT token.

The application needs an authentication interceptor to check if the session is expired. It is a simple class, containing two methods — seen below in code snippet, that implements HttpInterceptor.

export class AuthInterceptor implements HttpInterceptor {private timeout: number;
tokenSubscription = new Subscription()
constructor(private http: HttpClient,
private router: Router,
private jwtHelper: JwtHelperService,
private authService: AuthService) {}
intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
const token = localStorage.getItem(‘token’);
if (!token) {
this.router.navigate([‘/’]);
return next.handle(req);
}
const req1 = req.clone({
headers: req.headers.set(‘Authorization’, `Bearer ${token}`),
});
this.timeout = this.jwtHelper.getTokenExpirationDate(token).valueOf() — new Date().valueOf();
this.expirationCounter(this.timeout);
return next.handle(req1);
}
expirationCounter(timeout) {this.tokenSubscription.unsubscribe();
this.tokenSubscription = of(null).pipe(delay(timeout)).subscribe((expired) => {
console.log(‘EXPIRED!!’);
this.authService.logout();
this.router.navigate([“/login”]);
});
}
}

This application uses the JWT authentication method — client and server, if you want to use JWT, you can check out the Angular JWT authentication guide here.

Adding a simple login form and registration form will suit this application needs, it doesn’t need to be anything fancy.

Messenger component

After authentication is done it is time to implement the core part, the messenger component. The basic concept is to create a user that sends messages to another user which receives those messages and vice versa. So this would be two-way communication between two users. The messenger component on the Angular side is not doing anything except sending and receiving messages. Implement a service that contains two methods for receiving messages and for sending them to the receiver user.

public loadMessage(senderId: number, receiverId: number): Observable<KafkaMessageModel[]> { return this.http.get<KafkaMessageModel[]> ( this.baseUrl + ‘/messenger/receive/’ + senderId + ‘/’ + receiverId);
}
public sendMessage(kafkaMessage: KafkaMessageModel): Observable<KafkaMessageModel> { return this.http.post<KafkaMessageModel>(this.baseUrl + ‘/messenger/send’, kafkaMessage);
}

All that is left to be done is to implement a user interface, messenger component, for messenger functionality. Messenger interface allows logged-in users to choose recipient user which will receive messages from the logged-in user, a field for writing a message to a chosen recipient user, and an inbox area to display all the outgoing and incoming messages for conversation between the logged-in user and chosen recipient user. The next image shows a simple messenger interface revealing some conversation between two user’s messages from the perspective of user ‘test01’.

Kafka consumer lag monitor and component

Kafka lag monitor is an optional component that is used to graphically present Kafka consumer lag which is one of the most important Kafka metadata to monitor. It is conceived in a way that users with administrative roles can access this information. Consumer lag is a number that represents how many messages the Kafka consumer is behind the Kafka producer, or how many messages are inside the topic that haven’t been processed by the consumer.

This application uses just a canvas element (Chart.js) to display the graph that shows Kafka consumer lag, but it is up to the designer how the designer wants to display it. It is also possible to monitor all the Kafka metadata by opening the JMX (Java management extensions) port on Kafka broker, a detailed guide on how to set it up for the Confluent platform you can find here.

The next image shows an example of a Kafka consumer lag monitor. For a full guide on how to implement all kinds of canvas graphs, you can check it out here.

Backend — SpringBoot/Apache Kafka/Elasticsearch

Initialized setup — Server-side

The project is using SpringBoot version 2.3.0 with Elasticsearch version 7.9.1 and Apache Kafka client 2.6.0 to process and store user messages. Gradle version 6.6.1 is used as a build tool for the project. Postgresql version 12.3 is used as a database to store registered users.
MVC architectural pattern is used, which means that the project is separated into three main logical components — model, view, and controller components. It is encouraging to use the “Spring Initializer” website to set up the project and its dependencies.

Library dependencies used in the project are as follows in the next code snippet.

dependencies {
implementation ‘org.springframework.boot:spring-boot-starter-actuator’
implementation ‘org.springframework.boot:spring-boot-starter-data-elasticsearch:2.3.3.RELEASE’
implementation ‘org.springframework.boot:spring-boot-starter-data-jpa’
compile ‘org.springframework.boot:spring-boot-starter-jdbc’
compile ‘org.springframework.boot:spring-boot-starter-web:2.2.6.RELEASE’
compile ‘org.postgresql:postgresql:42.2.5’
compile ‘io.jsonwebtoken:jjwt:0.9.1’
runtime ‘org.postgresql:postgresql’
implementation ‘org.springframework.boot:spring-boot-starter-security’
implementation ‘org.apache.kafka:kafka-streams’
compile ‘org.apache.kafka:kafka-clients:2.6.0’
compile ‘com.google.code.gson:gson:2.8.6’
compileOnly ‘org.projectlombok:lombok’
compile ‘org.apache.logging.log4j:log4j-core:2.8.2’
annotationProcessor ‘org.projectlombok:lombok’
testImplementation(‘org.springframework.boot:spring-boot-starter-test’) {
exclude group: ‘org.junit.vintage’, module: ‘junit-vintage-engine’
}
testImplementation ‘org.springframework.kafka:spring-kafka-test’
compile ‘javax.servlet:javax.servlet-api:4.0.1’
testImplementation ‘org.springframework.security:spring-security-test’
compile ‘com.auth0:java-jwt:2.0.1’
compile ‘com.fasterxml.jackson.core:jackson-databind:2.11.1’
}

Authentication (Server-side)

Project security is based on the JWT authentication method. There are many ways to implement JWT security, in general, you need to generate a JWT token on user login if the user is not already authenticated and return the generated token to the client. For each client request, the backend needs to check if the JWT token is valid or has expired. JWT request filter class can be created implementing “OncePerRequestFilter” that will then allow you to override the “doFilterInternal” method, one of the ways to override mentioned method would be like this next code snippet.

@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
final String tokenHeader = request.getHeader(AUTHORIZATION);
String username = null;
String jwtToken = null;
if (tokenHeader != null && tokenHeader.startsWith(“Bearer “)) {
jwtToken = tokenHeader.substring(7);
try {
username = jwtTokenUtil.getUsernameFromToken(jwtToken);
} catch (IllegalArgumentException e) {
logger.debug(“Error while getting username from token!”, e);
} catch (ExpiredJwtException e) {
logger.debug(“Token expired!”, e);
}
} else {
logger.warn(“Jwt token doesn’t start with Bearer String…”);
}
if (username != null &&
SecurityContextHolder.getContext().getAuthentication() == null) {
UserDetails user = this.userManagerImpl.loadUserByUsername(username);
if (jwtTokenUtil.validateToken(jwtToken, user)) {UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken =
new UsernamePasswordAuthenticationToken(user, null, user.getAuthorities());
usernamePasswordAuthenticationToken.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));SecurityContextHolder.getContext().setAuthentication(usernamePasswordAuthenticationToken);
}
}
filterChain.doFilter(request, response);
}

Also, JWT utility and general web security configuration classes are necessary. JWT utility class contains all the methods needed for generating and validating user tokens and the web security configuration class contains HTTP security URI matchers and user authority roles and permissions.

@Component
public class JwtTokenUtil implements Serializable {
public static final long JWT_TOKEN_VALIDITY = 5 * 60 * 60;
@Value(“${jwt.secret}”)
private String secret;
public String getUsernameFromToken(String token) {
return getClaimFromToken(token, Claims::getSubject);
}
public Date getExpirationDateFromToken(String token) {
return getClaimFromToken(token, Claims::getExpiration);
}
public <T> T getClaimFromToken(String token, Function<Claims, T> claimsResolver) {
final Claims claims = getAllClaimsFromToken(token);
return claimsResolver.apply(claims);
}
private Claims getAllClaimsFromToken(String token) {
return Jwts.parser().setSigningKey(secret).parseClaimsJws(token).getBody();
}
private Boolean isTokenExpired(String token) {
final Date expiration = getExpirationDateFromToken(token);
return expiration.before(new Date());
}
public String generateToken(UserDetails userDetails) {
Map<String, Object> claims = new HashMap<>();
return doGenerateToken(claims, userDetails.getUsername());
}
private String doGenerateToken(Map<String, Object> claims, String subject) {
return Jwts.builder().setClaims(claims).setSubject(subject).setIssuedAt(new Date(System.currentTimeMillis()))
.setExpiration(new Date(System.currentTimeMillis() + JWT_TOKEN_VALIDITY * 1000))
.signWith(SignatureAlgorithm.HS512, secret).compact();
}
public Boolean validateToken(String token, UserDetails userDetails) {
final String username = getUsernameFromToken(token);
return (username.equals(userDetails.getUsername()) && !isTokenExpired(token));
}
}

Apache Kafka and Elasticsearch

Let’s start with Kafka first, Kafka is used as a distributed pipeline for messages and it is also used for monitoring purposes, to check if a consumer is working correctly. For consumer lag explanation you can check out the “Kafka consumer lag monitor component” section. Possible future upgrades to Kafka in this application would cover data stream processing, particularly in this project stream processing could be used for creating blocklists of users — blocking messages, recording user events, processing user statistics, and many more. In this version of the application, there is still no stream processing, it will be a separate blog post just to cover that.

There are three core configurations for base running Kafka, producer configuration, consumer configuration, and the broker configuration. In this post, only producer and consumer configuration will be mentioned. Producer and consumer configuration is prioritized for application perspective so it is covered in more details here.

The method for creating a producer, with all the configuration properties, would look like the code snippet below.

public KafkaProducer createKafkaProducer(String ack, Class<StringSerializer> keySerializer, Class<KafkaJsonSerializer> valueSerializer) {Properties producerProperties = new Properties();producerProperties.put(ProducerConfig.ACKS_CONFIG, ack);producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 1);producerProperties.put(ProducerConfig.RETRIES_CONFIG, 10);producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 50000);producerProperties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500);producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, “true”);return new KafkaProducer<>(producerProperties);
}

Mostly there are explanations for all producer properties inside ProducerConfig.java class.
A basic explanation of why is some of the configurations used will be explained next.
For development, acks property can be set to “all”.
Acks property is set to “all” because of Idempotence property. Idempotence property can be set to “true” only if acks property is set to “all”. Idempotence configuration property is explained in this heading.

A “bootstrap server” configuration points to the URL of Kafka brokers, so for local usage, it would be localhost (127.0.0.1) on port 9092.

Key and value serializers are classes that the developer provides, depending on what data needs to be inserted into the Kafka topic. In this application case, there is a String for the key serializer and the custom KafkaJsonSerializer class. A custom serializer is used to map entity class to JSON. Avro can be used as well but it is more complex than this method, scheme registry is necessary if Avro is used. If you want a custom Json serializer, it can be done, simply, like this.

public class KafkaJsonSerializer implements Serializer {
private Logger logger = LogManager.getLogger(this.getClass());
@Override
public void configure(Map configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, Object data) {
return new byte[0];
}
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
byte[] retVal = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
retVal = objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
logger.error(e.getMessage());
}
return retVal;
}
@Override
public void close() {
}
}

Linger property will guarantee that the producer will wait for the defined time, in milliseconds, until the producer sends messages. That will give some time for data messages to batch into larger bulk, also if batch size configuration property limit is not reached, and then data will be sent. If the batch size configuration property limit is reached before linger property delay time is not reached, messages are going to be sent even if linger time is not reached. The default value is 0.
Retries configuration property is limiting how many times a producer will try to resend message records if a recoverable exception happens.

Retry backoff time in milliseconds is time that binds the producer to wait a specific time until it retries to resend a message.

Request time out is the time that the client will wait for the request.

Idempotence configuration property guarantees that messages won’t have duplicates, but acks property needs to be set to “all” and retries greater than 0.

The method for creating a consumer, with all the configuration properties, would look like this.

public Consumer createKafkaConsumer(String groupId, StringDeserializer keyDeserializer, JsonDeserializer valueDeserializer) {Properties consumerProperties = new Properties();consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “latest”);consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);return new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer);
}

Group id is a unique property that defines which consumer group a consumer belongs to.

Auto offset reset config property is used automatically resets offset to latest, in this case. This means that it will start reading records from the last known record out of the topics partition.

Enable auto-commit config is a property that allows for consumer offset to be periodically committed, it is used to avoid committing offsets by hand witch can lead to numerous problems

Max poll records config property bounds a maximum number of records returned in a single call. It is set to 1000 because the single message size is pretty small so it can handle a lot of messages in a single poll if needed.

Up next is to create a method that creates a topic if the topic does not already exist. This is one of the ways to do that.

public void createTopicIfNotExist(String topicName, Long messageTopicStorageRetentionMS,String defaultReplicaitonFactor) throws InterruptedException, ExecutionException {synchronized (createTopicLock) {
if (existingTo˛˛pics.contains(topicName)) {
boolean topicExists = kafkaAdmin.listTopics().names().get().contains(topicName);
if (!topicExists) {
Map<String, String> topicConfMap = new HashMap<>();
topicConfMap.put(TopicConfig.RETENTION_MS_CONFIG,
messageTopicStorageRetentionMS.toString());
topicConfMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);int messageTopicStorageNumPartitions = 3;NewTopic topic = new NewTopic(topicName, messageTopicStorageNumPartitions,
Short.parseShort(defaultReplicaitonFactor))
.configs(topicConfMap);
List<NewTopic> resultTopicList = new ArrayList<>(); resultTopicList.add(topic);
kafkaAdmin.createTopics(resultTopicList);
}
}
}
}

So the method needs a topic name under which topic will be created, retention time is the maximum time that records will be held inside a topics partition, after that time everything will be discarded and the replication factor which guarantees that records will be replicated for that number of times if there are enough brokers for replicas to be stored on. Also, there is a clean-up policy config property that is set to delete, which means that old records will be deleted if retention time is reached or the size limit has been reached.

After the configuration is done, messages need to be produced so the producer instance needs to be created to do so. The producer will be created after the dependency injection is done on the startup.

@PostConstruct
private void createProducerOnStartUp() throws ExecutionException, InterruptedException {
kafkaElasticUtils.createTopicIfNotExist(kafkaElasticUtils.messageTopicStorage,
kafkaElasticUtils.messageTopicStorageRetentionMS, kafkaElasticUtils.defaultReplicaitonFactor);
try {
messageProducer = kafkaElasticUtils.createKafkaProducer(“1”, StringSerializer.class, KafkaJsonSerializer.class);
logger.debug(“Successfully created kafka producer: {}”, messageProducer);
} catch (Exception e) {
logger.error(“Error while creating Kafka producer: {}”, e.getMessage());
}
}

If a producer is successfully created, the application can produce messages to Kafka.

public void startProducing(KafkaMessage kafkaMessage) {try {
messageProducer.send(new ProducerRecord<>(kafkaElasticUtils.messageTopicStorage, kafkaMessage.receiverUserId.toString(), kafkaMessage));
logger.debug(“Message successfully sent to topic: {} with receiver id: {}”, kafkaElasticUtils.messageTopicStorage, kafkaMessage.receiverUserId.toString());
} catch (Exception e) {
logger.error(“Error sending message with receiver id: {} — to topic: {} “, kafkaMessage.receiverUserId.toString(), kafkaElasticUtils.messageTopicStorage);
}
}

The same thing is done for the consumer — it is created on startup after dependency injection is done.

@PostConstruct
private void createKafkaConsumerOnStartup() throws ExecutionException, InterruptedException {
kafkaElasticUtils.createTopicIfNotExist(kafkaElasticUtils.messageTopicStorage,
kafkaElasticUtils.messageTopicStorageRetentionMS, kafkaElasticUtils.defaultReplicaitonFactor);
consumer = kafkaElasticUtils.createKafkaConsumer(GROUP_ID, new StringDeserializer(), new JsonDeserializer<>(KafkaMessage.class));List<String> topics = new ArrayList<>();
topics.add(kafkaElasticUtils.messageTopicStorage);
logger.debug(“Consumer {} successfully created!”, consumer);
consumer.subscribe(topics);
logger.debug(“Consumer {} successfully subscribed to topics: {}!”, consumer, topics);
}

After the successful creation of a consumer, the application can start consuming messages stored in a Kafka topic and store them in Elasticsearch. Also at this point, the consumer lag can be calculated, so it is processed as well.

private void saveMessageToElasticAndProcessTopicLag() {synchronized (consumer) {
ConsumerRecords<String, KafkaMessage> consumerRecords = consumer.poll(Duration.ofMillis(5));
if (consumerRecords.count() > 0) {
consumerRecords.forEach(crv -> {
Long topicLag = processTopicLag(crv.offset(), crv.topic());
kafkaLagProcessor.addKafkaTopicLag(new KafkaMonitorMetrics(DateTime.now().getMillis(), topicLag, crv.topic()));
logger.info(“Topic {} lag: {} “, crv.topic(), topicLag); try {
kafkaElasticsearchManager.saveKafkaMessageToElastic(crv.value());
} catch (Exception e) {
logger.error(“Error while saving to Elasticsearch: {}”, e.getMessage());
}});
}
}
}

To process consumer lag simply subtract the end offset with the current offset like this.

public long processTopicLag(long offset, String topicName) { List<TopicPartition> partitions = consumer.partitionsFor(topicName)
.stream().map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toList());
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);long topicLag = 0;
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
Long currentOffset = offset;
long partitionLag = endOffset.getValue() — currentOffset;
topicLag += partitionLag;
}
return topicLag;
}

To implement the Elasticsearch repository it is enough to create an Elasticsearch repository that extends “ElasticsearchRepository” which then allows the use of methods like a normal JPA repository for an entity that is customized for Elasticsearch use. Index name and schema for an entity is automatically created when the object is sent to Elasticsearch.

@Document(indexName = “kafka_message”)
public class KafkaMessage {
@Id
@Field(type = FieldType.Keyword)
@JsonProperty(“id”)
public String id;
@JsonProperty(“message”)
public String message;
@JsonProperty(“senderUsername”)
public String senderUsername;
@JsonProperty(“senderUserId”)
public Long senderUserId;
@JsonProperty(“receiverUserId”)
public Long receiverUserId;
}

To serve message data back to users, the application needs to load messages from Elasticsearch and return messages to the appropriate user’s conversation. This means that both users need to receive all the messages from each other and to each other.

public List<KafkaMessage> loadFromElasticsearch(Long senderId, Long receiverId) { List<KafkaMessage> conversationMessageList = new ArrayList<>(); List<KafkaMessage> receiverMessageList;
List<KafkaMessage> senderMessageList;
try {
receiverMessageList = kafkaElasticsearchManager.loadAllMessagesForUser(receiverId, senderId);
senderMessageList = kafkaElasticsearchManager.loadAllMessagesForUser(senderId, receiverId); conversationMessageList.addAll(senderMessageList);
conversationMessageList.addAll(receiverMessageList);
} catch (Exception e) {
logger.error(“Error while loading messages out of ES: {}”, e.getMessage());
}
if (conversationMessageList.size() > 0) {
logger.debug(“ Number of records: {} — pulled out of ES index: {}! For sender user with id: {} and receiver user with id: {}”,
conversationMessageList.size(), kafkaElasticUtils.elasticIndex, senderId, receiverId);
conversationMessageList.sort(Comparator.comparing(kafkaMessage -> kafkaMessage.id, Comparator.reverseOrder()));
}
return conversationMessageList;
}

The application is calling the method every 3 seconds and if there are some messages inside the topic, firstly those messages are written down, from Kafka, into Elasticsearch, and after that processed messages are pulled out of Elasticsearch back to the intended user.

Conclusion and future milestones

This showcase project teaches you how to use Angular as a frontend and SpringBoot with Apache Kafka and Elasticsearch as a backend in the most simple way.
Apache Kafka can be horizontally scaled if needed so it provides a lot of flexibility in the field of system upgrades, and it is one of the best technologies for processing data, and data buffering.

In future versions of the application, it is intended to use Kafka streams as a data processor and implement a message block service where users could block messages of another user. Also, Kafka streams will be used to aggregate statistics from users to showcase what Kafka can do.

Elasticsearch is also upgradable horizontally very easily which also provides redundancy.
If you want to monitor Elasticsearch, Kibana is a graphical interface that can be used with Elasticsearch to monitor the health status of Elastic cluster and easy access to stored data with graphical interpretations.

Code is publicly available on GitHub. You can access backend code here, and frontend code here.

Keep following the future posts for more on this topic, happy learning!

--

--