Using Apache Spark streaming to populate a Delta Lake

A practical tutorial with executable code snippets

Carlos Sanmartin
Globant
15 min readMay 13, 2024

--

AI-generated image of Spark Streaming

1. Introduction

In the ever-evolving landscape of big data processing, Apache Spark Structured Streaming and Delta Lake have emerged as transformative technologies, reshaping the way organizations handle real-time data analytics and maintain data integrity. Combining the power of Apache Spark’s Structured Streaming API with the reliability and efficiency of Delta Lake’s transactional capabilities, this dynamic duo offers a robust solution for building scalable, fault-tolerant data pipelines.

While this article has a technical orientation, it’s important to emphasize the benefits of using Apache Spark/Streaming/Delta. Some of these benefits are:

  • Real-Time Empowerment: Opting for streaming over batch processes ensures that crucial data swiftly reaches business stakeholders, empowering quicker and more informed decision-making.
  • Cost-Effective Flexibility: Delta’s use of open-source formats facilitates seamless transitions between processing frameworks, reducing costs in potential migrations to different providers.
  • Scalability for Growth: Apache Spark’s horizontal scaling capability allows effortless expansion of processing power as business data volumes increase. In the cloud, this scalability is particularly straightforward, minimizing staffing needs and associated costs.
  • Widespread Accessibility: Apache Spark’s compatibility with Python, a widely used programming language, eases the hiring process for the team, ensuring a broader pool of qualified engineers.

2. Architecture

In this article, we are going to build a simple yet complete example of what would be a Delta Lake system using the medallion lakehouse architecture. For a more comprehensive read about it, please refer to this Microsoft article. Also, for reference, we have here a diagram of this architecture, courtesy of Databricks.

Architecture diagram of a multi-hop system

This architecture plus the Delta Lake technology brings many benefits, such as, and not limited to:

  • ACID transactions.
  • Time travel over the data.
  • Keeping the original data for future requirements.
  • Schema enforcement and SQL access to the data.
  • Data is stored in open-source format (parquet).
  • Stream or batch (or hybrid) data sources.

3. Setting up the infrastructure

We are going to build an example using Azure Databricks. However, as these programs are built in Spark/Delta (which are open standards), the example should apply to other platforms.

The first thing that you need is an Azure account. There is a free option that gives you 200 USD of credit and 12 months of free services to run experiments and learn. For more information, refer to this page.

After having an Azure account, we need to create a Databricks account and a storage account with a hierarchical namespace enabled. Those two resources should be enough to proceed.

For this tutorial, we created a storage account named temptraining, which we will use throughout all our steps. We have four containers within which will represent each one of the layers of our Delta Lake, as shown below:

Each container represents a different layer from the Delta Lake

In the Databricks workspace, it’s very important to set up the permissions so that the Spark cluster can access the storage account just created. We can do that by heading into the configuration of the cluster and adding a new configuration as shown below:

The value of such configuration can be found on the storage account page, under the “Access key” section:

After these configurations have been done, we can start coding!

4. Ingestion

Ingestion depends on how the data source works. We could subscribe to a Kafka topic, listen to files being dropped into a storage account, or even just run a data extraction on a schedule (yes, a scheduled ingestion can still be part of a streaming solution).

Despite the variability of the source data, the result of the ingestion should always be the same: the data sitting in the bronze layer of Delta Lake. The main purpose of bronze is to have the data in a tabular format and also access the features of Delta.

For this example, we are going to simulate a few files being dropped into an Azure storage account. We are going to consider these files and that storage account as an external system that we are subscribing to.

The first step would be then building a process that watches that storage account and, as soon as new files arrive, starts by itself and processes the data. For that purpose, Databricks offers a tool named Autoloader which is accessed through Spark.

Let’s start creating our solution!

First, we need to simulate the external system that we talked about earlier. We can create a container in our storage account, and put a folder in it, where we are going to simulate the arrival of new data as many CSV files. These CSV files will represent invoices, each one having the name of a product, the quantity, and the unitary price. Also, the CSV files will have for name a timestamp, which should represent when the transaction took place. This is an example file:

When dealing with external files, we need to define what is the expected format (if there is one). For this example, we are going to tell Spark that we expect the incoming data to have a certain structure. However, there is always the risk of bad data coming in anyway. If we want to salvage all of the possible good data that we get, we can fish out all the bad rows that don’t comply with our expected structure. Let’s see an example:

Our incoming data will have these three columns:

  • Item: STRING
  • UnitPrice: FLOAT
  • Amount: INT

Now, let’s build a CSV file with many possible problems:

Let’s go through each one of the lines

  • Line one is the header.
  • Line two is fine.
  • Line three has one additional column at the end.
  • Line four is missing a column. Missing columns are assumed to be the rightmost ones.
  • Line five, the second column is a string but a float is expected.

Important: In this example, rows two and three are malformed due to extra/missing columns, while row four has a data type issue. Those two problems are dealt with in different ways.

We need to place our CSV file in our storage account. For this example, we placed it in a folder named csv inside of a container named raw.

Now, let’s build a readStream with a display to load the data and show it on the screen. We can use the following code block to do this:

##########################################################
# stream reader for the CSV files in the storage account #
##########################################################

# Required imports
from pyspark.sql.types import *

# Dictionary with the configuration for the readStream
readStream_options = {
"cloudFiles.format": "csv", # Format expected in the raw data
"cloudFiles.useNotifications": False, # Detection mode for new files
"cloudFiles.schemaEvolutionMode": "rescue", # Bad rows are sent to the _rescued_data column
"multiline": True, # The raw files have multiple rows
"rescuedDataColumn": "_rescued_data", # Name of the column with the rejected data
"mode": "DROPMALFORMED", # Drop the malformed rows
"header": True # The file has a header
}

# Declare the expected schema of the data
readStream_schema = StructType([
StructField("Item", StringType(), True),
StructField("UnitPrice", FloatType(), True),
StructField("Amount", IntegerType(), True)
])

# readStream declaration
df = (spark
.readStream
.format("cloudFiles") # This is the format to use Databrick's autoloader
.options(**readStream_options) # Options declared above
.schema(readStream_schema) # Schema declared above
.load("abfss://raw@temptraining.dfs.core.windows.net/csv/")
)

# Display the dataframe in the screen
df.display()

If we run this code, we are going to get the following output:

CSV data plus the _rescued_data column

Note that rows two and three (Mushroom and Lettuce) were dropped from the source, while row four (Cucumber) made it into the dataframe, but its data type noncompliant columns went into the _rescued_dataspecial column. If we want to have data 100% safe, we can just add a .where("_rescued_data IS NULL")to our dataframe.

So far, we are only reading. To complete the ingestion phase, we need to sink the data into bronze. For that, we need to create the bronze table where we are going to sink our data. Bronze tables should be as close as possible to the raw data that we are ingesting into them.

Find below the DDL for a proposed bronze table named bronze.training_csvdata that we are going to use:

###################################
# DDL for bronze.training_csvdata #
###################################

spark.sql("""
CREATE TABLE IF NOT EXISTS bronze.training_csvdata
(
Id STRING NOT NULL,
Item STRING,
UnitPrice FLOAT,
Amount INT,
InputFileName STRING,
CreatedDate TIMESTAMP NOT NULL
)
USING DELTA
LOCATION 'abfss://bronze@temptraining.dfs.core.windows.net/training_csvdata'
;
""")

If we run this code, we will create an external Delta table. The good thing about external tables is that we have full access to the data on them by going directly into the storage account where we are storing it. That way, we are not depending on Databricks to interact with the data at all; we could perfectly choose to switch to Synapse or any other service that can deal with Delta format.

After creating the table, we are going to take the query that we had to display and sink the data into a table using a writeStream. This is how the code would look like:

#######################################################
# raw CSV file => bronze.training_csvdata gold stream #
#######################################################

# Required imports
from pyspark.sql.types import *
from pyspark.sql.functions import input_file_name, col, expr

# Dictionary with the configuration for the readStream
readStream_options = {
"cloudFiles.format": "csv", # Format expected in the raw data
"cloudFiles.useNotifications": False, # Detection mode for new files
"cloudFiles.schemaEvolutionMode": "rescue", # Bad rows are sent to the _rescued_data column
"multiline": True, # The raw files have multiple rows
"rescuedDataColumn": "_rescued_data", # Name of the column with the rejected data
"mode": "DROPMALFORMED", # Drop the malformed rows
"header": True # The file has a header
}

# Expected schema of the data. We need to pass this to the readStream as a parameter
readStream_schema = StructType([
StructField("Item", StringType(), True),
StructField("UnitPrice", FloatType(), True),
StructField("Amount", IntegerType(), True)
])

# readStream declaration
df = (spark
.readStream
.format("cloudFiles") # This is the format to use Databrick's autoloader
.options(**readStream_options) # Options declared above
.schema(readStream_schema) # Schema declared above
.load("abfss://raw@temptraining.dfs.core.windows.net/csv/")
.select([
expr("uuid()").alias("Id"), # Random UUID for every row
col("Item"),
col("UnitPrice"),
col("Amount"),
expr("input_file_name()").alias("InputFileName"),
expr("CURRENT_TIMESTAMP()").alias("CreatedDateTime")
])
)

