Cronus-Platform: powered by Akka

Summary: It is scalable and resilient distributed application which provides a platform to schedule an activity. A client can define and register an activity using its service layer. It uses RMQ as messaging system. Client publishes messages to RMQ. Cronus consumes and processes message to identify what all activities to be performed. It is fault-tolerant and recoverable scheduling mechanism is based on Akka actor system model with ScyllaDB as persistent storage. Actor gets created few hours before scheduled time of an event. It sleeps until wake up time arrives. It finishes its work and dies.

Reading tech debt: It is recommended to pay your reading tech debt here before going further. Probable debt could be knowing akka actor system(shard actor,singleton actor, persistent actor, akka cluster), alpakka for RMQ event streaming, akka persistence with cassandra, groovy script or data modelling in Cassandra.

Let’s start with requirement analysis. We at MakeMyTrip primarily focus on customer trust and promise to provide best service with continuous improvement using cutting edge technologies.

Use cases:

Typical Customer Booking Journey

Above is one of example where multiple events are performed upon booking timeline progressively. Starting from booking creation till completion, all kind of notifications/activities without fail make customer’s experience awesome and journey hassle-free. There could be a lot more use cases. Whenever such business requirement comes, one has to start over or extend some existing application to cater this. It could be painful and costly to go through development, QA and deployment phase again and again.

So we thought why we can’t build a platform which will reduce SDLC drastically. Where client should have full control to define and register/de-register activities. After activity registration, all they have to do is to publish messages into RMQ. Here comes the beautiful output as Cronus-Platform.

Cronus-Platform: Now the question would be what makes it platform. Because it is not just meant to be used for any specific use cases. Business logic are kept out of this platform. It will just cater to provide event based scheduling mechanism. It does have no restrictions on input(pojo) and supports wide range of flexibility to define an activity.

Let’s go through below self explanatory Activity config example.


{
"tenant": "Test", // to support multi-tenancy
"activityName": "BNPL_TEST",
"version": 1, // selfsupport of multi version
"entityId": "define groovy script to get entity id from pojo",
"validationScript": "define groovy script to validate pojo",
"dataRefScript": "define groovy script to extract reference date and metadata from pojo",
"states": [ // define list of states and transition flow
{ "name": "debit_state",
"executionType": "HARD", // event will never be skipped
"startTime": { // tells when to start trying debit
"scheduleType": "RELATIVE", // relative to reference datetime
"relativeTime": "-1d01h30m" // d=day, h=hour, m=minute
},
"endTime": {// last allowed time for execution
"scheduleType": "ABSOLUTE",
"absoluteDiffDate": 2, // after 2 days wrt reference datetime
"absoluteTime": "15:00" // 15 O'clock
},
"retryCount": 15, // max no of retry on failure
"retryInterval": "3h", //retry interval
"dependentStateName": null, // not dependent on any state
"activityStateDescription": "debit money from card"
},
{
"name": "email_state",
"executionType": "SOFT", //event will be skipped if time elapsed
"startTime": {
"scheduleType": "RELATIVE",
"relativeTime": "-2d0h0m"
},
"endTime": null,
"retryCount": 10,
"retryInterval": "10m",
"dependentStateName": null,
"activityStateDescription": "send email to customer"
}
],
"api": "http://<localhost>:<port>/<business/api/v1/BNPL>",
"maxRPS": 10, // throughput restriction for api call
"deleted": false,
"suspended": false //temporary enable/disable activity
}

This is how client can define activity and register to platform using REST call. It can be modified/enabled/disabled/deleted anytime and its version will keep bumping up applicably. It supports multi version activity. Anything scheduled with previous version will continue to run according to previous version of activity config. As a feature platform does provide APIs for cancellation.

When client publishes message to RMQ, Cronus consumer will tag the message against all active activity to check what all activities are applicable. Once tagging is done, it will break down further into events on timeline according to activity’s definition. For any event, the actor will get created 6 hr prior to scheduled time. Actor uses its on scheduler property to schedule itself. When time arrives, it wakes up and go through validation. Validation is consisted of no of checks like activity should not be deleted or suspended, its dependent state should be completed if it is of SOFT execution type, it should not be cancelled, its end time should not be passed. On success of validation checks, it makes POST REST call to the mentioned api in activity config with below request and response contract.

Request body :

{
"entityId": "string",
"tenant": "string",
"activityName": "string",
"state": "string",
"version": 0,
"timestamp": 0,
"metaData": "string"
}

Response body :

{
"message": "string",
"success": true
}

Platform stores all of its executed response into data store for audit purpose. If success is false, it will reschedule itself with given retry interval from activity config for the state only if it passes through retry validation checks.

If success is true, it marks that event successful. If nothing else is remaining scheduled in the actor, it will kill itself by consuming poison-pill.

