Apache Structured Streaming for End-to-End Real-Time Application

Shuhana
intelligentmachines
6 min readJun 6, 2020

Many applications today require processing data in real-time and make decisions based on real-time data such as fraud detection, network intrusion detection, IoT application. Apache Structured streaming is a distributed and fault-tolerant streaming engine that enables processing data in real-time. It has a very declarative, unified, high-level API for building real-time applications. Users don’t have to reason about complex streaming logic, fault tolerance, state management, and how the streaming application interacts with other applications. Structured Streaming also makes it easy to integrate streaming applications with other applications that include batch and interactive analysis.

In comparison to other stream processing systems, structured streaming provides far more declarative, high-level functional API. In most systems such as Google Dataflow, Apache Flink, Apache Spark Streaming, users have to build DAG of physical operators. Users have to think in terms of complex execution logic such as consistency, at-least-once delivery, out-of-order data, triggering modes. Structured streaming makes it simpler for the user to write streaming applications through its incremental query model. Many streaming engines just focus on streaming computation. But in real-world streaming systems are part of larger continuous applications. Continuous applications are end-to-end applications that react to data in real-time. These applications include running interactive queries on streaming data, ETL jobs transforming data from one storage system to another, online machine learning. Developers are responsible for making streaming systems interacting with external systems. Structured streaming address these problems by integrating its API with batch and interactive API.

Overview

Structured streaming supports a variety of input sources and output sinks such as Amazon kinesis, Apache kafka, network sockets, HDFS, Flume, etc. To ensure fault tolerance Input source must be replayable that means recent input data can be re-read and output sink must support idempotent writes. Structured Streaming also supports input and output from tables in Spark SQL.

The user provides a query to execute across the input streaming data just like a batch query to compute a result table that will be written to an output sink. Structured streaming automatically converts this batch-like query to a streaming execution plan.

Structured streaming makes a strong guaranty about consistency that is called prefix consistency. Given the input data is relatively ordered within a source, the result table will always correspond to a prefix of all input sources. The result table is always consistent with a prefix of the data. Prefix consistency also makes it easy to recover from failure as users can roll back the system to a specific prefix of the data and recompute outputs from that point.

Components of Apache Structured Streaming

There are two execution modes in structured streaming.

  • Micro Batch mode: This is the default mode of execution. Data streams are processed as a series of micro-batch jobs achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.
  • Continuous mode: This mode achieves low latency (~1 ms) with at-least-once fault-tolerance guarantees at a cost of less operational flexibility.

Programming Model

The input stream is considered as an unbounded table in structured streaming. New data is interpreted as an appended row in the input table. This allows us to treat both batch and streaming data as tables.

Users specify triggers to control how often the engine will attempt to compute a new result and update the output sink. Users can mark a column as denoting event time (a timestamp set at the data source), and set a watermark policy to determine when enough data has been received to output a result for a specific event time.

The sink’s output mode specifies how the result table is written to the output system. The engine supports three distinct modes:

Complete: The whole result table is written to sink on every update.

Append: Only the new rows in the result table will be written to sink.

Update: Only the modified rows will be written to sink.

As an example, let’s say we want to count the number of transactions by country of a customer for an online shopping application. We receive transaction data(invoice-no, product-name, price, time, customer-id, country) from the S3 bucket. At first, we have to create a SparkSession, the starting point of spark application.

from pyspark.sql import SparkSessionspark =  SparkSession \
.builder \
.appName("Python Spark Structured Streaming Example") \
.getOrCreate()

Then create a DataFrame from the input source and transform the data to count transactions.

df = spark.readStream.json("s3://logs")
counts = df.groupBy("country").count()

counts DataFrame is our result table which will be updated when there is some new input data. Now we want to write the complete output to S3 bucket in parquet format on every result table update.

counts.writeStream \
.format("parquet") \
.outputMode("complete") \
.trigger("1 minute") \
.save("s3://counts")

The output mode parameter specifies how the output will be written to sink on every update. Here we want the complete result. Trigger specifies when the streaming engine will start incremental computation and update the result table. Here we set it to 1 minute. No trigger means as fast as possible.

Structured streaming will automatically optimize and incrementalize our batch like query and execute it in a continuous manner. To calculate aggregation operation like count, an intermediate state needs to be stored. Structured streaming tracks state in a write-ahead log and a state store.

Structured streaming also supports windowed operation through event time processing. Event time is a timestamp associated with data. This allows expressing window-based operation as grouping and aggregation on the event time column in the data. For instance, we can count transactions in an hour window updating every five minutes.

windowedCounts = df.groupBy(window(df.time, "1 h", "5 min")).count()

Event time allows data to come out of order. Structured Streaming can maintain the intermediate state for a long period of time and update the corresponding window in the result table when late data arrives. But if there are no bounds on the size of the intermediate state then the system will run short of memory. Watermarking solves this problem. Watermark specifies how late the data will be accepted by the system. The system will forget intermediate states and will not receive late data for a window if it’s end time is less than given threshold from the maximum event time seen by the system.

State Management and Failure Recovery

Structured Streaming uses two forms of durable storage to track state and achieve fault tolerance. First, a write-ahead log keeps track of which data has been processed and reliably written to the output sink from each input source. Second, any operators requiring states such as aggregation operators or custom stateful operators checkpoint their state periodically and asynchronously to the state store. Both the log and state stores can run over pluggable storage systems such as HDFS or S3.

Upon recovery, the system finds the last epoch of input data that is not committed to the sink from the log. Epoch is defined as start and end offsets of data. It then initializes memory state from the earlier epoch that is committed to sink and rerun the last epoch.

Read More

--

--