Demystifying Delta Lake with AWS EMR : A CDC Use Case

Amar_Kumar
9 min readSep 15, 2023

--

In the ever-evolving landscape of data storage and processing, three distinct solutions have emerged as game-changers: Data Lakes, Data Lakehouses, and Delta Lakes. While each of these technologies holds its own promise, the choice between them can significantly impact how you handle your data. To shed light on the matter, we will explore these data storage paradigms and dive into a specific use case related to Change Data Capture (CDC)

Table of contents :

Overview :

· The Journey So Far
· Delta Lake Architecture: A Bridge Between Data Lakes & Data Warehouses
Delta Lake Architecture
Capabilities:
· Implementing CDC using AWS EMR and Delta- Lake
Prerequisite:
Generating a dummy dataset
Checking for memory usage
Writing and reading data from target
Creating Delta table Instance
Lets perform batch update and insert operation on Delta table
Arbitrary selective overwrite with replaceWhere
Delta Lake time travel
Delta Lake Delete Operation
Code
· Conclusion

The Journey So Far

Before we dive into our next topic, let’s take a moment to recap our journey. In our previous posts, we ventured into the realms of Data Lakes, Data Lakehouses, and Apache Hudi, each offering unique capabilities in the data management landscape.

We explored Data Lakes, understanding their flexibility and raw data storage potential. Then, we delved into Data Lakehouses, witnessing how they blend the best of Data Lakes and data warehouses for structured querying and governance.

Our journey also led us to Apache Hudi, where we harnessed its power for Change Data Capture (CDC) operations. We discussed crucial features like ACID compliance, incremental processing, and time travel that make Apache Hudi a robust data management framework.

Now, as we embark on the Delta Lake chapter, we bring with us a wealth of knowledge and insights from our previous adventures. Delta Lake will unveil a new dimension of data storage, integrity, and real-time processing, adding to our ever-expanding arsenal of data management tools. Stay with us as we explore the intricacies and capabilities of Delta Lake!

Delta Lake Architecture: A Bridge Between Data Lakes & Data Warehouses

Delta Lake is an open-source storage layer built atop a data lake that confers reliability and ACID (Atomicity, Consistency, Isolation, and Durability) transactions. It enables a continuous and simplified data architecture for organizations.

Delta Lake Architecture

Delta Lake is an improvement from the lambda architecture whereby streaming and batch processing occur parallel, and results merge to provide a query response

The stored data file has three layers, with the data getting more refined as it progresses downstream in the dataflow;

  1. Bronze tables: This table contains the raw data ingested from multiple sources like the Internet of Things (IoT) systems, CRM, RDBMS, and JSON files.
  2. Silver tables: This layer contains a more refined view of our data after undergoing transformation and feature engineering processes.
  3. Gold tables: This final layer is often made available for end users in BI reporting and analysis or use in machine learning processes.
Delta-Lake Multi-hop architecture

Capabilities:

  1. Atomic, consistent, isolated, durable (ACID) transactions on Spark. Readers see a consistent view of the table during a Spark job.
  2. Scalable metadata handling with distributed processing by Spark.
  3. Combines streaming and batch uses cases with the same Delta table.
  4. Automatic schema enforcement to avoid bad records during data ingestion.
  5. Time travel with data versioning.
  6. Supports merge, update, and delete operations for complex use cases like change data capture (CDC), streaming upserts.

Implementing CDC using AWS EMR and Delta- Lake

Prerequisite:

Configure on the EMR notebook to use Delta- Lake

%%configure -f
{
"conf": {
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
"spark.jars":"hdfs:///apps/delta/lib/delta-core_2.12-2.0.0.jar",
"spark.jars.packages": "io.delta:delta-core_2.12:2.0.0"
}
}

from delta import *

builder = spark.sql.SparkSession.builder.appName("Delta_App") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

delta_version =spark.conf.get("spark.jars.packages")
print("Delta Lake version:", delta_version)

## OUTUPUT:
Delta Lake version: io.delta:delta-core_2.12:2.4.0

Generating a dummy dataset

from delta.tables import *
from pyspark.sql.functions import *
from datetime import datetime
from delta import DeltaTable

config = {
"table_name": "delta_trips_table",
"target": "/tmp/delta-table",
"primary_key": "trip_id"
}

# Generate Dummy Dataset

from datetime import datetime
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
def get_json_data(start, count, dest):
time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
data = [{"trip_id": i, "ts": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
return data

#Create a spark dataframe
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data))

#Inserting dataset of large size
df = create_json_df(spark, get_json_data(0, 2000000, dest))

Checking for memory usage

# Checking the memory 
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
def _to_java_object_rdd(rdd):
""" Return a JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)

JavaObj = _to_java_object_rdd(df.rdd)
nbytes = spark._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)
print("Memory usage of DataFrame: {:.2f} MB".format(nbytes/1000000))

## OUTPUT
Memory usage of DataFrame: 80.26 MB

Writing and reading data from target


import time
df.write.format('delta').mode("overwrite").save(config['target'])

# set the table location in Spark config, which allows you to read the data using SQL format
spark.conf.set('table.location', config['target'])

df_delta = spark.read.format("delta").load(config['target'])

+-------------+--------+-------+--------------------+
| destination|route_id|trip_id| ts|
+-------------+--------+-------+--------------------+
| Seattle| A| 0|2023-09-15 06:00:...|
| New York| B| 1|2023-09-15 06:00:...|
| New Jersey| C| 2|2023-09-15 06:00:...|
| Los Angeles| D| 3|2023-09-15 06:00:...|
| Las Vegas| E| 4|2023-09-15 06:00:...|
| Tucson| F| 5|2023-09-15 06:00:...|
|Washington DC| G| 6|2023-09-15 06:00:...|
| Philadelphia| H| 7|2023-09-15 06:00:...|
| Miami| I| 8|2023-09-15 06:00:...|
|San Francisco| J| 9|2023-09-15 06:00:...|
| Seattle| A| 10|2023-09-15 06:00:...|
| New York| B| 11|2023-09-15 06:00:...|
| New Jersey| C| 12|2023-09-15 06:00:...|
| Los Angeles| D| 13|2023-09-15 06:00:...|
| Las Vegas| E| 14|2023-09-15 06:00:...|
| Tucson| F| 15|2023-09-15 06:00:...|
|Washington DC| G| 16|2023-09-15 06:00:...|
| Philadelphia| H| 17|2023-09-15 06:00:...|
| Miami| I| 18|2023-09-15 06:00:...|
|San Francisco| J| 19|2023-09-15 06:00:...|
+-------------+--------+-------+--------------------+

Creating Delta table Instance

# create DeltaTable instances using the path of the Delta table 
# This will help us to use different feature created for CDC operation on delta table
deltaTable = DeltaTable.forPath(spark, config['target'])

Lets perform batch update and insert operation on Delta table

There is an excellent documentation available for delta lake and how to implement its several statement. Refer the link below

Link: https://docs.delta.io/latest/delta-update.html

# Changing data for upsert
upsert_dest_delta= ["mexico" for i in range (0,100)]
df3_delta=create_json_df(spark, get_json_data(1000000,100, upsert_dest_delta))
df3_delta.show(20,False)

+-----------+--------+-------+-----------------------+
|destination|route_id|trip_id|ts |
+-----------+--------+-------+-----------------------+
|mexico |A |1000000|2023-09-15 06:04:14.833|
|mexico |B |1000001|2023-09-15 06:04:14.833|
|mexico |C |1000002|2023-09-15 06:04:14.833|
|mexico |D |1000003|2023-09-15 06:04:14.833|
|mexico |E |1000004|2023-09-15 06:04:14.833|
|mexico |F |1000005|2023-09-15 06:04:14.833|
|mexico |G |1000006|2023-09-15 06:04:14.833|
|mexico |H |1000007|2023-09-15 06:04:14.833|
|mexico |I |1000008|2023-09-15 06:04:14.833|
|mexico |J |1000009|2023-09-15 06:04:14.833|
|mexico |A |1000010|2023-09-15 06:04:14.833|
|mexico |B |1000011|2023-09-15 06:04:14.833|
|mexico |C |1000012|2023-09-15 06:04:14.833|
|mexico |D |1000013|2023-09-15 06:04:14.833|
|mexico |E |1000014|2023-09-15 06:04:14.833|
|mexico |F |1000015|2023-09-15 06:04:14.833|
|mexico |G |1000016|2023-09-15 06:04:14.833|
|mexico |H |1000017|2023-09-15 06:04:14.833|
|mexico |I |1000018|2023-09-15 06:04:14.833|
|mexico |J |1000019|2023-09-15 06:04:14.833|
+-----------+--------+-------+-----------------------+

# PERFORM THE UPSERT OPERATION
(deltaTable
.alias('t')
.merge(df3_delta.alias('u'), 't.trip_id = u.trip_id')
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())

# Inserting new data
insert_dest=["Bengaluru" for i in range(0,100)]
df5_mor=create_json_df(spark,get_json_data(2000000,100,insert_dest))

## Perform the Insert Operation
(deltaTable
.alias('t')
.merge(df5_mor.alias('u'), 't.trip_id = u.trip_id')
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())

Arbitrary selective overwrite with replaceWhere

This sample code writes out the data in replace_data, validates that all rows match the predicate, and performs an atomic replacement using overwrite semantics. If any values in the operation fall outside the constraint, this operation fails with an error by default.

(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save(config['target'])
)

Delta Lake time travel

Delta Lake stores data in Parquet files and information about transactions in the _delta_log metadata folder.

The _delta_log metadata folder tracks the Parquet data files that are added and removed from the Delta table for each transaction.

  • For Version 0, Delta Lake just needs to read File A
  • Delta Lake will see both File A and File B should be read for Version 1
  • For Version 2, Delta Lake will see that File A, File B, and File C were added, but File A and File B were removed, so only File C should be read. Delta Lake will only read File C and skip the other files when reading Version 2.
Delta table Time travel
# Time travel 
deltaTable.history(100).show()

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
| 2|2023-05-18 11:32:33| null| null| MERGE|{predicate -> (t....|null| null| null| 1| Serializable| false|{numTargetRowsCop...| null|Apache-Spark/3.3....|
| 1|2023-05-18 11:29:55| null| null| MERGE|{predicate -> (t....|null| null| null| 0| Serializable| false|{numTargetRowsCop...| null|Apache-Spark/3.3....|
| 0|2023-05-18 11:27:31| null| null| WRITE|{mode -> Overwrit...|null| null| null| null| Serializable| false|{numFiles -> 2, n...| null|Apache-Spark/3.3....|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+

## Have a more clear understanding about the time travel by selecting important feature
deltaTable.history(100).select("version", "timestamp", "operation", "operationParameters").show(truncate=False)


+-------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------+
|version|timestamp |operation|operationParameters |
+-------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------+
|2 |2023-05-18 11:32:33|MERGE |{predicate -> (t.trip_id = u.trip_id), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]}|
|1 |2023-05-18 11:29:55|MERGE |{predicate -> (t.trip_id = u.trip_id), matchedPredicates -> [{"actionType":"update"}], notMatchedPredicates -> [{"actionType":"insert"}]}|
|0 |2023-05-18 11:27:31|WRITE |{mode -> Overwrite, partitionBy -> []} |
+-------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------+

## Read the selected data using version - Time Travel
## Selecting the version -1
df_time_travel = spark.read.format("delta").option("versionAsOf", 1).load(config['target'])
df_time_travel.show()

+-------------+--------+-------+--------------------+
| destination|route_id|trip_id| ts|
+-------------+--------+-------+--------------------+
| Miami| I| 28|2023-05-18 11:23:...|
| Philadelphia| H| 107|2023-05-18 11:23:...|
|San Francisco| J| 139|2023-05-18 11:23:...|
| Philadelphia| H| 157|2023-05-18 11:23:...|
| Las Vegas| E| 174|2023-05-18 11:23:...|
| Miami| I| 178|2023-05-18 11:23:...|
| Miami| I| 208|2023-05-18 11:23:...|
| Philadelphia| H| 227|2023-05-18 11:23:...|
| New Jersey| C| 242|2023-05-18 11:23:...|
| Los Angeles| D| 253|2023-05-18 11:23:...|
| Tucson| F| 255|2023-05-18 11:23:...|
|Washington DC| G| 266|2023-05-18 11:23:...|
| Miami| I| 268|2023-05-18 11:23:...|
| Philadelphia| H| 307|2023-05-18 11:23:...|
|Washington DC| G| 316|2023-05-18 11:23:...|
| Seattle| A| 320|2023-05-18 11:23:...|
|Washington DC| G| 326|2023-05-18 11:23:...|
| Los Angeles| D| 353|2023-05-18 11:23:...|
| Tucson| F| 365|2023-05-18 11:23:...|
| Seattle| A| 370|2023-05-18 11:23:...|
+-------------+--------+-------+--------------------+

Note : Similarly, we can also perform time travel based on timestamp

Delta Lake Delete Operation

Delta Lake makes it easy to efficiently delete rows from a Delta table. Delta Lake delete operations add a new transaction that tombstones any files with data that should be deleted and adds new files with the deleted rows filtered out

Delta Lake Delete Limitations because of Parquet

Delta Lakes consist of a transaction log and data stored in Parquet files.Parquet files are immutable, which means that they don’t allow you to add or delete rows. If you want to “delete” rows from a Parquet file, you need to read the data into memory, filter out the rows you don’t want, and create a new Parquet file.

Delta Lake has to write new files when deleting rows because of the immutable nature of Parquet files

Delete Operation
## Delete Operation
condition="trip_id > 2000050"
deltaTable.delete(condition)

Code: You can find the full code on my GitHub repository

Conclusion

As we conclude our exploration of Delta Lake, it’s clear that Delta Lake isn’t just another data storage solution; it’s an indispensable tool for organizations striving for excellence in data handling. Incorporating Delta Lake into your data architecture empowers your organization to unlock the full potential of your data, enabling data-driven decision-making, advanced analytics, and real-time insights.

So, as you embark on your own data management journey, consider the transformative power of Delta Lake. It’s more than a technology; it’s a partner on your path to data excellence, and it’s ready to elevate your data management practices to new heights.

Thank you for joining us in unraveling the capabilities of Delta Lake, and we look forward to continuing this data-driven adventure together in our future blog posts. Stay tuned!

--

--