Quicksilver: Near Real Time Platform at Myntra
Myntra’s vision is to provide a personalised shopping experience to every user by matching our deep understanding of the user behavior with our market leading expertise of fashion trends. Introducing QuickSilver, Myntra’s near real-time processing platform, which enables this vision by streaming and processing tens of billions of events every day.
The ability to process data in near real time ensures a great user experience by personalising user journey, in-session. Quicksilver platform ingests tens of millions of events every minute, computes the features in near real time and makes them available to machine learning models, which help in hyper-personalization of shopping experience based on the current session activities and intent.
Quicksilver is part of overall Myntra Data Platform (referred to as MDP internally) for managing the entire lifecycle of data for Analytics, Machine Learning and AI use cases.
Requirements for Quicksilver
While building the near real time platform we had to cater to the following requirements
High data volume
The near real time system should be capable of ingesting and processing tens of millions of events every minute. This volume will increase over time with new personalization use cases and ever growing footprint of users on our platform
Data freshness
For the user to experience seamless in-session personalisation, the data freshness is required to be in the order of a few seconds from the point of user action till the experience can be refreshed with the in-session inferred intent.
Support Large number of attributes
With platform serving teams across multiple domains (product, users, payments etc) , the data can have a lot of attributes.
Stateful processing
While there are few use cases that don’t require any state, most stream processing use cases at Myntra require state, with duration of state varying from days to months depending on the use case. Fetching state, combining with current events’ data and processing at scale is a key requirement.
High Serving Throughput
Myntra serves millions of users daily, which translates to our Quicksilver platform serving millions of requests per minute.
Low Serving Latency
The p99 latency of the feature requests should be in the order of a few milliseconds since higher latencies here will directly affect the user experience. For a typical ML model, the expected p99 latencies are 40 to 50ms.
Resiliency
The platform needs to be highly available, resilient to run 24x7 and recover from failures quickly without any data loss
Scalability
Data is ever increasing and as the data volume increases, the system should be able to scale and handle the increased data volume and variety.
Multitenancy
Since quicksilver is a company-wide platform enabling multiple teams to leverage real time data, it should be able to support multiple business functions where the resources for the business functions can be isolated.
Flexible design
The platform and its design needs to be flexible enough to evolve from near real time processing to real time processing use cases in the future.
And it also needs to have flexibility to support multiple stream processing frameworks to cater to diverse sets of use cases.
Need for a dedicated Platform for Near real-time Processing
A dedicated platform for near real time processing ensures common capabilities are built for diverse stream processing use cases for Myntra’s current and future scale.It also enables operational efficiency and cost optimization when we onboard more business use cases.
QuickSilver design
Quicksilver was built keeping the above requirements in mind.
The platform has 3 main modules.
Streaming Data ingestion
Data ingestion can be done from multiple sources. Here we see how the data events coming from the App/Web are ingested and then later used for near real time processing.
Heimdall ( Event Collector ) is an in-house high throughput, low latency service that provides an http endpoint for clickstream event ingestion from App, Web and other sources and publishes them to Kafka. The amount of data which comes in is of the order of tens of millions of events every single minute.
Refer to Sourcerer Ingestion Blog to know more about how data ingestions are done in Myntra.
Near Real Time Processing Layer
Apache spark is used for near real time processing of the data and kafka is used to transfer the data in between the spark streaming jobs.
The data from the primary kafka is consumed, preprocessing is done on it and the pre processed data is written to the Quicksilver kafka. Preprocessing is done so that the data follows a uniform structure which simplifies further downstream near real time processing.
Depending on the use case, various other real time processing jobs can be written to process the data coming out of Quicksilver kafka.
In order to simplify creation of spark jobs, another in-house platform called Janus is used. It was primarily built to create and orchestrate batch jobs. As a part of the quicksilver platform, it was enhanced to support near real time jobs as well.
To know more about how Data Processing is done at Myntra, refer Janus blog.
In use cases where additional data aggregations are needed, the data is sent to Centaurus which is a Myntra service which does aggregations. In scenarios where massive aggregation is needed and low latency serving is needed, we use redis both as a state store and the serving layer.
Serving Layer
Once all the processing is complete, the final processed result needs to be served out in a quick and efficient manner and at a high throughput.In order to achieve this the final processed result is stored in an online feature store (Redis) and served using Tesla, a GoLang service. Redis and GoLang were chosen since they can serve very high throughputs of millions of requests per minute under a latency of a few milliseconds. The Serving layer can support millions of requests per minute under p99 latencies of 20ms — 30ms.
Resiliency and Scale
Since the system needs to be up 24/7 and also considering organic growth, the system needs to be highly scalable and resilient. The following factors were kept in mind to ensure that the system is highly resilient and scalable
Checkpointing
All the systems are expected to fail. In such a scenario, the stream processing jobs are set to adapt and start from the same point where it has failed to process. This is one key aspect that every streaming system needs to have. We are using Spark’s checkpointing mechanism to store the offsets until which the stream is read upon.
Restartability
Given the probability that a system is bound to fail, it should have the ability to self-heal. Unless otherwise, a manual intervention would be needed and the data pipeline will be broken. Hence restart scripts are set up for all the realtime jobs powered by Airflow where we are checking if the job is in the running state through YARN.
Monitoring
Every job has to be monitored for multiple metrics and the datastores used in the pipeline setup also should be alerted to the right people so that we are aware of what is happening to the pipeline. For this purpose, StatsD is used for metrics collection from Spark jobs which is visualized using Grafana, while Prometheus is also used for scrapping metrics from the datastores.
Data store Resiliency
Every component in the pipeline should be able to scale horizontally and be able to handle millions of events which can grow with time. With this thought in mind, we should be able to make a decision on how the data stores should be properly tuned to the needs of the business functions.
Kafka
For kafka, we had the following things in mind : Replication and Stability. We had set the replication to 3, which is the default industry wide standard used . And for stability , we have ensured that all alerts are set up in grafana. Some common alerts include Under replicated partitions, Fetch latency, and disk space usage. Apart from that, we have scripts to check if the kafka brokers are available all the time and alerts the developers in case of issues.
Redis
Similar to Kafka, we needed a distributed setup for redis that can scale for multiple use cases that need real time insights. We set up redis in a way that we have a master and two replica nodes, with read replicas enabled so that very high read throughput can be guaranteed.
Multitenancy
Redis and Golang Service
Each use case would need to support throughputs of millions of rpm. So the decision was taken to ensure that each use case gets its own isolated redis and golang instances.
Spark Clusters
Queues in yarn are used to ensure that spark jobs resources are isolated
This article gave an overview of the various components involved in the near real time platform.
We are also excited to hear from the rest of the community on what are some of the challenges they’ve faced and how they’ve overcome them in the comments section below.
Stay tuned for future articles which will cover the components at a much more granular level…
Co Author: Dyanesh D