Executing Raw and Aggregated Join in Spark Structured Streaming

Shubham Bathwal
Subex AI Labs
Published in
8 min readJun 5, 2023

Introduction to Streaming

What is Stream Processing?

Stream processing refers to a method of handling and analyzing data in real-time as it is continuously generated or received. It involves the processing of data streams in a sequential and continuous manner, rather than storing and processing data in batches or as static datasets.

Stream processing systems are designed to handle high-volume, high-velocity data streams from various sources such as sensors, social media feeds, logs, financial transactions, and more. These systems allow for the immediate processing, transformation, and analysis of the data as it flows, enabling real-time insights and decision-making.

Stream processing involves several key components, including data ingestion, data processing, and data output. Data is ingested from the stream source, processed using algorithms, filters, aggregations, or other operations, and then delivered to one or more destinations, such as databases, dashboards, or other applications.

Why Streaming Processing ?

Processing streaming data offers several advantages and opportunities compared to traditional batch processing. Here are some reasons why processing streaming data is beneficial:

Real-time insights: Streaming data processing enables real-time or near-real-time analysis of data as it is generated. This allows organizations to gain immediate insights and make timely decisions based on the most up-to-date information.

Rapid detection and response: Streaming data processing facilitates the quick identification of critical events or anomalies. By continuously analyzing incoming data in real-time, organizations can promptly detect and respond to issues, such as fraud, security breaches, system failures, or any other events that require immediate attention. This proactive approach can help mitigate risks and minimize the impact of potential problems.

Improved scalability: Stream processing systems can be designed to scale horizontally, meaning they can handle increasing data volumes by distributing the workload across multiple processing units. This scalability allows organizations to accommodate growing data streams and handle high data throughput effectively.

Enhanced situational awareness: By processing streaming data in real-time, organizations can gain a better understanding of current situations and trends. They can monitor and analyze data as it unfolds, allowing for more accurate and timely decision-making. This situational awareness is valuable in various domains, such as finance, logistics, cybersecurity, IoT, and many others.

Personalized and real-time experiences: Streaming data processing enables organizations to deliver personalized and dynamic experiences to users in real-time. For example, in e-commerce or digital advertising, real-time analysis of user behavior and preferences can be used to deliver personalized recommendations or targeted advertisements instantly.

Overall, processing streaming data offers the advantage of real-time insights, rapid detection and response, continuous processing, scalability, enhanced situational awareness, and the ability to provide personalized and real-time experiences. These benefits make stream processing invaluable in numerous use cases across industries.

Streaming Use Cases:

· Fraud detection

· Real-time stock trades

· Marketing, sales, and business analytics

· Customer/user activity

· Log Monitoring: Troubleshooting systems, servers, devices, and more

· Ride share matching

Real Life Example

When a passenger calls Lyft, real-time streams of data join to create a seamless user experience. Through this data, the application pieces together real-time location tracking, traffic stats, pricing, and real-time traffic data to simultaneously match the rider with the best possible driver, calculate pricing, and estimate time to destination based on both real-time and historical data.

Challenges in Stream Processing:

· Scalability

· Ordering

· Consistency and Durability

· Fault Tolerance & Data Guarantees

Spark Structured Stream

Structured Streaming is built on top of the Spark SQL engine, which is a distributed SQL query engine that allows you to query large datasets using SQL syntax. Structured Streaming extends the Spark SQL engine to support real-time streaming data processing by providing a unified API for batch and streaming data processing.

Spark Structured Streaming works by dividing the data stream into micro-batches and processing each batch using Spark’s batch processing engine. This allows Structured Streaming to process data streams in near real-time without having to write complex streaming code.

Why Spark Structured Streaming?

· Ease of use : Developers can use SQL-like queries to process real-time data streams

· Scalability : Spark Structured Streaming is built on top of the Spark SQL engine, which is a distributed SQL query engine that can scale to handle large datasets

· Fault-tolerant : Spark Structured Streaming is fault-tolerant, which means that it can handle failures without losing data.

· Integration : Spark Structured Streaming integrates with a wide range

Join Raw and Aggregated Streams

Raw Stream : Input Data Stream

Aggregated Stream : Window Based Aggregation on Input Stream.

Why join Raw and Aggregated Streams?

When using Spark Structured Streaming or similar stream processing frameworks, it is common to aggregate data based on a look-back window or sliding window interval. This aggregation helps in summarizing or condensing the data to derive useful statistics or metrics. However, the aggregated data alone may lack context or details about individual transactions or records.

By joining the raw data stream with the aggregated data stream, we can combine the summarized information with the corresponding original data. This allows us to obtain transaction or record-level results that provide a comprehensive understanding of the data.

Therefore, the relationship between the raw and aggregated streams is crucial for obtaining transaction-level details and gaining a more comprehensive understanding of the data.

Example Use Case:

