DATA is the New OIL: Mine it Well to Tap your Customers in Real-Time

Dream11 Engineering
Mar 25 · 6 min read

By Neha Sood

Data is the new oil…only if you can reach meaningful insights out of it. Getting the most relevant insights in the fastest possible way, makes a business stand apart from the crowd. In other words, reliability and speed are the two key metrics when it comes to assessing the quality of insights. However, with growth in user base and resultant data, supporting deep analytics at a large scale becomes a challenge.

At Dream11, our analytics use-cases range from real-time customer targeting based on their current actions, detecting behavioural anomalies, powering journey nudges basis users’ action stage, enabling the business to react without delay.

Because of our humongous scale, i.e., over 50 million registered users and 1 billion events per day, creating pertinent architecture is challenging, not to mention we are growing 4x YoY.

One such use-case was to identify fraudulent users and their transactions in near-real-time. The objective was to enable instant withdrawals for all our users. This required running a decision engine on their entire history of transactions in near-realtime, which the current architecture could not support.

Run to the hills!

Our transactional data currently resides in Aurora, which could not scale for this use-case. With a microservices based architecture, all the required data is also distributed across different clusters. Therefore, running joint analytics was not possible, as well.

Below is the broad architecture to kick start the project:

While looking for a target NoSql datastore, we focussed on the below prerequisites:

  • Low response time for data at scale
  • Highly available and futuristic, in line with our growth target
  • Ability to handle request spikes & high write volumes in peak seasons like IPL and World Cup etc.
  • Horizontally scalable

For the above requirements, Cassandra seemed a good fit. Modelling your data right (as per the use-cases) is the key to make it work. Data enrichment and joins have to be a part of the streaming layer.

The Pipeline from Aurora (AWS RDS) to Kafka

With our previous pipeline, we already had the data flowing from Aurora to Kafka.

However, from our experience with maintaining Confluent Connectors, we knew they were not the right fit for a near-real-time use-case.

Adios confluent connectors (why)

  • Long-running transactions cause data sourcing delays.

For sourcing data from Aurora to Kafka through Confluent connectors, we used “Timestamp + Incrementing mode”. To ensure that a transaction with an earlier timestamp completes, before proceeding the counter, some delay needs to be added with “timestamp.delay.interval.ms”, accounting for the maximum transaction duration per table.

  • Continuously updating records are not sourced, until their updations pause for a while.

A record is sourced only when it is in the <LastSourcedOffset> to <Current timestamp-Delay interval> window, with respect to the record timestamp. If the nature of data demands continuous updates, chances are they’ll keep missing this window until the updates slow down or pause for a while.

For more details on our usage of Confluent Connectors, refer our last blog here.

Bula Binlog replicators

Thinking of CDC on transactional systems and not considering Binlog replicators is a cardinal sin. We ran a comparison across all the known and less-known pieces. Here is what we have to show:

Comparisons (as of May’18)

Voila!

Maxwell, Streamsets and Debezium came out as the best suited for our requirements. However, with Maxwell lacking HA and Streamsets only supporting HA in its enterprise version, Debezium was the ‘chosen’ one. With a performance difference of ~15–20% under heavy load, some internal load tests comparing Streamsets community version & Debezium, also favoured the latter. Being based on confluent connectors, it also supported Schema Registry out-of-the-box.

Gotchas in Debezium

  • Beware, Debezium is not a scalable replicator. A single connector thrives on a single node. So you may run into performance bottlenecks. In our load tests, we started seeing lags at a consistent DML of at least 6k DMLs/sec, for more than 5 minutes. So, with consistently increasing and high load, this will be a bottleneck, although it may catch up within a couple of seconds, as soon as the load fluctuates or goes down.
  • With shared binlog files, across nodes, mysql master failover can get handled with a small operational overhead. All you need is to restart the Debezium connector, using their HTTP API.
  • Use avro data format, rather than Json to save network I/O. With Avro, we witnessed more than 50% spike in the performance.
  • To make Debezium suit our scale, we jumped into its open source code and tried out the below optimizations. The first one is running for us in production:
  • Skip deserializing delete (or any other) row operations, if they are not relevant to your streaming pipeline.
  • Serialize date(Time) fields as long (https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java)

Streaming: Kafka to Cassandra

Industrial benchmarks favored structured streaming as a micro-batch low latency and high throughput engine. Our POC showed good results too. With easy support for late data arrivals, joining multiple streams, out-of-the-box mechanisms for fault-tolerance and state-management, this looked like the perfect fit. An icing on the cake: the code is easily unit testable.

Also, we chose EMR for running the processes.

Why EMR?

  • AWS managed service
  • Supports resource dynamic allocation — your streaming jobs can automatically handle spikes in data volume and be able to scale up as your data grows
  • Auto-scaling of core and task nodes based on CloudWatch metrics
  • Automatic node failure handling

Gotchas in EMR

  • Master node stores the fsimage and the following edit logs. EMR does not merge them automatically, which results in disk full issue.
  • As can be concluded from the above point, EMR doesn’t have a secondary name node aka checkpoint node.
  • Task nodes do not have any local HDFS, unlike core nodes. So, any tasks running on these nodes will need to fetch the data from remote core nodes, for processing. This will have some performance implications.

Beefeaters & ravens

  • With both Debezium and Spark Streaming, you can expose metrics via JMX. This helps easily integrating with centralized dashboards that can give you a fair end-to-end picture on delays and possible bottlenecks.
  • Make sure you also set-up alerts when binlog replicator or streaming latency exceeds the expected threshold.

Is my toast done, honey?

What we have is, an in-house generic Streaming framework, which can read from and write to configurable Kafka and Cassandra clusters, with automatic schema mapping, using Schema Registry. With this framework, apart from the ability to join across streams, supporting dynamic filtering rules and data deduplication, one may even rename/drop/derive new fields from existing ones.

The pipeline involving multi-stream join, with large join and watermarking windows in the streams, takes max 2 sec end-to-end. While the single stream pipelines, with some filtering rules, column add/drop/rename, works on a delay of less than 100 ms.

We have just begun our journey of real-time deep analytics. The next frontier is identifying and alerting anomalies and tracking near-real-time user behaviour patterns.

Follow us in our journey and keep learning with us.

Dream11 Engineering

Written by

Official Account of Dream11's Engineering Team

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade