Incremental Processing on Hadoop

cdapio
cdapio
Published in
5 min readApr 23, 2019

March 1, 2016

Ali Anwar is a software engineer at Cask, where he is building on the Cask Data Application Platform (CDAP). Prior to Cask, Ali attained his undergraduate degree in Computer Science from the University of California, Berkeley.

Incremental data processing is a common challenge that is tackled in data lakes across several organizations. For example, suppose a user has a continual stream of data arriving into their data lake and they want to perform some validity check on their data set to filter out invalid records. With data in motion, it is much more efficient to run this validity check on only the latest, unprocessed data instead of the entire data set. Some other common Hadoop use-cases that could benefit from this sort of incremental processing are building an OLAP cube, index building and the like. Since the data is arriving periodically in batches, MapReduce is a natural fit for processing such data-intensive applications.

To help manage this massive stream, CDAP introduced the concept of a PartitionedFileSet dataset in CDAP, which is a collection of partitions, each corresponding to a unique key and a path in the file system. Using partitions allows developers to focus on the processing logic instead of the storage layer i.e., how the input to the job will be mapped.

An example of the partition key could be the epoch timestamp for when the file arrived into the system. This timestamp can then be used as a high-watermark of processed partitions in order to implement incremental processing. However, one of the downsides to this is that some files might be much larger than others, so they may not be available for processing when the partition is created because it took longer to copy the files into the system.

Challenges in incremental processing

There are several additional challenges in processing partitions of data:

  • Ensuring that different runs of a scheduled job do not process any partitions more than once, and also that no partitions are missed.
  • Handling large backlogs of data can be problematic if the processing has not run on a periodic basis, possibly due to the system being down.
  • Handling or reprocessing data that was originally for a job that failed to complete successfully.
  • Robustly handling multiple, concurrent runs of a job processing the data.

Incremental processing in CDAP

To facilitate incremental data processing in CDAP, we have developed additional platform apis to allow processing and management of partitions easily. Using the PartitionConsumer APIs, developers can focus on the processing logic while the CDAP platform manages which partitions have already been processed. We have solved the previously mentioned challenges and complexities of incremental data processing so you can focus on what’s important to your application.

Fig. 1 PartitionConsumer APIs ensure the available partitions get used as input for the MapReduce job

For incremental data processing, CDAP users can leverage PartitionedFileSet. All the user needs to do is define some simple configurations such as the input PartitionedFileSet and where to persist the processing state, and CDAP handles the rest. It will automatically keep track of which partitions to use as the input to each job.

Control flow during a MapReduce job

The PartitionConsumer APIs keep track of a working set of Partitions, which include partitions available for processing as well as partitions currently being processed by any running job. At the beginning of a MapReduce job during the beforeSubmit phase, the available partitions are fetched transactionally and are marked as in-progress. The in-progress state is then persisted to a user-defined location so that the partitions are not used by another MapReduce job. When the MapReduce job completes successfully, the partitions are marked as processed and removed from the working set. If the MapReduce job fails, the in-progress partitions are marked as available again so that subsequent runs of the MapReduce job will process them.

Fig. 2 The partitions’ process state changes through the various stages of the MapReduce job

One of the challenges we encountered while implementing this was how to handle job failures after a MapReduce job has marked partitions as in-progress. If the job was unable to mark these partitions as available for processing and no other action is taken, these partitions would be marked as in-progress forever and never actually get processed. To resolve this issue, the PartitionConsumer implementation also marks each partition with a timestamp. If a job is takes longer than a configurable timeout, the partition is understood to have failed processing. This allows it to be reprocessed.

Another challenge we faced is when the system comes back to a healthy state and jobs are running again, there may be a large backlog of data to process. This is handled by allowing a configurable limit on the number of partitions to be processed by each run of the MapReduce job. This helps ensure the duration of each job is kept short.

By abstracting the complex implementation of incremental data processing from the developer, CDAP makes it easy to focus on the critical processing logic rather than the bookkeeping of what needs to be processed.

Incremental data-processing in action

Now we will showcase the usage of incremental data processing using an example data cleansing application. The DataCleansing application reads partitions of a rawRecords PartitionedFileSet dataset and writes them to either cleanRecords or invalidRecords, depending on if the input data matches a particular schema.

/**
* A simple MapReduce that reads records from the rawRecords PartitionedFileSet and writes all records
* that match a particular {@link Schema} to the cleanRecords PartitionedFileSet. It also keeps track of its state of
* which partitions it has processed, so that it only processes new partitions of data each time it runs.
*/
public class DataCleansingMapReduce extends AbstractMapReduce {

...

private PartitionBatchInput.BatchPartitionCommitter partitionCommitter;

...

@Override
public void beforeSubmit(MapReduceContext context) throws Exception {
// define where to persist the consuming state, and set the partitions as input to the MapReduce job
partitionCommitter =
PartitionBatchInput.setInput(context, DataCleansing.RAW_RECORDS,
new KVTableStatePersistor(DataCleansing.CONSUMING_STATE, "state.key"));

...

}

...

@Override
public void onFinish(boolean succeeded, MapReduceContext context) throws Exception {
// persist the final state, marking the partitions as consumed
partitionCommitter.onFinish(succeeded);
}

Conclusion

In this blogpost we have highlighted the challenges developers face when dealing with incremental data processing. CDAP helps solve these problems by creating an easy-to-use abstraction called PartitionedDataset and a set of APIs for consuming those partitions.

If you’re interested in working on this and other challenging problems with incredible engineers, join us! We are always looking for talented software engineers to work with here at Cask!

--

--

cdapio
cdapio
Editor for

A 100% open source framework for building data analytics applications.