Apache Druid — The sine qua non of contemporary Big-Data analytics

Ronik Basak
The Startup
Published in
14 min readMar 24, 2020

The term “big data” refers to the digital stores of data that have a high volume, velocity and variety. Big data analytics is the process of using software to uncover trends, patterns, correlations or other useful insights in those large stores of data. Apache Druid is substantially capable to prove itself as one of the backbone to build a platform for Big-Data Analytics.

According to Wikipedia, Druid is a column-oriented, open-source, distributed data store written in Java. Druid is designed to quickly ingest massive quantities of event data, and provide low-latency queries on top of the data.

Druid’s core design combines ideas from data warehouses, time-series databases, and search systems to create a unified system for real-time analytics for a broad range of use cases.

Druid merges key characteristics of each of the 3 systems into its ingestion layer, storage format, querying layer, and core architecture.

Key Features of Druid:

Column-Oriented Storage:

Druid stores and compresses each column individually, and only needs to read the ones needed for a particular query, which supports fast scans, rankings, and group-By.

Native search indexes:

One important aspect regarding Druid’s index implementation is, it uses inverted index. An inverted index is an index data structure storing a mapping from content, such as words or numbers, to its locations in a document or a set of documents. The purpose of an inverted index is to allow fast full-text searches, at a cost of increased processing when a document is added to the database.

There are two types of inverted indexes: A record-level inverted index contains a list of references to documents for each word. A word-level inverted index additionally contains the positions of each word within a document.

Druid creates inverted indexes for string values for faster search and filter operations.

Streaming and batch ingest:

Druid allows its users to utilise the working principles of Lambda Architecture. Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream-processing methods.

Druid provides connectors to Apache Kafka for real-time data ingestion through streams and HDFS, AWS S3 for batch ingestion. It also supports other streaming and batch pipeline for data ingestion.

Time-optimised partitioning:

Similar to time-series databases, Druid intelligently partitions data by time to enable fast time-oriented queries.

Unlike many traditional systems, Druid can optionally pre-aggregate data as it is ingested. This pre-aggregation step is known as rollup, and can lead to dramatic storage savings. Roll-up is a first-level aggregation operation over a selected set of columns that reduces the size of stored data. During roll-up activity, the input rows are grouped by the timestamp and dimension columns with sum aggregations on the metric columns.

When rollup is enabled, then any rows that have identical dimensions and timestamp to each other (after queryGranularity — based truncation) can be collapsed, or rolled up, into a single row in Druid.

Flexible Columns and SQL Support:

Druid gracefully handles evolving schemas and nested data. In the legacy dataSchema, the flattenSpec is located in dataSchemaparserparseSpecflattenSpec and is responsible for bridging the gap between potentially nested input data (such as JSON, Avro, etc) and Druid's flat data model.

In addition to its native JSON based language, Druid speaks SQL over either HTTP or JDBC.

Druid Components and Working Principles:

Druid has a multi-process, distributed architecture that is designed to be cloud-friendly and easy to operate. Each Druid process type can be configured and scaled independently, giving you maximum flexibility over your cluster. This design also provides enhanced fault tolerance: an outage of one component will not immediately affect other components.

Druid has several process types, briefly described below:

Coordinator :

Responsible to manage data availability on the cluster. The Druid Coordinator process is primarily responsible for segment management and distribution. More specifically, the Druid Coordinator process communicates to Historical processes to load or drop segments based on configurations.

The Druid Coordinator is responsible for loading new segments, dropping outdated segments, managing segment replication, and balancing segment load and Segment Compaction.

The Druid Coordinator maintains a connection to the Zookeeper cluster for current cluster information.

The Coordinator also maintains a connection to a database containing information about available segments and rules. Available segments are stored in a segment table and list all segments that should be loaded in the cluster. Rules are stored in a rule table and indicate how segments should be handled.

Segment Compaction:

During each run, the Druid Coordinator compacts data-segments by merging small segments or splitting a large one.

This is useful when segments are not optimised in terms of segment size which may degrade query performance.

The Coordinator first finds the segments to compact based on the segment search policy. Once some segments are found, it issues a compaction task to compact those segments.

