Introduction to structured streaming with apache-spark

krishnaprasad k
Nerd For Tech
Published in
5 min readNov 28, 2021

What is structured streaming

Structured streaming is a stream processing framework built on top of apache spark SQL engine, as it uses existing dataframe APIs in spark almost all of the familiar operations are supported in streaming. Structured streaming is fault tolerance streaming mechanism implemented with check-pointing and write ahead logs.

Spark structured streaming

The main idea behind structured streaming is that data continuously being appended to a table, that is in simplest ways structured streaming is your dataframe “but streaming”.

Streaming enables us to build continuous application, those are applications interacting with data in real time.

Structured streaming as streaming dataframe

Core concepts of structured streaming

Transformations

Almost all of the transformations available in spark dataframe API (aggregations,joins) with a few exceptions are also available in structured streaming. There is only one action available that is start of the stream.

/*Prepare the schema for the data by reading the corresponding json file
* */
val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema
/*maxFilesPerTrigger is given a low value to specify how quickly spark is trying
to read the data in the file.
* */
val streaming = spark.readStream.schema(dataSchema)
.option("maxFilesPerTrigger", 1).json("/data/activity-data")

In the above example we are trying to stream from a JSON file with a give schema. The maFilePerTrigger indicates how many records you want to stream per trigger.

/*Just like batch data all the operation in spark streaming are lazy only evaluated at the time of action
* */
val activityCounts=streaming.groupBy("gt").count()

/*Write result to memory
queryName: activity_counts
outputMode: complete(All the rows will be written to the output sink)

* */
val activityQuery = activityCounts.writeStream.queryName("activity_counts")
.format("memory").outputMode("complete")
.start()

After streaming from the JSON file a groupBy operation is applied to the streaming data, and that query is written to the memory, we will be looking into the various output locations for writing(sinks) and modes later in this article.

/*wait for the termination of the query
* */
activityQuery.awaitTermination()

for( i <- 1 to 5 ) {
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}

The query will continue to stream to the memory until termination is called, awaitTermination() will wait for the termination of the query. The streaming process will not really start untill the awaitTermination function is called, but for simplicity we will be skipping the awaitTermination() in rest of our code snippets.

Aggregations and joins

Spark structured streaming supports aggregations and join operation similar to spark dataframe API let us see one example for each.

/*Aggregations
Cube:
Create a multidimensional cube for the current Dataset using the specified columns,
so we can run aggregation on them.
* */
val deviceModelStats = streaming.cube("sample1").avg()
.writeStream.queryName("device_counts").format("memory").outputMode("complete")
.start()

The cube function creates a multidimensional cube with given column names, so we can run aggregations on them.

/*Joins

* */
val historicalAgg = static.groupBy("sample1").avg()
val deviceModelStatsJoins = streaming
.cube("sample1").avg()
.join(historicalAgg, Seq("sample1") .writeStream.queryName("device_counts").format("memory").outputMode("complete")
.start(

Data sources

In structured streaming data sources are the sources where data is read from the following are some examples of data sources.

  • Apache kafka
  • File in a distributes file system HDFS or s3 spark will continuously read the new files.
  • A socket for testing

Data sink

Sinks are the destination of structured stream, the following are the examples of sinks.

  • Apache kafka
  • Files of almost any format(csv,parquet,json)
  • A memory sink

Output modes

How to write data into the output sink. The supported modes are.

Append: Append the incoming data to the existing table. Append mode is similar to appending new rows to the existing table.

Complete: Full output will be re-written in the complete mode.

Update: Similar to complete mode but only those rows having change will be affected in the update mode.

Triggers

Triggers specify when to look for new set of records, by default spark will look for new set of records when it is finished with new set of data. Spark also supports triggers based on time(event based triggers).

Event time based processing

Spark supports event time based processing that is processing data based on the event timestamp in the record.

Watermark

watermark specifies how late spark streaming expect to see the data in event time.

Watermark length indicates how long they need to remember the length of the data.

Reading and writing from the kafka source

Apache kafka is a distributed publish and subscribe system for the streams of data it lets you to publish and subscribe records like a message queue in a fault tolerant way. kafka store records in categories that are referred to as topics, each record in a topic consists of key, value and a timestamp. The position in which the records stored are called offsets.

Reading data from kafka topics

To read data from the kafka topic must choose one of the following patterns, assign,subscribe and subscribePattern.

  • Assign: It is a fine grained way of reading from kafka topic by not just specifying topics but also the topic partitions. This can be given as a json string.
{"topicA": [0,1],"topicB":[2,4]}
  • Other two methods are reading from kafka topics by specifying the list of topics

Starting and ending offsets

The start point where the query is started, this can be specified using a json string.

{"topicA": {"0":23,"1":-1},"topicB":{"0":-2}}

In the above json string -2 specifies the earlisest and -1 specifies the latest offsets. This applies only when new streaming query is started, rest will pick up from where the query left off.

failOnDataLoss

Whether to fail the query when the data is lost. Query can fail due to topics are lost or when the offsets are out of range. The default value will be true.

maxOffsetperTrigger

The number of offsets to read per trigger.

val df1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

The output will consists of following format

  • key: binary
  • value: binary
  • topic: string
  • partition: int
  • offset: long
  • timestamp: long

The output from kafka will be serialized in some way use spark dataframe API or user defined functions to convert it into required format.

val df2=df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Write data into kafka topics

Writing data to kafka topics are also known as publishing.

df2.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.start()

When to output the data

Triggers can be used to provide intervals to when to output the data to the sink.

import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
.format("console").outputMode("complete").start()

Trigger once can be used to output the data only once.

activityCounts.writeStream.trigger(Trigger.Once())
.format("console").outputMode("complete").start()

--

--

krishnaprasad k
Nerd For Tech

Data engineer | Deep learning enthusiast | Back end developer |