DATA PROCESSING IN “REAL TIME” WITH APACHE SPARK STRUCTURED STREAMING: PART 2
This post is part of a series of articles on the Apache Spark use case for real-time data processing, check out part 1. Written by — Eiti Kimura, IT Manager at Wavy
Apache Spark Structured Streaming
Among the features provided by Apache Spark, one of the ones that draws the most attention to the problem of real-time data processing is the availability of the data stream processing API, known as Structured Streaming.
Using the stream processing API, we were able to filter, count, aggregate data, make projections and group the data that is passing in real time in your stream, in this case it could be a stream server like Kafka, for example, or even a system distributed files (DFS).
To better illustrate the use of the Structured Streaming API, I will replicate a very simple example for comparing a batch processing implementation with processing a stream. This example is the bê-á-bá when it comes to this technology.
In this example, we have the reading of a directory, represented by “source-path” in CSV format, the data is selected and then a simple filter is applied; as a result, the data is transformed into a parquet format.
The first example is using the batch processing API, as a program for Apache Spark is usually written:
1
2
3
4
5
6
7
8
9
10
11
input = spark.read
.format("csv")
.load("source-path")
result = input
.select("device", "signal")
.where("signal > 15")
result.write
.format("parquet")
.save("dest-path")
Fragment 1. Scala code snippet for data processing using the Batch API.
In the second example, I’ll show you how to transcribe the previous code snippet to start processing a data stream. The same program can be rewritten without much effort, being as follows:
input = spark.readStream
.format("csv")
.load("source-path")
result = input
.select("device", "signal")
.where("signal > 15")
result.writeStream
.format("parquet")
.start("dest-path")
Fragment 2. Scala code snippet for data processing using the API Structured Streaming.
Note that compared to the previous program, the modifications were minimal, completely preserving the business rule applied to the data, which in this case is the information filtering. Instead of using the read command, let’s use readStream. Likewise instead of the write command, we will use writeStream and instead of calling the save command, we will call start, to start processing.
Bearing in mind that a job using the stream API generates a process that runs continuously, since conceptually a stream has no beginning or end, but a continuous flow of data that varies over time. Conceptually, we can start consuming information from a stream from any point in time, even simultaneously at different points (offsets). Figure 1 illustrates the production and consumption of information in a data stream.
It is worth mentioning here that Apache Spark Structured Streaming is not a stream server, that is, it is not equivalent to Apache Kafka. Structured Streaming was designed with the aim of processing information, to and from data streams.
Figure 2 illustrates the processing of streams by Structured Streaming, in which there is a data stream to be processed and a timeline representation (from left to right). As time passes, new records arrive that are continuously processed.
Spark reads data in a data structure called Input Table, responsible for reading information from a stream and implementing the platform’s Dataframe API operations. Then all business rules, processing, transformations and filtering are applied to the data that are consolidated in a structure called Result Table, responsible for storing the data before being dumped as a result of computation in the configured Output, which in turn can be a distributed file system, database or even another stream.
Operations on the data streams are performed continuously as the data arrives for reading and processing. The frequency at which these small batches of processing are performed can be defined by configuring a Trigger, a trigger that triggers the stream’s processing. This configuration depends on your processing capacity and also on the needs of your application.
Ilustração do processamento de dados contínuo pelo Apache Spark.
The most striking features in Structured Streaming are:
• New high-level API: the old streams processing API, DStreams was discontinued, the new Structured Streaming API was designed to generate the least possible impact on the existing code, in addition to correcting the problems of the previous version;
• Continuous data joining: it is possible to continuously join (join) operations of a stream data entry with static content, another DataFrame loaded from files, for example;
• Integrations with different data sources: different file types are supported when it comes to distributed file systems that can be mapped as both input and output streams, they are: CSV, Parquet and JSON. We can connect Spark directly to a stream server like Apache Kafka;
• Fault tolerance (checkpoints): this is a point that addresses the application’s resilience. After performing a set of operations on the data, Spark always saves the current state of operations in the form of checkpoints, markers recording the progress of processing. These markers are recorded in the DFS itself, thus shared with the other nodes in the cluster. In case of failure of one of the processing workers, the checkpoint files are used to know exactly where the processing was performed and from which point it should continue. Checkpoints increase the overall availability of the application. Observe in Figure 2 that before triggering a new processing task, the previous state is read to know the point that stopped the previous process;
• Treatment of disordered events (watermark): this is one of the great differentials of this tool, the treatment of disordered data. Imagine the case of an application based on events over time, a grouping of how many events have passed given a time window. Example: count of events received every 5 minutes. What would happen if an event was late for processing? We could not count as if the event had been generated at the time of processing, otherwise we would have an inconsistency in the data. The implementation of watermarks addresses exactly this type of case. A time window is maintained and if an event arrives late, however, still within that window, the data will be retro-consolidated. Example: 10 min window, if the data arrives within 10 min of delay, it will be consolidated in the correct time range in which the event occurred in the past, thus maintaining consistency in operations.
Figure 3 illustrates the case of an event that arrived late (see the notes in red), instead of being discarded or consolidated in the wrong time window, it was identified that the data arrived late and it was consolidated correctly in its time window .
Processing of data that arrived late using Watermarking
A practical example with Structured Stream
Now that the theoretical part has been explained together with the main features of the tool, it is time to develop a practical example.
In this example, we will read the data simulating that they are arriving in a file system, in this case, the system will be the local disk, for didactic purposes. The data read from a CSV has the fields described in the following table:
Sample data contained in CSV files.
The files are organized as follows:
The first step is to connect to the Spark cluster and configure the Data Input. This example was prepared to run locally, so there is no need to create an Apache Spark cluster to run the code, everything processes in a self-contained manner. All code snippets shown below are in Scala language.
val DIR = new
java.io.File(".").getCanonicalPath + "/dataset/stream_in"
//setting cluster definition
val conf = new
SparkConf()
.setMaster("local[*]")
.setAppName("Spark Structured Streaming Job")
// initiate spark session
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
//1. == DATA INPUT ==
// read data from datasource, in this particular case it is a directory
val reader = spark.readStream
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("latestFirst", "true")
.schema(SchemaDefinition.csvSchema)
.load(DIR + "/*")
Fragment 1: Establishing the session and defining the data input.
In this example, the files are stored in the directory: dataset / stream_in.
It is important to note that in the definition of readStream the CSV schema was informed to Spark, at this point it is necessary to show how the data should be read, the schema follows:
object SchemaDefinition {
// the csv data schema
def csvSchema = StructType {
StructType(Array(
StructField("name", StringType, true),
StructField("country", StringType, true),
StructField("city", StringType, true),
StructField("phone", StringType, true),
StructField("age", IntegerType, true),
StructField("carrier", StringType, true),
StructField("marital_status", StringType, true)
))
}
}
Fragment 2: Definition of the Data Schema.
With the data entry reading the information with readStream we inform Spark that the directory should be treated as a data stream, that way Spark knows that there will be an influx of new files in that directory and that they must be processed continuously.
The next step will be the processing with the data that is arriving, in this example, I will group the records by the fields: carrier and marital_status, simply counting the number of occurrences.
//2. == DATA PROCESSING ==
reader.createOrReplaceTempView("user_records")
val transformation = spark.sql(
"""
SELECT carrier, marital_status, COUNT(1) as num_users
FROM user_records
GROUP BY carrier, marital_status
""")
Fragment 3: Processing the input stream with the Dataframe API.
As Structured Stream works with the same API for processing Dataframes, we can simplify the business rule by writing the code in SQL, instead of one of the other supported languages.
To do this, the Dataframe must be registered as a temporary view and from then on we can execute SQL commands on that view. This was done using the command: <pre> createOrReplaceTempView (“user_records”) </pre>.
We have two well-defined steps, the input and the processing, now we need to define the output. For didactic purposes, the output will simply be printed on the console:
//3. == DATA OUTPUT ==
val consoleStream = transformation.
writeStream.
option("truncate", false).
outputMode(OutputMode.Complete).
trigger(Trigger.ProcessingTime("2 seconds")).
format("console").
start()
consoleStream.awaitTermination()
Fragment 4: Definition of the information output, result of the processing.
Some notes regarding the definition of the writeStream output, the OutputMode.Complete indicates that all the output must be dumped in the configured output, which in the case of the example just print the result through the instruction: format (“console”). The execution frequency was defined by the trigger in: Trigger.ProcessingTime (“2 seconds”).
The program does not actually start processing until the start () instruction is invoked. The following output shows the result of the processing with the execution of 3 batches:
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------------+---------+
|carrier|marital_status|num_users|
+-------+--------------+---------+
|VIVO |Single |21
|
|TIM |Single |22
|
|OI |Single |31
|
|NEXTEL |Single |23
|
|NEXTEL |Married |30
|
|CLARO |Single |19
|
|CLARO |Divorced |22
|
|TIM |Divorced |30
|
|VIVO |Divorced |34
|
|OI |Married |24
|
|VIVO |Married |27
|
|NEXTEL |Divorced |22
|
|CLARO |Married |33
|
|TIM |Married |29
|
|OI |Divorced |33
|
+-------+--------------+---------+
-------------------------------------------
Batch: 2
-------------------------------------------
+-------+--------------+---------+
|carrier|marital_status|num_users|
+-------+--------------+---------+
|VIVO |Single |28
|
|TIM |Single |29
|
|OI |Single |43
|
|NEXTEL |Single |28
|
|NEXTEL |Married |41
|
|CLARO |Single |26
|
|CLARO |Divorced |25
|
|TIM |Divorced |33
|
|VIVO |Divorced |44
|
|OI |Married |29
|
|VIVO |Married |33
|
|NEXTEL |Divorced |25
|
|CLARO |Married |41
|
|TIM |Married |33
|
|OI |Divorced |42
|
+-------+--------------+---------+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------+--------------+---------+
|carrier|marital_status|num_users|
+-------+--------------+---------+
|VIVO |Single |36
|
|TIM |Single |35
|
|OI |Single |50
|
|NEXTEL |Single |38
|
|NEXTEL |Married |50
|
|CLARO |Single |33
|
|CLARO |Divorced |32
|
|TIM |Divorced |38
|
|VIVO |Divorced |50
|
|OI |Married |34
|
|VIVO |Married |38
|
|NEXTEL |Divorced |32
|
|CLARO |Married |46
|
|TIM |Married |39
|
|OI |Divorced |49
|
+-------+--------------+---------+
For each batch executed, the data is consolidated and aggregated, the data in the column num_users is updated every iteration.
To simulate the previous output, run the program and at run time gradually move files into the directory mapped by the stream.
Gradually move files from the raw_data directory to stream_in to simulate the arrival of new files in the data stream and watch the processing happen:
The entire file structure shown above and the sample project can be found in this repository. You can use this project to run everything locally on your computer without the need for large infrastructure environments configured.
Conclusion
This article showed in depth the working of the Apache Spark stream processing API, its main characteristics, a brief comparison of a batch processing code and continuous processing.
As highlights, I highlight the ability to handle records that arrive late for processing (watermarking) and the fault tolerance of the stream mechanism (checkpoint).
I also showed a practical program for processing streams, commenting step by step and how to run it in your own development environment. In the next article in this series I show how we use the Spark Stream processor to process data in real time solving a problem in production at Wavy!
About the author
Eiti Kimura is a high performance IT Coordinator and Distributed Systems Architect at Wavy. Eiti has 17 years of experience in software development. Enthusiast of open-source technologies, Apache Cassandra’s MVP since 2014 and has extensive experience with back-end systems, in particular billing and messaging platforms for the main telephone operators in Brazil