Compaction tasks might fail due to the following reasons.

  • If the input segments of a compaction task are removed or overshadowed before it starts, that compaction task fails immediately.
  • If a task of a higher priority acquires a time chunk lock for an interval overlapping with the interval of a compaction task, the compaction task fails.

Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run.

Overlord:

Responsible to control the assignment of data ingestion workloads. The Overlord process is responsible for accepting tasks, coordinating task distribution, creating locks around tasks, and returning statuses to callers.

Overlord can be configured to run in one of two modes — local or remote (local being default).

In local mode Overlord is also responsible for creating Peons for executing tasks.

When running the Overlord in local mode, all MiddleManager and Peon configurations must be provided as well. Local mode is typically used for simple workflows.

In remote mode, the Overlord and MiddleManager are run in separate processes and can run individually on a different server. This mode is recommended if we intend to use the indexing service as the single endpoint for all Druid indexing.

MiddleManager:

The MiddleManager process is a worker process that executes submitted tasks. Middle Managers forward tasks to Peons that run in separate JVMs. The reason we have separate JVMs for tasks is for resource and log isolation.

Each Peon is capable of running only one task at a time, however, a MiddleManager may have multiple Peons.

Indexing Service:

The Apache Druid indexing service is a highly-available, distributed service that runs indexing related tasks.

Indexing tasks create (and sometimes destroy) Druid segments. The indexing service has a master/slave like architecture.

The indexing service is composed of three main components:

  • a Peon component that can run a single task.
  • a Middle Manager component that manages Peons.
  • an Overlord component that manages task distribution to MiddleManagers. Overlords and MiddleManagers may run on the same process or across multiple processes while MiddleManagers and Peons always run on the same process.

Broker:

Responsible to handle druid queries from external clients. The Broker is the process to route queries when we run Druid on a distributed cluster. It understands the metadata published to ZooKeeper about what segments exist on what processes and routes queries such that they hit the right processes. This process also merges the result sets from all of the individual processes together.

Query Forwarding:

Druid broker determines which processes to forward queries, the Broker process first builds a view of the world from information in Zookeeper. Zookeeper maintains information about Historical and streaming ingestion Peon processes and the segments they are serving.

For every datasource in Zookeeper, the Broker process builds a timeline of segments and the processes that serve them. When queries are received for a specific datasource and interval, the Broker process performs a lookup into the timeline associated with the query datasource for the query interval and retrieves the processes that contain data for the query. The Broker process then forwards down the query to the selected processes.

Caching

Broker processes employ a cache with an LRU cache invalidation strategy. The Broker cache stores per-segment results. The cache can be local to each Broker process or shared across multiple processes using an external distributed cache such as memcached.

Each time a broker process receives a query, it first maps the query to a set of segments. A subset of these segment results may already exist in the cache and the results can be directly pulled from the cache. For any segment results that do not exist in the cache, the broker process will forward the query to the Historical processes.

Once the Historical processes return their results, the Broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time processes. Real-time data is perpetually changing and caching the results would be unreliable.

Historical:

Druid historical processes are responsible to store queryable data. Each Historical process maintains a constant connection to Zookeeper and watches a configurable set of Zookeeper paths for new segment information. Historical processes do not communicate directly with each other or with the Coordinator processes but instead rely on Zookeeper for coordination.

How it loads and serves segments:

The Coordinator process is responsible for assigning new segments to Historical processes. Assignment is done by creating an ephemeral Zookeeper entry under a load queue path associated with a Historical process.

When a Historical process notices a new load queue entry in its load queue path, it will first check a local disk directory (cache) for the information about segment. If no information about the segment exists in the cache, the Historical process will download metadata about the new segment to serve from Zookeeper. This metadata includes specifications about where the segment is located in deep storage and about how to decompress and process the segment. Once a Historical process completes processing a segment, the segment is announced in Zookeeper under a served segments path associated with the process. At this point, the segment is available for querying.

Segment Cache and How it works:

When a Historical process notices a new segment entry in its load queue path, the Historical process first checks a configurable cache directory on its local disk to see if the segment had been previously downloaded. If a local cache entry already exists, the Historical process will directly read the segment binary files from disk and load the segment.

