Breaking Down Data Silos in OpenSearch: Practical Solutions for Handling High-Volume, Multi-Field Data

A joint article by Idan Liani and Livnat Zeira

OwnBackup
Own Engineering
7 min readMay 24, 2023

--

How to calculate tasks’ duration, when we have ‘start’ and ‘stop’ events in Elasticsearch

by Idan Liani

Elasticsearch is a very powerful tool which allows us to search for specific events occurring in our system. Sometimes we try to understand an occurence by looking for specific events and understanding information embedded in their relationship, such as duration. For example, the duration between when devices start and stop, based on the following data:

{name: 'Device1', type: 'start', 'eventTime': '2013-02-19 12:00:00'}
{name: 'Device2', type: 'start', 'eventTime': '2013-02-19 13:02:00'}
{name: 'Device1', type: 'stop', 'eventTime': '2013-02-19 14:45:00'}
{name: 'Device2', type: 'stop', 'eventTime': '2013-02-19 15:50:00'}

The calculation of the duration is:

{name: 'Device1','start': '2013-02-19 12:00:00', stop:’2013-02-19 14:45:00’, duration_in_minutes: 165}
{name: 'Device2','start': '2013-02-19 13:02:00', stop:’2013-02-19 15:50:00’, duration_in_minutes: 168}

One way to address this is by using Elastic Console.
We find all the relevant events (e.g., stop and start types), then group them by a key (e.g. the device name). Using the aggregation, we can easily find the start and end times for each key, assuming only those 2 times exists for each key. With those times we can calculate duration using the difference between the two.

Below we present the full query in Elastic language to achieve this goal. In the rest of this blog we will analyze the different parts of the query and present its rational.

The full query:

