Logging aggregation system

Facundo Ramallo
11 min readDec 13, 2023

--

ELKK Stack — Distributed architecture

Why am I writing this to you?

Two years ago I decided to fully immerse myself in the world of software development and it has been a process in which I have discovered a world full of knowledge to explore. Faced with this, I have decided to choose various topics and apply the following process to each one: research, practical application, and finally try to explain what I have learned in an article. Therefore, this article is the result of said process applied to the topic this article deals with.

What will you find in this article?

In this article, I start from the point I left it in a previous one, where the logging framework in the context of a spring-boot application was explained, and from that point, that is, the moment we assume that logging events are outputted to the “System.out”(stdout), and that we are in a distributed production environment that includes the ELKK Stack, the following questions came to my mind:

  1. How are these logs visible and searchable from a Kibana instance?
  2. Where are these logs stored?
  3. How are these logs stored?
  4. Where is this system configured? As we know from my previous article, our application knows nothing related to Kibana or Elastic Search instances or the way to connect to them.

Introduction

To understand the whys is mandatory to mention about “The Twelve-Factor App” methodology that is defined in the “The Twelve-Factor App” manifest, where we can read

The twelve-factor app is a methodology for building software-as-a-service apps that:

Use declarative formats for setup automation, to minimize time and cost for new developers joining the project;

Have a clean contract with the underlying operating system, offering maximum portability between execution environments;

Are suitable for deployment on modern cloud platforms, obviating the need for servers and systems administration;

Minimize divergence between development and production, enabling continuous deployment for maximum agility;

And can scale up without significant changes to tooling, architecture, or development practices.

One of the factors enumerated in this manifest is about “Logs”, and this factor states the following:

Logs are the stream of aggregated, time-ordered events collected from the output streams of all running processes and backing services. Logs in their raw form are typically a text format with one event per line (though backtraces from exceptions may span multiple lines). Logs have no fixed beginning or end, but flow continuously as long as the app is operating.

A twelve-factor app never concerns itself with routing or storage of its output stream. It should not attempt to write to or manage log files. Instead, each running process writes its event stream, unbuffered, to stdout. During local development, the developer will view this stream in the foreground of their terminal to observe the app’s behavior.

In production deploys, each process stream will be captured by the execution environment, collated together with all other streams from the app, and routed to one or more final destinations for viewing and long-term archival. These archival destinations are not visible to or configurable by the app, and instead are completely managed by the execution environment. Open-source log routers (such as Logplex and Fluentd) are available for this purpose.

Sometimes it will be very difficult to replicate an issue by analyzing data scattered across different log files. This is when Centralized Logging comes to the rescue to stitch together all the logs into one place.

But this comes with a challenge if the centralizing logging tool is not configured to handle Log-bursting situations. Log-bursting event causes saturation, log transmission latency and sometimes log loss which should never occur on production systems.

To handle such a situation, we can publish logs to Kafka which acts as a buffer in front of Logstash to ensure resiliency. This is one of many best ways to deploy the ELK Stack to reduce log overload.

Practical case

https://github.com/FacuRamallo/Logging-Agregation-System-ELKK.git

Even though there are different implementations for a logging aggregating system, I believe that the following example will help to catch de main idea behind any of them.

In the following image, we can take a look at the stack involved in what we are going to call our “log processing pipeline”. You can get the source code of this pipeline to test and play with it from this repository. For this article, We shall orchestrate a complete solution using docker to configure Kafka with ELK stack (or ELKK Stack) to enable centralized logging capabilities.

Technology stack

  • FluentD: is an open-source data collector that can be used to transport logs in a uniform way. It is a powerful and lightweight tool that lets you get logs from several sources, parse these logs, filter the logs and send them out via outputs. It is written in Ruby and C. There is also another related project called fluentbit which can be considered as a lighter version of fluentd. It’s solely written in C.
  • Apache Kafka/Zookeeper: Apache Kafka is an open-source distributed event streaming platform. For our use case, it collects logs from applications & queues them. ZooKeeper is used in distributed systems for service synchronization and as a naming registry. When working with Apache Kafka, ZooKeeper is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.
  • Logstash: aggregates the data from the Kafka topic, processes it and ships it to Elasticsearch. It is shipped with a wide variety of input, filter and output plugins, with many native codecs further simplifying the ingestion process.
  • Elasticsearch: is the distributed search and analytics engine at the heart of the Elastic Stack. It provides near real-time search and analytics for all types of data. Whether you have structured or unstructured text, numerical data, or geospatial data. In our use case, it efficiently stores and indexes logs in a way that supports fast searches.
  • Kibana: is used to query and visualize the data that’s stored in Elasticsearch.

Pipeline structure

The entire pipeline is configured in a docker-compose file.

Starting with our application, we run it inside a container to emulate a production environment where the application logs would be outputted to the stdout, like container-level-logging in Kubernetes logging architecture. In our example, we use the docker fluentd-logging-driver that will forward our logs to a Fluentd instance running in another container. In Kubernetes there are several ways to configure it, you can check it in the official logging architecture docs. But in a general aspect, a Fluentd agent would collect logs from all running pods inside a namespace or node and produce logging events to our Kafka topics.

As we mentioned before, with this configuration we accomplished that “A twelve-factor app never concerns itself with routing or storage of its output stream.

The logging driver is set for the app container and indicates to set a tag with our application name to the event messages that will be forwarded to the Fluentd instance that is listening on port 24224.

  logging_app:
build:
dockerfile: ./Dockerfile
container_name: logging_spring_app
volumes:
- logging_app_data:/usr/share/logging_app/data
ports:
- "8080:8080"
logging:
driver: "fluentd"
options:
fluentd-async: "true"
fluentd-address: localhost:24224
tag: logging_app
environment:
- "SPRING_PROFILES_ACTIVE=k8s"
depends_on:
- fluentd
restart: unless-stopped

The Fluentd instance is built inside another container

fluentd:
user: fluent
image: docker.io/library/custom-fluentd:latest
container_name: fluentd
volumes:
- ./fluentd/conf/fluent.conf:/fluentd/etc/fluent.conf
depends_on:
- kafka #set the order in which containers should be started
links:
- kafka
ports:
- "24224:24224"
- "24224:24224/udp"
networks:
- kafkanet
restart: unless-stopped

Where port 24224 is exposed and binds the volumes ./fluentd/conf/fluent.conf:/fluentd/etc/fluent.conf where we define the configuration file as follows

<source>
@type forward #input plugin https://docs.fluentd.org/input/forward
port 24224
bind 0.0.0.0
</source>
<match logging_app>
@type rewrite_tag_filter
<rule>
key log
pattern /HTTP/
tag access.log
</rule>
<rule>
key log
pattern /.*/
tag app.log
</rule>
</match>
<match access.log>
@type kafka2 #kafka output plugin https://docs.fluentd.org/output/kafka
brokers kafka:9092 # list of seed brokers
topic access-log-messages # topic settings
<format>
@type json #output format - records parsed as json
</format>
<buffer>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
chunk_limit_size 3MB
chunk_full_threshold 1
total_limit_size 1024MB
overflow_action block
</buffer>
</match>
<match app.log>
@type kafka2
brokers kafka:9092
default_topic app-log-messages
<format>
@type json
</format>
<buffer>
flush_at_shutdown true
flush_mode interval
flush_interval 1s
chunk_limit_size 3MB
chunk_full_threshold 1
total_limit_size 1024MB
overflow_action block
</buffer>
</match>
  • source tag: we set the forward input plugin to listen on port 24224
  • first match tag block, we set the rewrite_tag_filter plugin to rewrite the record tag depending on which regex pattern the record matches to set the “access.log” or “app.log” tag so we can differentiate between them.
  • the following match tag blocks, filter records by their tag and use the kafka2 output plugin to produce logging events to the defined topic in JSON format

Logstash container definition:

logstash:
image: docker.elastic.co/logstash/logstash:8.4.1-arm64
container_name: logstash
user: root
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
- "9600:9600"
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
links:
- elasticsearch:elasticsearch
- kafka:kafka
networks:
- kafkanet
depends_on:
- elasticsearch
- kafka

we bind the following volumes:

  • ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro → to set the elasticsearch host and to disable SSL verification. This configuration is just for a testing environment.
http.host: 0.0.0.0
xpack:
monitoring:
elasticsearch:
hosts: ["http://elasticsearch:9200"]
ssl:
verification_mode: none
  • ./logstash/pipeline:/usr/share/logstash/pipeline:ro → to set the pipeline configuration. Let’s explain a little bit about it:
# Plugin Configuration. This input will read events from a Kafka topic.
# Ref Link - https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["app-log-messages","access-log-messages"]
auto_offset_reset => "earliest"
decorate_events => true
codec => json
}
}

# Filter Plugin. A filter plugin performs intermediary processing on an event.
# Ref Link - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

