Backdated Data Updates for Apache Druid
How I got myself into this mess
In the organization I am working at the moment, I was tasked with solving a special use case: Near Realtime Backdated Data Updates to Apache Druid.
All working with Apache Druid know, Apache Druid supports streaming ingestion via Kafka and thus it can deliver near real-time data updates on newly ingested data. That’s nice. On the batch update side, using the new MSQE (Multi stage query engine) imply has developed, the ingestion times have rapidly improved vs the previous ingestion procedure. Great!
But what happens when you need to update Backdated Data? What are the ingestion times on that front?
The Problem of Data Updates
Let me say this upfront: Apache Druid does not support updates to specific data points, or upserts as it is more widely known.
I would have said rows, but Druid is not a row-based DB (OLTP) but rather a columnar-based DB (OLAP). By now you might already have caught on to the problem of why Druid has difficulty doing that. For those who did not catch on yet, here it is: since the data is stored in a columnar format, updating data is a much harder task. This, by the way, is not unique to Druid, those in the Parquet community know what this feels like.
So how can this be overcome? Well, Apache Druid has multiple ways of handling this problem. I will try to take you with me on the journey I took to solve this problem in what seems to be the best solution to solve my clients’ needs.
Imply is suggesting 3 possible solutions for this problem in a great article: “Upserts and Data Deduplication with Druid” by Jad Naous. Based on this I started with testing the option of re-ingesting the data.
Re-Ingest & Latency
So we understand that in order to update backdated data, we have to re-ingest the old data with the new data. That’s fairly simple, right? Well, it might not be so simple, but it’s definitely manageable. A couple of test runs, and Bob’s your uncle!
Are you waiting to hear about the problem? Here it is: In order to have low latency query response times, the data ingested is partitioned monthly. The reason for monthly partitions is caused by the way the data is used and queried. The data is always read in its entirety for specific accounts, spanning multiple years back. In order to stay in the sub-second latency response time category, we need big-time partitions!
The data size is rather large for a month, with about 600M rows. And ingesting a month’s worth of data takes about 35–40 minutes. Quite a stretch from the ‘near real-time” approach…
Ok. We need something else.
Re-Ingest using hybrid partition granularity
This solution involves keeping the data in multiple different granularities for different time periods of the data. This approach will reduce backdated ingestion times drastically. A day’s worth of data is ingested in 3 minutes. And if we can conclude that the backdated data will only span to at the most 14 days, we can manage to re-ingest the backdated data into druid within 3 minutes, which is a huge improvement.
We will even take a precaution, and keep data prior to 14 weeks in weekly partitions. The ingestion times will not be 3 minutes, but it will be much less than 40 minutes as well. This starts to look like a real solution!
So we would keep:
- Last 14 days: Daily granularity
- Past 1 — 2 Months: Weekly granularity
- Prior data: Monthly Granularity
Before | 4 weeks | 7 days |
---------------------------------------|----------------------|-----------|
Monthly | Weekly | Daily |
Daily Data
Each day new data is being added. The data size is about ~20M rows. All of this data will be ingested as daily data.
Weekly
Each night a job is being run, consolidating the daily data into weekly granularity data. This works in a sliding window fashion, as follows:
Day 1: No weekly ingest is being run
Day 2: A weekly ingestion is being run, including the first 2 days of that week
Day 3: A weekly ingestion is being run, including the first 3days of that week
Day 4: …
You get the picture. After 7 days we have a full week ingested into the weekly granularity
Monthly
Just as we reingested the daily data each day until we had a full week, we will do the same with the weekly data ingesting it into monthly data. This just will run once a week, whenever a full week has been reingested of the daily data.
Not Quite There
While this approach is a viable and much-used pattern for many companies using Apache Druid, it did not quite fit us for the following reasons:
- An update to a single day’s data can incur a change in all the subsequent days’ data.
- The data can be backdated even half a year(huh?!)
- 3 minutes is still not good enough
I get it. This pure re-ingestion approach is not going to cut it. I think i have to retrace my steps, back out from the corner I put myself into, and start a new approach. Or do I?
Maybe the approach is not a single path, but multiple paths, when combined lead me to the garden of Eden. Well, at least to Valhalla!
Hybrid Append Reingest Approach
The other options for handling data updates are by appending the data, and not re-ingesting the data when received. That is an interesting approach, which seems definitely faster than re-ingesting ~20M rows. The update data batch is about 20K rows, and just appending the data to the data source will be done in no time, especially when using the new MSQE engine.
This approach will create more segment files as the new data will be written to new segment files. This means, that for each query we might read more than 1 segment file per time partition as the data is now spread over multiple segment files. This can quickly lead to query latency degradation if not handled properly.
After the data has been ingested and is available, we have to run reingestion processes in the background, to “recalibrate” the data. Please note, I am not talking about the Apache Druid rollup functionality, which will just add the new rows to the previous existing rows in a new segment file. I am talking about a manual re-ingest, which will rewrite the segment files completely and recalculate the data so that each timestamp and account will only have 1 entry per time partition.
LATEST vs Aggregations
Using this approach, there are two options: receiving and ingesting the data as “whole” values or as “delta” values. Either way, for a single timestamp and account, there would be more than 1 data point, and we need to decide whether we want to aggregate the data, or always extract just the latest one.
There are two ways to handle the issues raised before until the data has been reingested or compacted:
- LATEST
- Aggregation query
Latest
We would ingest the values as “whole” data points and each query now has to run using the LATEST keyword. In addition, we would need to add another column, an “update-date” column, which the LATEST keyword function would use to determine which data is the latest data.
Internally, Apache Druid duplicates all the queries and performs them twice, in order to save time, maximize parallelism and return the correct result with the lowest latency possible.
I favor this approach less.
Aggregation
The data would have to be ingested as “delta” values, meaning, an addition (or subtraction) of the old value.
And that’s it. KISS!
Pulling it all together
After a lot of contemplation and deliberation, many PoCs, and hours invested in research we went by the “Hybrid Append Reingest Approach”, using delta values and aggregational queries.
Tying this whole story together is Airflow, using multiple dags as follows:
- Daily New Data update dag, running every morning,
- Weekly Data re-ingest dag, triggered by the previous dag
- Monthly Data re-ingest dag, triggered by the previous dag
- Delta Update Dag, running every 2 hours
- Recalibration Dag, running daily, in the morning
Since there are daily, weekly, and monthly data re-ingest tasks running, most of the recalibration is being done anyway on a daily basis. But, once data has been ingested for a monthly partition, the daily running dags are not going to re-ingest it. For this data, it needs a special dag, a recalibration dag.
What the recalibration dag does is figure out, whether there is a segment file that has a very small amount of rows, far below the average, and in such a case, it will trigger a monthly re-ingestion task for this partition.
Conclusion
It’s fair to say, using an OLAP database has its perks, but it will really make you work for it. It’s not for the faint of heart, but hey, it makes my job so much more interesting!