There will always be one and only one actor into the system serving all works to an unique entityId. These actors are shard and persistent actors as well. They are distributed over all nodes in akka cluster. It persists its state into data store each time on its state transition. All nodes in akka cluster subscribe event for exiting member. So if any node goes down, leader will recreate all those actors present on that machine.

There might be a situation where thousands of actors wake up near same time and hit the client service. And it can choke the service. For this platform has the capability where Client can define max RPS in activity config so that their service should gracefully handle all http calls from Cronus. Basically Cronus manages throughput internally by ensuring that not all actors should make http call altogether. The restricted one will come back later to try again.

Architecture: It exposes service to client for CRUD(create, update, delete) operations for activity config management along with tenant and versioning. It has smart RMQ consumers provided by Alpakka library. Consumer is attached to singleton actor. It consumes messages from queue in batch with configurable prefetch count(sliding window). Singleton actor’s property is that it gets activated on some other nodes if current node fails. Hence consumer will be highly available until at least one node in Akka cluster is healthy.

High level diagram

Consumer collects batch of messages and send them synchronously to supervisor actor. Supervisor pass them to router actor one by one which will have bunch of worker actors. Routing is done to bring consistent hashing and parallelism. So messages of same entity id will go to the same worker actor and those will be processed sequentially. At the same time messages from different entity ids will be processed in parallel by different worker actors. So it can scale horizontally if prefetch size and worker actors are increased.

Akka Cluster of 4 nodes
Flow diagram

Worker actors do actual stuff. It passes message to ActivityTaggingService which validates and tags what all activities are applicable for this message with the help of groovy engine. Once tagging is done, it will further break activities into events on linear timeline. Then if any of event is going to happen within next 6 hours, they will be send to shard region for scheduling after persisting into data store.

Event life cycle

Event has a definite state transition life cycle. Event starts with INITIAL state then move to SCHEDULE then move to SUCCESS or FAILED or CANCELLED. It can move from FAILED to SCHEDULED again if retry is applicable. So as soon as an event is created, it is persisted with INITIAL status into data store if not eligible for immediate scheduling.

Shard region basically have all information about all shard actors. If that particular shard actor exists in the cluster, it will just send the message. Otherwise internally first shard actor will be created on send message. Shard actors are spread over the nodes in the cluster. Once shard actor receive any message for scheduling, it will mark its status as SCHEDULED in data store and schedule itself at specified timestamp. It wakes up on that timestamp and make a http call to third party service configured in the activity. Based on http response it will mark that event SUCCESS or FAILED. In case of failure, it will check if it does qualify for retry, if yes it will reschedule itself again with increase specified retry interval in the activity. If there are no schedule active in that shard actor it will kill itself.

Actor Recovery: If one wants to recover the state of an actor after service down or restart, actor has to be persistent actor. In our case, all shard actors are persistent actors as well. We have evaluated few other data store for persistence like LevelDB, MySQL. Since we were looking for timeseries database for rest of things and Akka persistence with Cassandra is highly recommended by communities. So we decided to go ahead with Cassandra. On further exploration, we found that ScyllaDB is drop-in replacement for Cassandra and having many other advantages over Cassandra. So we replaced Cassandra with ScyllaDB and it worked well without any issues. Actor persistence library integration is easy and takes care of everything. One just needs to define what to persist and when. So whenever service restarts, just by passing persistent ids to shard region will create all existing shard actors and their state will be recovered from Cassandra. But what about if only one node goes down? Do we need to try recovering all actors from all nodes as Akka Cassandra persistence library does not support to retrieve node wise persistent ids. Akka provides pub-sub events for a node state i.e if a node is leaving or joining or exiting the cluster. This comes as a rescue for performance improvement.

Fault-tolerance: To improve the recovery performance when a node exits, a custom mapping is created and stored into DB which will have node-address vs persistent id. An actor will make an entry into this mapping on creation. If any actor gets poison-pill, it will remove its entry from the mapping. This way if any nodes goes down, singleton actor will get message as it has subscribed to events on exiting node. On receiving such message, it will fetch all persistent ids present using node-address and pass them to shard region to recover actors. It updates new node-address against actors in the mapping as well.

There is another actor dedicated to keep selecting all qualifying events from data store and send them to shard region for scheduling. It runs hourly and pick all those events whose execution time lying in between upcoming 5th to 6th hour window(configurable).

Monitoring and alerting should be in place when such kind of system is built. We have used grafana. ScyllaDB also provides grafana-integration tool which is really cool. Here are few of metrics from Cronus.

Cronus Metrics

Conclusion: Cronus has become an integral part of our infrastructure since its rollout, and a system which will be fuelling many of our core workflows. It can be particularly useful for platforms looking for a reliable general purpose framework that can be easily integrated with our infrastructure.