Itai Yaffe
Sep 15 · 7 min read

By: Dana Assa and Itai Yaffe

At Nielsen Marketing Cloud, we use Apache Druid as our core real-time analytics tool to help our clients monitor, test and improve their audience targeting capabilities. With Druid, we provide our clients with in-depth consumer insights leveraging world-class Nielsen audience data.
If you’re not familiar with Druid, we encourage you to first read some of the online documentation, e.g
What is Druid?, and also check out our DataWorks Summit Barcelona 2019 presentation about how we actually use Druid.

Photo by Kim Gorga on Unsplash

If you’re already using Druid, at some point you have encountered or will encounter the available options for setting a TTL on the data you store.
In this blog post, we will cover the basic options for setting a TTL within Druid (based on the timestamp of the data) and we will also describe a rather unique, advanced option of setting a dimension-based TTL.

Example of data stored in Druid

First, let’s look at a basic example of data stored in Druid:
Since Druid is a time-series data store, we normally store rows by event date (both segmentGranularity and queryGranularity within the ingestion spec are set to “DAY”). Each row represents a group of events that happened on a specific day.
That means we can query the data source using a specific date range, and Druid will perform the required aggregation.

For example, let’s look at the data collected about impressions of “Bebsi” and “Boba Cola” campaigns:

Querying the above data source for the total number of impressions for campaign_name=Bebsi between August 1st and August 2nd will return the number “298”.
Why? Because we perform a sum aggregation on the rows. In this simple example, that means 200+98.
Similarly, querying the above data source for the total number of impressions for campaign_name=Boba Cola between August 1st and August 2nd will return the number “670”.

Drop Rules and Load Rules

We mentioned Druid is a time-series data store, and each segment is assigned a time interval upon creation.
Say we need to store 30 days of historic data. Druid allows us to define a set of Load Rules and Drop Rules which will result in a 30-days retention period.

A Load Rule defines how many replicas of each segment should be loaded into the historical nodes.
There are 3 types of Load Rules:

  1. Forever Load Rule — all segments should always be loaded, regardless of their time interval (the default rule).
  2. Interval Load Rule — only segments with a time interval that is contained within the specified interval should be loaded. In the above example, specifying an interval of 2019–08–02/2019–09–02 means that the segment of 2019–08–01 won’t match this rule.
  3. Period Load Rule — only segments with a time interval that is contained within the specified period should be loaded. In the above example, specifying a period of P7D means that only segments with a time interval within the last 7 days should be loaded.

A Drop Rule indicates when segments should be dropped entirely from the cluster.
Similarly to Load Rules, there are 3 types of Drop Rules (actually, there’s a 4th type of Drop Rule called Period Drop Before Rule, which we won’t discuss in this post).

Rules are evaluated in order and hence the ordering of rules is important. There’s an automatic process that evaluates the segments against the rules and matches each segment with the first rule that applies. Each segment may only match a single rule.

Note that Drop Rules do not remove any data from the deep storage or the metadata store, they only drop the matched segments from the cluster itself and mark them as “disabled” or “unused” (i.e used=0) in the metadata store.
In order to actually delete data, we need to use Kill Tasks.

Kill Tasks and the art of deep storage maintenance

As mentioned, a Kill Task deletes all information about a segment and removes it from the deep storage. The Kill Task will only operate on segments that are marked as “unused” (i.e used=0).
After the Kill Task has operated on a segment, all of its content and metadata are gone (for good…).
However, as opposed to Load Rules and Drop Rules which are applied automatically, you need to execute Kill Tasks yourself.

Going back to the 30-days retention period example above, all segments older than 30 days will be automatically dropped from the cluster (according to the Drop Rule), and we’ll need to run a Kill Task in order to completely delete them from the deep storage and the metadata store.

That’s pretty easy, right? Well, not quite… Because every time you update existing data, Druid creates a new version of the segment holding that data. The previous version will be marked as “unused”, and the new version will be marked as “used” (i.e used=1).
The old versions are retained in the deep storage, increasing the amount of used storage.
This can become significant when there are multiple updates per segment over time.
For example, using Delta Ingestion to ingest data every day for the last 30 days, will result in up to 30 versions of the same segment, meaning up to 30X the amount of used storage, while only the latest version is actually being used when querying the data.

