Designing nRT Read and write optimized CDC Framework for Hadoop Data Lake — Part 2

Tarun Kumar
Airtel Digital
Published in
4 min readMay 19, 2022

System Architecture:

Data Design and organisation:

Base Data: Data is partitioned on Hash computed on Key. The hash function will depend on cluster topology, data size in partition and resources available and other partitioning requirements of data. There will be one file per hash / or multiple ordered files per hash (preferably 1 ) and all data within that file will be sorted on key. We will use Columnar data format for Files.

Hash Function → f(maximum cores available(n), table size , data partitioning requirements) = H

H gives value of 1 to n. Where n is maximum number of cores available to system to use.

Incremental Data: Updates happen in micro-batches. Batch can be defined on time window or maximum record size. For a given hash there can be multiple update batches.

Meta Data: For every delta batch there will be a bit vector data structure. This data structure will tell if the given key is present in the base file or not. This will be a binary file. It will be an array of bits where a bit at an index will tell if that key is present in the base file or not.

Every base file has an index column added after sorting on the key. The index in base file for a key tells us at which array location in meta file we need to check status.

Meta Function → f(key) = M

f : queries meta file and returns status of bit corresponding to key in meta file.

M = True or False

There will be another data structure map which tells if a given key is modified, it needs to be picked from which delta version file.

Data Flow Architecture:

Batch update Process:

Users will push data to our system in micro-batches. Batches depend upon time interval or data size or a hybrid of both. We acknowledge data in batch only. This sorts out failure cases and helps clients build retry logic. The updated data is merged with existing data to create a full record and stored in form of delta files as discussed above. When a batch is written, we will update the meta of existing files and create new versions. This will not break any existing query which is running or in progress.

Major Compaction :

This is an async process and atomicity will be maintained through the hive meta server. The major compaction merges the updates with the base data at regular intervals in the background.

Apart from that, there is minor compaction as well, which merges delta files into a new bigger delta file.

Query Process:

We have data in different buckets. The first bucket is a base bucket and the second bucket is a delta updates bucket. A given key can be present in both the base bucket and the updates bucket.

We define a function which reads Metafiles and tells us if the given key is present in the base bucket.

Fn(key,bucket) → true or false. Key is optional and by default it assumes all key set. It queries both bitvector and hashmap meta as explained earlier and returns a flag.

So Query will be split into 2 queries.

Q(Data, Filter push down expression, other Sql Constructs) → QG(QI(base bucket, Filter push down) Union QI(updates bucket, filter push down) , other Sql Constructs).

QI (bucket ,filter)→ select * from bucket_data where Fn(key,bucket) and filter condition

QG(data , Other Sql Constructs) → select * from data + Other Sql Constructs

Schema Evolution :

Schema evolution can be supported when a major compaction job runs through alter table statements.

Full Data Read Support :

Full data read support will be available using spark/hive using Custom Input Format.

Near Real-Time Query OLAP query is well-known problem and efforts are made to lower the data refresh rate and query time. As most of the queries are filter and projection queries on base data, Filter pushdown and projection pushdown are essential optimization while providing OLAP queries on Real-Time data. While both apache hudi and orc acid fail to do filter pushdown and do file-based hash map lookup and sorting respectively, we achieve this nicely by maintaining a bitset corresponding to the base file in each delta file.

Part1 Link

--

--