filter {
json {
source => "log" #extract the json from the log field
skip_on_invalid_json => true #avoid logstash failure for parsing errors
remove_field => [ "log", "container_id", "source", "event.original" ] #remove unnecessary fields
}
if ([@metadata][kafka][topic] == "access-log-messages") {
date {
match => [ "timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" ]
timezone => "Europe/Madrid"
}
mutate {
remove_field => ["message"]
rename => { "container_name" => "app_name" }
add_field => { indexPrefix => "access" }
}
} else if ([@metadata][kafka][topic] == "app-log-messages") {
date {
match => [ "date", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" ]
timezone => "Europe/Madrid"
remove_field => [ "date" ]
}
mutate {
rename => { "container_name" => "app_name" }
add_field => {
indexPrefix => "app"
}
}
}
}

#An output plugin sends event data to a particular destination. Outputs are the final stage in the event pipeline.
# Ref Link - https://www.elastic.co/guide/en/logstash/current/output-plugins.html
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-%{[indexPrefix]}-logs-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
  • input block → configure Logstash to consume from the defined Kafka topics, decorating the events with kafka metadata including the topic from where the event is being consumed and to decode it as a JSON string
  • filter block → there are several filter plugins depending on your needs. For this example, we just do some basic operations as it’s indicated in the configuration file that are self-explanatory.
  • output block → we set the elasticsearch output plugin to create new documents on the computed index

About Elasticsearch

About Elasticsearch we just create a container with the default configuration. Without going into Elasticsearch distributed architecture features, the important to understand for the topic we are into is :

  • It’s built on top of Apache Lucene (it itself is a powerful search engine, all the power of Lucene easily expose to simple configuration and plugins, it handles human language synonyms, typo mistake)
  • NoSQL Datastore (like MongoDB)
  • Schema-free (no need to define a schema before adding data into Elasticsearch)
  • JSON Document (data in Elasticsearch is stored in the form of JSON document)
  • RESTful APIs (Elasticsearch has powerful RESTful APIs that you can interact with)

We can think of a model that is similar to relational databases

For our use case, we are going to create 2 indexes, one for access-logs and one for app-logs.

Pipeline running and Kibana set up

Now let’s go into action and run our logging processing pipeline to see it in action and interact with the Kibana UI.

To run the pipeline run the command docker-compose up -d and wait for all the containers to be ready.

Go to the address http://localhost:5601 where we access the home page of Kibana

To see our pipeline working, make a request GET [localhost:8080/log]

On Kibana click on the top left burguer menu → management → index management . In this view will be able to see our two indexes created as follow

Go to DataViews

Create a new data view as shown in the following image

Once we created our data view, go to top left burguer menu → Discover where by default our just created data view is shown because is the only one. In its raw form we see all the documents stored in the indexes we just configured

Once we are able to see our logs in the display, Kibana offers us many ways to filter and query data. There are two options we can choose, one is the Kibana Query Language (KQL) and the other is the Lucene query syntax. The first one is just for filtering data and the second one has a more advanced feature set.
Some UI features for filtering and ordering data are shown following the video

Let's see also, some examples of the KQL

  • Filter for documents where a field exists → method: *
  • Filter for documents that match a value → method: GET
  • Filter for documents that match a value → method: GET
  • To search all fields for a matching value, just avoid the field parameter → GET
  • Filter for documents where a text field contains some strings in any order → message: Hello word
  • To search text fields where the terms are in the order provided → message: "Hello world"
  • Certain characters must be escaped by a backslash (unless surrounded by quotes) → \\():<>"*
  • To search for documents matching a pattern, use the wildcard syntax → status_code: 4*
  • To combine multiple queries, use the and/or keywords → method:GET AND status_code:200 OR method:POST

Conclusion

As a result of this process, we can conclude that there are many ways and technologies to implement a logging processing pipeline. What is chosen will depend on the context and needs but, for sure, it must be a resilient and highly available system because the information it provides about the state of running services is critical from a busyness and technical point of view.

However, from a developer's point of view, this deeper understanding gives us a good sight of the global picture, providing us with greater autonomy in case of a logging system failure or systems lacking valuable info that we could manage to include in the process.

Thanks for reading, hope it’s useful!!

Github Repository : https://github.com/FacuRamallo/Logging-Agregation-System-ELKK

References:

--

--

Facundo Ramallo

From Civil Engineer to Software Developer, passionate about continuos learning and problem solving. Now working as backend-engineer at Adevinta Spain.