How Razorpay uses Druid for seamless analytics and product insights?

Birendra Sahu
10 min readSep 1, 2020

--

Authors: Birendra Kumar Sahu & Tanmay Movva (Data Engineering Team)

Introduction

At Razorpay, we have billions of data points coming in the form of events and entities on a daily basis. To ensure and enable, that we operate using data at its core, enabling data democratization has become essential. This means that we need to have systems in place that can capture, enrich and also disseminate the data, in the right fashion, to the right stakeholders with high speed.

The Data Platform Team at Razorpay strives to leverage this data to improve our stakeholders experiences. Our mission is to provide infrastructure to collect, organize, and process this deluge of data (all in privacy-safe ways), and empower various business units across Razorpay to derive necessary analytics and make data-informed decisions from it.

The primary goal of the data platform is to expose high level and business critical data within the organisation through various dashboards, powered by Looker. Most razors use these dashboards every day to make various decisions on success rate monitoring/ improvement of payments/payouts processed through Razorpay. Looker Dashboards also allow real-time tracking and monitoring of various aspects of our business and systems. As a result, the timeliness of these dashboards is critical to the daily operation of Razorpay.

However, we had some unavoidable challenges:

Performance — Our raw data resides in S3 and external tables are created on top of this data using Presto on Hive. Analysts currently schedule presto queries to generate aggregate tables and use them to improve their query performances for adhoc needs, report generation, dashboards, etc,. Even though these aggregate tables improve their experience compared, the average query response times were still high and that has a direct impact on their productivity. So the system we adopt has to help us achieve interactive analytics and also decrease the overhead of scheduling presto queries by our users.

High Availability — The system needs to be reliable as critical dashboards, weekly and monthly reports, ad hoc queries have to be powered using this system. We cannot afford to have unplanned downtime as that would have direct impact on scheduled reports and alerts set on dashboards.

Scalability — We have a fast increasing user base and use cases to serve, for example we currently experience tens of thousands of presto queries per day with around 60–80 queries running concurrently for most part of the day. Because of high p95(6 mins) and p99(14 mins) values we maintain a 40 node presto cluster to support our workload. So the system we choose should be able to handle a similar scale of workloads and improve on the performance and resource utilization.

To solve these challenges we tried to build multidimensional cubes using Apache Spark and Apache Kylin. Even though both the systems improved query performance significantly, both these systems failed to satisfy our data ingestion and scalability requirements. Both the solutions aim at pre-computing the data for possible combinations of dimensions and as the number of required dimensions grows, the cube build times grow exponentially and also has a significant impact on storage. We wanted Kylin to succeed because we need not do any kind of pre-processing as Kylin can directly sync with our existing HIVE server and build cubes on top of those HIVE tables, but because of ultra high cardinality of our data and many dimensional columns being involved in a single cube, the build times have been huge and the build process was also complex to understand, debug and optimize. Even though druid required flattening of data before being ingested into it, it was much easier to set up, ingest and query data using druid.

Apache Druid has been selected for obvious reasons as listed below:

Fast query performance — With predefined data-sources and pre-computed aggregations, Druid offers sub-second query latency. The dashboards built on top of Druid can be noticeably faster than those built on other systems. Compared to Hive and Presto, Druid is orders of magnitude faster.

Scalability — Druid architecture is well separated out into different components for ingestion, querying and overall coordination. We have found this componentized architecture to be reliable and stable for our workload as it allows us to scale the system easily as needed. Druid’s architecture of separating data storage into deep storage for long term storage of data while loading only the required data into the historical node has worked well for us. Keeping the analytical data permanently in S3 gives us disaster recovery for free and allows us to easily manage clusters.

Ingestion — Druid had much less ingestion times compared to Kylin and creating cubes using Apache Spark because it doesn’t store aggregates by pre computing for all possible combinations but by indexing the data in own way. This also helps us in reduced storage. Also Druid has an advanced integration with Kafka compared to Kylin which helps us to create real time analytics stores more efficiently.

Business intelligence — Druid is commonly used for BI use cases. Razorpay have deployed Druid to accelerate queries and power applications. Unlike SQL-on-Hadoop engines such as Presto or Hive, Druid is designed for high concurrency and sub-second queries, powering interactive data exploration through a UI. In general this makes Druid a better fit for truly interactive visual analytics.

Druid at Razorpay

Apache Druid is a high performance real-time analytics database. It’s designed for workflows where fast queries and ingest really matter. Druid excels at instant data visibility, ad-hoc queries, operational analytics, and handling high concurrency.” — druid.io

Druid has a multi-process, distributed architecture in which each process takes care of different aspects of a database. This design helps in achieving improved fault tolerance and high availability. Each druid process can be deployed independently or in the recommended way of organizing processes into servers. Each process can be individually scaled/tuned depending on our requirements.

Druid is not a relational database, even though some concepts are shared. Unlike relational databases, there is no concept of joins across data sources(druid equivalent of a table) in Druid. Because of the data model in razorpay we need to denormalize the tables and make sure whatever columns are required from multiple tables are being included in the druid datasource. We run a spark job prior to ingestion into druid, so that we are able to denormalize and transform the data in an appropriate manner before the data is ingested into druid.

There are 3 groups of columns in druid __time, dimensions and metrics. __time is a primary timestamp column which has to be present necessarily in a datasource as everything in druid is keyed by this timestamp column. Dimensions are those columns on which we filter and group by our queries. Metrics are numeric(mostly) columns that can be aggregated.

By limiting its ability, druid is able to store data in custom columnar format known as segments. Segment files are druid’s way of partitioning data based on time. One segment is created for each time interval, this time interval is configurable at the time of loading data into druid. For our use cases so far we use a time interval of one day, that is one segment/day. Depending on the number of rows or size of data in a particular interval we can also have multiple segments for a time interval. If the datasets are huge, having multiple segments for a time interval can improve query response times significantly.

Ingestion

Druid supports both batch and streaming ingestion from various input sources. In our case S3 and Kafka are input sources for batch and streaming ingestion respectively.

Druid can roll up data at ingestion time to minimise storage by storing only aggregates. Roll up — Metric columns, based on the given aggregate function, are aggregated across a set of rows having identical dimensions within a configurable query granularity. By rolling up data we lose the ability to query individual events, roll up has to be disabled if we wish to ingest data as-is.

Majority of our current ingestion tasks are batch workloads. A spark job flattens the relational data, to overcome lack of joins in druid, and writes to s3 in parquet format. Within druid we run native index tasks to ingest data from S3 into deep storage.

By setting maxConcurrentNumSubTasks greater than 1, we can run an indexing task parallel. But ingesting this way creates multiple segments for a time interval, as each task creates a separate segment version and these are pushed to deep storage as-is instead of merging into one segment per timechunk. The datasource would then require a compaction task or reindex task to merge those segments to maintain optimal number of segments.

To avoid compaction or reindex task, we split the data based on __time column(we decide which column will be the primary timestamp column before loading data into druid) into a number of splits, which takes into account the size of data and available middle manager workers, and ingest them with maxNumConcurrentSubTasks set to 1.

For events we directly connect to the Kafka cluster and subscribe to the topic. We currently maintain 1 datasource for each topic and required transformations are done in the tranfromSpec(part of ingestion spec) in druid. We have been able to achieve significant roll up ratios in events, reducing storage footprint and also improve query response times. We use druids inbuilt Kafka indexing service extension which creates a supervisor for a Kafka topic. Supervisor is responsible for offset management and task management for a particular datasource. Supervisor spawns MiddleManager workers and sequentially submits index Kafka tasks to the workers with specific configurations and ingestion spec.

Querying

Druid supports two query languages — Druid SQL and native Json. Druid SQL is a built-in sql layer which uses Apache Calcite to translate sql queries to native json queries. Our internal micro-services submit queries to the druid router over https using native query language. Our BI tool looker uses Avatica JDBC driver to make druid sql queries to the router process.

Cluster Tuning

Basic cluster tuning guide provided in the official documentation was very helpful for us to tune the druid cluster. While tuning druid for performance on scale, we need to be mindful of the type of queries that are being made to the druid cluster. Managing merge buffers and number of processing threads in historical and middle manager processes is key to achieve desired query response times and multi tenancy. Number of group by queries at any given time is limited by the number of merge buffers available in the particular process to which the query is made. Number of processing threads in a process(Historical and MiddleManager) limits the total number of segments that can be processed in parallel, while achieving multi-tenancy it is important to have an idea on how many segments do we need to be processed in parallel as less number of processing threads leads to higher response times and over provisioning processing threads means throwing more cores into the cluster which leads to unnecessary costs.

Druid has an inbuilt metrics emitter, we push these metrics to kafka and ingest them into another druid cluster, which is used only for monitoring, to build performance and monitoring dashboards on looker. We monitor query metrics, ingestion metrics, number of segments, segment size, etc at service, datasource and segment level to optimize our storage and manage resources to performances satisfying our SLAs.

Challenges of using Druid

  • Low latency updates would be a problem since druid updates via batch jobs only because of the custom storage format druid uses to achieve sub second query response times.
  • Druid doesn’t facilitate joins and has minimal support for complex data types i.e. arrays and records. Hence, a lot of effort needs to be put in modelling the data into a flattened structure.
  • Since data is pre-aggregated on ingestion, data granularity below the mentioned level cannot be achieved (ex: if granularity is hourly, we will lose minute level accuracy and can only access it aggregates). Deciding on the right balance between required roll up ratio and granularity can become challenging.

Conclusion

Druid is a big data analytics engine designed for scalability and performance. Its well factored architecture allows easy management and scaling of the Druid cluster and its optimised storage format enables low latency analytics queries. We have successfully deployed Druid at Razorpay for our use cases and see continued growth in its footprint. We were able to achieve p90, p95 values of less than 5s and 10s respectively and our dashboard performances have improved by at least 10x when compared to presto. We continue to work towards increasing adoption of druid to improve analytics experience within Razorpay.

Future Enhancement

  • Middle Manager Autoscaling — To provision workers based on the number of pending/active index tasks. This would help us utilise the cluster resources optimally and save a few bucks.
  • Automate ingestion spec generation and enable platform users to develop the ingestion spec themselves.

References

https://druid.apache.org/docs/latest/design/architecture.html

https://druid.apache.org/docs/latest/operations/basic-cluster-tuning.html

https://netflixtechblog.com/how-netflix-uses-druid-for-real-time-insights-to-ensure-a-high-quality-experience-19e1e8568d06

https://medium.com/airbnb-engineering/druid-airbnb-data-platform-601c312f2a4c

--

--