Data Engineering: Getting Started with Delta Lake
In the realm of Data Lakes, Delta Lake is gaining increasing popularity when compared to Apache Hudi and Apache Iceberg. Let’s begin with a straightforward approach and delve into its features in the most simple manner. With minimum code on Apache Spark, specifically in the Scala programming language with spark-shell, we will explore its features in the simplest way possible. This simple introduction will enable you to gradually extend your knowledge towards the design and development of complex ETL pipelines.
Popularity and engagement can be judged by the following stats on GitHub and X.
On GitHub: Apache Iceberg, Apache Hudi & Delta Lake
On X: Apache Iceberg, Apache Hudi & Delta Lake
This post is mainly for getting started with Delta Lake using Apache Spark + Scala programming language on Spark Shell, so let’s start:
I choose Apache Spark: 3.5.0, Scala 2.12, and Delta Lake: 3.1.0, so if you don’t have these versions on your local machine, then let’s get installed first and jump to the below spark-shell commands.
spark-shell \
--packages io.delta:delta-spark_2.12:3.1.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Now, you have an active spark-shell with delta lake libraries included:
Let’s create a small dataset to explore how we can ingest that into a data lake table with ACID properties. This approach is applicable across various cloud providers such as AWS, GCP, or Azure. Have you ever thought about how open-source storage frameworks like Apache Hudi, Delta Lake, and Apache Iceberg efficiently handle petabyte-scale data stored in AWS S3 or GCP Cloud Storage buckets? How should we efficiently utilize the concept of prefixes in the AWS S3 or cloud storage of CGP?
Best practices design patterns: optimizing Amazon S3 performance
Your applications can easily achieve thousands of transactions per second in request performance when uploading and retrieving storage from Amazon S3. Amazon S3 automatically scales to high request rates. For example, your application can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per partitioned Amazon S3 prefix.
In this context, I will demonstrate the creation of Delta Lake tables for the same data in three distinct manners: a non-partitioned table, a partitioned table with an existing column, and the creation of a derived column that will serve as the partition column.
A non-partitioned Delta Lake table
val data = Seq(("1", "One", "category1"), ("2", "Two", "category2"), ("3", "Three", "category1"), ("4", "Four", "category3"), ("5", "Five", "category2"))
val schema = Seq("id", "value", "category")
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF(schema:_*)
<!-- Let's store in Delta lake storage format -->
val basePath = "file:///Users/krishna/Downloads/deltalakedb/no-partition/example1"
df.write.format("delta").mode("overwrite").save(basePath)
On the left-hand side above, you can observe all the Parquet files generated in the base path along with delta_log files. However, this method may not be efficient if your data is expected to scale into terabytes shortly.
A partitioned table with an existing column
So, let’s use the above-discussed concept of prefixes. I will use the same data frame as created above, just the way of storage in Delta Lake will be changed.
<!-- Store in partition as category -->
val basePathPartition = "file:///Users/krishna/Downloads/deltalakedb/no-partition/partition_example1"
df.write.format("delta").partitionBy("category").mode("overwrite").save(basePathPartition)
Focus on .partitionBy(“category”).
so it will create prefixes for each category and hence data will be distributed across different prefixes.
Imagine you have a dataset with a finite number of categories, which could become a challenge if each category expands into terabytes of data. This is where the magic of the derived column comes into play. You can treat this column as metadata and utilize it for partitioning when ingesting data into the Delta Lake storage format in the data lake.
A creation of a derived column that will serve as the partition column
<!-- Derive the partition column from exist column -->
<!-- Like you want to create only 100 (here I will create 3) partition based on the id -->
// Cast column "id" to long
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType}
val partitionLabel = "_partition"
val cdf = df.withColumn("id", col("id").cast(LongType))
val fdf = cdf.withColumn(partitionLabel, col("id") % 3)
<!-- Store into delta lake format -->
val basePathCustomPartition = "file:///Users/krishna/Downloads/deltalakedb/no-partition/cusotom_partition_example1"
fdf.write.format("delta").partitionBy(partitionLabel).mode("overwrite").save(basePathCustomPartition)
By doing these ways, now it is up to you how many partitions you want to create and how much read and write throughput you want for your table in the near future, this is the beauty of Data Lake using open source and cloud providers and we can say that it has no limit on the storage of the data and read and write throughput. This can be achieved through open-storage frameworks like Apache Hudi, Apache Iceberg, or Delta Lake, each offering table-level ACID properties.
This concept can be replicated in Apache Hudi / Apache Iceberg, for those looking into it check out my other posts on the medium itself.
The entire codebase of the above demonstrations is on my GitHub repo, https://github.com/krishnaiitd/datalake/tree/main/deltalake-getting-started-part1/OnSparkShell
Woohoo !!! We have successfully visualized the different types of partitioning strategies of the Delta Lake table.
Stay tuned and connect with me on
Medium: https://medium.com/@krishnaiitd
LinkedIn: https://www.linkedin.com/in/krishna-prasad-b8a86223/
GitHub: https://github.com/krishnaiitd