Streaming Spring Boot Application Logs to Apache Kafka — ELK(K) Stack — Part 2

What is ELK(K) Stack ?

Kafka is a distributed streaming platform.Building real-time streaming data pipelines that reliably get data between systems or applications, transform or react to the streams of data.

Logstash is a tool for managing logs. It supports virtually any type of log, including system logs, error logs, and custom application logs. It can receive logs from numerous sources, including syslog, messaging (for example, rabbitmq), and jmx, and it can output data in a variety of ways, including email, websockets, and to Elasticsearch.

Elasticsearch is a full-text, real-time search and analytics engine that stores the log data indexed by Logstash. It is built on the Apache Lucene search engine library and exposes data through REST and Java APIs. Elasticsearch is scalable and is built to be used by distributed systems.

Kibana is a web-based graphical interface for searching, analyzing, and visualizing log data stored in the Elasticsearch indices. It utilizes the REST interface of Elasticsearch to retrieve the data, and not only enables users to create customized dashboard views of their data, but also allows them to query and filter the data in an ad hoc manner.

The following image illustrates how the ELKK Stack components are used to collect log data

Zookeeper — Install & Configure

  • Download Zookeeper archive from http://apache.org/dist/zookeeper/zookeeper-3.4.9/
  • Go to your Zookeeper config directory zookeeper-3.4.9\conf
  • Rename file “zoo_sample.cfg” to “zoo.cfg”
  • Open zoo.cfg in any text editor and edit dataDir=/tmp/zookeeper to
  • You can change the default Zookeeper port in zoo.cfg file (Default port 2181).
  • Open command prompt and type “zkserver” to start the Zookeeper on port 2181!
wget http://apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar xvzf zookeeper-3.4.9.tar.gz
cd zookeeper-3.4.9
bin/zkServer.sh start

zoo.cfg -> download

dataDir=/opt/logs/zookeeper
clientPort=2181

Zookeeper should be running

http://localhost:2181

Kafka — Install & Configure

  • Download Kafka archive from https://kafka.apache.org/downloads
  • Go to your Kafka config directory. kafka_2.10–0.10.2.0\config
  • Edit file “server.properties” and edit line “log.dirs=/tmp/kafka-logs”.
  • Your Kafka will run on default port 9092 & connect to zookeeper’s default port which is 2181.
  • Open config\zookeeper.properties, change dataDir=/tmp/zookeeper
wget http://apache.claz.org/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
tar xvzf kafka_2.10-0.10.2.0.tgz
cd kafka_2.10-0.10.2.0
bin/kafka-server-start.sh config/server.properties

server.properties — download

log.dirs=/opt/logs/kafka-logs
zookeeper.connect=localhost:2181

zookeeper.properties — download

dataDir=/opt/logs/zookeeper
clientPort=2181

Kafka should be running

http://localhost:9092

Create a topic

  • Let’s create a topic named “tas_logs” with a single partition and only one replica
bin\kafka-topics.sh create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic tas_logs

List Topics

bin\kafka-topics.sh –list –zookeeper localhost:2181

Logstash Configuration

Typical Logstash config file consists of three main sections: input, filter and output. Each section contains plugins that do relevant part of the processing.

Create a logstash.conf file in the root directory of the Logstash installation and copy the following code into it.

logstash.conf — download

input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["tas_logs"]
}
}
filter {
grok {
match => [ "message", "%{GREEDYDATA}" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "tas_logs-%{+YYYY.MM.dd}"
workers => 1
}
}

Finally, the three parts — input, filter and output — need to be copy pasted together and saved into logstash.conf config file. Once the config file is in place and Elasticsearch is running, we can run Logstash:

bin/logstash -f logstash.conf

If everything went well, Logstash is now shipping log events to Elasticsearch.

Install Elasticsearch

Here’s how to do it (steps are written for OS X but should be similar on other systems):

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.1.zip
unzip elasticsearch-5.1.1.zip
cd elasticsearch-5.1.1
bin/elasticsearch

Elasticsearch should be running now. You can verify it’s running using:

http://localhost:9200

If all is well, you should get the following result:

Install Kibana

  • Download Kibana archive from https://www.elastic.co/downloads/kibana
  • Please note that you need to download appropriate distribution for your OS, URL given in examples below is for OS X
  • Extract the archive
  • Run it (bin/kibana)
  • Check that it runs by pointing the browser to the Kibana’s WebUI

Here’s how to do it:

wget https://artifacts.elastic.co/downloads/kibana/kibana-5.1.1-darwin-x86_64.tar.gz
tar xvzf kibana-5.1.1-darwin-x86_64.tar.gz
cd kibana-5.1.1-darwin-x86_64
bin/kibana

Kibana should be running

http://localhost:5601

First, you need to point Kibana to Elasticsearch index(s) of your choice. Logstash creates indices with the name pattern of logstash-YYYY.MM.DD. In Kibana Settings → Indices configure the indices:

  • Index contains time-based events (select this option)
  • Use event times to create index names (select this option)
  • Index pattern interval: Daily
  • Index name or pattern: [logstash-]YYYY.MM.DD
  • Click on “Create Index”

Spring boot application with Logback appender for Apache Kafka

This appender provides a way for applications to publish their application logs to Apache Kafka. This is ideal for applications within immutable containers without a writable filesystem.

Add logback-kafka-appender and logback-classic as library dependencies to your project.

build.gradle

[build.gradle]
compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.1.0'
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.6'
compile group: 'ch.qos.logback', name: 'logback-core', version: '1.1.6'
compile group: 'net.logstash.logback', name: 'logstash-logback-encoder', version: '4.8'

pom.xml

[maven pom.xml]
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.1.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.8</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.6</version>
<scope>runtime</scope>
</dependency>

Add logback.xml to application

[src/main/resources/logback.xml] — download

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="asyncTasLogKafka"
class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
<appender name="kafkaVerboseAppender"
class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder
class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>tas_logs-%msg</pattern>
</layout>
</encoder>
<topic>my_topic</topic>
<keyingStrategy
class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" />
<deliveryStrategy
class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<producerConfig>retries=2</producerConfig>
</appender>
<appender-ref ref="STDOUT">
</appender>
<root level="info">
<appender-ref ref="asyncTasLogKafka" />
</root>
</configuration>