Nümunə tətbiq komputer resursları: RAM, CPU və DİSK-in istifadə metrikalarını ani vaxtda monitorinq edir.

Tətbiqdə Kafka-nın həm Core Java ilə həm də Spring Boot ilə istifadə olunmasına eyni zamanda verilənlərin ani vaxtda WebSocket vasitəsiylə front-end -ə göndərilməsinə nəzər salacayıq. Ümumi arxitektura aşağıdakı kimidir:

Core java ilə yazılmış tətbiq 1 saniyə intervalla metrikaları toplayıb JSON formatında Kafka-ya göndərir. Spring Boot-da yazılan tətbiq isə həmin metrikaları Kafka-dan oxuyub ani vaxtda WebSocket vasitəsiylə qrafiklərdə vizualizasiya edir. Başlayaq metrikaların toplanmasından.

Metrika obyekti özündə aşağıdakı field-ləri cəmləyir:

  • totalMemorySize — RAM-ın ümumi ölçüsü
  • freeMemorySize — RAM-da boş yerin ölçüsü
  • memoryUsagePercent — RAM-ın neçə faiz məşğul olduğu
  • totalDiskSize — HDD-nin ümumi ölçüsü
  • freeDiskSize — HDD-də boş yerin ölçüsü
  • diskUsagePercent — HDD-nin neçə faiz məşğul olduğu
  • cpuUsagePercent — CPU-nun neçə faiz məşğul olduğu

Bu metrikaların toplanması üçün com.sun.managementjava.lang.management paketlərindən istifadə olunub. Bu haqda daha ətraflı sonda yerləşdirdiyim GitHub hesabımdakı proyektin qaynaq kodlardan baxa bilərsiniz.

Metrika obyektini doldurduqdan sonra Kafka-ya göndərmək lazımdır. Bunun üçün producer yazmaq lazımdır. Kafka-nın Core Java-da istifadə edilməsi üçün aşağıdakı kitabxana lazımdır:

https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients

Əlavə olaraq biz metrika obyektini Kafka-ya JSON formatın göndərəcəyimiz üçün aşağıdakı kitabxanalar da lazım olacaq:

https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databindhttps://mvnrepository.com/artifact/org.apache.kafka/connect-api

Producer-imizin kodlarına nəzər salaq:

package com.validakhundov.medium.kafka.metric.collector.producer;import com.validakhundov.medium.kafka.metric.collector.model.Metric;
import com.fasterxml.jackson.databind.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.connect.json.JsonSerializer;
import lombok.experimental.UtilityClass;
import java.util.Properties;
@UtilityClass
public class MetricProducer {
public void produceMetric(Metric metric, String token) {

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.0.110:9092”);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
Producer producer = new KafkaProducer(properties);
producer.send(new ProducerRecord<String, JsonNode>(“metric_” + token, new ObjectMapper().valueToTree(metric)));
producer.close();
}}

produceMetric -metodu vasitəsiylə metrika obyektini Kafka-ya göndərəcəyik. Fikir versək metoda parametr olaraq metrika obyektindən əlavə olaraq “token” ötürülür. Bu token-i proqram başladılanda bizə verir və Topic-in adını identifikasiya etmək üçündür. Həmçinin metrikaları monitorinq edəndə daxil etməliyik ki, consumer hansı topic-dən metrikları oxuyacağını bilsin. Producer-imizin sazlamalarını təyin edirik:

  • BOOTSTRAP_SERVERS_CONFIG — Broker-in adresi
  • KEY_SERIALIZER_CLASS_CONFIG — Key-in hansı format-da göndəriləcəyini təyin edir
  • VALUE_SERIALIZER_CLASS_CONFIG — Verilənin hansı format göndəriləcəyini təyin edir

Daha sonra ProducerRecord-in obyetktini yaradıb topic-i və metrikanı əlavə edib göndəririk. Sonda isə producer-i bağlayırıq.

Tətbiqi çalışdıraq və Kafka-ya metrikaların göndərilməsinə baxaq:

mvn package
java -jar target/metric-collector-1.0-jar-with-dependencies.jar

Tətbiq çalışmağa başladıqda bizə aşağıdakı kimi çıxış verəcək:

Token: 9858c2d0-aeec-11ea-8536–2ff45c056340

Bu token-i kopyalayırıq və hələlik konsol consumer -dən istifadə edib metrikaların Kafkaya gəldiyindən əmin oluruq:

./kafka-console-consumer.sh — bootstrap-server 192.168.0.110:9092 — topic metric_9858c2d0-aeec — 8536–2ff45c056340

Və nəticə:

Metrikalar uğurla 1 saniyə intervalla Kafka-ya gəlir. Keçək 2-ci tətbiqimizə.

Spring Boot-da Kafka-nın istifadəsi çox rahatdır. Əvvəlcə lazım olan kitabxananı qeyd edim:

https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka

Sazlamalardan isə sadəcə application.properties-də brokerin adresini aşağıdakı şəkildə qeyd etməyimiz kifayətdir.

spring.kafka.bootstrap-servers=192.168.0.110:9092

Consumer servisimizi yazaq və metrikaları Kafka-dan alıb hələlik sadəcə çıxışda çap edək:

package com.validakhundov.medium.kafka.metric.monitoring.service;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = “metric_” + “${metric.collector.token}”, groupId = “0”)
public void consumeMetric(String metric){
System.out.println(metric);
}
}

Bayaqkı token-i əmr sətrindən parametr olaraq ötürürük:

java -jar target\metric-publisher-v1.0.jar — metric.collector.token=9858c2d0-aeec — 8536–2ff45c056340

Çıxışda metrikaların Kafka-dan oxunduğunu görürük:

İndi isə metrikanı çıxışa deyil WebSocket və JavaScript vasitəsiylə qrafikdə göstərək. Spring Boot-da WebSocket istifadə etmək üçün kitabxananı əlavə edirik:

https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-websocket

Sonra isə endpoint və broker sazlamalarnı təyin edirik:

package com.validakhundov.medium.kafka.metric.monitoring.config;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends
AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry
stompEndpointRegistry) {
stompEndpointRegistry.addEndpoint(“/websocket”).withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker(“/metric”);
}
}

Və bayaqkı consumer servisimizi dəyişib Kafka-dan oxuduğumuz metrikanı çıxışa deyil WebSocket-lə göndəririk:

package com.validakhundov.medium.kafka.metric.monitoring.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

@Autowired
private SimpMessagingTemplate template;

@KafkaListener(topics = “metric_” + “${metric.collector.token}”, groupId = “0”)
public void consumeMetric(String metric) {
template.convertAndSend(“/metric”, metric);
}

}

Front end-də isə Socket və qrafiklərdən istifadə etmək üçün aşağıdakı JavaScript kitabxanalarını əlavə edirik:

<script type=”text/javascript” src=”https://code.jquery.com/jquery-3.5.1.min.js"></script>
<script type=”text/javascript” src=”https://cdnjs.cloudflare.com/ajax/libs/sockjs client/1.4.0/sockjs.min.js”></script>
<script type=”text/javascript” src=”https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script type=”text/javascript” src=”https://canvasjs.com/assets/script/jquery.canvasjs.min.js"></script>

Nümunə üçün qrafiklərdən birinə baxaq:

var socket = new SockJS(‘/websocket’);
var stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
stompClient.subscribe(‘/metric’, function (metric) {
var ramChartOptions = {
title: {
text: “RAM — “ + JSON.parse(metric.body).memoryUsagePercent + “ %”},
data: [{
type: “pie”,
startAngle: 45,
showInLegend: “true”,
legendText: “{label}”,
indexLabel: “{label} ({y})”,
yValueFormatString: “##### MB”,
y: {
labelFontSize: 25
},
dataPoints: [
{label: “Boş”, y: JSON.parse(metric.body).freeMemorySize, color: “blue”},
{
label: “Məşğul”,
y: JSON.parse(metric.body).totalMemorySize — JSON.parse(metric.body).freeMemorySize,
color: “red”
},
]
}]
};
$(“#ramChartContainer”).CanvasJSChart(ramChartOptions);
});
});

RAM-ın istifadə metrikalarını alıb qrafikdə göstəririk.

Metrika:

{“totalMemorySize”:8087,”freeMemorySize”:6213,”memoryUsagePercent”:76.0,”totalDiskSize”:222,”freeDiskSize”:119,”diskUsagePercent”:46.0,”cpuUsagePercent”:25.0}

Qrafik:

Kodlarla daha ətraflı GitHub hesabımdan tanış ola bilərsiniz. Apache Kafka seriyasının sonuna gəldik. Seriyada Kafka haqqında həm nəzəri həm də praktiki mövzuları 5 məqalədə cəmləməyə çalışdım. Ümidvaram maraqlı və faydalı olmuşdur. IT sektorunda öz dilimizdə kontent yaratmağa dəstək olmaq həmçinin daha çox insanın faydalanması üçün məqalələri paylaşmağı unutmayın :). Növbəti seriya və məqalələr üçün izləmədə qalın, hər birimizə uğurlar!

--

--