The segment cache is also leveraged when a Historical process is first started. On startup, a Historical process will search through its cache directory and immediately load and serve all segments that are found. This feature allows Historical processes to be queried as soon they come online.

Druid Ingestion Methods

The table below lists Druid’s most common data ingestion methods, along with comparisons to help us choose the best one for our need. Each ingestion method supports its own set of source systems to pull from.

Streaming

The most recommended, and most popular, method of streaming ingestion is the Kafka Indexing Service that reads directly from Kafka. The Kinesis indexing service also works well if we prefer Amazon Kinesis.

This table compares the major available options:

Batch

When doing batch loads from files, you should use one-time tasks, and you have three options: index_parallel (native batch; parallel), index_hadoop (Hadoop-based), or index (native batch; single-task).

In general, we recommend native batch whenever it meets your needs, since the setup is simpler (it does not depend on an external Hadoop cluster). However, there are still scenarios where Hadoop-based batch ingestion might be a better choice, for example when you already have a running Hadoop cluster and want to use the cluster resource of the existing cluster for batch ingestion.

This table compares the three available options:

Druid Ingestion Spec:

No matter what ingestion method we use, data is loaded into Druid using either one-time tasks or ongoing “supervisors” (which run and supervises a set of tasks over time). In any case, part of the task or supervisor definition is an ingestion spec.

Ingestion specs consists of three main components:

  • dataSchema, which configures the datasource name, primary timestamp, dimensions, metrics, and transforms and filters (if needed).
  • ioConfig, which tells Druid how to connect to the source system and how to parse data. For more information, see the documentation for each ingestion method.
  • tuningConfig, which controls various tuning parameters specific to each ingestion method.

I have shared two types of Ingestion-Spec below where we are ingesting data into the same datasource using both real-time Kafka streaming and batch ingestion through a file from AWS S3.

Ingestion-Spec sample for Realtime Druid Ingestion for Kafka Indexing Service
Ingestion-Spec for Native Batch Ingestion into Druid from a file stored on AWS S3

Druid’s data model

Datasources

Druid data is stored in datasources, which are similar to tables in a traditional RDBMS. Druid offers a unique data modelling system that bears similarity to both relational and time-series models.

Primary timestamp

Druid schemas must always include a primary timestamp. The primary timestamp is used for partitioning and sorting the data. Druid queries are able to rapidly identify and retrieve data corresponding to time ranges of the primary timestamp column. Druid is also able to use the primary timestamp column for time-based data management operations such as dropping time chunks, overwriting time chunks, and time-based retention rules.

The primary timestamp is parsed based on the timestampSpec. In addition, the granularitySpec controls other important operations that are based on the primary timestamp.

Dimensions

Dimensions are columns that are stored as-is and can be used for any purpose. We can group, filter, or apply aggregators to dimensions at query time in an ad-hoc manner.

If we run with rollup disabled, then the set of dimensions is simply treated like a set of columns to ingest, and behaves exactly as we would expect from a typical database that does not support a rollup feature.

Dimensions are configured through the dimensionsSpec.

Metrics

Metrics are columns that are stored in an aggregated form. They are most useful when rollup is enabled. Specifying a metric allows us to choose an aggregation function for Druid to apply to each row during ingestion.

Metrics are configured through the metricsSpec.

Partitioning

Optimal partitioning and sorting of segments within your datasources can have substantial impact on footprint and performance.

Druid datasources are always partitioned by time into time chunks, and each time chunk contains one or more segments. This partitioning happens for all ingestion methods, and is based on the segmentGranularity parameter of your ingestion spec's dataSchema.

The segments within a particular time chunk may also be partitioned further, using options that vary based on the ingestion type you have chosen. In general, doing this secondary partitioning using a particular dimension will improve locality, meaning that rows with the same value for that dimension are stored together and can be accessed quickly.

Segments

Apache Druid stores its index in segment files, which are partitioned by time. In a basic setup, one segment file is created for each time interval, where the time interval is configurable in the segmentGranularity parameter of the granularitySpec.

