Exactly Once Mechanism in Apache Spark

Structured Streaming state mechanism.

Iqbal Singh
6 min readMar 11, 2020

Data Delivery Guarantee

All the next-generation data processing or streaming frameworks as Kafka, Spark, Apache Flink etc. come with different levels of data guarantee as

  • Exactly Once (each record will be delivered once)
  • At Most Once
  • At Least Once

However, everything has a price. Exactly once scenarios are most expensive as the job needs to make sure all the data is processed exactly once, with no duplicate or missing records.

Spark Structured Streaming

Before we dive in, If you don't know how to run a structured streaming application, you can read it here.

https://medium.com/@Iqbalkhattra85/running-a-spark-structured-streaming-job-4d69422a0d2f?source=friends_link&sk=fd64fe14622ad4a197098e55243a6a5c

State Mechanism

Spark spark.sql.streaming.checkpointLocation parameter adds an HDFS directory to spark session object and spark uses this directory to store checkpointing information for the structured streaming queries.

In our example query, we are processing a stateless transformation for converting a CSV file to a Parquet file, This is like a basic pre-staging data cleansing job with data scrubbing and data structuring. We are not performing any aggregations or watermarking.

We created a Structured Streaming parserQuery that reads all the files from the input dir and generates a Parquet output dataset based on the user-defined configuration of the micro-batch size and interval.

(Try to run the job for better understanding)

Code Link: here

Overview

Source Data Checkpointing

Source data checkpointing is a state mechanism needed to make sure that the job is,

  • Processing all the input data files.
  • Processing all the input files only once to avoid any data duplicates.
  • Able to recover from failed runs by itself.

For the source data checkpointing, Our structured streaming query will create a new directory under the checkpointing location with queryName and this directory will contain three directories with specific checkpointing information about each micro-batch and one metadata file with query details.

The job will create one file per streaming micro-batch, with the batch number (same as Spark UI) under commits, offsets and sources directory.

Job Output Dir: /tmp/checkpoint_streaming_poc

Query Base Dir: /tmp/checkpoint_streaming_poc/Parsing_Query

The first line of each file will contain a value v1 . This is the data source version spark is using for reading input. I have used spark2.2 that uses data source version v1. Data Source version v2 is available in new spark versions.

  1. commits: Each commit files contain the commit information for the batch metrics.
  2. offset: Each offset file will contain the offset information for a particular batch. It will contain the batch watermark, batch timestamp and spark configurations.

3. sources: Each source file will contain the information about every input file processed in the particular micro-batch with details of fileName, timestamp and the batch for the processing.

4. metadata: The metadata file contains the metadata information about our query as a query ID. This is the only file that covers the information of the structured streaming query.

{"id":"caccdf75-85e4-4f68-9841-9a842efe5c9b"}

Target Data Commits

Target Data Commits are important for the job In the case of a Restart, Job needs to make sure what was the last successful completed micro-batch of data before starting the new batch.

For output data commit, the streaming job creates a new _spark_metadata directory under the output dir for committing the output micro-batches. The job will create one file per micro-batch under this output commit directory.

Output Dir for the structured streaming job contains the output data and a spark internal _spark_metadata directory. Spark uses this dir internally for reading data for downstream jobs (Explained later in this article).

hadoop fs -ls /tmp/streaming_query_poc/_spark_metadata

Every micro-batch output commit file contains information about batch output metrics with the information for every output file path, file name, file size, isDir flag, modification time for the file, block size for the file and the action.

cat /tmp/streaming_query_poc/_spark_metadata/2

.Compact File

if we look closely at both the input and output checkpointing and commit saving directories respectively, we can see that every 10th file (9,19,29,39…) has an extension of “.compact”. A compact file aggregates the last 9 files and compacts all the data to a single file, this is to avoid the creation of too many small files as a structured streaming job can run for years and can process millions of batches. By Default spark only keep last 100 files in the checkpointing dir’s (10 .compact commit files one per 10 commit files)

We can configure by using below params

"spark.sql.streaming.minBatchesToRetain" //default 100"spark.sql.streaming.fileSink.log.compactInterval" //default 10"spark.sql.streaming.fileSource.log.compactInterval" //default 10

As per design a Compact file under the source checkpointing dir will have a list of all the files processed by the Spark structured streaming job till today and the compact file under the output commit metadata will contain the list of files produced by the job till today.

Working

Spark MicroBatch Execution Class handles the processing for the checkpointing for the source data. On a very high level, we can add the steps as below.

  • The job will check the last completed batch number will do a +1 and create a new batch number for the job.
  • It will list the files in the source directory ordered by the HDFS file timestamp.
  • Check the processed files from the source checkpointing dir and filter only unprocessed files.
  • Cut a batch defined by the max file batch threshold from the unprocessed file list.
  • Create a file with <batchNumber> as a name on the source side under all three above mentioned directories and will start processing the batch.
  • Once the batch completes successfully, A batch commit file will be created with all the output file details under the _spark_metadata directory.

Handling Failure

In case of a failure,

  • if the source checkpointing and output commit (_spark_metadata) dir have the same latest batch id, the job will start with the above-mentioned steps.
  • If the job created a batch on the source side and failed while processing the batch then the job will use the already created source batch details from the source batch checkpointing file and will process the batch.
  • Streaming jobs are very sensitive towards the state dir’s if there is any mismatch between the dir’s the job will keep on failing.

Downstream Data Reads

Most Important is the mechanism to make sure downstream jobs are getting exactly one copy of data. To achieve this, a structured streaming job will only commit the whole batch once all the input files are processed. If a batch fails after processing partial data, the Job Output directory will have some output files for the failed batch but there will be no commit file for the batch under spark_metadata directory.

  • While-Reading Data, Spark checks the input data location of a job and enforces the spark job to read from spark_metadata . if it is available under the input location.
  • By doing this, jobs reading from the structured streaming output directory will only generate an input file list from _spark_metadata and will never read any partial batches.

It gives us a Data Guarantee of Exactly Once for downstream jobs.

This mechanism is expensive, it adds two more stages for listing and defining a micro-batch for processing and this is a big pain for large systems consuming thousands of files. I will list all the issues and workarounds for them in my next post.
Feel free to review and let me know if any changes are required.

Thanks !!

--

--