Logstash — Denormalize documents (Part 3)

Ingrid Jardillier
4 min readMay 2, 2024

--

This is a three part article. You can find other parts of this article:
*
Part 1 : highlights the need of denormalization
*
Part 2 : exposes the problematic of not using denormalization
*
Part 3 : shows how to implement denormalization

As we saw in the previous part, one of the solution to improve exploitation of documents with arrays is to use Logstash to denormalize documents. In this article, we will implement this denormalization for the simple example provided in the first part.

Principle

We spoke a lot about denormalization but what does it mean in our case?

As we saw in previous articles, the default ingestion of our JSON objects result in prize’s fieds as arrays.

prizes-original index

Denormalization process will clone existing documents with multiple prizes and flat the prize’s fields. So, for the document with id 2, it will create 2 documents, one for 1903 physics prize and one for 1911 chemistry one.

The result will be the following one:

prizes-denormalized index

Implementation

To implement our denormalization, we just have to change our logstash configuration to add a ruby filter, which will process the denormalization.

And, as we want to keep the two types of documents (original and denormalized), we will set the index name in the @metadata object and use it in the elasticsearch output. And we’ll use the keep_original_event boolean parameter to indicate if we want to keep the original document or not.

input {
file {
id => "prizes"
path => "/usr/share/logstash/pipeline/file/prizes.json"
mode => "read"
codec => "json"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
json {
source => message
remove_field => message
}
mutate {
remove_field => ["@timestamp", "@version", "event", "host", "log"]
}
}

filter {
ruby {
id => "denormalized-by-prizes"
path => "/usr/share/logstash/pipeline/file/denormalized_by_prizes.rb"
script_params => {
"keep_original_event" => true
}
}
mutate {
remove_field => ["@timestamp", "@version", "event", "host", "log"]

}
}

output {
stdout {
codec => rubydebug { metadata => true }
}
}

output {
elasticsearch {
index => "%{[@metadata][_index]}"
hosts => ["https://es01:9200","https://es02:9200","https://es03:9200"]
ssl_certificate_authorities => ["/usr/share/logstash/certs/ca/ca.crt"]
user => "elastic"
password => "${ELASTIC_PASSWORD}"
}
}

The code of the ruby plugin is then:

# The value of `params` is the value of the hash passed to `script_params` 
# in the logstash configuration.
def register(params)
@keep_original_event = params["keep_original_event"]
end

# The filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array.
# Creating new ones only requires you to add a new instance of LogStash::Event to the returned array.
def filter(event)

items = Array.new

# Keep original event if asked
logger.debug('keep_original_event is :' + @keep_original_event.to_s)

if @keep_original_event.to_s == 'true'
event.set('[@metadata][_index]', 'prizes-original');
items.push event
end

# Get prizes items (to denormalize)
prizes = event.get("prize");
if prizes.nil?
logger.warn("No prizes for event " + event.to_s)
return items
end

# Create a clone base event
eventBase = event.clone();
eventBase.set('[@metadata][_index]', 'prizes-denormalized');
eventBase.remove("prize");

# Create one event by prize item with needed modification
prizes.each { |prize|
eventPrize = eventBase.clone();

# Copy each prize item value to prize object
prize.each { |key,value|
eventPrize.set("[prize][" + key + "]", value)
}

items.push eventPrize
}

return items
end

In this filter, the principle is the following:

  • we create an items array that will contain all documents that we want to have in the output (the original one if the keep_original_event is set to true and the denormalized ones).
  • we keep in memory the prizes object of the current event.
  • We create a clone base event. This step is optional if events are lights (all can be done in the each loop), but can be better for heavy events for performance considerations).
  • We loop on prize object, clones the base event and set all the prize’s field in a prize object. We then push the cloned event.

Querying on this field

Now, when we add a KQL filter, as seen in the previous part, but this time on the prizes-denormalized index:

prize.year : 1903 and prize.category : "chemistry" 

This doesn’t return any result, as expected!

We have to use a relevant filter to obtain results, for example:

prize.year : 1903 and prize.category : "physics" 

will return:

Warning: Be advised that cloning events can be an expensive process. You will have to add performance tests to check that event process duration are conforms to your needs.

In future article, we will show how to improve our code for readability and how to test our filter.

--

--