Stream and Batch, where these worlds meet? Lets figure this out

Gonzalo Lencina
DataPebbles
Published in
8 min readFeb 19, 2024

In the realm of real-time data processing, Apache Spark Structured Streaming stands out as one of the most powerful and versatile tools available. This library enables easy and efficient analysis and manipulation of data in real-time. In this article, we’ll delve into how we can leverage Apache Spark Structured Streaming to perform a join between a batch process and a streaming one. But before delving into the joining process, it’s important to grasp some key concepts and features of Structured Streaming.

Databricks

Databricks is a unified analytics platform that integrates seamlessly with Apache Spark for efficient data processing. With its Structured Streaming capabilities, users can easily perform real-time stream processing tasks. By leveraging Spark’s built-in connectors, such as Kafka or Delta Lake, Databricks enables smooth integration with streaming data sources. Its unified notebook interface facilitates the writing and execution of streaming queries using Spark SQL, Python, Scala, or R, streamlining the development of streaming data pipelines. In summary, Databricks provides a comprehensive solution for building and deploying streaming applications with Spark, empowering organizations to gain insights from their streaming data in real-time.

What is Apache Spark Structured Streaming?

Apache Spark Structured Streaming is a real-time data processing library built upon Apache Spark’s distributed data processing engine. It provides a high-level API for defining real-time data streams, enabling developers to write queries similarly to how they would with Spark SQL for batch processing.

One of the main features of Structured Streaming is its ability to handle data continuously and fault-tolerantly. It employs the concept of micro-batches, where data is processed in small batches. This allows processing operations to be performed incrementally on the data instead of processing all of it at once. The progress and state of the process are stored in checkpoints managed by the driver during processing.

Materializing streaming data into tables:

In Structured Streaming, streaming data can be materialized into tables, facilitating their manipulation and analysis. These tables can be of different quality and processing levels, such as bronze, silver, and gold.

Bronze Tables: contain raw or unprocessed data as received from data sources. This data may have duplicates, errors, or inconsistencies and is generally in the original format of the data source.

Silver Tables: contain cleaned and transformed data ready for analysis. This is where initial transformations and cleaning are applied to raw data. Data in silver tables is typically in a more structured and normalized format than in bronze tables.

Gold Tables: contain processed and enriched data ready for use in production applications and analysis. This data is the most refined and has undergone multiple transformations and aggregations to meet specific business requirements.

We must define two concepts to understand the functioning of streaming processes:

Stateless: A type of operation in which each record is processed individually and does not maintain any state or relationship with the rest. Each record is processed in isolation, and no context or information from previous events is taken into account. Simple transformations and stream-static joins are included in this type of operations. That means that in stream-static join we can not buffer unmatched records as a streaming state to be matched later, our query cannot wait for new records to appear in the static side. For this we could configure a separate batch job to find and insert this missed records.

Stateful: A type of operation that considers previous records to calculate current records. This type of operation includes deduplication, aggregation, and stream-stream joins, as they are operations whose result depends on the result of previous records.

Performing a join between a batch process and a streaming one:

One of the most powerful features of Structured Streaming is its ability to join real-time data with historical or batch data. This allows combining the richness and freshness of streaming data with the integrity and coherence of historical data. A stream-static join is an operation that combines data from a streaming table with data from a static table in a join process. Let’s see how we can perform this type of join.

Let’s introduce a scenario in which we start with a source from Kafka, which we ingest through a streaming process into a bronze table. The information we are ingesting contains real-time data on bicycles and bicycle sales. From this bronze table, we create two silver tables through streaming: orders and bikes. Some of these bikes are not used, broken, or not in operation. So, through a batch process that runs every 48 hours, we filter the bikes that are currently operational in the store. Our goal is to create a real-time table with the sales of bikes that are currently operational in the store, making a join between the live table of orders and the static table of current bikes.

It is worth noting that it is the streaming portion of the join that drives the join process, which means that only new data appearing in the streaming part of the join will trigger the process. So adding new records into the batch table won’t update the join result.

Batch and streaming data preparation:

  • Firstly, we need to have a batch process that generates historical data and stores it in a Spark table.
  • We also need to have a real-time data flow that generates streaming data and stores it in another Spark table.

Implementation of bronze table:

Instead of directly connecting our system to Kafka, we’ll connect it to the cloud object storage, where Kafka will be depositing files in JSON format. This allows us to use the Auto Loader to read the files incrementally as they arrive and detect new files. To set up the Auto Loader, we need to specify the format as cloudFiles and then specify that these cloudFiles are in JSON format. In addition to adding the timestamp and year columns, we can see that in the trigger, we specify availableNow=True. This will cause the Spark Streaming process to start working with the data available at the moment the query is initiated, instead of waiting for the next trigger interval defined by the processing schema. This option would be useful in cases where you need processing to start immediately without any additional delays related to the scheduled trigger interval. For example, if you have critical data that needs to be processed in real-time without any wait, or if you need to initiate a query synchronously with some external action.

However, it’s important to note that this option may result in higher resource usage, as processing will start immediately without waiting for the optimal time according to the configured trigger interval schema. Therefore, you should carefully evaluate whether this option is appropriate for your specific use case.

from pyspark.sql import functions as F

def process_bronze():

schema = "key BINARY, value BINARY, topic STRING, partition LONG, offset LONG, timestamp LONG"

query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(schema)
.load(f"{dataset_biketore}/kafka-raw")
.withColumn("timestamp", (F.col("timestamp")/1000).cast("timestamp"))
.withColumn("year_month", F.date_format("timestamp", "yyyy-MM"))
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/bronze")
.option("mergeSchema", True)
.partitionBy("topic", "year_month")
.trigger(availableNow=True)
.table("bronze"))

query.awaitTermination()

process_bronze()

Ingest the orders silver table:

from pyspark.sql import functions as F

json_schema = "order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT, bike ARRAY<STRUCT<bike_id STRING, quantity BIGINT, subtotal BIGINT>>"

query = (spark.readStream.table("bronze")
.filter("topic = 'orders'")
.select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
.select("v.*")
.filter("quantity > 0")
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/orders_silver")
.trigger(availableNow=True)
.table("orders_silver"))

query.awaitTermination()

Ingest the bike silver :

def porcess_bikes():
schema = "bike_id STRING, brand STRING, size STRING, price DOUBLE, updated TIMESTAMP"

query = (spark.readStream
.table("bronze")
.filter("topic = 'bikes'")
.select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
.select("v.*")
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/bikes_silver")
.trigger(availableNow=True)
.table("bikes_silver"))
)

query.awaitTermination()

porcess_bikes()

def porcess_actual_bikes():

query = (spark.readStream
.table("bikes_silver")
.select("v.*")
.filter("current == True")
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/actual_bikes_silver")
.trigger(availableNow=True)
.table("actual_bikes_silver"))
)

query.awaitTermination()

porcess_actual_bikes()

Joining tables:

First option:

 @dlt.table
table_properties={
"quality": "silver",
}
def bike_sales():
return spark.sql("""
SELECT a.bike_id, b.quantity, b.price, b.date
FROM STREAM(LIVE.orders) b
INNER JOIN LIVE.actual_bikes a
ON a.bike_id = b.bike_id
""")

As we can see, we use the @dlt decorator to create the live table that will store the query result. This decorator is specific to Databricks and is used to mark a function as a data flow task within the Data Lifecycle Toolkit. This function is designed to work with tables in a Data Lake, facilitating the creation and maintenance of real-time or near-real-time data streams. This table is associated with the bike_sales function, which reads from the streaming table and performs the join with the static table. It’s a simple code that reflects one of the ways to accomplish it. As we can see, to indicate the streaming part of the join, we use STREAM()to specify that the table is streaming.

Second option:

from pyspark.sql import functions as F

def bike_sales():

orders_df = (spark.readStream.table("orders")
)

bikes_df = spark.read.table("actual_bikes")

query = (orders_df
.join(bikes_df, orders_df.bike_id == bikes_df.bike_id, "inner")
.writeStream
.table("bike_sales")
)

query.awaitTermination()

process_books_sales()

In this latter case, the result of the join between the streaming table is written to the bike_sales table without using the @dlt decorator. In both cases, the result would be the same.

  • Using the Spark SQL API, we can perform a join between the batch table and the streaming table using appropriate join clauses (e.g., INNER JOIN, LEFT JOIN, etc.). During the join process, Spark Structured Streaming will handle the continuous nature of streaming data and ensure that results are updated in real-time as new data arrives.

Handling limitations:

  • Late arriving data: as streaming data may arrive with some delay compared to historical data. To address the issue of late arriving data in the streaming part of the join, one solution is to utilize watermarks. Watermarking in Structured Streaming is a technique used to constrain state in all stateful streaming operations by specifying how much late data to consider. Essentially, a watermark functions as a moving threshold in event-time, trailing behind the maximum event-time observed by the query in the processed data. The trailing gap, also known as watermark delay, determines how long the engine should wait for late data to arrive and is specified in the query using withWatermark.
  • Unbounded state: As the streaming process progresses, the streaming state will continue to expand indefinitely since all previous inputs must be retained, as any new input could potentially match with any previous input. To prevent the state from growing unbounded, it is necessary to specify additional join conditions. These conditions ensure that inputs from an indefinite past cannot match with future inputs, allowing them to be purged from the state. To avoid this we can include this additional conditions in the join:
  1. Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
  2. Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).
  • Unmatched data: Delta Lake does not require foreign key constraints, so it’s possible that the joined data is unmatched. If a change is made to the live table, a recompute of the existing data will not be performed. For that, a backfill through a full refresh would be necessary.

Conclusion:

The join between two streaming tables is more suitable when both data streams are continuous and don’t have a historical component. However, in our case, for instance, we need to store a history of current bikes, which is why we perform a stream-static join.

In this article, we’ve explored how to use Apache Spark Structured Streaming to perform a join between a batch process and a streaming one. We’ve discussed key concepts of Structured Streaming, materializing streaming data into tables, and the steps to perform a join between batch and streaming processes. Additionally, we’ve compared this approach with the join between two streaming tables. By understanding these concepts and techniques, DataOps engineers can make the most of Apache Spark’s capabilities for efficient and effective real-time data analysis.

--

--