DATA PROCESSING IN “REAL TIME” WITH APACHE SPARK: PART 3
This post is part of a series of articles on the Apache Spark use case for real-time data processing, check out part 1 and part 2. Written by — Eiti Kimura
We have reached the final step of this journey, we previously saw an introduction and a detailed description of the streams processing API, Structured Streaming.
Use Case: A “real-time” data grinder
At Sinch we work with large volumes of transactions every day, monthly the company sends more than 1 billion messages to our customers, by messages, understand as being communication that impacts customers such as: SMS and Whatsapp, for example.
In addition to offering a new communication experience to our customers, we have a business line with cell phone operators, which offer product development services and distribution of media and digital content, bringing new subscribers to the operators’ platforms.
In this context, the subscriber and billing management platform processes the billing of users for their services at the operators and manages the life cycle of product subscriptions, frequency of billing, new subscriptions, renewal and so on.
These more than 110 million transactions that we have daily, reflect the health of the platforms and the business, show the flow of new users and the cancellation of products. This information is the basis for taking very important business and technical actions.
In our initial solution, prior to the one I will cover here, the entire process for consolidating this mass of data, processing to extract information and make it available on our dashboards ranged from 30 to 60 minutes, in the worst case more than 90 minutes. In this approach, the data generated in our applications, distributed on several servers, were persisted in a relational database, then the information was transferred to an analytics database in the cloud, with the ability to process ad-hoc queries on the data volume. mentioned.
We needed to process this data more quickly, as events were happening, consolidate and show this information more quickly, giving more visibility to the business, television media campaigns need to be tracked in “real time”, for example.
The initial solution started not to serve us with the speed we needed, so we started to work with other strategies and tools. In our case, we chose to work with the Apache Spark stream processor, Structured Streaming. In part 2 of this series of articles I explain how technology works in detail, now I’m going to show you how to apply this problem in practice.
How about processing data in “real time”?
To achieve the goal of processing data in “real time”, we had to implement a series of changes also on the platforms. The first change was to stop centralizing the data in the relational database and assume the strategy of dividing to conquer, instead of centralizing. On the systems side, the persistence of transactions would no longer be done in the database, but rather generating small files on local machines, with that, we even gained more performance in applications. From there, the files were transferred to our HDFS distributed file system. These files are copied from local machines to HDFS via transfer scripts maintained by our internal teams. The diagram in Figure 1 illustrates the generation of the files:
The continuous processing job runs on the Apache Spark cluster connected to HDFS to access the files to be processed. HDFS directories have been mapped as a data stream, new files arrive continuously every minute.
The consolidation of information performed by Structured Streaming is persisted in a relational bank to feed our dashboards. The main difference here is that the data is persisted pre-aggregated in the database and processed continuously. The diagram in Figure 2 shows the schema with Apache Spark as part of the solution.
After this presentation of the solution from an architectural point of view, I will focus more on coding, but before going into code, I will show a little about the structure of the files generated by the applications. All files are written in CSV format and compressed with Gzip. Here is a sample of the file:
1
2
3
838,2,5500000000,100015,”{“”authCode””:””3215"”,””transactionIdAuth””:””1011706220428374938"”}”,SUBSCRIPTION_050,0.5,11,0,1,14,Subscription renew.,2017–07–18 13:22:59.518,,,19,PRE,false,Charge Fail. CTN[31984771092][PRE][GSM]: Without Credit.,,,,0,458,,engine2dc2,23,3,5,2017–07–18 13:22:59.544,,FE1952B0–571D-11E7–8A17-CA2EE9B22EAB,NT0359
…
A data line is made up of all information regarding the transaction that was carried out, billing price, product identifier, customer, operator responses and so on.
Following the steps exemplified in part 2 of this series, we need to define an input for reading information from the stream, for that it is necessary to connect to the Apache Spark cluster and configure the readStream parameters.
Note that all the code snippets presented are written in Scala language.
1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf().setAppName(“Structured Streaming”)
val spark = SparkSession.builder()
.config(conf).getOrCreate()
val streamReader = spark.readStream
.format(“csv”)
.option(“header”, false)
.option(“mode”, “DROPMALFORMED”)
.schema(ReadSchemas.csvTransactionSchema)
.load(“hdfs://YOUR_PATH/20*/*/*/*.gz”)
To read the file in CSV format, in the case of readStream, it is necessary to inform the schema, that is, the data types of each of the fields in the file, as well as the following code snippet:
1
2
3
4
5
6
7
8
9
10
11
12
// the csv data schema
def csvTransactionLogSchema = StructType {
StructType(Array(
StructField(“id”, StringType, true),
StructField(“application_id”, IntegerType, true),
StructField(“carrier_id”, IntegerType, true),
StructField(“phone”, StringType, true),
StructField(“price”, DoubleType, true),
StructField(“origin_id”, IntegerType, true),
. . .
))
}
The next step is to define the information processing, the transformations and aggregations that will be applied continuously on the data.
In this first processing version, all transformations were made using the Dataframe API methods:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
al query = streamReader
.withColumn(“date”, $”creation_date”.cast(“date”))
.withColumn(“successful_charges”, when($”transaction_status_id” === 2, 1))
.withColumn(“no_credit”, when($”transaction_status_id” === 0, 1).otherwise(0))
.withColumn(“error”, when($”transaction_status_id” === 3).otherwise(0))
.filter(“carrier_id IN (1,2,4,5)”)
.filter(“transaction_status_id NOT IN (5, 6)”)
.filter(“transaction_action_id IN (0, 1)”)
.withWatermark(“creation_date”, “3 hour”)
.groupBy($”carrier_id”, window($”creation_date”, “5 minutes”).as(“window”))
.agg($”carrier_id”,
avg($”response_time”).as(“avg_response_time”),
sum($”successful_charges”).as(“successful_charges”),
sum($”no_credit”).as(“no_credit”),
sum($”error”).as(“error”),
count($”carrier_id”).as(“total_attempts”))
The content of this code snippet is dense, it is worth explaining some parts in more detail. I start with some transformations using the <pre> withColumn </pre> command creating new columns that will be aggregated below, then we have some filtering, through the <pre> filter </pre> statement. In this part, some operators and transaction status referring to user charges are filtered.
It is worth mentioning that the watermark was configured for the processing of disordered events in the stream, this feature allows handling events that arrive late and out of order for processing, in this case I am using the <pre> creation_date </pre> record creation field as reference and saying that I accept a processing window of up to 3 hours, in case the records arrive late. More details on how the watermark mechanism works in the official documentation.
Following the flow of the previous code, we reach the final part, in which some aggregations are made, consolidating the numbers and counts of the records.
After a while working with the Dataframes functional API, it is a little more intuitive to read and understand the code. However, the operations described are very similar to a SQL query. So, why not simplify the understanding of data transformation using the SQL language itself? Apache Spark supports the use of the SQL API in conjunction with processing streams.
The SQL language is often the standard for data manipulation for both development teams and teams of database administrators. We made the decision to write all the data transformation logic in SQL as a way to involve more people in the project, both other development analysts and data, using a common knowledge language, our good old SQL.
All data manipulation and transformation logic was rewritten using SQL for continuous processing of data contained in the Dataframe. For that, we first have to register the Dataframe as a temporary view, in this case called <pre> transaction_temp_table </pre>:
1
2
3
treamReader
.withWatermark(“creation_date”, “3 hour”)
.createOrReplaceTempView(“transaction_temp_table”)
This is version 2 of the program shown previously, note that the transformations on the data are the same, that is, the final output of the information processing is the same for both versions of the programs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val query : DataFrame = spark.sql(
“””
SELECT carrier_id, TO_DATE(creation_date) as record_date,
HOUR(creation_date) as hour_of_day,
WINDOW(creation_date, “5 minutes”).start as start_date,
AVG(response_time) as avg_response_time ,
SUM(CASE
WHEN transaction_status_id = 2 THEN 1
ELSE 0
END) as successful_charges,
SUM(CASE
WHEN transaction_status_id = 0 THEN 1
ELSE 0
END) as no_credit,
count(carrier_id) as total_attempts
FROM transaction_temp_table
WHERE carrier_id IN (1,2,4,5)
AND transaction_action_id IN (0, 1)
AND transaction_status_id NOT IN (5, 6)
GROUP BY carrier_id, TO_DATE(creation_date),
HOUR(creation_date),
WINDOW(creation_date, “5 minutes”).start
“””)
For debugging purposes, a writeStream was configured to print only the result of the information processing. This is the result of the consolidated information within a 5 minute window, the aggregated numbers of transactions with the operators:
1
2
3
4
5
6
7
8
+ — + — — — — — -+ — — -+ — — — + — — — + — — — — — — + — — — — + — — — — -+ — — -+ — — — -+
|id|record_date|hour |start |end |avg_resp_tm |success |no_credit|error|tot_att|
+ — + — — — — — -+ — — -+ — — — + — — — + — — — — — — + — — — -+ — — — — -+ — — -+ — — — -+
|1 |2017–07–18 |13 |13:20 |13:25 |618.8061297 | 4 |2607 |195 |2806 |
|2 |2017–07–18 |13 |13:20 |13:25 |1456.424283 | 13 |10912 |1503 |12428 |
|5 |2017–07–18 |13 |13:20 |13:25 |1161.730896 | 9 |2796 |532 |3337 |
|4 |2017–07–18 |13 |13:20 |13:25 |2950.642105 | 4 |1364 |54 |1425 |
+ — + — — — — — -+ — — -+ — — — + — — — + — — — — — — + — — — — + — — — — -+ — — -+ — — — -
Now that we are processing the data, we need to decide what to do with this output. Structured Streaming offers some integrations to direct the processing result in the pipeline, they are: distributed file systems, streaming server, such as Kafka, console for debugging and an extension called foreach, in which it is possible to implement a customized integration.
In the use case presented, the result of the aggregation is directed to a relational database, which was implemented using foreach type output. Here is the output code:
1 val jdbcWriter = new JDBCSink(resource, username, password)
2 val foreachStream = query
3 .select($”carrier_id”, $”date”, $”hour_of_day”, $”start_date”, $”end_date”)
4 .writeStream
5 .foreach(jdbcWriter)
6 .outputMode(OutputMode.Update())
7 .trigger(Trigger.ProcessingTime(“2 minute”))
8 .option(“checkpointLocation”, “hdfs://YOUR_PATH/checkpoint-complete/”)
9 .start
10 foreachStream.awaitTermination()
Some highlights are the settings: checkpoint where the directory informed must be in a DFS so that all other Workers know where the processing is; the frequency that the processing will be performed, in the case defined by the 2-minute trigger; the Update type output mode, in which only the records that have undergone changes are directed to the output; finally, the type of foreach output, in which case an extension was implemented to record the data in the jdbcWriter relational database.
Calling the <pre> awaitTermination () </pre> method on a WriteStream will cause the processing to start and the application to start running continuously!
We arrived here at the end of the implementation of our use case, implementing a “real-time” stream processor. Now comes the deployment phase, which consists of generating an artifact and submitting the job to the Apache Spark cluster, which is outside the scope of this article.
Results obtained
As results obtained, we have data being pre-aggregated, that is, there is also an impact on the volume of information stored in addition to the speed of processing and executing queries:
Approximately 30 times faster is the time the transactions took place until they appear consolidated in our dashboards for consultation. It is a very expressive result!
Final considerations and lessons learned
To make the evolution of the reading schema whether CSV or JSON, for example, it is not trivial, a new version of the application must be generated and redeployed. The good news is that adding new fields to the schemas will not generate errors or problems in the applications. However, removing fields that already exist in the schema will cause errors when reading data, when mapping files to the Dataframes structure.
To better work with application resilience, Spark provides some additional settings when reading data, when defining your ReadStream
1
2
val input = spark.readStream
.option(“mode”, “DROPMALFORMED”)
When reading files, if one of the records is malformed, Spark usually interrupts the stream’s processing with an error. To make the application more robust, we can inform at the time of reading that in case of records with problems, they will simply be ignored and the next records will be processed correctly.
There are some pros and cons of working with file processing as a stream in Spark’s default configuration, if a file being processed is corrupted, the program will stop with an I / O error. To prevent this from happening, simply configure it so that corrupted files are simply ignored as follows:
1
2
spark.sqlContext
.setConf(“spark.sql.files.ignoreCorruptFiles”,”true”)
Another important point is to understand if the cluster has enough capacity to process the information at the specified frequency (Trigger). The best way to find out if the cluster needs to be scaled is by following Spark’s processing logs. Here is an example:
WARN ProcessingTimeExecutor:66
- Current batch is falling behind. The trigger interval is 1000
milliseconds, but spent 19455
milliseconds
This is a log advising that the processing should have occurred with an interval of 1 second, however Spark is taking 19 seconds to process, when observing the resources of the Workers during the processing it was possible to notice that they were completely saturated
This is a clear sign that more resources need to be added to the cluster to increase its processing capacity. Another alternative would be to decrease the processing frequency, as shown in the log previously, there is no way to have information processing in less than 19 seconds, so this time could be defined as the Trigger, but this will depend on the requirements of the application.
A question I am often asked and worth discussing: Why didn’t you use Apache Kafka as a stream server? It is really a very pertinent question.
The answer is quite simple, in the architecture of our platforms we did not have Kafka as part of the solution. This would require changes in all applications that, instead of simply registering the data in a file, would have to start producing data in Kafka, just as it would be necessary to raise a whole new infrastructure to support this cluster. For the moment of the solution the choice that demanded less changes in the applications and also the lowest cost was to use Apache Spark’s ability to map DFS directories as a data stream, so we continue using the files.
We reached the end of this series of articles on data processing in “real time” with Apache Spark, it was a very deep series from the fundamentals of processing streams to an application solving real world problems in production environments.
About the author
Eiti Kimura is a high performance IT Coordinator and Distributed Systems Architect at Wavy. Eiti has 17 years of experience in software development. He is an enthusiast of open-source technologies, MVP of Apache Cassandra since 2014 and has extensive experience with back-end systems, in particular charging and messaging platforms for the main telephone operators in Brazil.