Sanjeev Kumar Dangi
Feb 16 · 12 min read

Background

Myntra has an in house service named Yantra. It is actually event processing engine that manages a state machine and triggers actions based on events and changes state.

Yantra handles multiple event based state machine workflows like sending notifications, generating feeds and sending mailers on events, such as user have added the items to the cart but hasn’t made the purchase, user conversion flows for a new user like activation and deactivation of coupons.

Earlier some of these use cases were fulfilled by another system - Comex, which would run a hourly or daily cron jobs. This system has constraints as the system actions were not realtime leading to less conversions from the notification funnel. For batch jobs, you cannot use live service instances as there will be sudden spike in throughput when the batch job runs and that impacts your live traffic. So in some cases, you run these batch jobs during non peak hours or you integrate with analytics system or use a dedicated service instance for the same.

Yantra service that was built to make the flows realtime solved most of the issues, but it threw up another set of challenges. Say for abandoned cart use case earlier system used to send notifications every hour and at the end of day also. So there will be 2 cron schedules — one every hour and one every day. So total 13 schedules were enough.

With real time use case on Yantra, every add to cart will generate a new schedule or update the existing schedule and delete item cart event may delete the schedule if cart has become empty. During Myntra EORS, there will be a few millions add to item cart events i.e millions of schedules that will be created/updated/deleted.

Initial days — RabbitMQ exchange plugin

Some of the components in Myntra were heavily using RabbitMQ exchange plugin for delayed queues and so realtime events can be scheduled with a delay and a consumer can subscribe to queue and will get event when delay expires. Teams inside Myntra had very good experience with RabbitMQ exchange plugin and it could handle decent scale if thrown a very large RAM.

Use cases worked well except during EORS sale. During sale days, RabbitMQ cluster would go down because the required throughput was very high and consumers were throttled as per downstream pipeline capacity, RabbitMQ RAM would be filled up with hot data but that data is still not consumed leading to very high sustained disk I/O. Disk I/O would further slow down consumers and thus leading to more disk I/O and RAM usage and high CPU hence bringing down the RabbitMQ processes.

Need for custom scheduler service

As RabbitMQ exchange plugin was not working for us. We evaluated couple of existing scheduling solutions that were available as open source or were also used in Myntra. Each one of them were used for specific use cases and they were not solving our use case.

Existing scheduling tools

  • Quartz Scheduler

Quartz is library and works very well for many scheduling needs. It has bookkeeping also. It can be run in cluster mode and is fault tolerant.

But, it doesn’t scale beyond hundreds of schedules per second due to mysql database locking.To use quartz, you have to embed the library and create the schema to maintain the persistence. It is not a managed service. It still requires some work to integrate and manage the same.

  • Linux native crontab

Simple and easy to use. Available everywhere as natively built in linux.

But they are single point of failure. If machine goes down, schedules will not trigger. It doesn’t have bookkeeping — which all schedules fired at what time and what was the status?

  • RabbitMQ delayed message exchange plugin

Many services already had integration with RabbitMQ for messaging needs. So it is was pretty easy to integrate and onboard.

But it requires separate RabbitMQ cluster for scheduling needs. For high reliability of schedules, you may not want to have inter or intra service pub sub flows on the scheduling RabbitMQ cluster. This plugin stores schedule state in single node out of all nodes in a RabbitMQ cluster. So it is single point of failure. If that node persisting delayed schedule messages goes down, your schedules will be lost. And you also cannot cancel a schedule once scheduled. Authors of this plugin also suggest not to use this plugin for very high throughput use cases — 100k or million schedules https://github.com/rabbitmq/rabbitmq-delayed-message-exchange#limitations

Bootstrapping scheduler service

So we decided that there was a need for managed scheduler service which can be used across Myntra by all services. It should have below mentioned features.

  • It should be a managed service that should have defined contracts for interaction.
  • It should be multi tenant.
  • It should have per client resource management and fairness.
  • Clients should be able to increase or decrease their resource quota.
  • It should have bookkeeping — status of schedules.
  • It should have support to cancel or reschedule a schedule.
  • HTTP and Message Bus should be supported for callback channels when schedule is triggered.
  • It should at least support granularity of scheduling at per minute.
  • It should be horizontally scalable. We can add more nodes as throughput requirements increase.
  • It should be distributed and fault tolerant i.e no single point of failure.

Architecture

Tech Stack

Scheduler service has 3 major components — http service layer, poller and datastore.

Golang — We decided to use Golang for service layer and poller layer. Golang has been used in couple of services in Myntra and is ideal for high throughput and low latency services that require concurrency. It is very light on memory footprint. It has great support for concurrency with goroutines. Each goroutine takes around 4KB footprint while threads in java take around 512KB. Scheduler workers and pollers are goroutines. We can create millions of goroutines on a single 12 core 24 GB machine.

Cassandra — We decided to use Cassandra. It is horizontally scalable, distributed and fault tolerant datastore and is heavily used inside Myntra. It is great for very high write throughput but not so great for read throughput.

Airbus — Airbus is managed messaging platform built over open source messaging pieces and is used for async inter service message communication inside Myntra.

Service

Service layer is the http web layer that handles all REST traffic. Couple of important endpoints are —

  • Register client — only admins can register a new client.

A client is a actually a tenant i.e another Myntra service that runs its use case on scheduler service. Every client registers with a its unique id.

  • Update client — update the configuration of client
  • Create Schedule
  • Update Schedule
  • Reschedule
  • Delete Schedule

A client can only call schedule endpoints, if it is registered, otherwise requests are rejected.

Datastore

Datastore stores

  • schedule state data — payload, callback details, success and failure after the trigger
  • client config metadata
  • poller instance statues and poller node membership.

Poller Cluster

Poller nodes uses uber ringpop-go library https://github.com/uber/ringpop-go. Ringpop provides application level sharding. It makes a consistent hash ring of on available poller cluster nodes and keys are distributed on the ring and parts of the ring are owned by poller cluster nodes.

Every client owns some fixed no of poller instances. It total no of poller instances assigned to all clients on all nodes is X. If there are Y clients where each client owns C1x, C2x ….. CYx poller instances respectively i.e C1x+ C2x+ ….. +CYx = X. And if there are N poller cluster nodes, each node would run around X/N poller instances i.e. X/N = C1x/N+ C2x/N+ ..+ CYx/N where C1x/N, C2x/N….CYx/N are owned poller instances by Y clients respectively. If a node goes down, (X/N) poller instances will shift to available N-1 nodes. If a nodes is added, X/(N+1) poller instances are shifted to new poller cluster node where each node gives away X/N- X/(N+1) poller instances.

How does it work?

Client registers with unique client id and poller instance quota. Poller instance quota is calculated based on callback throughput requirements.Each poller instance fires every minute and can fetch maximum of 1000 schedules from a datastore with max payload size of 1KB each.

Suppose a client with id say C1 wants to have a callback requirement of 50000 RPM. Client will be assigned 50 (actually it is 50+x. x is buffer for safety) poller instances during registration. Poller instance ids of this client will be C1.1,C1.2 ………C1.50

When a schedule is created by this client, that schedule is tied to any one of the poller id with a random function that has a property of uniform distribution across all poller instance ids of the client. Suppose if I schedule 50000 schedules in a minute or couple of minutes with the fire time at say 5:00PM. With random distribution, mostly all 50 poller instances for this client will be assigned approx 1000 schedules that need to be triggered at 5PM. Every minute, every poller instance fetches schedules tied to it that will fire on current minute. At 5:00PM, each poller instance of this client, will fetch all schedules (approx 1000 schedules) tied to it and will fire them on the respective callback channel.

If clients require higher create/update/delete peak RPM, then we can add additional service node or datastore node or both and if clients require higher peak callback RPM, then we increase poller instances for that client i.e this increase may require addition of new node in poller cluster or datastore or both.

So service can be scaled horizontally if the throughput requirements keep increasing by augmenting nodes in service layer, poller cluster and datastore.

Cassandra data model

Data layout is designed in a way that during every minute, poller instance request goes to a single Cassandra shard only, so that reads are fast.

Schedule table has primary key (clientid, poller count id, schedule minute).

So for above example data will look be

C1, 1, 5:00PM, uuid1, payload

C1, 2, 5:00PM, uuid2, payload

.

.

C1, 50, 5:00PM, uuid50000, payload

where uuid1 to uuid50000 are unique schedule ids.

For the above use case, we did 50000 Cassandra writes and 50 Cassandra reads.

Actually every schedule creation does one Cassandra write but not every schedule fire requires one read. No of Cassandra reads in a minute is equal to no of poller instances running for all clients. So reads are always much less than no of writes. As Cassandra is very suitable of high write throughput and less read throughput. This data modelling and poller design also requires high write throughput and less read throughput. So it works well for Cassandra datastore layer.

Callback Processing

There are two type of callbacks— http and message bus. There are two go channels one for http and one for message bus. Pollers push the schedule payloads for that minute on http channel if the type is http and on message bus channel if the type is message bus.

Schedules on http channel are consumed by a go routine worker pool which actually invoke the target http endpoint with payload.Currently there is a worker pool of 15000 go routines for http callback processing. Suppose if average timeout of http endpoint is 500ms and there can be max 3 retries. A single schedule can be processed in 2 seconds = (500ms * 3 +500ms fetching schedule and processing overhead). So in worst case, if all http callbacks fail and 3 retires happen, in a minute worker pool can process 15000*30 =450K http callback schedules.

