Monitoring ETL pipelines with the Elastic Stack

How to monitor data pipelines to detect deteriorations and unexpected behaviors

Matteo Di Pierro
Dec 16, 2019 · 13 min read

Hi everyone,

In this article, I’d like to share with you my recent job about monitoring ETL pipelines using the Elastic Stack, but since this is my first post, let me first introduce myself.

My name is Matteo Di Pierro, I’m a Data Engineer at Quantyca and I recently joined the Site Reliability Engineer team.

We are an Information Technology consulting company that deals with data in every respect, starting from the integration and the definition of Big Data architectures, up to reporting and analysis.

Introduction

We mainly use Talend as data integration software, so our goal is to monitor Talend jobs.

Our Solution allows you to manage logs from the various systems involved and analyze them. Some of these data consist in standard logs generated by Talend, while other more information-rich logs come from our custom logging module.

To collect, aggregate, process, store and visualize logs, we relied on the Elastic Stack. All the components involved in the task are:

  • Talend
  • Logging Custom Module
  • Filebeat
  • Logstash
  • Elasticsearch
  • Kibana

In the next chapters, we will analyze the features of each component and how we used them within our project.

Talend & Custom Logging Module

Talend provides an own monitoring console, but it does not have enough information. However, in order to create useful and efficient dashboards to monitor our job, we need more information than those available from the standard talend log.


Fortunately, we can rely on the Talend Custom Logging, developed by Quantyca with the aim of producing more explanatory and information-rich logs.

With this module, users are enabled to generate specific logs inside Talend jobs. These logs contain useful information like context variables, statuses, and statistics about the current execution.

Every log message contains two families of information, called flowInstanceInfo and messageInfo.

By integrating this module in your Talend data flow, you can either store logs on a DB or different files. For our purposes we are going to focus on file output of custom logging module, since log messages are stored in JSON objects and — as we will see later — it will be easier to parse, transform and store them in elasticsearch.

Hence, once the custom logging has been configured and integrated within our flows, folders and log files will be created as shown in the figure:

logs produced with the Custom Logging module

Every file contains a JSON object for each generated message, with all the fields listed above.

In the example above you can see a set of log files produced by several runs of example jobs that implemented the custom logging module.


We talked about Talend and how we produce our custom logs. Now we will get to the heart of the post. In the next chapters, we will see how to use the elastic stack components to make the most from our logs and create a useful and intuitive monitoring system.

Elastic Stack

The Elastic Stack is full of features and functionalities and it would be impossible to cover everything here. If you are interested in learning more, I suggest you to watch these official videos by the Elastic Team:

Anyway, I’ll give you a short overview to illustrate the features that will help us in our stack.


The Elastic Stack gives us the ability of aggregates logs from all our systems, performing searches on them and create powerful visualizations. The stack is composed of three different products. Each of them can work alone, but they have been built to work exceptionally well together.

The components of the stack are:

  • Logstash
  • Elasticsearch
  • Kibana
  • Beats

Logstash is server-side data processing pipeline that ingests data from a multitude of sources simultaneously, transforms it, and then sends it where we prefer (we will ship data to Elasticsearch). With Logstash it is possible to create new fields and edit existing ones, and using grok you can also derive a data structure from unstructured data. these functionalities will be very important for us since we need to create semi-structured data in order to define interesting visualizations.


Elasticsearch is a distributed, RESTful search and analytics engine. It represents the heart of the Elastic Stack since it centrally stores data and allows us to perform and combine many types of searches. In Elasticsearch everything is indexed, so we can leverage and access all data at a really high speed. For monitoring tasks, we just need Elasticsearch to store, index and retrieve logs in a dynamic and rapid way.


The final component of the stack is Kibana, that allow us to navigate and visualize Elasticsearch data. With Kibana you can create dynamic dashboards that simplify the exploration of the data and the identification of issues and unexpected behaviors.


A fourth set of components are the Beats.

Beats are single-purpose data shippers. They are used to send data from hundreds or thousands of machines and systems to Logstash or Elasticsearch.

For our task we only used Filebeat. Its goal is to catch new files or updated files when their name matches with a chosen pattern and send them to Logstash (or Elasticsearch).

You can discover all the available beats and learn more about their features by clicking here.


Now you are familiar with all the components used in this project, let’s check it out how we configured and used them!

Filebeat

First of all, it is necessary to install Filebeat on the host where log files are stored. Then, we created a file “filebeat-talend-log.yml” with the following configuration.

# filebeats inputfilebeat.inputs:
- type: log
enabled: true
paths:
- /data/inputs/talend-log/talend-log-*.log
- /data/inputs/talend-log/logfiles/**/*.log
tags: ["talend_log"]
json.keys_under_root: true
json.add_error_key: true
# Logstash outputoutput.logstash:
hosts: ["logstash1:5051","logstash2:5151"]
loadbalance: true
worker: 2

In the “filebeat.inputs” section, we define all the patterns that filebeat uses to find new log files. Here we are collecting every file the “.log” extension in “data/inputs/talend-log/” path, or in a subdirectory of it.

With “output.logstash” we instead define where filebeat have to ship the matching files. In this case, we want to send the logs data to two different hosts (logstash1 and logstash2) at different ports. We decided to distribute the load on these two different logstash for load-balancing reasons. To enable this feature you just need to add the two properties in the snippet of code above: “loadbalance” and “worker”.

Logstash

Ones installed, we start the configuration creating two directories in “usr/share/logstash” path:

  • config: where we store the main configuration and defines new pipelines.
  • pipelines: here we create a folder for each data pipeline. In each directory, we define transformations, give new structures to input documents, and create or drop fields.

Before seeing how we did it, here is how the file tree will look in the end of the configuration:

Let’s start with “logstash.yml”:

http.host: "0.0.0.0"
queue.type: persisted

http.host” represents the bind address for the metrics REST endpoint, while we chose a persistent queue as a queue type. In this way, the queue keeps a record of events that have been processed by the pipeline. An event is recorded as processed if, and only if, the event has been processed completely by the Logstash pipeline.

Now we have to create our first and only pipeline. Its goal is to get files shipped by filebeat, process them, and store new documents with logs in elasticsearch.

The first step is to define the new pipeline in the “pipelines.yml”.

- pipeline.id: talend-log
path.config: "/usr/share/logstash/pipeline/talend-log/"
pipeline.workers: 1

We are just defining a pipeline called “talend-log” whose configurations are in “/usr/share/logstash/pipeline/talend-log/”. Finally, we are saying that only one worker will, in parallel, execute the filter and output stages of the pipeline.

Then, in the folder indicated in the last step, we create three different configuration files (look at the previous image):

  • 100-input.conf
  • 500-filter-talend-job.conf
  • 900-output.conf

The first file only establishes which port to listen to. This port must be the one indicated in the filebeat configuration. We assume that we are configuring the first host of logstash (logstash1), so we write “100-input.conf”:

input {
beats {
port => 5051
}
}

We finally arrived at file “500-filter-talend-job.conf”. Here most of the logics and transformations take place, so let’s write:

filter {json {
source => "message"
}
if [flowPid] == [flowMainPid]{
mutate {
add_field => {
"isMainJob" => "1"
}
}
}
else{
mutate {
add_field => {
"isMainJob" => "0"
}
}
}
mutate{
convert => {
"isMainJob" => "boolean"
}
}
if [exitStatus] == "SUCCESS" {
mutate {
add_field => { "encodedExitStatus" => 0 }
}
mutate {
convert => {"encodedExitStatus" => "integer"}
}
}
else if [exitStatus] == "FAILURE" {
mutate {
add_field => { "encodedExitStatus" => 1 }
}
mutate {
convert => {"encodedExitStatus" => "integer"}
}
}
ruby {
init => "require 'time'"
code => "
starttime = Time.iso8601(event.get('flowMainStartTimestamp').to_s).to_f;
endtime = Time.iso8601(event.get('timestamp').to_s).to_f;
event.set('duration', endtime - starttime);
"
}
}

What are we defining here? First of all, we are saying to logstash that the logs it is receiving are in JSON format. So we don’t need to define a specific pattern to parse the file, logstash will do it by itself and it will structure our data. This is a very comfortable feature that saves us a lot of time, but unfortunately, some other steps are needed.

When we will create the dashboards, we will need to know if we are dealing with a main job or with a subjob. So let’s make logstash create a new field called “isMainJob”. To do this we check if “flowPid” and “flowMainPid” are equals.

Another useful field is “encodedExitStatus”, that is just a numeric encoding of the exit status (SUCCESS = 0, FAILURE = 1).

Finally, with the ruby instruction at the bottom of the file, we created a “duration” field. It is a value that indicates how much time has passed from the start of the main job until the production of the current log message.


Now we have structured and enriched documents and we need to store them in elasticsearch. Let’s configure this step in the final file of the pipeline: “900-output.conf

output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "talend-log-%{+YYYY.MM.dd}"
template_overwrite => true
}
}

This file only established that we want to send the parsed documents to elasticsearch, which can be found at the selected host and port.

Moreover, we assign them an index, which is composed of a prefix chosen by us and the date of today.

Our pipeline is now ready to work!

Elasticsearch

cluster.name: "quantyca-cluster"
network.host: 0.0.0.0
# set to 1 to allow single node clusters
discovery.zen.minimum_master_nodes: 1
## Use single node discovery in order to disable production mode and avoid bootstrap checks
discovery.type: single-node

These configurations are good for our demo, but in a production environment you may need a more reliable cluster. In this article, we will not cover this topic, but I recommend you go into more detail by reading the official documentation and this article about tips to configure a good elasticsearch cluster.

Kibana

server.name: kibana
server.host: "0.0.0.0"
elasticsearch.hosts: http://elasticsearch:9200
elasticsearch.requestTimeout: 500000

Thanks to this configuration, Kibana is enabled to access Elasticsearch and all the documents stored on it.

Once everything is installed and configured we can access Kibana at http://localhost:5601 (replace localhost with the host where you installed the tool) and start to create our first visualization.

Latest failed jobs — sample visualization

Let’s go to Visualize tab on the left bar of Kibana and click on “Create new visualization”, then select “Horizontal Bar”.

Fill the fields as shown in the pictures:

The Horizontal bar visualization is a rotated Vertical bar visualization, this means that when Kibana says “x-axis” it means “y-axis” and vice-versa.

Let’s configure the X-axis aggregating by the “flowName” field, and order them by decreasing timestamp. In the Y-axis the aggregation is the count.

A final touch is to apply some filter on the top bar, in detail we need to analyze only main jobs that failed and only documents that represents final messages:

The visualization is now ready to be inserted in a dashboard, look at how it looks like:

Thanks to it, it is now very easy to understand which job failed (the jobs on the top are those that have failed most recently), and how many times they failed (bar length).


We created about 20 different visualizations. Each kind of plot has its features and configurations but, since Kibana is very intuitive, a little practice will suffice to handle all types of visualizations fluently. What is more difficult is to understand what you want (and you need) to show. Thinking a good, explanatory and readable dashboard is very important, so we will focus on that instead of other visualizations.

Dashboards

  • Whole system monitoring
  • Single components monitoring

We use the first concept to have a preview of the state of health of the system. Thanks to this we can spot a problem in some component, so we will look at the information of the component in error, thanks to the second concept.

A good monitoring system allows you to understand that something is going wrong in your system, understand which component has a problem and finally understand the problem.

So, we need two different dashboards for the monitoring of talend job. The main dashboard shows how the whole system is going, while another dashboard shows detail for each job.


The main dashboard looks like this:

Starting from the top, we have a horizontal bar visualization where we display the latest executed job in a temporary order. The bar length represents the duration (do you remember when we computed that field?), while the color says the exit status of the flow.

On the right, we have a cake plot, showing the jobs success rate.

Then we have other horizontal bars visualizations, we can see the longer jobs, according to their last execution or the average of all execution. If these two visualizations are not similar, probably there is some problem.

The two error visualizations represent jobs error count and order them according to the number of errors or the timestamp.

Then there are two interesting heatmaps. In the first, there are all the executions of different jobs and their status, while in the second we have the errors count for each job day by day.

Finally, there is a tag cloud visualization. The bigger the name of a job, the more this is causing problems.


When someone noticed that a particular job has a problem, he has to investigate and solve it. To allow it, we developed a specific-job dashboard.

On the top left of the dashboard, you can select the desired job and, optionally, the execution code. Then there will be a lot of useful information: the success rate of the specific job, status and duration of the subjobs, when the job worked correctly last time and a duration history of the jobs over different executions.

Moreover, you can scroll the complete log of the selected execution. This log contains all the messages generated by the logging module, as they are saved in elasticsearch. It is possible not only to read the messages, but to see the values of all the fields that we have analyzed previously in the table.

Conclusion

The post ends here, thank you for your attention, and if you enjoyed this post and are interested in our activities, follow us on Linkedin and Medium!

Quantyca

Quantyca — Data at Core

Matteo Di Pierro

Written by

Quantyca

Quantyca

Quantyca — Data at Core

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade