Data Platform @ Rapido (Part -I) — efficient, scalable and cost friendly analytics

Sabyasachi Nandy
Rapido Labs
Published in
7 min readJan 31, 2024

Data Platform @ Rapido (Part — I) — Cheap, efficient and scalable analytics.

Before I start, I would like to give you a brief idea of what Rapido as an org does.

Rapido is an Indian ride-hailing service, which primarily operates as a bike taxi aggregator. It has now ventured into auto rickshaws and cabs too.

We currently do ~1.5 million rides per day with a presence at over 100+ cities PAN India. Apart from bike we also offer auto & cab services now on the platform, so our scale is increasing exponentially.

Our major growth happened post COVID where we scaled exponentially. This meant our tech and platform also had to scale with it.

In this series of articles, we will go through how the Data Platform team in Rapido stood up to this enormous challenge where we had to scale our systems within a very tight cost cap to meet our business needs.

So, let’s start with a birds eye view of our architecture.

Raw : Unflattened, immutable data
Canonical — Flattened, immutable data along with some additional enriched columns
Datasets — Datasets created from multiple canonical or raw to serve a very specific use case.

We get millions of events every day through our event bus (Kafka) which is persisted in our warehouse with “at least once” guarantee (our bronze/raw layer). This is then ingested, processed and cleaned by automated pipelines and written back as our silver layer of data.

After this, more curated pipelines run which slice, dice, aggregate multiple such silver layer datasets to create our highest quality gold layer datasets. By the end of 2022 we are running nearly ~1000 such pipelines which serve about 3000 of such datasets and crunching 15+ petabytes of data monthly.

With more data comes more queries !!

So all the above the datasets were getting served to through a single large Trino cluster through Metabase (our serving tool). Soon we realised this single Trino cluster is becoming a bottleneck for two main reasons:

  1. Treating all queries the same: All our querying use cases, be it for analytical purposes (by organic users) or doing data transformation or lookup or join during ingestion pipeline in silver / golden layer. (by spark jobs) were both getting served through a single system. This meant that few large queries consumed most of the resources in the cluster. Apart from this, any failure in the queries fired from spark jobs could lead to potential failures in the job and all downstream pipelines got affected and required manual rerun or backfills.
  2. Treating all users the same: This meant that a single system was serving exploratory queries fired by data scientists (which in average scanned TB’s of data) vs queries run by PM’s, business stakeholders which only needed to scan a few MB’s. This meant that most users had a degraded user experience.

How did we address it ?

This was a pretty simple solution. We figured out that we could differentiate our workload based on users and the query patterns into 4 main groups

  1. BI users: Our analysts, Data Analysts (DA) and Data Scientists(DS) who would be querying vast amounts of data but are okay with a SLA of a few minutes.
  2. Business Stakeholders, PM’s : Users who look into daily reports on top of curated data. Serving them with the best SLA in terms of time was of utmost importance.
  3. Ingestion Batch Jobs : Few batch jobs directly queries Trino for running some complex queries for aggregation. These were limited in number, but due to their sheer complexity they needed large clusters and often used to run for more than tens of minutes if not hours.
  4. Maintenance and Operational queries : All other queries related to syncing and refreshing of metadata of tables and views. These were high in number but their resource usage was pretty limited.

With these categorisation, we broke down our clusters into 4 different clusters

  1. BI Trino : For user Group 1. This is our largest cluster.
  2. Prime Trino : For serving queries intended for Group 2.
  3. Processing Trino : For our batch jobs to use, ie. Group 3.
  4. Maintenance Trino : For our operational queries, i.e Group 4.

Now, this leads to a bunch of questions.

How do I know which cluster to use when ?

Should I even know this differentiation which is done for some optimization ?

How do we abstract out these many clusters from the end users?

Especially for Group 1 and 2, as these users might have an overlap. An analyst might also want to look into the reports they have made, and vice versa, PM’s might want to deep dive into the data themselves. Now asking them to connect to different clusters to fire a query would be an extreme case of bad user experience.

Trino Gateway” comes to the rescue

For solving the above issues, we introduced an intelligent proxying layer between our serving layer, i.e Metabase and Trino.

With this proxy in place, we could parse the query and its metadata to figure out which query that cluster should be routed to. Some of the logic that we use for distinction are as follows.

  1. Is the query exploratory or pre defined and repeated query
  2. Which data set is being queried by the user

There is a lot more detail to this than what’s mentioned above. Perhaps, we will go through this in detail in some other article perhaps.

With this in place we could utilise each cluster to serve queries they were meant to serve.

Now with intelligent routing out of the way, we were faced with a new challenge.

Rapido truly believes in data democratisation.

Everyone in the org is given access to data based on their role and hence a need. Everyone here is curious enough to try and explore data as much as possible. But in practice, all the queries that get fired are not always the most optimum or efficient ones. This meant, few bad queries had the potential to starve the entire system.

There comes Governance of queries

So we need a process to

  1. Govern all the queries that came into the system and filter out the bad ones.
  2. Ensure a fair share of resources by restricting the max resources a particular user can use in the system

For the query governance, we built an in-house query parser which would identify if queries followed basic hygiene or not, i.e whether it uses partition filters, whether the result is limited or not etc.

The query parser on finding a bad query can outright reject the query or direct it to a lower grade cluster.

For rationing the resources, which was relatively simpler, we used the Trino Resource Groups concept to create resource groups based on user emails and allocated resource caps to them.

Phew, so with all these in place and endless tuning of the JVM in our Trino clusters we were able to serve 12k to 15k queries per day, serving PB’s of data as compared to TBs queries before.

Wait, now our cost blew up !

With such a scale and running multiple Trino clusters 24*7, we saw a tremendous spike in our cost. Given our push towards a more sustainable business, cost is something we are very cautious about.

We now needed to look into clever but simpler ways to reduce cost.

For this we started looking into various options. These are some of the options that came out after multiple deliberations:

  1. What if we could run Trino Workers in less costly spot VM’s.
  2. How can we reduce failures in the Trino clusters so that our cluster resources are better utilised.
  3. We also analysed query load pattern between peak and non-peak hours, busy days vs weekend and came up with automated configureable strategies to scale-up, scale-down.

Reduce Cost by running Trino on Spot VM’s and systematically refreshing the clusters.

Though this meant cheaper cost to run Trino, we know we are sacrificing availability of the cluster. If the spots VM were kept on for more than a day, we would see massive preemption of the clusters.

So we needed a hack around this. We took inspiration from the web service world. We designed a very simple HA based solution for our Trino clusters.

Given we already had Trino Gateway in our architecture, we created standby clusters for all our individual clusters.

i.e BI — Trino 1 and BI — Trino 2 and so on for the other clusters.

Now these clusters are automatically switched at random throughout the day. i.e BI-Trino 1 drains out all its queries, and then the Trino gateway makes BI-Trino 2 the active cluster and kills the BI-Trino 1 cluster.

This meant that we ourselves were periodically preempting our workers before the infra provider could do. Our switching is graceful, it does not let any query fail / get stuck during switch, end users are not effected.

This approach reduced the cost of our clusters by one third and query failures due to preemption of nodes has become non-existent.

This is how we have been operating analytics at Scale in Rapido. But this is just the beginning for us. With steep growth expected in the upcoming months, we are gearing towards creating a system that can handle 2x to3x the current scale while still keeping costs in check.

We have also started introducing newer components in our system. Alluxio being one of them which would serve as a hot store for our most queried data. More on the journey in the next set of blogs in this series.. Do watch this space.

Till then, goodbye and cheers !

--

--

Sabyasachi Nandy
Rapido Labs

Building data products. Gadget fanatic. Code drives me, Caffeine completes me.