Building Analytical Apps on the Lakehouse using Apache Hudi, Daft & Streamlit

Dipankar Mazumdar
apache-hudi-blogs
Published in
9 min readMay 10, 2024

Building user-facing analytical apps and dashboards is critical for organizations that want to make decisions actionable. While traditional BI tools have been predominantly used for such tasks, they often lack integration with the Python ecosystem and require a dedicated effort and specialized skill set. Also, the need to quickly prototype and build data apps is becoming essential at different stages of the analytics lifecycle (not just last mile), including exploratory data analysis, ML model evaluation, data pipeline monitoring, and LLM-based apps.

To address these challenges, low-code tools like Streamlit, which serve as a thin wrapper around the Python ecosystem, allow you to bring APIs, models, and business logic to life. Streamlit enables easy data consumption from a variety of sources such as databases, APIs, and file systems, allowing for easy integration into applications. In this blog, our focus would be on building a data app using data directly from an open lakehouse platform.

Open Lakehouse Platform

Lakehouse architectures are gaining traction as more organizations transition to using open table formats for transactions on data lakes. Open lakehouse platforms such as Apache Hudi allows organizations to build flexible architectures, enabling them to choose the best compute engine for their workloads without locking data into proprietary storage formats.

Fig 1: Apache Hudi’s Open Lakehouse Platform. Highlighted component in orange is the table format

At its core, a lakehouse integrates the transactional capabilities of traditional databases (such as OLAP) with the scalability and cost-efficiency of data lakes. Data files are stored in accessible, open table formats in cloud-based object stores like Amazon S3, Azure Blob, or Google Cloud Storage, with metadata managed by the ‘table format’ component. This modular approach creates a future-proof architecture where new compute engines can be added to the stack as needed. For example, you might use Hudi with Apache Flink today to build low-latency pipelines, and later add Presto or Trino or anything else for ad-hoc analytics.

Hudi + Daft Integration

Fig 2: Apache Hudi-Daft Read workflow

One important thing to note is that lakehouse platforms today are predominantly distributed to effectively handle large-scale, complex, and varied data workloads. This means you may need to spin up clusters with Spark, JVM, and other necessary configurations to interact with the data stored in underlying storage systems. However, the need to use data directly from lakehouses in a single node architecture is becoming critical, especially for doing ad hoc analytics and building analytical apps, which accelerates the time-to-insight process.

For such use cases, you don’t always need to go through the cumbersome process of setting up the infrastructure. This is where Python-based dataframes such as Daft come into the picture. Daft is a distributed query engine designed for ETL, analytics, and ML/AI at scale. It offers a familiar Python DataFrame API that aims to surpass Spark in both performance and ease of use. Daft operates locally with a lightweight, multithreaded backend. So, works well on your local development environment, but when the local machine’s capacity is exceeded, it can transition to run out-of-core on a distributed cluster.

The recent release of Daft introduces support for reading Apache Hudi Copy-on-Write (CoW) tables. This means, users can now consume Hudi tables directly from an object store using pure Python. Daft’s query optimizer also enables partition pruning and file pruning (via file-level statistics) to skip non-relevant data files to return faster results.

Hands-on Dashboard

The goal of this hands-on example is to show how you can use Daft as the query engine to read Hudi tables, and then build a user-facing analytical application in Python. The specific dataset and use case are not the main focus of this blog. Instead, the goal is to demonstrate the practical workings of the Hudi-Daft integration.

Before we start working on the code, let’s outline a minimalistic lakehouse architecture as the foundation for our dashboard. This will also introduce the tools we are using for this exercise. All the tools used here are open source. Amazon S3 operates on a pay-as-you-go model with costs based on storage and API usage.

Fig 3: Lakehouse architecture for the exercise

Architecture Stack:

  • Data Lake Storage — Amazon S3
  • File Format — CSV, Parquet
  • Table Format — Apache Hudi
  • Compute Engine — Apache Spark (writes), Daft (reads)
  • User Interface — Streamlit

Libraries to install: Streamlit, Plotly, Daft, Pandas, boto3

We will use Amazon S3 as the data lake storage, where all data files will be securely stored after the ingestion job completes. The source data will be a CSV file, and we will write the records as Parquet when creating the lakehouse table. Apache Hudi will be used as the table format and Hudi’s lakehouse platform including table services (clustering, indexing, file sizing, etc.) will be utilized for keeping the storage layout optimized.

