The challenges of running Druid at large scale, and future directions, part 2

Roman Leventov
5 min readNov 19, 2017

In the previous post I described how Druid time series data store is used at Metamarkets and discussed some of the major challenges that we face when scaling this system to hundreds of nodes. In this post, I want to present other scalability issues with Druid.

To understand this post better, read through the overview of the Druid architecture, given in the previous post.

Druid cluster view, simplified and without “indexing” part

Issues with ultra-large queries

In ad analytics, time series data sources are generally very “thick”. Reporting queries in our cluster over many months of historical data cover up to millions of segments. The amount of computation required for such queries is enough to saturate the processing capacity of the entire historical layer for up to tens of seconds.

If we just let such queries to execute somehow along with interactive queries over recent data, it would make the latency and the user experience for the interactive queries horrible. Currently we isolate interactive queries from reporting queries using time-based tiering in the historical layer, i. e. recent segments in the data sources are loaded in one set of historical nodes (we call it hot tier), and older segments, which constitute the majority of the segments needed to be processed for reporting queries, are loaded in another set of historical nodes (cold tier).

This approach has some flaws:

  1. It still doesn’t provide full isolation, because reporting queries are done not over old data only, but over both old and new data, so reporting queries still interfere with interactive queries in the hot tier considerably.
  2. Tiering restricts the computational capacity available for queries over both new and old data, because it isolates CPU resources on instance level, rather than on process or thread level.
  3. Because of the previous, and also because we try to make the cold tier cheaper by giving it much less memory and CPU relative to the amount of data that it stores and needs to process sometimes in large bursts, and also network-attached disks, reporting queries sometimes actually run for tens of minutes instead of single minutes, if the resources of the entire cluster were utilized more efficiently.
Tiers in Druid historical layer, too rigid resource isolation

This set of issues has been recognised in the Druid community for years. One solution proposed to make super-heavy queries to run faster, and not to hog resources of a Druid cluster, was to employ Spark to run such queries. However, it’s going to be woefully inefficient, because Spark could only download entire segments from Druid’s deep storage, sometimes of dozens of columns, instead of just a few columns, required for the query execution. Also Spark query processing engine is not optimized as tightly for time series data and specific query types, as Druid query processing engine. On the other hand, if there is a large Spark cluster with a lot of spare resources anyway, this solution is practical.

A more efficient solution within the current Druid architecture would be to isolate thread and memory resources for running interactive and reporting queries within historical nodes (to allow both types of queries opportunistically use all resources of a node), rather than between historical nodes, and also to make it possible to bypass page cache when loading a segment into memory of historical node. The downside of this solution is that it’s difficult to implement. It makes Druid a more complex, not a simpler piece of engineering.

Decoupling of storage and compute, proposed in the previous post, also solves the problems with large queries efficiently. First, since compute resources (CPU and memory) available for processing of some segment are not coupled with a node where the segment resides, it makes “query inference” not an issue at all, as long as there are enough compute resources in the entire historical layer (that could, by the way, be scaled up and down very quickly, because historical nodes are almost ephemeral). To make processing of some particularly huge queries faster, new historical nodes could be provisioned just-in-time.

Brokers need to keep the view of the whole cluster in memory

About ten millions of segments are stored in our Druid cluster. As it is currently implemented, all broker nodes keep metadata about all of the segments in the cluster in their JVM heap memory, that already requires much more than 10 GB of memory and is the cause of significant JVM GC pauses on brokers.

Intermediate solution is to skip some parts of segment metadata from announcements, but on 10x scale this problem is going to hit again. It should be possible to partition segments between brokers, so that each broker operates only on a subset of the whole cluster view.

Issues with Marathon

This is not something about Druid itself, but about how we deploy it at scale. Marathon is an orchestration solution on top of Mesos. We have negative experience with it. The typical issues are:

  • Marathon does something downright wrong or isn’t able to recover after some failure, e. g. see tickets 4729, 7429, 7828.
  • Sometimes Marathon suddenly becomes very slow (starts new app instances with several minute lag), or just stuck and not doing anything, or becomes “crazy” and restarts instances continuously without a reason, or UI shows false information, i. e. not coherent with the actual state of deployments. Usually manual restart of the Marathon service fixes those conditions.
  • Marathon lacks some features needed for large-scale applications, e. g. it doesn’t have any downscale strategy, and doesn’t allow to move resources between canary and production deployments.
  • Many small inconveniences in the web UI when working with apps of dozens or hundreds of instances, e. g. see tickets MARATHON_UI-87, 154, 155, 156.
  • Marathon doesn’t provide basic app statistics, such as historical data of the app size (with breakdown by an instance state), statistics about instance average instance lifetime, the number of instance restarts. See ticket 7922.

--

--