Building Data Lakehouse to unleash the power of Data at apna

Md Sarfaraz Hussain
apna-technology-blog
9 min readJan 25, 2023

By Sarfaraz Hussain, Ravi Singh

At apna, we are on a mission to connect every Indian to life-changing opportunities through our job marketplace and community. In this blog, we will share our experience with building a Data Platform based on the Lakehouse architecture to unleash the power of data to drive our data science and analytics use cases.

Background of Data at apna

The current data architecture at Apna resembles a warehouse built on top of BigQuery.

  1. We use BigQuery for both storage and computation.
  2. Table schema is defined upfront and enforced at the time of writing.
  3. We ingest CDC and Clickstream data into BigQuery via the third party systems (Mixpanel, Hevo) in hourly/daily batches.
    i. Transactional Service data in Postgres/ MongoDB is ingested by Hevo. ii. Clickstream data is collected by Mixpanel.
  4. This extra layer on top of ingestion has cost implications along with freshness and consistency.

Opportunities of improvement with current Data Ecosystem

  1. The cost of BigQuery can go high due to the complex nature of the queries written by our internal users — a lot of joins, sub-queries, CTE functions and people not using BigQuery mindfully.
  2. Time travel is only up to 7 days.
  3. Maintaining near real-time sync of CDC data via Hevo was costly due to the complex ETL (UPSERT) operation . So we had to set the sync frequency to 1–3 hours.
  4. When there is a large batch of change streams that needs to be processed, a lot of partitions are created for Hevo’s temporary external BigQuery table that takes a lot of time to Merge/Upsert, and that sometimes lead to failure.

At apna, we strongly believe that every limitation is a new opportunity to create something better.

Users of our Data

Most users of the data platform predominantly use a SQL interface provided by BigQuery. There is a growing demand for using Spark for building data/ML pipelines and ad-hoc analysis by engineering users.

  1. Product analysts and Product Managers deriving product feature performance using BigQuery(joining both CDC and Clickstream data). This mainly involves ad-hoc SQL queries for deriving product-level insights.
  2. Data Scientists derive features to train models using BigQuery (joining both CDC and Clickstream data). This mostly consists of writing SQL to extract datasets required for model training, validation and requirement of Point In Time query.

3. Services built on top of BigQuery data e.g. Job Feed service (mostly using CDC data), Growth Campaigns, etc.

Introduction to AAG

Due to the aforesaid challenges and limitations, we decided to build a data platform based on the Data Lakehouse architecture and named the platform “Apna Analytics Ground” shortly known as AAG. (Meaning Fire in Hindi)

Data Lakehouse architecture

Data Lakehouse: A new generation of open platforms that unify Data Warehousing and advanced analytics capabilities of Data Lake.

As part of this blog, I will not discuss the “What and Why of Data Lakehouse”, but would recommend going through this reference blog if you are not familiar with it.

Data Lakehouse Architecture

The idea is to build a decoupled infinite-storage (Google Cloud Storage) and infinite-compute (Spark, Presto) architecture, which can handle the data volume at scale. Having a data platform with an architecture which has separate compute and storage layers allows for a separation of concerns: running compute clusters for the analytic jobs and storing data at the lowest cost per terabyte.

Further reading —
1. Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics
2. Apache Hudi — The Data Lake Platform

Table Format

Table formats allow us to interact with data lakes as easily as we interact with databases, using our favorite tools and languages. A table format allows us to abstract different data files as a singular dataset i.e. a table.

We choose to use Apache Hudi as our table format for building Lakehouse.

A few out-of-the-box prominent features of Apache Hudi are:

  1. Provides consistent view on frequent updates and inserts. ( For Spark and Presto )
  2. Has more tools for compaction/vacuuming to improve read performance so file management is easier.
  3. Maintains schema metadata which can be automatically inferred from datasets and can also provide versioning support.
  4. Lots of functionalities like compaction, cleaning, clustering, and file sizing can be tuned as per the query pattern.
  5. Point In Time query via Apache Spark.
  6. Out-of-box capability to handle de-duplication of events by using Record Key and Pre-combine Key.
  7. GCS + Hudi is the most open architecture which works well with most processing / compute tools and provides schema versioning and also point in time capabilities along with a lot of out-of-the-box capabilities like Cleaning/Compaction/Savepoint, etc.

AAG Ecosystem

AAG Ecosystem

Starting from the left of the diagram, we have our transactional databases (Postgres and MongoDB) and we pull the change logs via Confluent Kafka Connectors.

To bring change logs from the transactional sources we use the following connectors:

- Postgres: Debezium PostgreSQL CDC Source Connector

- MongoDB: Atlas MongoDB Connector

These connectors create specific Kafka topics for each table present in transactional databases and publish the change logs for each table to their specific Kafka Topic. The retention period of Kafka Topics is set to 7 days.

We also have an in-house ingestion service which ingests clickstream data generated from Mobile App and Web directly into Kafka Topic as well.

As the next step, we create specific HoodieMultiTable Streams to capture the data from the Kafka topics and dump it as is into our Bronze layer in Hudi format. So each stream creates a separate Hudi table in GCS for each Kafka topic.

Once the data is present in the Bronze layer, we create respective HoodieMultiTable Streams to transform data and move to the Silver layer in Hudi format, like Bronze streams we get a separate Hudi table in GCS for each Silver stream and each table is registered in DataProc Metastore.

We do schema validation, timestamp validation and a lot of transformations (like flattening complex struct fields, deriving new columns, adding current timestamps, custom transformation, etc) during moving data from Bronze to Silver. We store the schema for each stream/table in the Schema Registry which is used by each Capture Stream.

If any record of the table/stream does not comply with the defined schema, we move those records to a different location in GCS. So, each stream creates two tables — one with valid records and the other with invalid records. We call the table with invalid records as a quarantine table.

There is also a separate process of BigQuery Sync which is done by the HoodieMultiTableStreamer, this process creates an external BigQuery table for each Hudi table. This helps us to query Hudi data quickly via BigQuery to validate the data and play around without having to write and deploy any Spark application.

Storage

Storage Architecture

Bronze Layer

  • Written from Streaming CDC pipeline and Clickstream ingestion service.
  • Stores raw Clickstream events and Transactional data.
  • Very high write throughput.
  • Data is written in very small batches and partitioned by -
    - Clickstream: processing date
    - CDC: processing date
  • Maintain separate paths for CDC and Clickstream.
  • Read by Silver pipelines every 5m-1hr.
  • For Clickstream, we use Hudi’s BULK_INSERT mode of operation with sorting/clustering on “event_type” enabled for ingesting the Bronze table. This ensures that the records belonging to the same event type are packed into the fewest number of Parquet files, ensuring that the Silver pipelines read only the rows (pages) of the Parquet table that contain the records for the associated event.
  • For CDC, we use regular INSERT mode, so that we get equal file sizes. The expectation is that all CDC tables may not have large enough data. Bronze tables are ingested as the raw change logs.

Silver Layer

  • Contain the Clickstream events and Transactional data in a queryable form.
  • Data is flattened and fanout (for Clickstream) & materialized (for CDC).
  • For CDC: All filtering and flattening happens in the Silver table before we upsert/delete (materialize). That way if the pipeline messes anything up, we can backfill from the bronze table, instead of dealing with backfilling from Debezium/Postgres or Mongo DB.
  • Schema validation and Data Quality check to kick in.
  • Data is split into —
    — Clickstream: multiple paths with the event name and partitioned by event generated time.
    — CDC: multiple paths with the table name and partitioned by event time.
  • Compaction is done every 1–4 hours depending upon the business requirements of how quickly updates and deletes need to be made available for BI and reporting.
  • Tables are of Merge-on-Read (MoR) type and data we do Cleaning every 2 days for improving read performance on the Silver tables.
  • For any kind of reprocessing that might occur due to the addition of new column(s), we can delete the Hudi checkpoint and process the data again from any partition of the Bronze layer.
  • Read by Gold pipelines every 24 hours and can also be used for Ad-hoc analysis using Spark/Presto/Trino.

Gold Layer

  • Feature stores and aggregated datasets.
  • Make it simpler and faster for data users to derive insights.
  • Ensure correctness, reliability and discoverability.
  • Reduce overall compute costs and abuse.

Compute

When we were designing the above architecture, we decided to pick Apache Hudi to improve the latency of our analytics dashboard by adapting to Hudi’s incremental processing rather than doing batch ETL which was both time and cost intensive.

While exploring Hudi, we also met Onehouse.ai, a new company founded by Hudi’s original creator. Onehouse is building a managed service around Hudi, and we decided to use their product to speed up the development of our Lakehouse foundation.

Onehouse offers managed ingestion from Postgres, MongoDB, Kafka, GCS, and BigQuery. Onehouse automatically streams data from these sources into managed Hudi tables on GCS. We modernized complex batch ETL pipelines with BigQuery to low-code incremental pipelines with Onehouse which resulted in reduced effort and engineering resources. In terms of resource for Postgres and MongoDB CDC we are using 12 GKE nodes with 4 cores and 16 GB memory to manage the Bronze & Silver layer which has ~250 tables.

Old Architecture
New Architecture

We replaced Hevo integration while improving data freshness! Instead of waiting 1–3 hours for data to be synced into BigQuery, we now have data freshness of minutes. Since all of our Hudi tables are stored in our own GCS buckets, we now have interoperability to use BigQuery for BI, while at the same time unlocking Presto, Pinot, and other technologies needed for our Lakehouse analytics.

By updating our infrastructure, we not only got rid of bloat technologies, but now have more flexibility to use different data ecosystems depending on our needs. It has simplified our architecture and improved computing efficiency. Now, our users don’t have to wait for hours to get a latest recommendation — they get it in minutes.

Schema Management

We use Confluent Schema Registry to maintain the schema for every table.

Clickstream:
Bronze - it has one single schema.
— Silver - it has 2 schemas for each event/table. One source schema is to flatten the array of events and another schema is to be used as the target schema during fanout. We need two schemas because a few sources and target columns are different (i.e. Silver tables have derived columns)

CDC:
Bronze - it has one single schema.
— Silver - it has ’n’ number of schema stored. One schema per each collection/table.

Acknowledgements:

To begin with, if you have been reading this blog till now then we would like to acknowledge you, Kudos to you! You are a champ and dedicated too!

Next, We would like to acknowledge the people behind this:

Data: Piyush Mujavadiya, Sarfaraz Hussain, Ravi Singh, Vinod Adwani

DevOps and Platform: Aswin Mullenchira, Harish Srinivas, Prabhu Raja, Nirmaljeet Singh

Design: Kratika Singh

Leaders: Ronak Shah, Suresh Khemka

Let me know if you have any questions or suggestions,
Until Next time,
Sarfaraz Hussain
Email: sarfaraz.h@apna.co

--

--

Md Sarfaraz Hussain
apna-technology-blog

Sarfaraz Hussain is a Big Data fan working as a Data Engineer with an experience of 4+ years. His core competencies is around Spark, Scala, Kafka, Hudi, etc.