For one of our data sources that has a 1-year retention period, this up-to-30-versions-per-segment resulted in ~430TB of data in our deep storage (which, in our case, is an S3 bucket).
To mitigate that, the interval specified in your Kill Task should be as wide as possible.
While the example in the documentation includes a very scary statement <all_segments_in_this_interval_will_die!>, the fact is not all segments in the specified interval will be deleted, but rather just those segments that are marked as “unused”.
Applying that methodology on the data source I mentioned above, reduced the amount of used storage from ~365TB to ~15TB and our S3 costs from ~$8.3K/month to ~$350/month — a factor of 24X! So we saved almost $8K/month, and that’s just for one data source!

Dimension-based TTL

Lately, we found ourselves in a need of retaining rows in one of our Druid data sources, for different retention periods, based on the content of the rows.
However, Druid only allows you to define a retention period on a segment-basis (as described above).
That means Druid can delete entire segments that are older than, say, 1 year — but can’t delete only specific rows within these segments.
After some digging in Druid’s documentation, we noticed there’s an interesting option, somewhat buried within the Update Existing Data page, that allows you to specify a filter within the ingestion spec of your indexing task.

Going back to the example data we store in Druid, let’s say we want to retain all rows with “campaign_name=Boba Cola” for 1 day, and all rows with “campaign_name=Bebsi” forever.
We use Delta Ingestion in order to update data that already exists in the data source and to add new data (in this example — from an S3 bucket).
Inside the dataSource inputSpec, we provide an ingestionSpec with:

  1. An interval of 30 days (between August 2nd and September 2nd 2019).
  2. A filter on the campaign_name dimension, keeping only rows with “campaign_name=Bebsi

A snippet from our Hadoop-based Batch Ingestion task:

"ioConfig" : {
"type": "hadoop",
"inputSpec": {
"type": "multi",
"children": [
{
"type": "dataSource",
"ingestionSpec": {
"intervals": ["2019-08-02/2019-09-02"],
"dataSource": "myDataSource",
"granularity": "day",
"ignoreWhenNoSegments": "true",
"filter": {
"type": "selector",
"dimension": "campaign_name",
"value": "Bebsi"
}
}
},
{
"paths": "s3://muBucket/myFolder/date=2019-09-02",
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat"
}
]
}
}

A snippet from the data in s3://muBucket/myFolder/date=2019–09–02:

2019–09–01, Bebsi, 100
2019–09–02, Bebsi, 45
2019–09–02, Boba Cola, 120

The data in our data source, after running this ingestion task, is as follows:

What has changed?

  1. We no longer have entries for “Boba Cola” prior to September 2nd.
  2. There are 400 impressions for “Bebsi” on September 1st (rather than the 300 we had before this ingestion task).
  3. We have new rows for both campaigns on September 2nd.

Essentially, we were able to use this kind of filtering to apply different retention periods for different rows, based on their content.

Summary

In this post, we explained how to set-up a TTL for the data stored in Druid, based the data’s timestamp, using Load Rules and Drop Rules.
We described the way to completely delete the data from the deep storage, as well as how to avoid excessive storage usage and costs, using Kill Tasks.
And last but not least, we demonstrated a rather unique option of setting a TTL based on the content of your data.

We hope you found this post interesting and helpful, and we look forward to your feedback!

If you want to meet us in-person and hear us talking about Druid, join us at Big Data LDN this November and attend our session Counting unique users in real-time: here’s a challenge for you! (register for free at https://bigdataldn.com/register/).

P.S — we’re hiring ;)

NMC-TechBlog

A publication by the Nielsen Marketing Cloud Engineering team, where we talk about what we do and how we do things

Itai Yaffe

Written by

Tech Lead, Big Data Group at the Nielsen Marketing Cloud. I have been dealing with Big Data challenges since 2012, using tools like Spark, Druid and Kafka

NMC-TechBlog

A publication by the Nielsen Marketing Cloud Engineering team, where we talk about what we do and how we do things

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade