Building Data lake for Analytics using Apache Hudi

Giridhar Munagala
NoBroker Engineering
6 min readJun 17, 2022

Information is the oil of the 21st century, and analytics is the combustion engine.

- Peter Sondergaard, Gartner Research.

To grow business organically, Analytics is getting adopted rapidly, at every organisation. With the help of analytical processes, product teams are receiving feedback from users and able to deliver new features at a much faster pace. With a greater understanding of the User, provided from analytics, marketing teams are able to tune their campaigns to target specific audience. All this is possible only when we can deliver Analytics at scale.

The need for a Data lake

At NoBrokercom, for operational purpose, transactional data gets stored in SQL based databases & events data gets stored in No-SQL databases. These application dBs are not tuned for Analytical workloads. Furthermore, to get the bigger picture of the customer & business, it is usually required to join the data across transactional & events data. These limitations slow down the analytical process a lot.

To address these issues, we have developed a data platform named as STARSHIP, which provides a centralized repository of all Nobroker’s data & can be accessed through SQL.

STARSHIP is delivering analytics across 40TB+ rapidly evolving data. Any event or transaction happens on Nobroker, it is available for analysis in Starship within 30mins.

An integral part of it is, building data storage layer optimised for Analytics. Parquet & ORC data formats provide this functionality, but they lack the update & delete functionality.

Apache Hudi

Apache Hudi is an open-source data management framework, that provides record level insert, update & delete functionality on columnar data formats. We use Apache Hudi extensively in all of our ETL pipelines that bring data to STARSHIP. We employ incremental data ingestion using Apache Hudi’s Delta-streamer utility. We have been able to enhance Delta-streamer to accommodate our business logics & data characteristics.

Delta-Streamer

Data is processed through multiple inter-connected modules in Apache Hudi, before reaching the distributed cloud storage. These modules can be worked independently or through the Delta-streamer utility, which streamlines the entire ETL process. Though the default functionality provided is limited, it allows customisation using extendable Java classes.

Major modules in Delta streamer. ps: Green — Data stores, Yellow — Enhanced modules, Blue — untouched modules.

Source reader

Source reader is the first & foremost module in the Hudi data processing, which is used to read the data from upstream. Hudi provides supporting classes that can read from local files such as JSON, Avro & also from Kafka streams.

In our data pipeline CDC events are generated to Kafka in Avro format. We have extended source class to add incremental read from Kafka, each time reading a specific no. of messages from the stored checkpoints. We added a functionality to append Kafka offset as a column in data.

# Reading data from Kafka from given Offset ranges
baseConsumerRDD = KafkaUtils.createRDD(
sparkContext,
KafkaParams,
offsetRanges,
consistent_location_strategy,
)
.filter(x -> x != null)
.filter(x -> x.value() != null);
# Adding Message offset to the data
baseRDD = baseConsumerRDD.map(x ->"{
\"starship_offset\":"+x.offset()
+","
+"\"starship_value\": "
+ x.value().toString() +
"}"
);
# Reading into Spark data frame & Applying schema
table_df = sparkSession.read()
.schema(table.getIncomingSchema())
.json(baseRDD)
.select(
"starship_value.*",
"starship_offset"
);

After the initial data read, we also enforce the schema, either obtained from Kafka schema registry or a user provided custom schema.

Business logic processor

The data which is brought into Spark data frames from Source reader will be in the raw format. To make it usable for analytics, we need to clean, standardize & add business logics to the data. Every data point in STARSHIP goes through the following transformations to ensure data quality.

  • case standardization : lower/upper casing.
  • Date format conversion : Converting various string date formats to epoch milliseconds.
  • Time zone standardization : Converting data from all time zones to UTC.
  • Phone number standardization : Formatting phone numbers to “Country code — Phone number” format.
  • Data type casting : Converting quoted numbers to Int/ Long, converting to text formats etc.
  • Masking & hashing : Masking sensitive information using Hashing algorithms.
  • Custom SQL query processing : In-case of custom filters need to be applied on specific columns, they can be passed as SQL clauses.
  • Geo point data processing : Processing geo point data to Parquet supported formats.
  • Column standardization : Converting all column names to snake case & flattening any nested columns.

