At Nielsen Identity Engine, we use Spark to process 10’s of TBs of raw data from Kafka and AWS S3.
Currently, all our Spark applications run on top of AWS EMR, and we launch 1000’s of nodes per day.
For a more detailed overview of how we use Spark, check out our Spark+AI Summit 2019 Europe session.
In part 1 of this blog series, we discussed what is Spark Partitioning, what is Dynamic Partition Inserts and how we leveraged it to build a Spark application that is both idempotent and efficiently utilizes the cluster resources.
In this part, we are going to deep dive into how Dynamic Partition Inserts works, why in some cases it can make your application run 2X slower, and how you can mitigate that.
Just as a reminder to part1, here’s a code snippet showcasing Dynamic Partition Inserts:
// NOTE: Setting the following is required, since the default is “static”
.set(“spark.sql.sources.partitionOverwriteMode”, “dynamic”)val bigDataFrame = spark.read.parquet(“s3://data-lake/date=2020–02–29”)
val transformedDataFrame = bigDataFrame.map(…).filter(…)transformedDataFrame.write
How does Dynamic Partition Inserts work behind the scenes?
To understand what happens behind the scenes, we can look at this comment in Spark GitHub repo:
- During the job execution, each task writes its output under a staging directory with a partition path, e.g. /path/to/staging/a=1/b=1/xxx.parquet.
Note that each partition path can contain 1 or more files. For instance, for the aforementioned application, we have hundreds of Parquet files in each partition path.
2. When committing the job (see here):
(a) We first clean up the corresponding partition directories at destination path, e.g. /path/to/destination/a=1/b=1;
(b) And then move (copy) files from staging directory to the corresponding partition directories under destination path
(c) Finally, we delete the staging directory
These operations (clean up, move, etc.) are filesystem operations, and each filesystem that spark supports (e.g local filesystem, HDFS, S3, etc.) has its own implementation of the operations, packaged in a filesystem connector.
For example, renaming a directory is an atomic and cheap action within a local filesystem or HDFS, whereas the same operation within object stores (like S3) is usually expensive and involves copying the entire data to the destination path and deleting it from the source path.
Specifically, step 2(b) above invokes
fs.rename() for each partition in the staging directory, which essentially invokes the
rename method of the filesystem connector in use.
So, if the HDFS connector is in use, the
rename operation will be atomic and cheap, whereas if an S3 connector is in use, the rename operation will be expensive (as explained above).
OK, so… Why is my Spark application 2X slower on Kubernetes vs EMR?
At the end of part 1, we mentioned that a job that used to take ~50 minutes to execute on EMR, took 90 minutes on Kubernetes — almost 2X the time.
The only (real) difference was the fact we used S3:// URI (e.g s3://spark-output) when running on EMR, and S3A:// URI (e.g s3a://spark-output) when running on Kubernetes (more on S3A in the next section).
As we dug deeper into the performance of that job, we realized the job was still in RUNNING state for about 40 minutes after all the stages were completed.
We then turned to S3, so see if the output reached its destination, and saw that the staging directory (e.g s3a://spark-output/.spark-staging-d95c63ee-d6dc-497c-9954-d274b0d1bca7/) exists during the entire time between stages completion and job completion.
As we examined this directory’s size and number of files, we saw both metrics are slowly decreasing until finally all files were moved to the destination directory.
On EMR, this phase took just a couple of minutes.
That made us suspect that the root cause is related to the specific filesystem implementation (i.e EMR S3 vs S3A or something of that sort).
Not all S3 connectors are created equal
As mentioned above, S3 is not really a filesystem, but rather an object store. Per Spark documentation:
Spark can read and write data in object stores through filesystem connectors implemented in Hadoop [e.g S3A] or provided by the infrastructure suppliers themselves [e.g EMRFS by AWS]. These connectors make the object stores look almost like file systems, with directories and files and the classic operations on them such as list, delete and rename.
When running Spark on an EMR cluster and using S3:// URI, the underlying implementation will default to AWS proprietary S3 connector named EMRFS.
Alternatively, if we are not running on EMR (e.g on-prem YARN cluster, Kubernetes or even Spark in local mode) and still need to access data on S3, we should use S3A:// URI (as s3:// and s3n:// URI’s will default to legacy connectors that have been removed from Hadoop).
For S3A:// URI, the underlying implementation will default to Hadoop’s open-source S3 connector named S3A.
So what do we mean by saying “not all S3 connectors are created equal”?
Every filesystem connector implements the filesystem operations (rename, copy, delete, read) in a different way (which may, or may not, be optimal).
So even if your application interacts with the same object store (S3 in this case), the actual implementation of the filesystem operations can be different (depending on which filesystem connector is being used, e.g EMRFS or S3A):
- If you look at the relevant S3A code (from the latest Hadoop release, which is currently 3.2.1), you’ll see that renaming a directory is done sequentially, i.e iterating the files within that directory, copying each file to the destination directory and periodically deleting the copied files (i.e issuing a delete operation for every 1000 files).
If, for example, we have a directory with 1000 files, and it takes 1 second to copy each file, it’ll take ~1000 seconds for the rename to complete.
Considering the fact that in our use-case, we have hundreds of files per directory (i.e partition path), you can understand why it took so long for our job to finish renaming all the partitions (after the stages were completed).
- EMRFS, on the other hand, renames a directory in a multi-threaded way, so files within that directory are handled in parallel (we concluded this based on our observations via stack traces, etc., since the actual code is not open-sourced).
Thus, the rename phase at the end of our job is much shorter (as opposed to the Hadoop 3.2.1 implementation we showed above).
What can I do to mitigate this issue?
During the investigation of this issue, we noticed some progress has been made in Hadoop’s trunk, and S3A rename will become a parallel operation in the upcoming Hadoop 3.3 release (expected this month), as part of HADOOP-13600 and HADOOP-15183 (see here).
However, since it was a serious issue for us, and we didn’t want to wait until the release, we’ve compiled Hadoop 3.3 from trunk on our own, and used that to run the aforementioned Spark job on Kubernetes.
- We built a Spark Docker image that contained:
(a) Spark 2.4.4 without Hadoop
(b) Our own compiled version of Hadoop 3.3-SNAPSHOT
- To let Spark use our Hadoop distribution, we added this environment variable to the Dockerfile:
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
This significantly shortened the rename phase when running the job with S3A:// URI on Kubernetes, and now it only takes a few minutes (similar to what we observed on EMR using EMRFS), so — problem solved!
In this post, we’ve described how Dynamic Partition Inserts works, the differences between EMRFS and S3A filesystem connectors, why in some cases those differences can make your application run much slower, and how you can mitigate that.
One thing worth noting is, that the parallel rename mentioned above only parallelize operations within each partition (i.e renaming files in the same directory in parallel) and not between partitions (i.e it does not rename multiple directories in parallel).
So, if you have many partitions and many files per partition (like us), this can help shorten your job’s execution time.
However, if you have many partitions but only 1 file per partition, this won’t help you (regardless of which filesystem connector you’re using — EMRFS or S3A).
In part 3, our plan is to discuss the S3A committers added in Hadoop 3.1, and how they can improve working with S3.
[March 22nd, 2020] update: note that since
spark.sql.sources.partitionOverwriteMode is set to
dynamic (i.e we use Dynamic Partition Inserts), EMRFS S3-Optimized Committer can’t be used (thanks, obogobo, for highlighting this point).