# Dictionary with the configuration for the writeStream
writeStream_options = {
"checkpointLocation": "abfss://bronze@temptraining.dfs.core.windows.net/training_csvdata/_checkpoint" # writeStream always requires a checkpoint
}

# writeStream declaration
(
df
.writeStream
.format("delta") # Format required to deal with delta tables
.trigger(processingTime = "10 seconds") # We are running a stream iteration every 10 seconds
.outputMode("append") # Append data to the destination only
.options(**writeStream_options) # Options declared above
.start("abfss://bronze@temptraining.dfs.core.windows.net/training_csvdata")
)

Some highlights here:

  • We changed the readStream by adding a select with the specific columns that we need to match exactly our bronze schema. We added a randomly generated GUID and a current timestamp as a good practice.
  • We defined a checkpoint location for the writeStream. The checkpoint is very important for the streaming process because, in case the stream fails, all the files that have already been processed won’t be processed again.
  • We defined as processingTime 10 seconds because, in this example, we don’t need ultra-low latency. This means that, every 10 seconds the process will pick up all of the newly arrived files and form a microbatch with them.
  • Our outputMode is "append", which just adds new data to the target table.

We start this process and then we drop some files in the storage account. Databricks provides a live chart that is updated when new data goes through the stream. When we drop two sets of files, we can see the two spikes in the chart.

Charts showing data flowing through

When we check our bronze table, we can see that the data has been appended. By checking the InputFileName column, we can see that there is data from several different files.

Data has been persisted in bronze.training_csvdata

5. Transformation

After we have ingested our data into bronze, we can build another process to take it and refine it. Some of the things that we could do are the following:

  • We could filter out data depending on its quality or some business logic.
  • We could join the data to other tables.
  • We could calculate additional columns.

These are just ideas; what to do depends mainly on our requirements and the architecture proposal. Here, in our example, we are just going to calculate two new fields:

  • TotalPrice: UnitPrice times Amount.
  • TransactionDateTime: The timestamp that comes in the name of the physical CSV file.

Those new fields, alongside any other improvements that we choose to do over the data, should be persisted on a new table in the silver layer. Find below the DDL for a proposed silver table named silver.training_transactions that we are going to use for this purpose:

########################################
# DDL for silver.training_transactions #
########################################

spark.sql("""
CREATE TABLE IF NOT EXISTS silver.training_transactions
(
Id STRING NOT NULL,
TransactionDateTime TIMESTAMP,
Item STRING,
UnitPrice FLOAT,
Amount INT,
TotalPrice FLOAT,
InputFileName STRING,
CreatedDateTime TIMESTAMP NOT NULL
)
USING DELTA
LOCATION 'abfss://silver@temptraining.dfs.core.windows.net/training_transactions'
;
""")

After that, we will need another stream to move the data from bronze to silver. Something important to note here: when dealing with streaming, there is always the risk that the checkpoint data might get corrupted or even deleted. If that happens, Spark will lose track of what data has already been processed from the source of the stream and will try to reprocess everything. Therefore, we should account for that risk and build logic to prevent problems around it.

For that reason, it’s a good practice to make our process idempotent, meaning that if we rerun the same process with the same data, we don’t get duplicates downstream. There are many ways to implement this, but the overall idea here is to validate that the source data doesn’t already exist on the destination structures, and if it does, skip it.

For our specific scenario, we can implement this technique by taking the data from bronze and doing a LEFT JOIN with the distinct list of InputFileName that exists in silver. Every row that has a match should be discarded then. This is the proposed code to do that:

##################################################################
# bronze.training_csvdata => silver.training_transactions stream #
##################################################################

# Required imports
from pyspark.sql.functions import expr, col, split

# readStream declaration
df = (spark
.readStream
.format("delta") # Format required to deal with delta tables
.load("abfss://bronze@temptraining.dfs.core.windows.net/training_csvdata/")
.select([
expr("uuid()").alias("Id"), # Random UUID for every row
expr("""TO_TIMESTAMP(SUBSTRING(REVERSE(SPLIT(InputFileName, '/'))[0],1, 16), "yyyyMMdd'T'HHmmssX")""").alias("TransactionDateTime"),
col("Item"),
col("UnitPrice"),
col("Amount"),
expr("UnitPrice * Amount").alias("TotalPrice"),
col("InputFileName").alias("InputFileName"),
expr("CURRENT_TIMESTAMP()").alias("CreatedDateTime")
]).alias("bronze")
.join( # Join to the destination to ensure no duplicates are being inserted
spark.read.format("delta").table("silver.training_transactions").select("InputFileName").distinct().alias("silver"),
(col("bronze.InputFileName") == col("silver.InputFileName")),
"left"
)
.where("silver.InputFileName IS NULL") # Only keep the data that doesn't exist on the destination
.select(["bronze.*"])
)

# Dictionary with the configuration for the writeStream
writeStream_options = {
"checkpointLocation": "abfss://silver@temptraining.dfs.core.windows.net/training_transactions/_checkpoint" # writeStream always requires a checkpoint
}

# writeStream declaration
(
df
.writeStream
.format("delta") # Format required to deal with delta tables
.trigger(processingTime = "10 seconds") # We are running a stream iteration every 10 seconds
.outputMode("append") # Append data to the destination only
.options(**writeStream_options) # Options declared above
.start("abfss://silver@temptraining.dfs.core.windows.net/training_transactions")
)

If we use this code, we can run the stream from bronze to silver, delete the checkpoint, and run the stream again. The process will discard all the rows that have already been processed.

6. Serving

The gold layer is used to build consumption-ready models to serve the data directly to the user. Tables in gold should be optimized to be read, usually by being de-normalized and pre-filtered. We could build many different tables of gold, all coming from the same silver set of tables, but each gold table could serve a different business purpose.

For example, it is very common that business reports require the data to be summarized for easier consumption. In this case, when you keep getting more rows in both bronze and silver, you might not necessarily get more rows in gold. The values for the metrics in gold should be updated instead.

For our scenario, we are going to simulate two summarized tables that could serve two different business requirements.

In silver, we are keeping these three metrics:

  • UnitPrice .
  • Amount.
  • TotalPrice.

In gold, we’re going to build six aggregated metrics by doing a SUM and an AVG to each one of them. Also, as we are going to maintain two different tables, we are going to group the data on one of them by date and on the other by item. Both tables are going to be fed from the same silver data.

These are the proposed DDLs for our two gold tables:

#############################################
# DDL for gold.training_aggregation_by_item #
#############################################

spark.sql("""
CREATE TABLE IF NOT EXISTS gold.training_aggregation_by_item
(
Id STRING NOT NULL,
Item STRING,
SUM_UnitPrice FLOAT,
AVG_UnitPrice FLOAT,
SUM_Amount FLOAT,
AVG_Amount FLOAT,
SUM_TotalPrice FLOAT,
AVG_TotalPrice FLOAT,
CreatedDateTime TIMESTAMP NOT NULL
)
USING DELTA
LOCATION 'abfss://gold@temptraining.dfs.core.windows.net/training_aggregation_by_item'
;
""")



#############################################
# DDL for gold.training_aggregation_by_date #
#############################################

spark.sql("""
CREATE TABLE IF NOT EXISTS gold.training_aggregation_by_date
(
Id STRING NOT NULL,
TransactionDate DATE,
SUM_UnitPrice FLOAT,
AVG_UnitPrice FLOAT,
SUM_Amount FLOAT,
AVG_Amount FLOAT,
SUM_TotalPrice FLOAT,
AVG_TotalPrice FLOAT,
CreatedDateTime TIMESTAMP NOT NULL
)
USING DELTA
LOCATION 'abfss://gold@temptraining.dfs.core.windows.net/training_aggregation_by_date'
;
""")

Now, to populate these two gold tables, we are going to need two different streams (both coming from silver). This is the code to populate the date aggregated table:

############################################################################
# silver.training_transactions => gold.training_aggregation_by_date stream #
############################################################################

# Required imports
from pyspark.sql.functions import *

# readStream declaration
df = (spark
.readStream
.format("delta") # Format required to deal with delta tables
.load("abfss://silver@temptraining.dfs.core.windows.net/training_transactions/")
.select([
col("Id"),
expr("CAST(TransactionDateTime AS DATE)").alias("TransactionDate"),
col("UnitPrice"),
col("Amount"),
col("TotalPrice"),
col("CreatedDateTime")
])
.groupBy("TransactionDate") # We will group the data by the TransactionDate column
.agg( # Create some aggregated metrics over our basic metrics
sum("UnitPrice").cast('float').alias("SUM_UnitPrice"),
avg("UnitPrice").cast('float').alias("AVG_UnitPrice"),
sum("Amount").cast('float').alias("SUM_Amount"),
avg("Amount").cast('float').alias("AVG_Amount"),
sum("TotalPrice").cast('float').alias("SUM_TotalPrice"),
avg("TotalPrice").cast('float').alias("AVG_TotalPrice")
)
.withColumn("Id", expr("uuid()")) # Random UUID for every row
.withColumn("CreatedDateTime", expr("CURRENT_TIMESTAMP()"))
)

# Dictionary with the configuration for the writeStream
writeStream_options = {
"checkpointLocation": "abfss://gold@temptraining.dfs.core.windows.net/training_aggregation_by_date/_checkpoint" # writeStream always requires a checkpoint
}

# writeStream declaration
(
df
.writeStream
.format("delta") # Format required to deal with delta tables
.trigger(processingTime = "10 seconds") # We are running a stream iteration every 10 seconds
.outputMode("complete") # Fully replace the destination on every batch
.options(**writeStream_options) # Options declared above
.start("abfss://gold@temptraining.dfs.core.windows.net/training_aggregation_by_date")
)

Similarly, here we have the code for the stream that maintains the table aggregated by item:

############################################################################
# silver.training_transactions => gold.training_aggregation_by_item stream #
############################################################################

# Required imports
from pyspark.sql.functions import *

# readStream declaration
df = (spark
.readStream
.format("delta") # Format required to deal with delta tables
.load("abfss://silver@temptraining.dfs.core.windows.net/training_transactions/")
.groupBy("Item") # We will group the data by the Item column
.agg( # Create some aggregated metrics over our basic metrics
sum("UnitPrice").cast('float').alias("SUM_UnitPrice"),
avg("UnitPrice").cast('float').alias("AVG_UnitPrice"),
sum("Amount").cast('float').alias("SUM_Amount"),
avg("Amount").cast('float').alias("AVG_Amount"),
sum("TotalPrice").cast('float').alias("SUM_TotalPrice"),
avg("TotalPrice").cast('float').alias("AVG_TotalPrice")
)
.withColumn("Id", expr("uuid()")) # Random UUID for every row
.withColumn("CreatedDateTime", expr("CURRENT_TIMESTAMP()"))
)

# Dictionary with the configuration for the writeStream
writeStream_options = {
"checkpointLocation": "abfss://gold@temptraining.dfs.core.windows.net/training_aggregation_by_item/_checkpoint" # writeStream always requires a checkpoint
}

# writeStream declaration
(
df
.writeStream
.format("delta") # Format required to deal with delta tables
.trigger(processingTime = "10 seconds") # We are running a stream iteration every 10 seconds
.outputMode("complete") # Fully replace the destination on every batch
.options(**writeStream_options) # Options declared above
.start("abfss://gold@temptraining.dfs.core.windows.net/training_aggregation_by_item")
)

Note that in these two blocks of code, the outputMode is "complete" (instead of "append", which we have been using for the previous two streams). This is the difference between the two modes:

  • "append" just inserts new data in the target table since the last batch.
  • "complete" overwrites the target table according to the query defined on the readStream. Think of it as pressing the refresh button in a report when the underlying data changes.

Now, if we turn on our three streams and create some CSV files in the storage account, after a few seconds, we are going to automatically get the summarized, ready-to-consume data in both gold tables. We are going to query our first gold table, where we can see the aggregated data by each one of our food items. The Units column has the sum of all the data coming from the Amount column on our silver table (which would represent the amount of Broccoli pieces) whereas the TotalPrice would represent the value of all the Broccoli together.

This information could be very useful for business areas looking for a high-level view of sales. Typically, we could add a PowerBI report to it and create some nice charts and indicators from the data.

Our second gold table provides a different projection of the data. Instead of having grouped by different foods, we are grouping here by day. We can appreciate in this example the progress of the sales over the days and note that the sales have been going slightly up:

Good, now we can create more CSV files in our storage account to simulate more raw data arriving into our systems. Let’s wait for a few more seconds and re-check our date aggregated table:

We can see that the contents changed automatically as the stream detected more raw data arriving. The streams autonomously processed the new data into bronze and silver and eventually refreshed both gold tables. In this specific example, we can appreciate that the gaps in the sales between the days have grown bigger.

7. Conclusions

In an era when businesses depend increasingly on data for their operations, the evolution towards real-time data processing becomes imperative. The powerful combination of Apache Spark Structured Streaming and the Delta layer emerges as a formidable solution, poised to meet the demands of this transformative shift in data architecture.

This article presents a straightforward proposal for constructing streaming pipelines over Delta Lake. The intention is to provide readers with a practical example that can expedite the initiation of their projects.

--

--