Achieving a 6x improvement in Data Ingestion in Imply

Yugen.ai
Yugen.ai Technology Blog
6 min readApr 3, 2024

Running the (experimental) Multi-Stage Query Ingestion in Production

By Upendra Jha and Upendra Singh Karmiyal

Introduction

At Yugen.ai one of our teams has been building a reporting product for one of our customers on Imply (enterprise version of Apache Druid) for near real time data analytics. This dashboard empowers over 300 stakeholders to make important business decisions.

At high level the pipeline looks like the following -

  1. Event Unification — Loads raw events from S3, cleans and transforms them and adds metadata information
  2. Data Consolidation — Handle late arrivals and map data hourly to be ingested into Imply
  3. Data Ingestion — Ingest the mapped data from S3 into Imply
  4. Reindex/Compaction — Reindexing/compaction of data on hour and day level granularity

Some more details about the pipeline and its usage -

  • Unification and consolidation are Spark applications which run on an EMR cluster.
  • Workflows and their orchestration are handled by Airflow
  • Week old data is reindexed on hour level granularity and a month old data is reindexed on day level granularity.

Up until a few of months back, we processed ~600k events across 6 event types every 15 mins. Our SLA is to execute the entire pipeline within 10 mins.

One of goals this month was to introduce an additional event type — this would lead to the data volume increasing to ~5M every 15 mins. This blog demonstrates certain optimisation steps we took to maintain the SLA and some learnings we had along the way.

Defining the Problem

Everything else remaining pretty much the same, our execution time would jump to over 30 mins to process 5M events. Blindly throwing more compute instances to decrease time was not a responsible solution either.

Framing the problem

Now, before we deep dive, let’s understand how Imply/Apache Druid works behind the scenes.

What is Imply

Imply is an OLAP data storage and analytics system designed for the high-performance processing of massive datasets. Imply offers low-latency data ingestion, flexible data exploration and analysis, high-performance aggregation, and easy horizontal scaling. It can process data at a high scale and provides pre-aggregation capabilities. Common application areas for Imply include:

  • Clickstream analytics (web and mobile analytics)
  • Risk/fraud analysis
  • Network telemetry analytics (network performance monitoring)
  • Server metrics storage
  • Supply chain analytics (manufacturing metrics)
  • Application performance metrics
  • Business intelligence / OLAP

Imply uses inverted indexes and bitmap indexes to optimise query performance.

How Bitmap indexing helps improve performance

Bitmap indexing utilises bitmaps, where each bit represents the presence or absence of a particular attribute value across the dataset. This technique is particularly useful for columns with low cardinality, where there are few distinct values. By encoding the presence of values as bits, bitmap indexing allows for efficient operations like intersection, union, and negation making it well-suited for data warehousing and OLAP (Online Analytical Processing) scenarios. Below is a visualisation illustrating how bitmap indexing works.

How Bitmap Indexing works (Source — Alibaba Cloud)

Druid stores dimension columns with dictionary encoding and inverted indexes (aka bitmap indexes) and they are type-aware compressed, which both minimises how much CPU is needed and reduces the amount of data storage.

Bitmap Indexing in Imply (Source — Imply Dev blog)

The Solution

At the time of writing this blog, for batch data there are three different ways of data ingestion in Imply.

  1. Native batch
  2. SQL
  3. Hadoop-based

Hadoop-based ingestion has an external dependency on a Hadoop cluster, which didn’t fit well in our case. Therefore (1) and (2) were the options we worked with.

Native batch ingestion

This was the first ingestion technique that Imply started with. Native ingestion requires ingestion spec file which mainly comprises Data Schema, IO Config and Tuning Configs in JSON format.

  • For production use-cases, the parallel task type index_parallel is used — this is a task for multi-threaded batch indexing.
  • The index_parallel task is a supervisor task that orchestrates the whole indexing process. The supervisor task splits the input data and creates worker tasks to process the individual portions of data.
  • Druid/Imply issues the worker tasks to the Overlord. The Overlord schedules and runs the workers on MiddleManagers or Indexers. After a worker task successfully processes the assigned input portion, it reports the resulting segment list to the Supervisor task.
  • The Supervisor task periodically checks the status of worker tasks. If a task fails, the Supervisor retries the task until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.

SQL Ingestion — MSQ (Multi Stage Query)

MSQ data ingestion executes SQL statements via the/druid/v2/sql/task API. MSQ has the following components -

  • Controller: An indexing(often used as another term for ingestion in Imply) service task of type query_controller that manages the execution of a query. There is one controller task per query.
  • Worker: Executes the query, with multiple worker tasks per query, operating in parallel.
  • Stage: Represents a parallelised phase of query execution across worker tasks.
  • Partition: A slice of data output by worker tasks. In INSERT or REPLACE queries, the partitions of the final stage become Druid segments.
  • Shuffle: Involves the exchange of data between worker tasks on a per-partition basis, where each output partition is sorted by a clustering key.
MSQ in Imply (Source — Imply official documentation)

Execution Process

  • The Broker receives and plans the SQL query into a native query.
  • The Broker encapsulates the native query into a task of type query_controller.
  • The Broker submits the task to the indexing service and returns a task ID to the user.
  • The Controller task initiates multiple Worker tasks based on configuration parameters like maxNumTasks and taskAssignment.
  • Worker tasks execute the query, operating in parallel.
  • For SELECT queries, Worker tasks send results back to the Controller, which then writes them into its task report.
  • For INSERT or REPLACE queries, Worker tasks generate and publish new Druid segments to the provided data source.
Controller & Worker in MSQ

Why is MSQ faster than Native Ingestion

Native batch ingestion relies on scatter-gather approach while MSQ relies on shuffle-mesh approach. Here is a comparison of the differences between the two ingestion techniques which explains why MSQ is faster than the Native ingestion -

MSQ vs Native Ingestion

When we switched from Native Ingestion to MSQ, our ingestion time reduced to ~5 mins, which was a drastic improvement from the 30 mins before.

YAY! From 30 mins to 5 mins :D

While it was relatively straightforward to implement MSQ, gaining a deeper understanding of MSQ components, getting familiar with their design was of invaluable experience. These are moments where as Engineers we get to dig deep into the systems we work with — the final results are much more gratifying after that. Also, holding ourselves to a standard where we care deeply about cost optimisation pushes us to design better systems.

For more stories on the work we do in ML, AI and Engineering, follow our blog https://medium.com/yugen-ai-technology-blog

--

--