Data Buffering and Streaming Observability using Kafka and Elasticsearch

Abhinav korpal
Airtel Digital
Published in
6 min readMay 25, 2023

In this article, you will read a story about a system used to ingest real-time data logging and analytics from several hundred servers composed of UI, Server-Side processing tier, and each server sends data into Kafka, process it in an ELK/EFK layer, and pushes the results to a serving dashboard layer where it can be queried, on OpenShift Container Platform considering Zone HA Cluster implementation.

Introduction

Airtel is a huge data management handler of customers interacted applications, and we come across bundles of applications logs that require centralized real-time logging, thus to achieve analytics or to collectively query applications logs composed of UI, Server-Side processing tier, we created this blog. We are using Kafka as messaging and buffer approach for ingesting data into EFK. As we all know that Kafka provides durability through replication, considering this fact, we have implemented the solution in such a way that if one or more of those Pods fails, the messages will not be lost.

Before sending real-time logs to Kafka, we filter application logs by removing non-required data from them and adding a few more relevant details for the server into log agent groks. In the end, the user can query or filter application logs via Kibana Interface by specifying or using fields as key-value pairs to analyze logs.

You will learn in this blog that if one of the Kafka broker Pods fails, Kafka will detect the failure and reroute the messages from the failed broker to a healthy or In-Sync replica broker. As failed broker revives, that broker again comes into the pool and joins the streaming mechanism.

We are using Kafka as the backbone through which we buffer real-time event logs from the application. Logs Agent is installed on each server and is just used for real-time log shipping and streaming the data coming from Logs Agent based on different tags.

KPI’s being pushed into Kafka, to transform the data. Kafka Elasticsearch connector pushes this data into Elasticsearch with separate Kibana dashboards and visualizations have been tuned for different analysis and performance.

Design

Background

The agenda and concept of this document are simply to consolidate all the dependencies like Zookeeper, Kafka, Kafka-Connect, Kafka-Drop, Elasticsearch, Kibana, and Grafana on a common platform on OpenShift Container Platform considering HA implementation instead of setting them separately on each server or VM, which benefit in cost-cutting and management of servers separately.

This solution is compatible with all those applications which are producing logs/data on servers and gathering or placing them in a common place for better readability or accessibility.

Our Solution provides the servers dependencies by orchestrating them and we setup the whole structure considering HA for all software like Kafka and Kafka dependents, Elasticsearch, etc. Whether it is Kafka setup, Elasticsearch setup, or Kafka Connect establishing connectivity b/w Kafka and Elasticsearch, each run establishing connectivity b/w Kafka and Elasticsearch, each is running in HA mode on different nodes.

Components

Input/Output to Kafka

We have setup Logstash/Fluentd/Filebeat on many of our servers, where we configured our custom groks to structure the application logs and after getting formatted, send those data to different Kafka brokers which we have setup as output.

Independent of mobile apps, web apps, CI/CD, or other applications, we can use this solution as common for all projects to push all those applications data to Kafka and Elasticsearch Layer.

Before logs forwarding, all application logs/data are modified or arranged using groks which help in compact and relevant details in JSON output which finally gets processed or moved to Kafka.

Kafka

To start a Kafka server, we’d first need to start a Zookeeper server. We must ensure that the Zookeeper server always starts before the Kafka server and stops after it.

Zookeeper server is listening on port 2181 for the Kafka service, which is defined within the same container setup.

Similarly, the Kafka service is exposed to the host applications through port 29092, but it is advertised on port 9092 within the container environment configured by the KAFKA_ADVERTISED_LISTENERS property.

We have configured ‘3’ Brokers for each partition to store datasets present in application logs. We are aware that data is not considered fully stored until all replicas are synchronized. In the blog, we have outlined and explained how to achieve this synchronization and proceed accordingly.

Here, Replication as 3 denotes that in case we lost 2 Pods/Brokers out of 3, we still have 1 Broker which will help in storing data or prevent us from losing any data. We are storing data in regular topics, for audit and event sourcing.

For HA Kafka Setup, we must ensure that the service names and KAFKA_BROKER_ID are unique across the services.

Moreover, each service must expose a unique port to the host machine. Although the zookeeper is listening on port 2181, it is exposed to the host via ports 22181 and 32181, respectively. The same logic applies for the kafka-1 pod, kafka-2 pod, etc services, where they’ll be listening on ports 29092 and 39092, respectively.

If the requirement is to add one more broker in the pool list of Kafka Brokers, it can be done simply by just adding one more pod with 2 properties change like Broker_ID and Broker Service Name, Kafka will auto-detect the addition and get that added into its pool.

Once the cluster is up, you can use Kafdrop Tool to connect to the cluster by specifying comma-separated values for the Kafka servers and respective ports.

Kafka Sink Connector

Along with Kafka HA Setup, we have setup Kafka Connect for traversing logs from Kafka to Elasticsearch. We setup connectors first to sink data from Kafka to Elasticsearch. Using that connector, we make connectivity between Kafka Topic and Elasticsearch Index.

Elasticsearch and Kibana:

We setup Elasticsearch with 2 master nodes and 2 data nodes for achieving HA. All the JSON data or logs which are getting stored in Kafka brokers coming from the Logs agent are getting stored in Elasticsearch using Kafka Connect, and all this data is stored in those 2 data nodes only which are managed by 2 master nodes. Each data node has its own storage where data is persisted.

We setup 2 master nodes which will manage all the index creation or document storage in data nodes. Along with these 2 master nodes, we setup a further 2 more data nodes which store the index and data or document in it.

All the partition data get traversed to Elasticsearch Index without any duplications and similarly, we get that reflected on Kibana with proper timestamp management.

Conclusion

This Solution is ready to move and can be used either for centralized logging or singular application logging to ingest data in real-time for logging and analytics.

Future Plans

Moved Archived data to Airtel private S3, delete it from the cluster, when the retention period expires, and moved the data to S3.

The Team Building Great Things Together

We focused on expanding our use case footprint. This wouldn’t have been possible without the hard work and great contributions from our team to build impactful systems that help our business forward and bringing DevOps Engineering solutions.

Special thanks and credits goes to the stunning colleagues for collaboration: Anubhav Yadav, Gaurav Walecha, Praveen, Anshika Mishra, Arzaw Tiwari

--

--