Pandas, Elasticsearch and Aggregations

M Castelino
Kubehells
Published in
2 min readJan 31, 2021

Pandas is good at handling large volumes of data. However there is always an upper limit. What if you are looking for summary statistics for an extremely large data set and your narrowest window of analysis is known.

Do you want to look at all the data? Probably not.

Narrowest window here means that you do not need to break down the data below a specific granularity. You may not care the exact time something happened, but you care if it happened on a particular month, day, hour or minute.

In that case you can use Pandas to build aggregations and visualizations for the higher level analysis, while leveraging the aggregation capabilities of an existing data store. Using Elasticsearch as an example to show how this can be done.

Elasticsearch is very good at performing aggregations over extremely large index patterns (billions of documents). Aggregations in elasticsearch complete in seconds. If all of the raw data was extracted into Pandas, not only would the dataframe get too large; the time to ingest the data would be significant. Also aggregations can serve as a form of data cleanup, given the right search parameters.

Elasticsearch Aggregation

Using the example below, we are aggregating a search looking for the occurrence of a specific pattern and then aggregating it over several categories.

Note the use of the date_histogram and its granularity. Here we are reducing the narrowest window of observability to a day. This can be adjusted based on the need to be higher or lower.

from elasticsearch_dsl import Search, Qdef Q1(type, key, val):
return Q(type, **{key: val})
def query_aggregation(start_time, end_time, buckets):
q = Q1('match_phrase', 'log', 'foo saw bar')
t = Q1('range', '@timestamp', {"gte": start_time, "lte": end_time})
s = Search(using=ES, index=index).query(q).filter(t)
s = s[:0]
s.aggs.bucket('domains', 'terms', field='domain.keyword', size=buckets)\
.bucket('zones', 'terms', field='zone.keyword', size=buckets)\
.bucket('regions', 'terms', field='region.keyword', size=buckets)\
.bucket('days', 'date_histogram', field='@timestamp', calendar_interval='1d')
s = s.execute()
return s.aggregations.domains.buckets

Pandas Dataframe

You can then easily run the query over a large time window and create a dataframe. The data frame can then be used to perform second level aggregations or to create derivative dataframes or columns.

import numpy as np
import pandas as pd
data = []for domain in query_aggregation(now-30d, now, 1000):
for zone in domain.zones.buckets:
for region in zone.regions.buckets:
for day in region.days.buckets:
data.append([domain.key, zone.key, region.key, day.key, day.doc_count])

df = pd.DataFrame(data, columns = ['Domain', 'Zone', 'Region', 'Day', 'Count'])

This also allows you to perform rich visualizations not possible in Kibana or Grafana with minimal effort. Given that aggregations are relatively cheap in Elasticsearch, this allows you to build real-time responsive dashboards and analyze trends.

--

--