Key generator

Every row is represented in Hudi using a set of keys, to provide updates & deletes at row level. Hudi requires every data point to have a primary key, an ordering key & in-case of partitioning a partitioning key also.

  • Primary key : To identify if a row is an update or a new insert.
  • Ordering key : To identify the latest events for every primary key in the current batch of events, in case of multiple events appearing for same row in same batch.
  • Partitioning key : To write data in partitioned format.

It becomes tricky to order events which are coming from CDC pipelines, especially when multiple types of streams being processed by the same logic. For this purpose we have written a key generator class that handles the ordering logic based on the input data stream source & provides support for multiple keys as primary keys.

Parquet writer

Once the data is in final transformed format, Hudi writer will take care of the writing process. Each new data ingestion cycle is termed as a commit & associated with a commit no.

  • Start of commit: Ingestion starts with a “<commit_no>.commit_requested” file getting created in cloud storage.
  • Commit inflight: Once the writing process starts after processing all transformations, a “<commit_no>.commit_inflight” file is created.
  • End of commit : Once data is written to disk successfully, a final “<commit_no>.commit” file is created.

Only if the final .commit file is created, the ingestion process is termed as successful. In-case of failure, Hudi writer rolls back any changes done to the parquet files & picks up the new ingestion from latest available .commit file.

If we write new Parquet files every commit, we will end up with a large no. of small files, which slows down the analytical processes. For this purpose every time new inserts come, Hudi writer identifies if there are any small files and adds new inserts to them, instead of rolling out a new file.

At Nobroker, we make sure every one of our parquet files are minimum 100MB size to optimize the speed of Analytics.

Data Indexing

Apart from writing data, Hudi also keeps track of where a particular row is stored, to speed up the updates & deletes. This information is stored in specialized data structures , called as indexes. Hudi provides several indexing implementations such as bloom filters, simple indexing & HBase indexes Indexing | Apache Hudi.

We started with bloom filters, but as data increased & use cases evolved we shifted to HBase indexes, which provide very fast retrieval of row metadata.

HBase indices reduced resource requirement for our ETL pipeline by 30%.

Schema writer

Once the data gets written to Cloud storage, we should be able to discover it on our platform automatically. For this purpose Hudi provides a schema writer, which updates any user specified schema repository about new databases, tables and column additions to the data lake.

We use Hive as our centralised schema repository. By default Hudi adds all the columns in the source data as well as all the metadata fields to the schema repository. Since Our data platform is exposed to Business, we ensure the metadata fields are skipped while writing the schema. This has no effect on performance but provides a much better experience for the analytics user.

With the help of Schema writer, business can add a new feature in upstream data, and it is made available on our data platform, without any manual intervention.

Cleaner

During the ingestion process lots of metadata files & temporary files get created. If left untouched they bring down the Analytics performance. Hudi ensures that all unnecessary files are archived and deleted if needed.

Every time a new ingestion happens , a few of the existing Parquet files will get a new versions rolled out. The older versions are useful to track the event timelines & to enable queries to run for longer periods. They slowly fill up the storage space. For this purpose Cleaner gives 2 ways to reduce storage space

  1. KEEP_LATEST_FILE_VERSIONS : A certain no. of latest file versions are kept, while older ones are deleted.
  2. KEEP_LATEST_COMMITS : Keep only the file versions written by n latest commits.

Our Data platform is tuned to deliver interactive queries / reports under 1min. At the same time we ensure older file versions are retained up to 1hr, to support long running Data science workloads.

Apache Hudi is one of the most important part of Starship Data platform. We have many more components that provide other functionality such as Visualization, interactive querying engine etc. Watch this space and follow us for more on the amazing things we do at NoBroker.com.

--

--