Log Management and Monitoring with ELK STACK (Elasticsearch, Logstash, Kibana and Filebeat)- home lab example
Lab Overview
Before we start, I deployed an Ubuntu (20.04) machine to VMware to use as the host of the ELK STACK. You can use different operating systems such as Debian, MacOS, Windows etc. There is a picture showing the basic settings of my machine. Since the logs can fill up your machine’s disk, I set my hard disk to 100 GB and set up port forwarding on my NAT Network.
What is ELK STACK?
ELK STACK is an open source platform that includes Elasticsearch, Logstash, Kibana and Beats. Reliably and securely ingest data from any source, in any format, then search, analyze, and visualize it.
What is Elasticsearch?
Elasticsearch is a distributed, RESTful search and analytics engine. It is the heart of ELK STACK and stores data centrally, searching and analyzing large volumes of data quickly and in near real-time and returning answers in milliseconds. It can get quick search answers instead of searching directly in text.
What is Logstash?
Logstash is an open source data ingestion tool that allows you to collect data from multiple sources, transform it, and send it to the destination of your choice. It includes pre-built filters that allow users to easily retrieve data regardless of source or type.
Logstash allows you to easily retrieve unstructured data.
It offers pre-built filters; so you can easily transform common data types, index them in Elasticsearch, and start querying without having to create custom data transformation pipelines.
What is Kibana?
Kibana is a visualization tool. As part of ELK STACK, it accesses logs from Elasticsearch, allowing users to create their own dashboards and display them to the user in various graphs.
What is Beats?
Beats is a data container designed to send various types of data to Elasticsearch/Logstash. It is part of ELK STACK and is used to collect data from various sources which can be logs, metrics, metric packages.
Beats consists of four different types of data senders, each designed for a specific purpose. In this lab example we will use Filebeat to send data to logstash.
Filebeat: Used to collect log data from various sources.
Preparation
First of all, since elasticsearch is java-based, we need to install Java. Make sure OpenJDK or Oracle OpenJDK is installed.
I installed openjdk-11-jre-headless with the following command:
sudo apt install openjdk-11-jre-headless
Installing Elasticsearch
I am installing Elasticsearch version 7.17.21. If you are installing a different version, make sure the other ELK STACK components are the same version. I prefer to use the Debian installation package. Compatible with Debian, Ubuntu or other Debian-based systems. There are several installation packages available on elastic.co (https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html)
a) We start by adding Elastic’s PGP signing key to our APT keyring:
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg
b) Installing from the APT repository:
You may need to install the apt-transport-https
package on Debian before proceeding:
sudo apt-get install apt-transport-https
Save the repository definition to /etc/apt/sources.list.d/elastic-7.x.list:
echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list
c) Finally you can install the Elasticsearch Debian package with:
sudo apt-get update && sudo apt-get install elasticsearch
Elasticsearch configuration file is in /etc/elasticsearch.conf. We need to be root to configure the Elasticsearch.yml file. Use these commands:
sudo su
nano /etc/elasticsearch/elasticsearch.yml
Now we can make changes to the elasticsearch.yml file.
Uncomment cluster.name in the Cluster section.
Uncomment network.host and replace it with the IP address you assigned to the local server. I prefer to specify my host IP.
Uncomment the http.port line but leave the value ‘9200’
Add a “discovery.seed_hosts: [ ]”.
Let’s start the elasticsearch
systemctl enable elasticsearch.service
systemctl start elasticsearch.service
After starting the elasticsearch, lets check the status with the following command:
systemctl status elasticsearch.service
Now we can try how elasticsearch responds us with the command:
curl -X GET “http://10.10.10.15:9200”
In this output, we can see the cluster name, version number and other information about our server.
Kibana
To install Kibana, make sure you install the same version as elasticsearch.
Use the following command:
sudo apt-get update && sudo apt-get install kibana
Kibana’s configuration file is in /etc/kibana. Use “nano” to configure kibana.yml
nano /etc/kibana/kibana.yml
Uncomment “server.port”, “server.host” and “elasticsearch.hosts”. If you changed the network host in Elasticsearch, you should also change the elasticsearch.host value in kibana.yml. Since I installed both elasticsearch and kibana on the same server, I specify the server.host of kibana.yml in the same way.
Lets start the kibana
systemctl enable kibana.service
systemctl start kibana.service
Check the status of kibana .service
Check kibana from browser. Visit http://your_server_host:5601
Now kibana is ready!!
Logstash
To install Logstash, make sure you install the same version as elasticsearch. Use the following command:
sudo apt-get update && sudo apt-get install logstash
Let’s activate and start logstash
systemctl start logstash
systemctl enable logstash
Logstash configuration will be shown after filebeat upload.
Filebeat
In my home lab, I will install filebeat on the same server, but you can create a different machine and install filebeat on it. Run the command;
sudo apt-get update && sudo apt-get install filebeat
Filebeat configuration file is in the /etc/filebeat.d directory. Use “nano” to configure filebeat.yml
nano /etc/kibana/kibana.yml
Change the type value to “log”
Be sure to comment out output.elasticsearch and the hosts in the elasticsearch output section.
Uncomment hosts in oputput.logstash and logstash output section.
After configuring the filebeat.yml file, let’s enable and start filebeat.service.
systemctl enable filebeat.service
systemctl start filebeat.service
Check the filebeat.service status
systemctl status filebeat.service
We need to enable some modules for Filebeat. run the command;
filebeat modules list
We can see that no modules are currently active. To enable the system modules, run the following command;
filebeat modules enable system
Now if we check the module list again we can see that system module is enabled.
Module configuration files are in the /etc/filebeat/modules.d directory. Use “nano” and make sure “active:true” changes to “true” if it was “false”.
nano system.yml
Creating Grok Patterns for Logstash
What is grok pattern?
“Grok syntax is composed of reusable elements called Grok patterns that enable parsing for data such as timestamps, IP addresses, hostnames, log levels, and more.”
If we check the logs, we can clearly see they are different and unstructured. To filter these unstructured logs, we need to define grok patterns.
How can we do this?
There is a grok debugger in Kibana’s Dev Tools section. I grabbed a few plain logs from /var/log/auth.log and started building the grok patterns.
There is a good references for grok patterns such as https://docs.mezmo.com/telemetry-pipelines/using-grok-to-parse and https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
Logstash Confugration for System and Apahce Logs
After our Grok pattern is ready, we need to specify it in logstash. There are 3 important components in the Logstash configuration file.
Inputs
Insert the following input configuration. This specifies a beats input that will listen on TCP port 5044.
input {
beats {
port => 5044
}
Filter
The filter is utilized to parse incoming logs, making them structured and usable for the predefined Kibana dashboards. An example of a syslog filter log was sourced from the official Elasticsearch documentation.
filter {
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
date {
match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
}
And another example for Apache logs;
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
Output
This configuration essentially sets up Logstash to save data from Beats into Elasticsearch, which operates at [your_host_ip_address]:9200, using an index named after the specific Beat. In this tutorial, the Beat being used is Filebeat.
output {
elasticsearch {
hosts => ["localhost:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
Change the directory to the conf.d file in /etc/logstash and use “nano” to create a configuration file.
cd /etc/logstash/conf.d
nano logstash.conf
Add the input, filter and output respectively. Here is my config file:
input {
beats {
port => 5044
}
}
filter {
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => {
"message" => [
"%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:host} %{SYSLOGPROG}: %{WORD:action} %{WORD:username} (?:%{WORD:method} )?for %{USER:username} from %{IP:source_ip} port %{NUMBER:port} %{WORD:protocol}",
"%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:host} %{SYSLOGPROG}: %{WORD:action} %{WORD:connection_status} %{DATA:action} %{NOTSPACE:invalid_user} %{IP:source_ip} port %{NUMBER:port} \[%{WORD:auth}\]"
]
}
}
date {
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
}
}
output {
elasticsearch {
hosts => "10.10.10.15:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
Save and close.
Check the configuration file with following command:
sudo -u logstash /usr/share/logstash/bin/logstash --path.settings /etc/logstash -t
If it responds “Configuration OK”, it is ready for use.
Logstash must be restarted after any changes. Follow the command:
Systemctl restart logstash.service
If we type “http://host_ip_address:9200/_cat/indices?v” into the browser, we can see that there is a filebeat index. Our Filebeat and Logstash are integrated.
When we take a look at our Kibana service > discover. First we need to create an index. Let’s name this directory “filebeat*” and continue. Refresh the Discover page, our log file is ready!
Here are my logs below;
Example of SSH logs:
You can see the properties of the host like ip, Kernel, OS type, OS version and view the source ip, source port, connection errors and successes, etc.
We are now at the end of our log management and monitoring application article where we have created the log monitoring details. Thank you for your attention.