For our compute needs, Apache Spark will handle the write tasks during the ingestion phase, while Daft will be the primary engine for reads and analytics, offering optimized performance for these operations. In the end, we will use Streamlit to create an interactive dashboard using the data directly from the lakehouse.

If you want to follow this example, clone the GitHub repo. Let’s get started!

Create Hudi Table & Ingest Records

The first step is to create the Hudi table using Spark. Below are all the configurations required to use PySpark with Apache Hudi. This is an optional step if you already have a Hudi table in the data lake.

** Note that in a real-world setting, if you are a data analyst/scientist working on the analytics layer, these tasks will typically be handled by the data engineering team.

from typing import *

from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
.appName("Hudi Table") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()

print("Spark Running")

s3_path = "s3a://my-bucket/sandbox/daft_hudi/"

# Access SparkContext
sc = spark.sparkContext

Now, let’s ingest the records.

# Read Source Data for Ingestion
TABLE_NAME = 'aldi_data'
INPUT = 's3a://my-bucket/input/retail/All_Data_Aldi.csv'
df_cow = spark.read.csv(INPUT, header=True, inferSchema=True)

# Minor Transformation
df_cow = df_cow.withColumnRenamed('prices_(£)', 'prices')
df_cow = df_cow.withColumnRenamed('prices_unit_(£)', 'prices_unit')

# Write the Records
PATH = 's3a://my-bucket/sandbox/daft_hudi/'

hudi_options = {
'hoodie.table.name': TABLE_NAME,
'hoodie.table.keygenerator.class' : "org.apache.hudi.keygen.SimpleKeyGenerator",
'hoodie.datasource.write.hive_style_partitioning' : "false",
'hoodie.datasource.write.partitionpath.field' : "category"
}

spark.sql("DROP TABLE IF EXISTS " + TABLE_NAME)
df_cow.write.format("hudi").mode("overwrite").options(**hudi_options).mode("overwrite").save(PATH)

This will create a Hudi table named aldi_data in the S3 data lake and will be partitioned by the category field. Here is how the table looks like in the data lake.

Fig 4: Hudi Table partitioned by Category in S3 data lake

Read Hudi Table using Daft

Now that we have the records written to a Hudi table, we should be good to start reading the data using Daft to build our downstream analytical app. Like mentioned before, Daft provides high performance I/O reads from cloud data lakes.

Here is a snippet that shows how to read Hudi table using Daft’s query engine.

 df = daft.read_hudi("s3://my-bucket/sandbox/daft_hudi")
df_analysis = df.select("supermarket", "prices", "names", "date", "own_brand", "category")
df_analysis.collect()

Let’s understand the methods from the Daft API.

  • read_hudi() — used to read the Hudi table. You can specify the table location URI here
  • select() — this creates a new dataframe from the provided expression (similar to SQL SELECT)
  • collect() — this method executes the entire dataframe and materializes the results

We begin by reading the Hudi table from the S3 bucket where the records were previously ingested. Next, we use the select() method to choose the fields necessary for our analysis. Since Daft dataframes are lazy — meaning they do not evaluate until explicitly instructed — results are not immediately displayed after these operations.

In this example, we just use Daft to defer the tasks of reading the data and selecting columns. Practically, this approach of laziness allows Daft to optimize your queries more effectively before they are executed. Finally, we can tell Daft to execute our DataFrame and cache the results by using df_analysis.collect(). The important thing to note is that any subsequent operations on df_analysis will avoid recomputations and simply utilize this materialized result.

All of these query plans can be examined by calling the explain() method. To see the plan after Daft applies its optimizations, we can use explain(show_all=True). Here is what we have for our case.

Fig 5: Logical & Physical Plans in Daft

We can see the Unoptimized, Optimized and Physical Plans for our dataframe. The Optimized Logical Plan (highlighted) presents the projection pushdown based on our query. When these query engine optimizations are combined with Hudi’s storage optimization capabilities (such as clustering, indexing, file sizing, etc.), they can deliver excellent performance for handling large datasets.

Build the Streamlit Dashboard

As of now, we have the Hudi table stored as a Daft dataframe df_analysis. To build the dashboard, we will use a combination of Python-based libraries, including Pandas and Plotly Charts, alongside Daft. While visualization libraries today predominantly work with Pandas — hence its use for this specific purpose — we will employ Daft in scenarios that benefit from its performance optimization capabilities.

For example, some charts in the dashboard require aggregated values (such as Product Variety per Category). In these instances, rather than performing aggregations in Pandas, we utilize Daft’s capabilities to aggregate the data first, then pass the results to the visualization library. This method proves especially effective when handling very large datasets, which are typical in lakehouse workloads. Here is a snippet that shows the same.

distinct_names_per_category = df_analysis.select("category", "names").distinct()
category_diversity_daft = distinct_names_per_category.groupby("category").agg(
daft.col('names').count()
).to_pandas()
category_diversity_daft.columns = ['Category', 'Number of Unique Products']

We first select distinct names and categories from the dataframe, then group by category, and count the unique product names in each category. The results are then converted to a Pandas dataframe for use with the visualization chart.

From the dashboard’s design perspective, we will have four charts that answer some business questions and a filter to analyze data by category.

# Charts 1 & 2
col1, col2 = st.columns(2, gap="large")
with col1:
st.subheader('Price Distribution by Category')
fig1 = px.box(df_filtered, x='category', y='prices', title='Price Distribution by Category', color_discrete_sequence=modern_mint)
st.plotly_chart(fig1, use_container_width=True)

with col2:
st.subheader('Product Variety per Category')
filtered_category_diversity = category_diversity_pandas[category_diversity_pandas['Category'].isin(df_filtered['category'].unique())]
fig_category_diversity = px.bar(filtered_category_diversity, x='Category', y='Number of Unique Products', title='Product Variety per Category', color_discrete_sequence=modern_mint)
st.plotly_chart(fig_category_diversity, use_container_width=True)

# Charts 3 & 4
col3, col4 = st.columns(2, gap="large")
with col3:
st.subheader('Share of Own Brand Products')
own_brand_count = df_filtered['own_brand'].value_counts(normalize=True).reset_index(name='proportion')
fig4 = px.pie(own_brand_count, values='proportion', names='own_brand', title='Share of Own Brand Products', labels={'own_brand': 'Brand Type'}, color_discrete_sequence=modern_mint)
st.plotly_chart(fig4, use_container_width=True)

with col4:
st.subheader('Average Price by Brand Type and Category')
# Group by both 'own_brand' and 'category' for a more detailed breakdown
brand_category_price_comparison = df_filtered.groupby(['own_brand', 'category'])['prices'].mean().unstack().fillna(0)
fig5 = px.bar(brand_category_price_comparison, title='Average Price by Brand Type and Category', color_discrete_sequence=modern_mint, labels={'value':'Average Price', 'variable':'Category'})
fig5.update_layout(barmode='stack')
st.plotly_chart(fig5, use_container_width=True)

Putting all of these together, here is the final application ready for analysis.

Fig 6: Analytics Dashboard built with Apache Hudi Table & Daft

The dashboard also allows filtering the dataset by category and renders the relevant visualizations as per the filter. This allows users for a more granular analysis.

Fig 7: Dashboard in action

Conclusion & Future Work

Building the dashboard directly on an open lakehouse brings several advantages.

  • Faster Insights: Direct access to the lakehouse (gold layer in a medallion architecture) accelerates the time-to-insight process, ensuring that analyses are timely and relevant.
  • Reduced data redundancy: Traditional reporting often involves moving data across multiple systems (lakes to warehouses for BI), which can result in numerous copies and versions of data. This is avoided with an open data architecture that enables direct access to data.
  • Cost Efficiency: Using an open lakehouse architecture reduces costs by eliminating the need for complex ETL pipelines and frequent data extracts, while cloud storages like Amazon S3 allows scaling as required.

In this blog, we presented how quickly and seamlessly we can build user-facing analytical applications on open lakehouse platforms such as Apache Hudi, using high-performance query engines like Daft. This marks our first experience dealing with Hudi tables using pure Python, without the need to set up Spark in a Java-based environment.

The integration of Daft, which provides a familiar Python API while delivering superior performance, opens up interesting avenues for running analytical workloads on Hudi without the need for distributed computing like Spark. Some of the items we are working on for the immediate future are:

--

--

Dipankar Mazumdar
apache-hudi-blogs

Dipankar is currently a Staff Data Engineering Advocate at Onehouse.ai where he focuses on open source projects in the data lakehouse space.