Apache Spark Shuffle Service — there are more than one options!!
The purpose of this blog is to provide a list of Shuffle Service Implementations in Apache Spark and motivation for their design choices. This blog will also cover the high level understanding of Spark Shuffle service.
Credit Note: All Credit goes to actual blogs and you tube videos published already for each Shuffle service implementation and the relevant links have been added in each section of this story.
One of the key operation in distributed in-memory programming model adopted by Apache Spark is Shuffling. Although considered as one of the costly operation and something which need to be avoided at all costs in order to improve the spark application performance but it is still required and key operation. It serve as means of block data exchange between different executors.
With the evolution of usage of Apache Spark from multi-tenant Single Spark cluster running multiple applications, Managed cluster offerings providing Spark runtimes, running Spark on Kubernetes to Serverless Spark the usage and importance of Shuffle service has grown.
Shuffle service is a proxy through which Spark executors fetch the blocks. Thus, its lifecycle is independent on the lifecycle of executor. Apache Spark provide extendible framework to provide different implementation of Shuffle service. Shuffle service can run on a Spark worker node or even outside Spark Worker. Spark Executor can register to it. During the registration process the executor informs the service about the place on disk where it store the shuffle files. Shuffle Service watch the Shuffle files and return the shuffle files when requested by other executors.
This blog explains the internal working of Shuffle service is detail.
List of Shuffle Service implementations
- InBuilt Shuffle service — ie NO External Shuffle service. In this case Spark Executor is doing this task itself. Although with this options Spark can function fine but you will start to see the impact of this option when an executor is stopped, it automatically removes generated files. But when the service is enabled, the files aren’t cleaned after the executor’s shut down. Thus when you are not using shuffle service there is possibility of data loss when executor is killed, which can result in degradation of application performance. Another possible sideffect of this approach is surfaced when executors are busy running tasks but other executor are requesting for shuffle data and if executor is not able to respond to block request in configure time application see block fetch failure errors which degrade the application performance. User can try to overcome this issue by change the spark configurations `spark.shuffle.io.maxRetries` , `spark.shuffle.io.serverThreads`, `spark.shuffle.io.clientThreads`, `spark.shuffle.io.threads` , `spark.rpc.io.serverThreads`, `spark.rpc.io.clientThreads` and `spark.rpc.io.threads`.
- Standalone Shuffle Service: Executors communicate with external shuffle service using RPC protocol. They typically send messages of 2 types: RegisterExecutor and OpenBlocks. RegisterExecutor is used when the executor wants to register within its local shuffle server. OpenBlocks message is used during blocks fetch process. Both actions are operated by BlockManager via its shuffleClient. Regarding the external shuffle service enabled configuration, the instance used in this field is either NettyBlockTransferService (no external shuffle) or org.apache.spark.network.shuffle.ExternalShuffleClient. Implementation details of Standalone service can be found here.
- YARN Shuffle Service: This is shuffle service for Spark on YARN. In order to use the implementation of the Shuffle service on YARN user need to start Spark Shuffle Service on each
NodeManagerin your YARN cluster. There is separate implementation for the YARN as Cluster Manager but the idea and approach is similar to Shuffle service used in standalone Cluster Manager.
- Mesos Shuffle Service: This is shuffle service for Spark on Mesos. Approach for this is also similar to one implemented for Standalone and YARN. Implementation details can be found here.
- Kubernetes Shuffle Service : Although officially if one is using Kubernetes as cluster manager, it doesn’t support an external shuffle service at this time. But there are some implementations available which leverage DaemonSet that runs a shuffle-service pod on each node. Shuffle-service pods and executors pods that land on the same node share disk using hostpath volumes. Spark requires that each executor must know the IP address of the shuffle-service pod that shares disk with it. More details about this approach can be found here
- Cosco: This is used by Facebook warehouse to power Spark and Hive jobs. It is implemented as a scalable, reliable and maintainable distributed systems. Key differentiator for this service is partial in-memory aggregation across a shared pool of distributed memory. Cosco provides improved efficiency in disk usage compared to Spark’s built-in shuffle. More details about Cosco can be found in this blog, and these YouTube videos link1 link2
- Magnet : This is a push-based shuffle service implemented at LinkedIn. Key idea of this is that the mapper-generated shuffle blocks also get pushed to remote shuffle services to be merged per shuffle partition. It groups the contiguous blocks in the shuffle file into chunks. Shuffle blocks that are larger than a certain size are skipped, so it does not push blocks that are potentially coming from large skewed partitions. The map task determines this grouping and the corresponding Shuffle service destinations in a consistent way such that blocks from different mappers belonging to the same shuffle partition are pushed to the same Shuffle service. Once the grouping is done, the transfer of these blocks is handed off to a dedicated thread pool and the map task simply finishes. This way, it decouples the task execution threads from the block transfer threads, achieving better parallelism between the I/O intensive data transfer and the CPU intensive task execution. More details about this can be found in this blog and this YouTube video link
- Riffle: This shuffle service improves I/O efficiency and scales to process petabytes of data. Riffle efficiently merges fragmented intermediate shuffle files into larger block files, and thus converts small, random disk I/O requests into large, sequential ones. Riffle further improves performance and fault tolerance by mixing both merged and unmerged block files to minimize merge operation overhead. Using Riffle, Facebook production jobs on Spark clusters with over 1,000 executors experience up to a 10x reduction in the number of shuffle I/O requests and 40% improvement in the end-to-end job completion time. More details about this can be found in this paper.
- Zeus: This is a distributed Shuffle service implement at Uber. Uber runs one of the largest Spark and Hive clusters on top of YARN in industry which leads to many issues such as hardware failures, reliability and scalability challenges. Zeus supports hundreds of thousands of jobs and millions of containers which shuffles petabytes of shuffle data. You can get more details about this in this git repo and in this you tube video.
- Alibaba’s EMR Remote Shuffle Service: This Shuffle service is developed at Alibaba Cloud for serverless Spark use case. It has three main roles: Master, Worker, and Client. The Master and Worker constitute the server. The Client integrates into Spark in a non-intrusive manner. The Master is responsible for resource allocation and state management. The Worker processes and stores shuffle data. Lastly, the Client caches and pushes shuffle data. This adopts the shuffle mode of Push Style. Each Mapper has a cache that is delimited by partition, and the shuffle data is written to the cache first. PushData request is triggered when the cache of a partition is full. More details about this can be found in this post.
This purpose of this story has been to provide a extensive list of Shuffle service implementation available with high level overview of each of them. Please share your reviews about different approaches and add in comments if you know any other implementation which exsits.