Sourcerer: Data Ingestion at Myntra

Abhinav Dangi
Myntra Engineering
Published in
5 min readSep 6, 2021

Rapid increase in data in terms of velocity and volume has put new demands on how we capture and provision data for analytics. Myntra being a premier fashion retailer in India is no different.

Being one of the leading e-commerce companies in India and rapid adoption of e-commerce over the last decade we have seen exponential rise in traffic and data volumes. This along with advancements in Data Analytics and Machine learning needed us to enable a reliable means to store all data available in any system into a central place for democratizing the use of data. This requires Myntra to have a Data Analytics Platform which will integrate data from different teams and different sources, perform different kinds of processing and generate insights.

Over the years there has been a huge increase in both transactional data that is recorded in data stores as well as clickstream data recorded user behaviour data. We soon had a problem of plenty and realised we needed to make long term investment in building a Data Ingestion Platform (Sourcerer) to serve as a central gateway for all data ingestion coming in the Data Platform.

Guiding Principles

We aimed at solving the following problems.

  1. Ingesting from diverse data stores being employed to store transactional data — Mysql, Mongo, Cassandra, Aerospike etc.
  2. Handling large volumes of data at scale maintaining high accuracy and quality while managing data freshness SLA guarantee.

Platform Evolution

Rome was not built in a day and so was our ingestion platform. We had the Data Ingestion Platform built in three phases.

Extract based ingestion

To start with our ingestion into our Data warehouse we created Pentaho ETL based pipelines and leverage several out of the box capabilities like performing data enrichment, joins between datasets, single solution for several sources etc.

As the number of pipelines grew and data volume increased potential challenges regarding stream data ingestion, dependency on last modified timestamp in source, developer bandwidth required for creating and maintaining ingestion pipelines (one per table), etc started to emerge.

Change Data Capture (CDC) based ingestion

We used Debezium to capture change data from MySQL sources. Debezium solved several problems like reduction in the number of pipelines as one Debezium connector is required per database, no dependency on the last modified timestamp and supporting stream ingestion. Data ingestion strategy was also changed from ETL to ELT model (to separate Ingestion and Processing) which made use of Pentaho tool redundant.

Debezium pushed the data to Kafka. From there, we used Gobblin to dump data to Data-lake. We chose Gobblin as a one-stop solution for all ingestion related requirements. It had the capability to ingest large volumes of data, and from a variety of data sources, e.g., MySQL, Mongo, and other databases, Rest APIs, Kafka etc.

But, we faced several challenges like autoscaling, poor resource utilization, poor documentation and community support on cluster mode, pull based monitoring not supported, special cluster requirement and several other bugs like scheduling errors, stuck jobs etc. We also tried to use Gobblin to onboard other sources but without any success.

Upgraded CDC based ingestion

Since we used Gobblin to simply dump data to Data-lake, we decided to replace it with a Kafka Connector. Sink Connector was built inhouse to push data from Kafka to Data-lake. It supports autoscaling (using Kafka Connect cluster), pull based monitoring, different file formats (extendable to support others) etc.

Current Architecture

At present, the Sourcerer Ingestion Pipeline supports MySQL, MongoDB, API and Kafka based ingestions. The architectural diagram is shown below:

The database ingestions are pulled using change data capture connectors like Debezium. API Ingestions are captured using an Event Collector. All the data goes into the central message queue — Kafka. From Kafka, data is sinked into the Data-lake or the Data warehouse. We have Kafka Connect clusters for supporting source and sink connectors. Realtime processing framework processes data from Kafka and makes it available for real-time use cases.

Data Veracity and Data Validation plugins are provided in the framework to maintain high accuracy and data quality.

Capabilities

The main purpose of Sourcerer is to allow in-flow of data from different data sources and make it available for further analytical use-cases. The framework capabilities include:

  1. Supports different types of sources. The framework supports pull based ingestions from relational sources like MySQL database, non-relational sources like MongoDB, and message queues like Kafka topics. It also supports push based ingestions, taking in data by providing an API, for Clickstream data, Server-side events and handling other point use cases.
  2. Supports different kinds of sinks. Writing to Data-lake and loading data into Data warehouse are supported by Sourcerer. Both Batch processing and Realtime processing can be performed on this data.
  3. Extendable. The framework is extendable to support other kinds of sources and sinks in the future.
  4. Supports schema validation. Where schema adherence is important, data is rejected upfront. Data with non-conforming schema is separated out into invalid buckets, and appropriate actions are taken on it.
  5. Supports schema evolution. The framework detects changes in source structures and automates backward-compatible schema evolution till the sink without any human intervention.
  6. Data veracity plugin. This plugin enables the user to compare data between two queryable data stores, and generate the report based on provided configurations. It ensures data completeness. Counts across dimensions and schema comparisons are supported by this plugin.
  7. Data Validation plugin. This plugin is required in order to ensure that the data present in the Data Platform conforms to the validations exposed by the stakeholder (source team/consumer team/analyst). Data validations are supported on data sets present in different locations like Kafka, Data-lake, Data warehouse.

Other Non-functional capabilities include:

  1. Highly scalable. The framework handles 10s of Billions of events per day for Clickstream data and Billions of CDC events per day from database sources.
  2. Robust and fault-tolerant architecture. All components are working in distributed clustered mode having proper restart ability, monitoring and alerting set in place.

Future Work

We are building a Self-serve platform for Sourcerer for easy on-boarding of datasets to Data Analytics Platform. Also looking at expanding Sourcerer for different sources like File Ingestions; SFTP, G-Sheet, .

At the same time, we are surveying newer open source/paid solutions and capabilities.

Continue reading:

  1. MySQL Ingestion Deep Dive
  2. Janus: Data Processing Framework
  3. QuickSilver: Near Realtime Processing Framework

Credits: Thanks to Amit Rana and Shruti Mantri for their review and support.

--

--