1: GET /<index name>/_search
2: {
3: "size":0,
4: "query": {
5: "bool": {
6: "should": [
7: {
8: "match_phrase": {
9: "type": "start"
10: }
11: },
12: {
13: "match_phrase": {
14: "type": "stop"
15: }
16: }
17: ],
18: "minimum_should_match" : 1,
19: "filter": [
20: {
21: "range": {
22: "@timestamp": {
23: "gte": "now-30d/d",
24: "lte": "now/d"
25: }
26: }
27: }
28: ]
29: }
30: "aggs": {
31: "key": {
32: "terms": {
33: "field": “name”,
34: "size": 50000
35: },
36: "aggs": {
37: "start": {
38: "min": {
39: "field": "@timestamp"
40: }
41: },
42: "end": {
43: "max": {
44: "field": "@timestamp"
45: }
46: },
47: "duration" :{
48: "bucket_script": {
49: "buckets_path": {
50: "start": "start",
51: "end": "end"
52: },
53: "script": "(params.end - params.start) / (1000 * 60)"
54: }
55: },
56: "sort": {
57: "bucket_sort": {
58: "sort": [{"duration": {"order": "desc"}}]
59: }
60: },
61: "filter": {
62: "bucket_selector": {
63: "buckets_path": {
64: "duration": "duration"
65: },
66: "script": "params.duration > 30"
67: }
68: }
69: }
70: }
71: }
72: }

So here’s what we did:

Step 1: Selecting the right index
The index name on line 1 limits our search results to the desired index only.

1: GET /<index name>/_search

Step 2: Return only the aggregation results
By setting the size to zero, we remove the row results and limit our result to the aggregation of those row results. However, during the first phase of writing the query we would like to make sure we return the right records. To test this, it is recommended to increase the value to verify.

3: "size":0

Step 3: Filter the data you want
Then we set the data we want to work with, the start and the stop events of all devices:

4:"query": {
5: "bool": {
6: "should": [
7: {
8: "match_phrase": {
9: "type": "start"
10: }
11: },
12: {
13: "match_phrase": {
14: "type": "stop"
15: }
16: }
17: ],

Later we wanted to verify that one of the matches will be relevant,
Since we use a boolean query at line 5. We basically want the result rows to have only one match with the filters: the ‘start’ OR the ‘end’ conditions, this is why we specify the minimum of one condition should be applied to a given record (line 18):

18: "minimum_should_match" : 1

Step 4: Setting time range for the query (optional):
When we search Kibana we usually set a time range filter to limit our results. In other Kibana tools, such as discover and dashboard, you must select a date range. Here it is not mandatory, but it is recommended to avoid searching through the entire index.
By selecting this filter in the @timestamp field (default) we limit the results to the last 30 days (line 23), or whatever make sense for your query.

19: "filter": [
20: {
21: "range": {
22: "@timestamp": {
23: "gte": "now-30d/d",
24: "lte": "now/d"
25: }
26: }
27: }

Step 5: Building the aggregations
The second part of the query is the aggregation of the results we fetched in the previous steps (lines 4–29).
First we set our aggregation data (e.g the name column) and limit the amount of aggregation buckets to use:

31: "key": {
32: "terms": {
33: "field": "<key column>",
34: "size": <limit>
35: },

Then set the start and end times for each bucket by selecting the minimum and maximum values for each bucket (start event always happen before end events):

37: "start": {
38: "min": {
39: "field": "@timestamp"
40: }
41: },
42: "end": {
43: "max": {
44: "field": "@timestamp"
45: }
46: },

Step 6: Calculating the Event Duration
Now if we want to calculate the duration, subtract the start time from the end time:

47: "duration" :{
48: "bucket_script": {
49: "buckets_path": {
50: "start": "start",
51: "end": "end"
52: },
53: "script": "(params.end - params.start) / (1000 * 60)"
54: }
55: },

Step 7: Sorting the Events
Now, if we’d like, we can sort the events by duration in descending order, or by any other sort option that fits:

56: "sort": {
57: "bucket_sort": {
58: "sort": [{"duration": {"order": "desc"}}]
59: }
60: }

Step 8: Filtering the relevant events
And finally, if we’d like, we can filter the buckets we are interested in, for example, durations longer than 30 min:

61: "filter": {
62: "bucket_selector": {
63: "buckets_path": {
64: "duration": "duration"
65: },
66: "script": "params.duration > 30"
67: }
68: }

Please note running this query on large amounts of data might impact your Elasticsearch performance. Depending on your Elastic configuration, you may be limited in the amount of data processed in a query without impacting others, and Elasticsearch may prevent returning too much data by blocking the query. Though it won’t block you when there is only performance impact. To analyze your index’s usage memory footprint and performance, have a look in https://www.elastic.co/blog/how-to-analyze-and-optimize-the-storage-footprint-of-your-elastic-deployment-disk-usage-api.

When going over the results, make sure you have 2 results for each bucket, to confirm your results are valid.

How to dynamically change indexed fields in AWS OpenSearch

By Livnat Zeira

The following outlines and suggests solutions to the challenge of field limitation in OpenSearch in cases of complex data consisting of many different fields.

OpenSearch is a Lucene-based search engine that started as a fork of Elasticsearch. Data structures are serialized and stored as JSON field/value documents and an inverted index is used for fast searches. More on that here.

The implementation of the search engine is suited for a limited field count and the default limit for indexed fields per index (i.e. a group of documents) is 1000, to ensure optimal index size and reasonable query speed.

This value can be changed, but increasing it may affect performance and cause failures like query timeout and memory issues, especially if you have high load and few resources. So what can you do if your data consists of more than 1000 fields?

The immediate solution is to split the data into multiple indexes, but first you should consider whether you really need all fields to be searchable. You can specify in the index mapping which fields to index and which not, using the ‘index’ parameter.

Another way to support more than 1000 fields is to use the Reindex API. What if you can’t tell in advance which fields need to be searchable? What if you need to dynamically change which fields are indexed?

During AWS Reinvent conference, Ownbackup had a meeting with OpenSearch representatives where we discussed this challenge and have come up with a ‘trick’:

  1. In your index mapping, specify which fields not to index, using the ‘index’ mapping parameter.
  2. When you index a document, don’t filter out any fields, let the entire document be stored under ‘_source’, while only some of the fields are actually indexed for search.
  3. When the need arises to re-organize which fields are indexed for search and which are not, create a new index and in its mapping specify which fields not to index (like in step 1).
  4. Call Elasticsearch re-index API with ‘source’ as your old index, and ‘dest’ as your new index.
  5. Delete the old index and that’s it! Now the new chosen fields will be available for search.

Note that OpenSearch storage is pricey, so if you have fields that are likely not to be searched, it is still recommended to filter them from the document before indexing.

--

--