Let’s consider a scenario where we have a raw stream of sales transactions and an aggregated stream that provides the total sales amount per product category for a specific time window. If we only had the aggregated data, we would know the total sales amount for each category, but we wouldn’t have insights into which individual transactions contributed to those totals. By joining the raw and aggregated streams, we can determine which specific sales transactions contributed to the aggregated amounts, enabling us to analyze the performance of individual products or identify patterns in customer behavior.

Way to join input and agregated stream in Spark Strucutred Streaming join

Input and Aggregated stream can be joined in spark structured streaming in append mode.

Understanding the modes :

Spark Structured Stream provides ways to process the input stream.

Complete : Entire result will be outputted after every trigger.

Append : Only the new rows will be outputted after every trigger.

Update : Only the updated result since the last trigger will be outputted.

Shortcoming of Append Output Mode:

Joining two stream is only available for append mode. But it has certain limitations.

In append mode the output for a records comes only once when spark is sure that the lifetime of the record is completed and it is not going to be updated in future. This time will be sliding interval plus the watermark duration — so there will be a delay of this duration in outputting the actual data.

Our Approach:

Terminologies -

Foreach Batch: The foreachbatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. This allows you to define a function that is executed on the output data of every micro-batch of a streaming query.

Check Point: Checkpoint helps build fault-tolerant and resilient Spark applications. In Spark Structured Streaming, it maintains intermediate state on HDFS compatible file systems to recover from failures.

Checkpoint Directory Folder Structure :

1. Commits — It maintains batch wise commits.

2. Offsets — The offset files contain information about the position of the last processed record in each partition of the input data source. This information is used to resume processing from the last checkpoint in case of a failure or a restart. Applicable in case of Kafka

3. Source — The offset files contain information about the position of the last processed record in each partition of the input data source. This information is used to resume processing from the last checkpoint in case of a failure or a restart. Applicable in case of file store

4. State — The state files contain the state of the streaming job, such as the state of the aggregations or the state of the window operations.

5. Metadata — The metadata files contain information about the data sources, schema, and other metadata related to the streaming job.

Architecture Diagram:

Join Raw and Aggregated Stream — Architectural Diagram

Flow:

After Aggregating the values, we call the foreach batch function with output mode as update, Inside the foreach batch function we do the following things:

  1. Get the batch number from the foreach batch function. When we call the foreach batch function by default it passes two arguments — Batch Dataframe and Batch Number.
  2. Based on the batch number we will get the list of files that needs to be read again. From the checkpoint folder we can the list of files read in the current batch either from source or offset folder depending on the data source.
    Sample File :
    {“path”:”s3a://simulated-simbox/simulated-simbox/part-13037.parquet”,”timestamp”:1670486999000,”batchId”:665}{“path”:”s3a://simulated-simbox/simulated-simbox/part-13038.parquet”,”timestamp”:1670487000000,”batchId”:665}
  3. Parse the file list and then read the data from the source as a batch dataframe inside the foreach batch function.
  4. Apply the transformations again that was applied on streaming data before the aggregations.
  5. Join the raw data frame with the aggregated streaming data frame and drop the duplicates.
  6. Apply transformations on the joined data frame and write it to sink.

Using Hypersense AI to do stream operations:

HyperSense AI, a powerful AI platform, offers advanced functionality specifically designed for processing streaming data and joining streams, revolutionizing data-driven decision-making. With its comprehensive set of features, Hypersense AI enables businesses to effortlessly incorporate real-time insights into their decision-making processes. By seamlessly integrating Hypersense AI into the flow of processing streaming data, organizations can efficiently aggregate, transform, and analyze data streams in real-time. Moreover, Hypersense AI provides robust capabilities for joining streams, allowing businesses to combine multiple data sources, perform complex join operations, and extract valuable insights from the unified view of the data. This seamless integration empowers organizations to gain immediate insights, make timely decisions, and uncover actionable intelligence from streaming data. With Hypersense AI’s powerful streaming data processing and joining capabilities, businesses can unlock new opportunities and gain a competitive advantage in the dynamic landscape of data-driven decision-making.

About Hypersense AI:

HyperSense AI is a cutting-edge AI Orchestration platform from Subex that provides the following benefits to our customers

  • Orchestrate E2E machine learning model building, deployment and ML Ops, at least 3X faster than traditional methods
  • No Code platform requiring no knowledge of any programming languages to build ML models
  • Fully automated Auto ML capability for even business users to do simple AI/prediction tasks, thereby democratising AI adoption within enterprises
  • Explainability, What-if analysis, counterfactual analysis to ensure model transparency and remove bias ensuring that you can operationalize AI models with full confidence
  • Can be deployed on any Hyperscaler cloud platform, or on-prem. Built using distributed computing architecture and micro services for high scalability and support for huge volumes of batch and streaming data processing and analytics.
  • Subex professional services with deep telco knowledge to solve complex problems using AI/ML

--

--