BoulderDB: MakeMyTrip’s Personalization User Data Store
At MakeMyTrip, we have verticals in the form of Line of Businesses (LOBs) and horizontal capabilities such as Data Platform Team (DPT). DPT plays a significant role in developing data products across all verticals and building services on top of data to enable personalization, customer segmentation, personalized rankings etc.
The data platform at MMT consists of 4 layers: Capture, Processing (Batch + NRT), Storage and Serving. To be able to solve personalization use cases, we require data from every layer playing nicely with each other. Data is being captured using Pixel Data Tracking (PDT) and written to Kafka. Real-time processors (Glide) process this data and write to databases. Serving APIs read from the database and serve upstream APIs providing users’ data- clickstream, transactional, preferences etc. Our primary tech stack is Scala, Spark (Batch + Streaming), Akka HTTP and Kafka.
Data Platform Complexities
Data model required for personalization use cases are typically in the form of Key-Value where the key is some sort of a user id and the value is the data being served. Driving use cases with data at its core has its own set of challenges. To be able to solve these challenges, one requires unique solutions that are tailor fit to solve the problems at hand as well as can evolve to solve the problems of future. Let’s look at some of the problems we were facing:
Strict SLA with A-A data centers
All LOB APIs and consumer facing products are powered by one or more data services. Since, these services lie in the most downstream layer, it imposes strict latency requirements on them. We generally have a 99%ile SLA of 10–15 ms depending on the API. To add to the complexities, we have 2 Active-Active data centers. At any given point, user’s request can land in any data center and has to be served with consistent data.
Near Real-time data updates
Majority of the use cases involve processing data in (near) real-time (NRT). This means data being served from the APIs have to be constantly updated with new data generated from the users’ activities. Since front-end APIs have different caching from the back-end databases, this means data updates trickle down slowly to the consumer. It also means that in order to fetch more recent data, cache needs to be refreshed quickly leading to poor cache hit.
Most of the data pipelines at MMT are based on Lambda architecture. For this to work, we need bulk writes of huge volumes of data as well as low latency random reads and writes. For 1 database to support both use cases, significant engineering effort is required.
In most architectures, we have a bunch of API nodes (having their own caching) in front of a few database nodes (NoSQL/RDBMS). Since database horizontal scalability is somewhat limited relative to scaling the API nodes, this adds a limit to overall horizontal scalability. Even if the database can scale horizontally, it often poses maintenance overheads in terms of data distribution, partitioning, data consistency etc.
How we solved it?
In early 2017, we were using ArangoDB as the NoSQL database and Couchbase as the caching layer on top of it. ArangoDB was great in solving multi-model queries and having a powerful Arango Query Language (AQL) together with indexes. But as our real-time data ingestion grew, latencies began to suffer since it used to acquire collection level locks while writing data (this has been mitigated in the latest versions). This meant reads would suffer when data was being written- which quickly started to hamper the SLAs. Moreover, serving data from the cache requires a network hop to Couchbase. Nevertheless, ArangoDB is still being used but in other use cases where indexes are required or in graph traversal applications.
During this time, we were in the process of putting the ‘My’ back in MakeMyTrip by creating a truly personalized context aware app redesign. One of the core components to make that experience seamless and immensely relevant is to show the right information to the user at the right time and in the right context. In order to do that, what was required was the concept of per-user databases. This required building new sets of tools and components to support these use cases.
We began to investigate a performant local cache and soon decided to develop our own in-house database named BoulderDB.
BoulderDB has the following components:
- RocksDB: the underlying database engine. RocksDB is a persistent key-value store developed by Facebook using the learnings of Apache Cassandra and Google’s Level DB. It is super high performant and works on byte streams. There can be multiple RocksDB instances within 1 BoulderDB and each BoulderDB has the same exact replica of RocksDB.
- Spark Streaming: the real-time streaming layer used to stream data into RocksDB.
- Akka HTTP Serving API: serving layer for exposing data out of BoulderDB through a RESTful endpoint.
- Rule Engine: MVEL based mini rule engine useful for plugging in dynamic code evaluation.
- Cluster Coordinator: this layer interacts with a Consul cluster for coordinating with the rest of the BoulderDB cluster.
- Database Version Registry: this serves as the DB registry for versioned snapshots of the database files. This allows atomic loading of DB at each node and supports rollbacks too.
Let’s break down how BoulderDB works.
Consistent Hashing in HAProxy
DPT uses HAProxy as the load balancer for its components. So, requests from load balancer can route to any API node at random. Since every BoulderDB is a copy of one another, this imposes a challenge for having a local cache at each node. Turns out we can if we use consistent hashing at the load balancer level. This prompted us to use lua scripts to add functionalities such as consistent hashing based on the users’ identifiers. A given user’s request would now consistently be routed to the same node resulting in a consistent view of data.
Near Real-Time updates
Next piece of the puzzle is having near real-time updates to the database. We have written a framework named Glide which is a bunch of utilities and wrappers on top of spark streaming.
In Hadoop, computation was brought closer to data to enable distributed map reduce. Similarly, to enable low latency reads and writes, we brought spark streaming closer to the database.
We coupled Glide and RocksDB together into a single JVM. This enabled much less latency in terms of writing updates to data from the streaming layer. Also and more importantly, most of our updates to the database are upserts (read, modify and write). When upserts don’t happen where data resides, lots of problems can arise in terms of consistency and latency.
This brings us to the last piece- Serving. We use Akka HTTP to develop most of our scalable serving APIs. To enable low latency serving, we moved serving to the database as well. Bundling Akka HTTP serving inside the database JVM enabled us to serve most up-to-date data from the database without any network hop. Since RocksDB has its own bloom filters and caches, we didn’t need any additional cache in BoulderDB. Of course, front-end APIs can still have their own cache if they want to.
One of the main problems we have faced with ArangoDB or Couchbase is that batch updates to the DB really slow them down. This is because when bulk writes are happening, reads take a toll. Even after the writes have stopped, there is a lot of compaction, GC etc. This made us think of off-loading the bulk writes elsewhere and atomically load the bulk data into the database. Since we have a local RocksDB in each BoulderDB instance, we need to load new batch data into each instance. During the nightly jobs, RocksDB segments are written, compressed and compacted to create a database package. This database package is registered into a Database Version Registry. From there, it’s loaded into each of the BoulderDB instances using a custom written 2-phase commit using Consul (detailed below).
Horizontal Scaling and Clustering Support
One might think that bundling streaming, data and serving together makes it a bit monolithic. On the contrary, this has enabled us to scale even more.
Instead of thinking database as a separate entity from serving and streaming, think about all of this as a fast cohesive machine whose cogs and parts are working together in close proximity. If we think of this machine as 1 unit, then all we need to scale is to add more of these units.
This means adding as many BoulderDB instances as required. Each has its own serving, own local RocksDB and own streaming layer to keep the data up to date. Instead of having different BoulderDBs talk to each other, they talk to another central service named Consul. When databases talk to each other, they can become slow and a lot of engineering effort needs to be built to keep them in sync. Therefore, we kept each database independent of each other and only connected by the central Consul service. Consul is a service discovery framework and has a data center aware KV store. All consul is doing is keeping spark streaming and the bulk load of data atomic across different BoulderDB instances.
Does it scale?
I am not a fan of benchmarks, really! Each DB vendor comes with its own benchmarks every month only to be superseded by somebody else. Still, to give you a flavor of the performance, on our production hardware with a JVM heap of 5 GB, BoulderDB was able to achieve 100,000 requests per second with a 99%ile response time of under 10ms- during which streaming data writes were happening in real-time.
When is it not suitable for me?
An important piece of information to keep in mind is that since each BoulderDB instance is a copy of one another, essentially this means is that each instance needs to be able to keep all the data on each node. If your use case dictates that the data is so large that it cannot possibly be stored on 1 node, then each BoulderDB cannot be a copy of one another. Instead, in this case, BoulderDB needs to be sharded and requests for users need to be routed to different BoulderDB instances by using the same consistent hashing in DB as well as HAProxy.
What is BoulderDB really? Is it a database because it has RocksDB? Is it a real-time spark streaming engine? Or is it an Akka HTTP based serving API? Think of it as lego-like components- the actual use case dictates which components need to be used. We have many use cases which only use its database and streaming layers, and many others use its database and serving layers and some use all the components.