Elastic Search: Composite aggregation query and retrieve bucket data

Mohd Ahshan Danish
3 min readMar 2, 2023

--

ELK Stack

made by myself.

Hi Devs👋,

Elastic Search provides near real-time search, so it is always better to use ES. Here, I am going to discuss how to prepare a composite aggregation and retrieve data using recursion.

Please refer to my previous post below on Query DSL.

The composite buckets are built from the combinations of the values extracted or created for each document, and each combination is considered a composite bucket.

body ={
"query": {
"bool": {
"must": [
{
"terms": {
"field1": [
"ad1f2570-d009-42ca-8cc8-8f977dcc1211",
]
}
},
{
"range": {
"created_at": {
"gte": "2001-01-01",
"lt": "2021-01-01"
}
}
}
]
}
},
"aggs": {
"aggs": {
"composite": {
"sources": [
{
"field1_aggs": {
"terms": {
"field": "field1"
}
}
},
{
"field2_aggs": {
"terms": {
"field": "field2"
}
}
}
],
"size": 1000
}
}
}
}

The above query has composite aggregation for fields 1 and 2. It has created_at range and field 1 filters. It will return doc_count for all possible combinations of field1 and field2.

# execute above query 
res= client.search(index='study', body=body

Looping data is not straight forward for composite aggregation. Here, you can see “size”: 1000. So it will return the first page, and then you need to again invoke for the next onward page, as below.

body ={
"query": {
"bool": {
"must": [
{
"terms": {
"field1": [
"ad1f2570-d009-42ca-8cc8-8f977dcc1211",
]
}
},
{
"range": {
"created_at": {
"gte": "2001-01-01",
"lt": "2021-01-01"
}
}
}
]
}
},
"aggs": {
"aggs": {
"composite": {
"sources": [
{
"field1_aggs": {
"terms": {
"field": "field1"
}
}
},
{
"field2_aggs": {
"terms": {
"field": "field2"
}
}
}
],
"size": 1000,
'after': {'field1_aggs': 'ABC','field2_aggs': 'XYZ'}
}
}
}
}
# execute above query for next page
res= client.search(index='study', body=body

You need to add below in next query for next page.

'after': {'field1_aggs': 'ABC','field2_aggs': 'XYZ'}

Note: ‘after_key’ will be part of the response, and you have to use it for the next page.

Using elasticsearch_dsl

Now we can implement above using Python ElasticSearch_dsl and get all data in List.

def _composit_serach(
field1
field2,
from_date,
to_date,
**kwargs,
):
"""
It will return the number of documents for each field1 and field2
combination.
composite aggs : [field1, field2]
Index: index1
"""
assert from_date, "from_date query cannot be None"
assert to_date, "to_date cannot be None"


field1_Q = Q()
if field1:
field1_Q = Q("terms", field1=[field1])

field2_Q = Q()
if field2:
field2_Q = Q("terms", field2=[field2])

timeframed = Q(
"range",
**{"created_at": {"gte": from_date, "lt": to_date}},
)

res = []

def _search(res=res, **kwargs):
s = Search(using=client, index=index1)
s = s.query(
field1_Q
& field2_Q
& timeframed
)

field1_aggs = A("terms", field="field1")
field2_aggs = A("terms", field="field2")
aggs = [{"field1_aggs": field1_aggs}, {"field2_aggs": field2_aggs}]

s.aggs.bucket("aggs", "composite", sources=aggs, size=1000, **kwargs)
_logger.info(f"Search query: {s.to_dict()}")

response = s.execute()
for tag in response.aggregations.aggs.buckets:

res.append(tag["key"])

if "after_key" in response.aggregations.aggs:
after = response.aggregations.aggs.after_key
_logger.info(f"after: {after}")
_search(res=res, after=after) #recursion

_search(res=res)

return {"data": res}

Above code will form the same query using Python’s elasticsearch_dsl, and I have used recusrsion to get data from all pages.

Setup and Installation:🏗

Please refer to my previous post below on the installation of ES and Kibana using Docker.

In the next article, we are going to explore ES DSL Query more.😁.

So Stay Tuned, I will see you in the next one.😄

Have fun!

You can connect me on linked-in.

If you have any other additions to this, you can leave a comment below, and if you like this post, please hit clap. 👏and consider following me on Medium.

--

--