Advanced use cases working with file-based sources and sinks in CDAP”?

cdapio
cdapio
Published in
5 min readApr 20, 2019

August 24, 2015

Yaojie Feng interned at Cask for the summer of 2015, and is a Masters student at University of Illinois at Urbana-Champaign, majoring in Computer Science.

This is my story about interesting summer that I had and how quickly it flew by! It was a fun-filled experience that helped me extend the frontiers of my knowledge and experience with amazing and fun-loving people.

This blog details a few of the exciting projects I worked on as an intern at Cask:

  1. An ETL Batch Sink for CDAP that allows one to store data in time-partitioned datasets in Parquet format; and
  2. An HBase Snapshot Dataset Sink that allows one to seamlessly replicate data from a database into HBase including changes such as insertions, updates, and deletions.

ETL Batch Sink for TimePartitionedFileSet Parquet

For the first project, I worked on building a batch sink for a TimePartitionedFileSet that writes data in Parquet format. I spent about three weeks finishing my work. During this project, I got a basic idea about what CDAP is and how the Extract, Transform, and Load (ETL) pipeline in CDAP works. I also learned about the different kinds of data format frameworks in Hadoop, such as Avro and Parquet.

An ETL pipeline consists of a source, which is the first step, transforms (an optional middle step) and a sink, which is the last step. A batch sink transforms a single input object into a key-value pair that the batch run will output. My job was to make the sink transform the data to Parquet format. The sink accepts the data schema as a JSON string. The biggest challenge was to find the appropriate input and output format for the dataset since there is no easy way to convert a JSON string to the Parquet schema. I did research on this and found that Avro supports direct parsing from a JSON string to the Avro schema. What’s more, it is easy to convert from Avro to Parquet format in a MapReduce job. Therefore, we use AvroParquetInputFormat and AvroParquetOutputFormat to first transform the data to an Avro record and then write the Avro record to a Parquet file. By doing this, a Parquet file can be generated after each batch run and we can explore the results using Hive. Now, this new feature is available in CDAP as of version 3.1.0, which was my first contribution to Cask!

HBase (Table) Snapshot Dataset and Batch Sink

My second project at Cask involved building a Snapshot Dataset and Snapshot Batch Sink. The motivation behind this was to make the sink reflect corresponding changes — such as insertions, updates, and deletions — in the source. I needed to come up with the algorithm and write the design document for this project. Initially, I did not anticipate it was such a complicated problem, but I spent almost five weeks finalizing the design. I had six designs and there were limitations in each design; for example, bad performance due to old data deletions in each adapter run, Tephra changes, etc.. During the design process, I learned a lot about HBase and transactions in CDAP.

After considering the trade-offs among the different designs, we finalized on the design that was for batch applications. Like other datasets, we needed a SnapshotDataset class, which provides the implementation of data operations that can be performed on this dataset instance; and we needed a SnapshotDefinition class, which defines how to configure the dataset. The SnapshotDataset has two underlying tables: a metadata table, which stores the current version of the snapshot; and a main table, which stores the data from the adapter run. In each adapter run, the data written to the main table is assigned the transaction ID of the MapReduce job, and the metadata table is updated with the transaction ID after the MapReduce job to indicate the current version of the snapshot. When reading from the dataset, we get the dataset through the SnapshotDefinition. When getting the dataset, we start a transaction to get the current version from the metadata table and then pass the current version to the main table to filter the results.

It took about two weeks for me to implement this algorithm. To overcome some limitations in the CDAP dataset framework, we had to pass all the parameters needed to create the DatasetDefinition to the SnapshotDataset. Then in the SnapshotDataset, when we start the transaction, we first read the metadata table and get the current version of the snapshot and instantiate the main table with the snapshot version as runtime argument. Using this algorithm, we can see corresponding updates in the Snapshot sink.

Example of a Snapshot dataset

Suppose we update the dataset with this data at time = 10; the two tables will be:

Main data table

rowKey value timestamp
— — — — — — — — — — — —
x 1 10
y 2 10
z 3 10

Main data table

rowKey value timestamp
— — — — — — — — — — — —
x 10 10

The version is 10 so when query happens at this time, all data with timestamp 10 will be returned.

Next, suppose we update the dataset at time = 20, and only x and z are updated, and y is deleted; now, the two tables will be:

Main data table

rowKey value timestamp value timestamp
— — — — — — — — — — — — — — — — — — — —
x 1 10 1 20
y 2 10
z 3 10 4 20

Main data table

rowKey value timestamp value timestamp
— — — — — — — — — — — — — — — — — — — —
version 10 10 1 20

The latest version in the metadata table is now 20, so all data with timestamp 20 will be returned.

Conclusion

It was an amazing experience for me personally, where I learned about CDAP (Cask Data Application Platform), Hadoop, and its ecosystem of components. I plan to pay attention to CDAP updates and try to make contributions to it. If you too would like to learn, do consider Cask as your future career.

--

--

cdapio
cdapio
Editor for

A 100% open source framework for building data analytics applications.