Data Insights Engine @ MakeMyTrip
At MakeMyTrip just for domestic hotels supply we have line of sight to over 100k Properties, 2 million rooms (increasing). We also operate on data metrics for over 350 airlines, with over 50k flights metrics worldwide. Having a plethora of choices, the customers usually need certain aid to identify relevant products and choose from them. Thus, persuasions comes into the picture i.e. persuade users to book with us right product at right (run)time & right place.
A user in day to day basis is surrounded by lots of persuasive text which generally fail to influence them. For a persuasion to be impactful it needs to contain meaningful insights. Here we compute them from various static content as well as various user activities e.g. search, bookings, review & rating at the most granular levels.
Following are a few categories of Persuasions :
- 70% hotels are booked in Chennai. Book soon!
- Flights Filling Fast — Number of Bookings made in Last 12 hours for this sector, date.
Social Trust -
- Great location! This hotel’s location has a much better rating compared to nearby properties
- Aloft Bengaluru is top choice for family travellers by family travellers, ranked in top 10
- This property was viewed by 10 people in the last hour
- You are booking at a great price. Fares for DEL-BOM, 15 days before departure is normally around 3105
- Great location! This hotel’s location has a much better rating compared to nearby properties
At MakeMyTrip the Persuasion Engine broadly consists of 2 components:
- Peitho: A template engine equipped with a rule engine to evaluate various rules to decide what persuasion is to be displayed on run-time
- Insights Engine: A platform that is to compute and serve various data insights needed to generate meaningful persuasions at run-time(realtime)at the Peitho layer
Here we will be focusing on the Data Insights Engine.
Data Insights Engine:
- Multidimensional Real-Time Timeseries, Aggregates as insights
- Clickstream, Inventory, Transactional data streams
- Flexibility with config driven insights processing and serving
User Activity -> Events -> Signals -> Insights -> Persuasions
Real Time Aggregation: Creation of a real-time platform where insights can be added easily based on configuration was a challenge because:
- Different insights might have different types of aggregations e.g. min, max, count, average, etc.
- Only creating the insights is not enough, it is also important is with what frequency we update our insights and how do we keep our aggregates and insights accurate when there is no event coming in.
- To maintain the accuracy of the insights generated, one considerable point is how do we trigger its creation and updation. Not every insight would require the update in real-time and some might be critical enough to be updated in near real-time. How to utilise the resources and give the best and most accurate outcome?
Data Modelling: Generation of any insight would require querying and performing various aggregates on large amount of data. Here are few of the challenges while designing our data model:
- Data is large and aggregation is to be performed efficiently in real-time.
- We can not have a key value based model because there can be various different types of aggregates that has to be performed, configuring any new insight should not be time taking process.
- Sometime data is needed to be split (normalise) so that insights can be produced from most granular data points.
- Sometimes data coming from different sources needs to be joined
How did we solve it?
After identifying the challenges we decided to take a step back and again look at the main use case we are trying to solve i.e. to create a config-driven platform for generation of insights. Hence, we gave a thought on how we can classify our insights into different types. They were:
- Real Time Last Insights: These types of insights are very critical from updation point of view as it depends on the last snapshot of data. Any change in the data will have direct impact on this. Hence it has to be updated as soon as we get data that has direct impact on it. E.g. Seats remaining on a flight.
- Real Time Aggregate Insights: These type of insights are dependent on real time data as well as historical data already stored in database. It involves aggregations on a sliding interval of time. E.g. Hotels Listing Pageviews in the last 12 hours.
- Batch Insights: These insights are compute heavy and its value is not impacted from real time data. It involves large amount of data, spanning multiple data-sources and can produce valuable insights taking bigger picture into account. E.g. Hotel bookings in the last 6 months.
So, after identifying the categories of insights we designed our architecture :
The basic system comprises of the following components:
- Data-sources: Data comes in to kafka topics is considered as input for our realtime pipelines. This data post cleanup and modelling is then stored onto various Druid datasouces to be used for aggregations. The same data is dumped onto hdfs via DUMDAR, which is then used by our batch pipeline.
- Processing: Data processing is taken care of via various Spark jobs. For realtime jobs we used a config driven Spark Streaming application and for batch we use a similar config driven Spark Batch application running on our YARN cluster. There is also an application called the Insights Engine which is also a Spark Streaming application which acts an orchestrator for the various Realtime Aggregate Insights and queries various Druid data-sources and sends the computed insights to the Serving Store.
- Serving Store: We use a BoulderDb as our serving store for both Batch and Realtime insights. But here it is connected to two different rockdDb databases for batch and realtime as realtime and batch insights are mutually exclusive.
Now let’s revisit how we have solved each category of insights in depth:
- Real Time Last Insights: Since these types of insights require real time updates and only dependent on last snapshot of data, we are using spark streaming in our processors to read data from various kafka topics and based on the configuration of insights it knows exactly from which topic and which data point insights has to be generated. It directly updates the values in our database — BoulderDB (key-value store) which is later used by our API layer for consumption. Example of configuration:
- Real Time Aggregate Insights: These insights require aggregates in near real time and are also dependent on our data store for calculating aggregates on an interval of time.
Here we needed a data store and an orchestrator. For the data store we needed following features:
- Realtime ingestion of data without much impact on the read latencies
- It should be queryable as the insights tend to use same data over different time windows for calculating aggregates e.g. average over 30 minutes, 1 hour & 6 hours. Maintaining counters for each insight might be needless and maintaining the same would be tough.
- It should be able to handle i.e. ingest and aggregate large volumes of data, and scale out easily.
- Support for batch update of data to support lambda architecture.
Taking all these into consideration we could not move to:
- NoSQL DBs like Cassandra and HBase as they did not give us the feature to freely query data on any of the needed dimensions and aggregate the same. Also, maintaining counters is very tedious as the counters need to be updated over a sliding window. Also, in case there is any issue pipelines i.e. certain events are missed the counters will get impacted and correcting it would require someone recomputing the whole thing and placing them onto the NoSQL DB rather than just replaying the missed events.
- RDBMS as they are not very good at aggregating large volumes of data efficiently.
Hence, we finally came to use Druid.
Apache Druid (incubating) is a high performance analytics distributed data store for event-driven data.
It is a timeseries database that consists of dimensions (for querying and filtering) and metrics (for aggregations). It optimises aggregation by doing a rollup during data ingestion over a time window called the query granularity, reducing the size of data as well as pre-computing certain metrics.
We use Druid’s Kafka Indexing Service which uses a supervisor setup to maintain the state of a indexing task i.e retrying in case of job failure, properly maintaining kafka offsets in its metastore, maintaining replicas of a indexed data to increase availability as well as rejecting late and early message.
Druid is a distributed storage with support of data replication and auto re-balancing making it fault tolerant. It has a configurable deep storage e.g. HDFS, S3, etc. making it persistant.
It supports batch ingestion of data from HDFS and has the capability to leverage resources from YARN for batch indexing jobs enabling it to index large volume of data in batch hence, supporting lambda architecture.
Currently we re-index data into druid in batch for older days increasing their granularities as per our needs to further improve performance. E.g. we index data with a granularity (query granularity) of 15 minutes in realtime to power our realtime insights i.e. average bookings in the last 30 minutes. Then we re-index data for day D-1 (in batch) with a granularity of 1 hour to support insights requiring computations in the order of hours i.e. average bookings in the last 24 hours and finally re-index data for day D-2 (in batch) with granularity of 1 day to power insights on the order of days i.e. average bookings in the last 7 days.
Now moving onto the part of the orchestrator, as we are required to perform aggregates on an interval of time, we would not be willing to waste our resources to calculate insights when it is not required. So we are triggering the insights based on events. For this we are using trigger expressions that is a MVEL expression. Our rule engine is fed with these MVEL expressions and when new events from kafka is getting processed then with help of this rule engine we get to know all the insights that need to be calculated for each event. After this druid query containing filters, aggregates and time interval is generated at run-time which performs the aggregates and generates the result.
Now, the problem arises: how do we keep our insight value accurate when there is no event coming in but since it is based on an interval of time, the insight value will keep changing. For example if we want to calculate how many couple travellers have booked a particular hotel in last 6 hours and we are calculating the insights on arrival of events then after sometime when events for this hotel are not coming then the counts of couple travellers will keep decreasing and at the end it will become 0. So we have to keep track of different states of insights :
- In Refresh Cycle
For this we are keeping the insights in local cache (RocksDB). Here based on different frequencies we keep the track of different states of insights. After completion of each micro batch we store in RocksDB the insights that got triggered in that batch. This helps us in identifying in each micro batch what insights are to be refreshed and what insights have been expired.
Insight processor creates a single query for each triggered/in-cycle insight, which are then updated in BoulderDB. Similarly expired insights are deleted from BoulderDB.
Here is an example of our configuration for these types of insights:
- Batch Insights: These insights are not dependent on real time data but involves huge amount of data, spanning multiple tables. For this our processors are using spark sql because it provides us the flexibility to shoot different types of queries at run-time. Configuration for these types of insights looks like this:
Alerting & Monitoring:
Each component is individually monitored on Zabbix via their respective health/status APIs. All vital metrics are collected via a JmxTrans process, which then sends the same to OpenTSDB which is then plotted onto our Grafana dashboards. For Druid all metrics are published onto kafka using druid’s kafka-emitter. Data from which is again sent to OpenTSDB for monitoring purposes. SLAs are maintained for BoulderDb response times and error codes on Zabbix via OpenTSDB queries.
We also have a setup in place to test our complete realtime pipeline. Certain test messages are introduced into out input data sources, for which insights are generated by the Insights Engine and finally saved onto our BoulderDb. Another process fetches the same from BoulderDb and checks if all needed insights were created correctly from the test data and on time (i.e. update frequency of an insight). This approach is used to test Realtime-Last and Realtime-Aggregate Insights data pipelines.
Insights batch job are scheduled on Airflow on a daily frequency, with retries and failure mails setup in the Airflow DAG.
Since, no system is perfect we too have certain limitations present in the Insights Engine.
- The first and foremost of them being Druid. As of now druid doesn’t facilitate joins and has very minimal support for complex data types i.e. arrays and records. Hence, a lot of effort needs to be put in modelling the data into a flattened structure.