Streaming data in a Delta table using Spark Structured Streaming

A solution for real-time data processing

Sudhakar Pandhare
Globant
7 min readDec 14, 2022

--

As a Data Engineer, you must deal with Big Data technologies, such as Spark Streaming, Kafka, and Delta Lake. However, when combining these technologies at a large scale, you can find yourself searching for a solution that covers all the complicated production use cases. In this blog post, I will share how you can leverage the analytical power of data platforms by combining those three technologies to build real-time data processing and analytics.

Problem Statement

It is a widespread problem where we need low latency analytics in your data platform. Typically data is processed in the Delta Lake via batch jobs to generate landing and final processed data layers. This increases the processing time because of the batch nature, and the data arrival happens with the next scheduled jobs.

Batch processing creates delays which impact business in taking quick decisions. Also, the batch process execution depends on an orchestrator like Airflow.

Batch processing architecture

Proposed Solution

The idea of the Spark Streaming job is that it is always running. It constantly reads events from the Kafka Topic, processes them, and writes the output into the Delta Lake table. The job should never stop.

Just as an event-driven architecture processes the data in near real-time via Kafka topics, we will use Delta Lake tables to bridge the near real-time data processing pipeline.

The Architecture

Real-time analytics system based on Kafka events. The first entry point of data in the below architecture is Kafka, consumed by the Spark Streaming job and written in the form of a Delta Lake table. Let's see each component one by one.

Event-driven Architecture

Event Store (Kafka)

We are using Kafka as our event store to handle real-time data. Kafka is a community-distributed streaming platform capable of handling trillions of events daily. It publishes and subscribes to a stream of records and is also used for fault-tolerant storage. It stores, reads, and analyses the streaming data. It is used for messaging, website activity tracking, and logs.

A Kafka cluster consists of one or more servers running called Kafka Brokers, and within these Brokers, there are Kafka Topics. Kafka Topics hold data that is being produced and consumed.

Spark Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The Spark SQL engine will run it incrementally and continuously and update the final result as streaming data continues to arrive.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs, thereby achieving end-to-end latencies as low as 100 milliseconds and exactly once fault-tolerance guarantees. However, since Spark 2.3, a new low-latency processing mode called Continuous Processing has been introduced, which can achieve end-to-end latencies as low as one millisecond with at-least-once guarantees. Without changing the Dataset/Data Frame operations in our queries, we can choose the mode based on our application requirements.

Delta Lake

Delta Lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. We can store our data as-is, without having to first structure the data, and run different types of analytics from dashboards and visualization to big-data processing and real-time analytics.

Why do we need Delta Lake? There are several reasons:

  • Data "overwrites" on the same path causing delta loss in case of job Failure.
  • Data "overwrites" on the same path causing delta loss in case of job Failure.
  • Plain Spark code writing the AVRO and PARQUET files does not have ACID transactions.
  • In Spark, there is no support for schema evolution. If we try to merge two data frames with different schemas, the job will fail.
  • Data versioning is not maintained; we cannot roll back to the older data once data is deleted.

The following are the key features of Delta Lake:

  • ACID transaction- With Delta Lake, We do not need to write any code — It is automatic that transactions are written to the blog. This transaction log is the key and represents the single truth source.
  • Scalable Metadata Handling- Spark distributed processing power to handle all the metadata for petabyte-scale tables with billions of files at ease.
  • Unified Batch and Streaming- No longer a need to have a different architecture for reading stream of data vs. Batch of data. Delta Lake table is a batch and streaming source and sink.
  • Schema Enforcement- If you put a schema on a Delta Lake table and try to write data to that table that is not conformant with the schema, it will give you an error and not allow you to write that, preventing you from bad writing.
  • Schema Evolution- Schema evolution is a feature that allows users to easily change a table's current schema to accommodate data that is changing over time. Schema evolution is activated by adding .option("mergeSchema", "true") to our .write or .writeStream Spark command.
  • Upsert and Delete-Delta allow us to do upsert or merge very easily. We can merge data from another data frame into your table and do updates, inserts, and deletes.
  • Compaction in Delta Lake-IF we continuously write data to a Delta Lake table, it will, over time, accumulate many files, especially if we add data in small batches. We can compact a table by repartitioning it to a smaller number of large files.

Implementing the proposed solution

The following steps are needed.

1. Create a Topic on the Kafka cluster. --bootstrap.server is a comma-separated list of host and port pairs that are the addresses of the Kafka Brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. Before we can write our first events, we need to create a Topic. Let's create employee_topic. Open a terminal session and run the below command:

sh kafka-topics.sh –create –topic topic_name –bootstrap-server 
{bootstrap-server}

2. Publish data to the Kafka Topic. A Kafka client communicates with the Kafka Brokers via the network for writing (or reading) events. Once received, the Brokers will store the events in a durable and fault-tolerant manner for as long as we need — even forever.

Run the console producer client on the terminal session to write a few events into your Topic. By default, each line you enter will result in a separate event written to the Topic.

sh kafka-console-producer.sh -broker-list {bootstrap-server} -topic topic_name

Let's publish data to the employee_topic.

JSON events in the Kafka topic

3. Schema. The Kafka Topic events have JSON format. Before we read the Kafka Topic events in a streaming way, we just need to provide a schema.json file. This file contains metadata information of events data available in the Kafka Topic. Let's start with some imports, create Spark sessions and load the schema.json into a schema.

from delta. tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

spark = SparkSession.builder.appName('job_name').getOrCreate()
schema_file = "schema.json"
data = open(schema_file)
schema = StructType.fromJson(json.load(data))

4. Spark Streaming job reads data from Kafka Topics. We need to explicitly state from where we are streaming with format("kafka") and should provide the Kafka servers and subscribe to the Topic we are streaming from using the option.

Spark Streaming uses readStream() on SparkSession to load a streaming dataset from Kafka. The option kafka.bootstrap.servers will provide Kafka bootstrap server details and in subscribe will provide the Kafka Topic name. The option startingOffsets earliest is used to read all data available in Kafka at the start of the query; we may not use this option that often, and the default value startingOffsets is latest which reads only new data that's not been processed.

data_df=spark.readstream.format("kafka")\
.option("kafka.bootstrap.servers","{bootstrap-server}")\
.option("subscribe","{topic_name}")\
.option("failOnDataLoss","False")\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr(schema)

data_df.printSchema()

5. Write final data_df as Delta Lake table. Using structured streaming, we can write a data stream into a Delta Lake table. Delta's transaction log guarantees that each message will be processed exactly once. It also supports concurrent writing from multiple streams or batch jobs into the same table. By default, the messages in the stream will be appended to the Delta Lake table.

data_df.writeStream.format("delta")\
.outputMode("append")\
.option("checkpoint_location","{checkpoint-location}")\
.trigger("processing=30 seconds")\
.start("{Delta table path}")

6. Read the Delta Lake table as Stream. When we load a Delta Lake table as a stream source and use it in a streaming query, the query processes all of the data present in the table and any new data that arrives after the stream is started.

We can also control the maximum size of any micro-batch Delta Lake gives to a streaming by setting the maxFilesPerTriggeroption. This specifies the maximum number of new files to be considered in every trigger. The default is 1000. Option ignoreChanges in the stream will not be disrupted by deleting or updating the source table.

delta_df=spark.readStream.format("delta")\
.option("maxFilesPerTrigger",5)\
.option("ignoreChanges","true")\
.load("{Delta table path}")

7. Write data to the delta table. We can write a data stream into a delta table using structured streaming. Delta's transaction log guarantees that each message will be processed exactly once. It also supports concurrent writing from multiple streams or batch jobs into the same table. By default, the messages in the stream will be appended to the delta table.

delta_df.writeStream.format("delta")\
.outputMode("append")\
.option("checkpoint_location","{checkpoint-location}")\
.trigger("processing=30 seconds")\
.start("{Delta table

Conclusion

I have covered real-time data processing with Delta Lake features in this article. With the above event-driven architecture solving the integration problem between Spark Streaming and Kafka was an important milestone for building our real-time analytics dashboard.

--

--