Writing a map-reduce job to concatenate a millions of small documents
Running hadoop jobs on small files is usually discouraged but sometimes you have no choice. Sometimes you even have million of files you and you’d like to run some analysis on them. Usually, you deal with the small files issue by using distcp but your files have to be concatenable to begin with, like json or similar. In our case our files are raw documents (e.g. pdf, doc files), and we want to run analysis on each of them individually. It would be nice to process larger chunks of them. So let's turn millions of small files into hundreds of bigger files.
The job
First we need a map-reduce job. We basically need to do what distcp, which is concatenate files, does but in a way that we will be able to refer to individual documents. Here is something simple:
Here I’m using mrjob since it makes it easy to write one. You'll also notice that I'm using the CombineTextInputFormat which groups smaller files into bigger ones. I'm using it along with mapreduce.input.fileinputformat.split.maxsize=800000000 which tells hadoop to group the small files into ones of appro
ximately chunks of 800 MB. With this simple job and some big machines, we can process about 0.5 terabyte of small files in about 3 hours
Production tips for huge loads
For cases where you have 500+ GB of small files, here are some useful tips:
- you will want to give the driver more memory, via the
HADOOP_HEAPSIZEenvironment variable. It can often takes a good 10 minutes for the job to set up its input list in cases like this. - The number of
maptasks depends on the input split of your input so make sure that your input split is big enough (controlled in this case bymapreduce.input.fileinputformat.split.maxsize) so that you don't have to spawn thousands of mappers. This is especially important in the cases where you have tens of millions of small files. - In cases like this it’s probably not necessary to set the number of
reducetasks. In this case, this number will be guaranteed to be small enough. Small number of reduce tasks lead to bigger chunks of data being written, which is what we want anyway. - Expect to tweak the amount of memory assigned to map and reduce tasks
- Our output is ultimately json-compressed using
gzipbut other formats work well also, like parquet, compressed with Snappy