Apache Spark and Amazon S3 — Gotchas and best practices

S3 is an object store and not a file system, hence the issues arising out of eventual consistency, non-atomic renames have to be handled in the application code. The directory server in a filesystem has been replaced by a hash algorithm of the filename. This is bad for listing things, directory operations, deleting and renaming(copying and deleting as technically there is no renaming in object stores)

Start using S3A (URI scheme: s3a://) — Hadoop 2.7+. S3a is the recommended S3 Client for Hadoop 2.7 and later S3a is more performant and supports larger files(upto 5TB) and has support for multipart upload. All objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.Most bug reports against S3N will be closed as WONTFIX

Making Spark 2.0.1 work with S3a For Spark 2.0.1 use hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar in your classpath; don’t forget to update spark-default.conf with the AWS keys and the S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Definitely use Dataframes as query reordering and predicate push down is available out of the box and hence less data is fetched eventually speeding up your queries

If you are reading the same data multiple times, try using the .cache or s3distcp to transfer the files to your local EMR cluster to benefit from the better file read performance of a real file system. The groupBy option of s3distcp is a great option to solve the small file problem by merging a large number of small files.

Which brings me the to the issue of reading a large number of small files. If merging the files using a tool is not an option try the following code which effectively works around the slow S3 directory listing bottleneck

import com.amazonaws.services.s3._, model._
import com.amazonaws.auth.BasicAWSCredentials

val request = new ListObjectsRequest()
request.setBucketName(bucket)
request.setPrefix(prefix)
request.setMaxKeys(pageLength)
def s3 = new AmazonS3Client(new BasicAWSCredentials(key, secret))

val objs = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
sc.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }

Ensure that spark.sql.parquet.filterPushdown option is true and spark.sql.parquet.mergeSchema is false (to avoid schema merges during writes which really slows down you write stage). Thankfully Spark 2.0 has the right default

Have you wondered why just at the time a job is about to complete, nothing is getting written to the logs and all spark operations seems to have stopped but the results are not yet in the output directory of S3 … what is going on ? Well every time the executors writes the result of the job, each of them write to a temporary directory outside the main directory where the files had to be written and once all the executors are done a rename is done to get atomic exclusivity. This is all fine in a standard filesystem like hdfs where renames are instantaneous but on an object store like S3, this is not conducive as renames on S3 are done at 6MB/s.

If possible write the output of the jobs to EMR hdfs (to leverage on the almost instantaneous renames and better file IO of local hdfs) and add a dstcp step to move the files to S3, to save yourself all the troubles of handling the innards of an object store trying to be a filesystem. Also writing to local hdfs will allow you to enable speculation to control runaway tasks without falling into the deadlock traps associated with DirectOutputCommiter.

If you must use S3 as the output directory ensure that the following Spark configurations are set

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Note: DirectParquetOutputCommitter is removed from Spark 2.0 due to the chance of data loss. Unfortunately until we have improved consistency from S3a we have to work with the workarounds. Things are improving with Hadoop 2.8

Avoid keynames in lexicographic order. One could use hashing/random prefixes or reverse date-time to get around.The trick is to name your keys hierarchically, putting the most common things you filter by on the left side of your key. And never have underscores in bucket names due to DNS issues.

Enabling fs.s3a.fast.upload upload parts of a single file to Amazon S3 in parallel

Well that was the brain dump of issues in production that I have been solving recently to make Spark work with S3. Stay tuned for more on this as I dig deeper in the next post …

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.