For Druid to operate well under heavy query load, it is important for the segment file size to be within the recommended range of 300MB-700MB. If our segment files are larger than this range, then we should consider either changing the granularity of the time interval or partitioning our data and tweaking the targetPartitionSize in our partitionsSpec (a good starting point for this parameter is 5 million rows).

Core Data Structure of Segment:

Here we describe the internal structure of segment files, which is essentially columnar: the data for each column is laid out in separate data structures. By storing each column separately, Druid can decrease query latency by scanning only those columns actually needed for a query.

There are three basic column types: the timestamp column, dimension columns, and metric columns, as illustrated in the image below:

The timestamp and metric columns are simple: behind the scenes each of these is an array of integer or floating point values compressed with LZ4. Once a query knows which rows it needs to select, it simply decompresses these, pulls out the relevant rows, and applies the desired aggregation operator. As with all columns, if a query doesn’t require a column, then that column’s data is just skipped over.

Data-Structure of the implementation strategy behind a Druid Segment

Dimensions columns are different because they support filter and group-by operations, so each dimension requires the following three data structures:

  1. A dictionary that maps values (which are always treated as strings) to integer IDs,
  2. A list of the column’s values, encoded using the dictionary in 1, and
  3. For each distinct value in the column, a bitmap that indicates which rows contain that value.

Why these three data structures? The dictionary simply maps string values to integer ids so that the values in (2) and (3) can be represented compactly. The bitmaps in (3) — also known as inverted indexes allow for quick filtering operations (specifically, bitmaps are convenient for quickly applying AND and OR operators). Finally, the list of values in (2) is needed for group by and TopN queries. In other words, queries that solely aggregate metrics based on filters do not need to touch the list of dimension values stored in (2).

Druid Cluster Set-Up:

Apache Druid is designed to be deployed as a scalable, fault-tolerant cluster. Druid processes can be deployed any way we like, but for ease of deployment we suggest organising them into three server types: Master, Query, and Data.

  • Master: Runs Coordinator and Overlord processes, manages data availability and ingestion.
  • Query: Runs Broker and optional Router processes, handles queries from external clients.
  • Data: Runs Historical and MiddleManager processes, executes ingestion workloads and stores all queryable data.

Druid Architecture:

Druid has a micro-service-based architecture can be thought of as a disassembled database. Each core service in Druid (ingestion, querying, and coordination) can be separately or jointly deployed on commodity hardware.

Apache Druid is a real-time analytics database designed for fast slice-and-dice analytics (“OLAP” queries) on large data sets. Druid is most often used as a database for powering use cases where real-time ingest, fast query performance, and high uptime are important. As such, Druid is commonly used for powering GUIs of analytical applications, or as a backend for highly-concurrent APIs that need fast aggregations. Druid works best with event-oriented data.

Common application areas for Druid include:

  • Clickstream analytics (web and mobile analytics)
  • Network telemetry analytics (network performance monitoring)
  • Server metrics storage
  • Supply chain analytics (manufacturing metrics)
  • Application performance metrics
  • Digital marketing/advertising analytics
  • Business intelligence / OLAP

Use-Cases where Druid can be a good fit:

Druid is likely a good choice if our use case fits a few of the following descriptors:

  • Insert rates are very high, but updates are less common.
  • Most of our queries are aggregation and reporting queries (“group by” queries). We may also have searching and scanning queries.
  • We are targeting query latencies of 100ms to a few seconds.
  • Our data has a time component (Druid includes optimisations and design choices specifically related to time).
  • We may have more than one table, but each query hits just one big distributed table. Queries may potentially hit more than one smaller “lookup” table.
  • We have high cardinality data columns (e.g. URLs, user IDs) and need fast counting and ranking over them.
  • We want to load data from Kafka, HDFS, flat files, or object storage like Amazon S3.

References:

Apache Druid has a very elaborated documentation already on its architecture and working principles. I have experience in using Druid as a backbone for three of our Analytical Products in Data Platform team of my current organisation. Based upon my experience and need, i have tried to jot down all the relevant details in the above blog.

--

--

Ronik Basak
The Startup

Software Development Engineer III @ Swiggy | Ex Software Developer of Lazada(Alibaba Group), Target, Flipkart | Tech Enthusiast | Budding Writer