Currently there is a single producer that listens on message bus channel, it buffers the schedules till buffer is full or 200ms has expired and then compresses and publishes the batched and compressed payloads on message bus.

Going forward the plan is to have one http channel and one message bus channel for each client. And only the pollers of this client will produce on these channels. With this each client will have dedicated worker pool for processing each of callback types instead of global one. With this worker pool size can be tuned for per client throughput and schedule processing of one client doesn’t impact the other clients.

Fault tolerance

  • Node goes down

All poller instances persist its ownership node information in the Cassandra when they have successfully started. If a poller node goes down, each node receives a node down event. On this event, each node loops through all the poller nodes that were assigned to this node and see if ownership of ring slice where hash key of this poller instance lies, has moved to this one. If yes, this nodes starts the poller and updates the node ownership information in the Cassandra else ignores the poller instance.

  • Node comes up

If a poller node comes up, it loops through all the poller instances see if ownership of ring slice where hash key of this poller instance lies, has moved to this one. If yes, this node sends a synchronous poller instance shutdown call to owning node and on success, it starts the poller instance on current node and updates the poller instance ownership node information. If poller instance shutdown request fails, it retires and if all retries fail, node bootstrap fails. This behaviour is done to ensure that no two poller instances are running for any poller instance id inside the poller cluster.

Resiliency of pollers

Pollers never die when they are running unless asked to shutdown. If any runtime errors happen, poller instance catches that and logs it and recovers.

Graceful shutdown

When a poller cluster node gets shutdown signal, all running poller instances on this node finish processing of all the schedules that are currently being processed. After processing is done, worker pools and poller instances cleanup resources and shutdown.

Rolling out to production

This service has been in production from last one year and we have on boarded couple of use cases with high throughput. To support Myntra EORS (End of Reason Sale), we did the benchmarking for the same. With 1 KB payload size, on 3 node poller cluster(12 core 24 GB RAM) and 3 node Cassandra cluster(12 core 24 GB RAM, 1 TB disk) and message bus callback, we were able to benchmark throughput of around 350K RPM for create schedule and around 1 million RPM for message bus callback.

We generally used to disable abandoned cart notification use case on Yantra during sale days because scheduling throughput used to be too high which RabbitMQ delay exchange plugin couldn’t handle. This time we kept this use case on during EORS by using scheduler service instead of using RabbitMQ exchange plugin and every add to item cart triggered a create/update schedule for scheduling service.

During 5 day EORS sale, around 180 million schedules were created and peak hitting around 100–120k rpm. Scheduling service worked without any issues during the sale.

Couple of scheduling use cases

  • SLAs -Schedules for SLAs of different stages in order fulfilment pipeline like packing, shipping, delivered to hub, delivered to sorting centre, picked by SDA, delivered by SDA. If SLA breach happens, schedules will be fired and mitigation action can be taken. If the activity is completed within SLA, schedule is cancelled.
  • Retries - Lot of asynchronous processing has failure handling with backoff strategy. If there is a failure, service can schedule the retry as per backoff policy in the scheduler service.
  • Scheduling discounts — Discounts are generally active from a start window to end window. Service can create two schedules one for enabling of discount and one for disabling of discount.
  • Scheduled notifications — Many marketing and engagement campaigns are scheduled for target segment of users. Service can create one schedule for every campaign notification trigger.
  • Payment reconciliation — Payment gateways may have issues where the transaction gets successfully processed by bank but gateway gets timeout. On payment runtime failures service can create schedule for reconciliation at a later time. When this schedule fires, it fetches the status for transaction from bank and if the payment was success, refund is initiated.
  • Scheduling product listing activation — After products are on boarded in catalogue, service can create a schedule for activation. On schedule fire, it activates the style in Myntra and product starts appearing in listing and search.
  • Scheduling feedback mailers after order is delivered.
  • Scheduling nudge email to buy accessories or bought together recommendations related to a product when a product is delivered.
  • There can be n no of other use cases across different services.

Future Work

We are building self serve dashboard with ACL controls for easy on-boarding of clients. Dashboard will have common functionalities — like create schedule, update schedule, reschedule a schedule, check the status of a schedule etc.

We will be adding cron support also in the schedule API.

MySS will integrate with BI systems so users can generate reports on historical data based on status and time window for different clients. Integration will be done with a fixed event contract and pluggable publishers. Different publisher implementations can publish the events to message bus, http layer or s3 or can just log on console.

References:

Myntra Engineering

Stories of code.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade