Designing nRT Read and write optimized CDC Framework for Hadoop Data Lake — Part 1

Tarun Kumar
Airtel Digital
Published in
4 min readApr 22, 2022

Background: We at Airtel, wanted to perform Analytical queries(OLAP) on data stored in transactional databases like Oracle, Mysql , Postgres or Key value-based data sources like Aerospike. Since these databases don’t serve analytical queries efficiently or don't support them, we needed to bring this data into our data lake/warehouse using CDC process and ingest in some table format which provides very fast insertion/update/merge/delete functionality as well as very fast query functionality.

We explored multiple frameworks available in the market and finally ended up designing our own System from the ground up.

Existing System Evaluation:

Hive Acid : Hive Transactions/ACID is another similar effort, which tries to implement storage like merge-on-read, on top of ORC file format. Understandably, this feature is heavily tied to Hive and other efforts like LLAP. Hive transactions do not offer the read-optimized storage option or the incremental pulling. In terms of implementation choices Hive transactions feature is implemented underneath by Hive tasks/queries kicked off by the user or the Hive metastore.

Cons of Using Hive ACID / Why we are not going with HIVE ACID:

  1. When an application or query reads the ACID table, the reader provides the list of committed transactions to include. This list is produced by the Hive megastore when a query starts. The task does a merge sort. Each of the files is sorted by (original transactions ascending, bucket ascending, rowId ascending, and currentTransaction descending). Only the first record with a currentTransaction that is in the list of transactions to read is returned, which corresponds to the last visible update to a row. This is a cumbersome operation in terms of Time complexity and System Resource Usage. This will be a big hit on query time performance.
  2. It requires all columns of the key while updating. So Views, where different tables are joined and exposed in the Federation layer, will not be possible.
  3. The Sorting Based record comparing makes the filter push down impossible even on the base files.

Apache Kudu: Apache Kudu is a storage system that has similar goals, which is to bring real-time analytics on petabytes of data via first-class support for upserts. A key differentiator is their attempt to serve as a datastore for OLTP workloads, something that we do not aspire to be. Consequently, Kudu also does not support incremental pulling.

Cons of Using Apache Kudu / Why we are not going with Apache Kudu:

  1. Kudu diverges from a distributed file system abstraction and HDFS altogether, with its own set of storage servers talking to each other via RAFT. Our storage, on the other hand, is designed to work with an underlying Hadoop compatible filesystem (HDFS, S3 or Ceph) and does not have its own fleet of storage servers, instead of relying on Apache Spark to do the heavy lifting. Thus, it can be scaled easily, just like other Spark jobs, while Kudu would require hardware & operational support, typical to datastores like HBase or Vertica.
  2. There is an upper limit on the count of columns supported by Kudu in a Table.
  3. There is also limitation around column Type support like Map, Array columns are not supported Size of Cell limit is 64KB and the Size of a row is few 100 KB only. As the no of columns in a table increases, performance degrades as the no of columns increases and that’s why they have put a limit of 300 columns. Kudu does not support all the types supported by Spark SQL. For example, Date and complex types are not supported.
  4. Moreover, It is untested for data with very high upsert volume. Doing POC would require setting up a Kudu cluster and we don’t have any such cluster or system handy.
  5. Please note these limitations listed for Apache Kudu are based on data we could gather online or based on some documents published by them. They might have updated/resolved them in the latest releases.

Apache Hudi : Apache Hudi (Hudi for short, here on) allows you to store vast amounts of data, on top of existing hadoop. : Hudi provides support for updating/deleting records, using fine-grained file/record level indexes while providing transactional guarantees for the write operation. Queries process the last such committed snapshot, to produce results. Hudi also provides first-class support for obtaining an incremental stream of all the records that were updated/inserted/deleted in a given table, from a given point-in-time

Cons of Using Apache Hudi / Why we are not going with Apache Hudi:

  1. It requires all columns of the key while updating. So Views, where different tables are joined and exposed in the Federation layer, will not be possible.
  2. Delta files are loaded as a file-based Hash Map and if a value is present in Map then return from the map else return from the base. This implementation requires, a file-based hash map to be computed at each query which is very time-consuming. We will not be able to support OLAP queries with second as latency with HUDI.
  3. The HashMap Based record comparing makes the filter push down impossible even on the base files.

Features supported by our Framework:

  1. Very fast Upserts, Deletes and very fast queries for key value-based databases having fixed data set size like customer data.
  2. Support upserts, Deletes in ever-increasing transactional data like order data in e-commerce.
  3. Partial Record update
  4. Automatic file sizing, data organisation, compactions, cleaning.
  5. Schema evolution support
  6. Support for time series based data and deduplication support on huge volume data.
  7. Highly available / no single point of failure.
  8. Easy to plug into existing Hadoop systems.
  9. Filter push down at query time helps fast query.

Stay tuned to the next posts for further details.

--

--