Crunching Parquet Files with Apache Flink

Nezih Yigitbasi
3 min readAug 15, 2015

--

Apache Flink is a fault-tolerant streaming dataflow engine that provides a generic distributed runtime with powerful programming abstractions. With those abstractions it supports both batch and stream computations, and also machine learning and graph algorithms, making it a powerful unified processing engine. I have been working with the Apache Parquet file format for quite some time and I thought that they are a great fit — a powerful optimizing engine with a storage and query efficient columnar file format. So I decided to give them a try.

I downloaded the “Restaurant Score” data from the open SF Data portal, which includes three datasets (businesses, violations, and inspections) in CSV format that have the scores that the Health Department has given to various businesses around the San Francisco area as a result of health inspections. The dataset also includes the health violations of these businesses with the level of the risk and other details such as the description of the violation. This is definitely an interesting dataset to look at before visiting another restaurant in San Francisco!

Since the dataset is in CSV format I have converted these datasets to the Parquet format using the Kite SDK before processing them with Flink. For that I have defined the corresponding Avro schemas first and generated model classes from these schemas using Avro tools. After I have these classes all I need is to parse the CSV files, populate instances of these classes from the values I read from the CSV files, and finally use the Kite SDK to write these instances to the destination file in the Parquet format. It’s very easy to tell the Kite SDK that I want Parquet output, I just need to set the format as PARQUET when I create the dataset descriptor — that’s it.

Convert CSV files to Parquet format [see the Github repo for the complete source code]

Since the instances of these Avro model classes are the entities that are stored in the Parquet file, they will be the elements that will be processed by Flink’s dataflow. To setup this dataflow I just had to define a DataSet for each of these files and setup the input formats that will read the Avro records from these files using the AvroParquetInputFormat.

Create the Flink DataSet’s.

After I have the data sources set up all I need is to use Flink’s operators to setup a dataflow and feed the dataflow with these data sources. For example, the below dataflow gets 20 of these restaurants that have “High Risk” health violations. For that, it joins the business dataset with the violations dataset on the “businessId” column and then keeps only the “High Risk” violations, projects the columns that we care about, and gets the first 20 items — great, except a little bit of verbosity due to using Java and type erasure, which makes Flink to require the explicit specification of the types in the map functions (although I could get rid of some of them with “returns()” ).

Find 20 restaurants with High Risk violations.

Overall it was a pretty smooth integration thanks to Flink’s support of the Hadoop API’s, which let me plug in the AvroParquetInputFormat in the data set definitions. And the rest is just the dataflow specification using Flink’s API. The source code and the datasets are in my Github repo and ready to run. Give it a try and have fun!

Disclaimer: Opinions expressed are solely my own and do not express the views or opinions of my employer.

--

--

Nezih Yigitbasi

Software Engineer @ Facebook - distributed systems guy, working on PrestoDB