Apache Spark ! What Next ? ( Is it GPU..)

Anirban Ghosh
Walmart Global Tech Blog
8 min readJul 30, 2021
source

Decision-making is key to any business. An access to the wealth of information that helps to take these key-decisions better in today’s highly digitized world. To unlock such wealth of information, it is pivotal to have an integrated platform which helps to have highly valuable business intelligence for forecasting, logistics and inventory optimization and scenario planning which increases speed of high-quality decision-making.

Building accurate optimized solution in vast digitized world is an iterative process to discover suitable results for decision-making. This is usually rote memorization and the “fastest victory in a loop” game, because the faster you can iterate, the easier it is to explore best solution and get good answers.

Churning of such a great ocean of data in digitized world requires powerful computing tool. Tools are becoming faster with time but speed is limited due to the physical restrictions of an electron moving through matter. That’s where technologies like RAPIDS powered by GPU (Graphics processing unit) are emerging to break through this speed barrier.

Rapids is the umbrella for several open-source projects. RAPIDS greatly increases the speed of each step of iteration and provides a common format for information, thereby reducing the burden of exchanging data between different systems.

At the user aspect, Rapids replicates Python,cuDF API to simplify the transition of RAPIDS based platform.It supports interfaces such as Blazing-SQL , RAPIDS Spark etc. for data processing .Here we are going to discuss on RAPIDS-Spark powered by GPU for large data processing.

Here’s What We’re Excited About

  1. Emergence of RAPIDS with Spark
  2. Why do you need Spark RAPIDS?
  3. Acceleration of Spark with GPU
  4. Does architecture help SPARK RAPIDS perform better than SPARK-CPU?
  5. Key advancement of SPARK RAPIDS
  6. Tuning based on use-cases along with Limitations

Emergence of Spark with RAPIDS

The RAPIDS Accelerator for Apache Spark leverages GPUs for acceleration of the processing via the RAPID libraries.

The RAPIDS Accelerator for Apache Spark combines the power of the RAPIDS cuDF library and the scale of the Spark distributed computing framework. The RAPIDS Accelerator library also has a built-in accelerated shuffle based on UCX (Unified Communication X)that can be configured to leverage GPU-to-GPU communication and RDMA (remote direct memory access) capabilities.

Why do we need SPARK RAPIDS?

The growing adoption of AI in analytics has created the need for a new framework to process data quickly and cost efficiently with GPUs and RAPIDS is helping ETL (Extract, Transform & Load) process to achieve this by providing increased model accuracy through faster iteration and more frequent deployment. Also, below attributes are helping to select RAPIDS spark for ETL pipeline.

  1. GPU-accelerated data processing
  2. Easy integration with existing Spark-CPU pipelines. Almost nil code migration cost from Spark-CPU to SPARK-RAPIDS.
  3. Overall reduction of query processing cost using Spark-Rapids
Tested Spark-Rapids performance (using nvidia-tesla-v100 GPU type) in one of the use case and found 66% performance improvement for ETL processing which deals with almost 67 billion records residing on cloud Platform.

Acceleration of SPARK with GPU

Rapids is a set of software libraries created to use GPUs in order to accelerate data processing. For fast GPU-optimized algorithm, it uses low-level CUDA code while still having SPARK-SQL and Data-frame API on top that is simple to use.

source

Rapids’ uniqueness is that it is entirely incorporated with Spark-sql API which are quickly passed through for GPU acceleration to Rapids. The SQL plugin tries to produce results that are bit by bit identical with Apache Spark. There are several cases where there are some differences with specific functions , but mostly fetches expected results.

Does architecture help SPARK RAPIDS perform better than SPARK-CPU?

RAPIDS is powered by GPU machines whereas Spark2.0 works based on CPU. GPU has a highly parallel structure which makes it more effective than CPU if used on data that can be partitioned and processed in parallel. To be more specific, GPU is highly optimized to perform advanced calculations such as floating-point arithmetic, matrix arithmetic and so on.

The reason behind the difference of computation capability between CPU and GPU is that GPU is specialized for compute-intensive and highly parallel computations, which is exactly what you need to single instruction on large data. The design of GPU is more of data processing than data caching and flow control. If a problem can be processed in parallel, it usually means two things: first, same problem is executed for each element, which requires less sophisticated flow control; second, dataset is massive and problem has high arithmetic intensity, which reduces the need for low latency memory.

source

The Diagram above shows the differences between CPU and GPU in their structure. Cache is designed for data caching; Control is designed for flow control; ALU (Arithmetic Logic Unit) is designed for data processing.

Features in SPARK RAPIDS benefits performance

There are key advancements in Apache Spark 3.0 that contribute to transparent GPU acceleration:

RAPIDS Accelerator for Apache Spark 3.0 :

Built on top of NVIDIA CUDA and UCX, the RAPIDS Accelerator for Apache Spark enables applications to take advantage of GPU parallelism and high-bandwidth memory speed with no code changes, through the Spark SQL and DataFrame APIs with Spark shuffle implementation

RAPIDS-accelerated Spark DataFrame & Spark SQL:

RAPIDS offers GPU-accelerated memory-efficient columnar data processing and query plans by Spark SQL and DataFrames . When a Spark query executes, it goes through the following steps.

source

By enabling GPU functionality using spark.rapids.sql.enabled = TRUE, there is improvement in SQL performance for different queries which are having aggregation function and comparison-operator.

*As mentioned in above diagram, there is performance improvement up to 80% where query was dealing with 30+ million records with size of approx. 160 GB.

** In First sql some operations are not 100% compatible. Many of these incompatibilities are around corner cases that most queries do not encounter or that would not result in any meaningful difference to the resulting output but operation falls on CPU which deteriorates performance in this scenario. By enabling these operations either individually or with the parameter called spark.rapids.sql.incompatibleOps.enabled , performance of queries can be improved greatly but result varies based on use-cases .Over time, RAPIDS-SPARK expect the number of incompatible operators to reduce.Further information can be found in url.

RAPIDS-accelerated Spark shuffles:

Accelerated spark shuffle implementation is built upon the GPU-accelerated Unified Communication X (UCX) library to dramatically optimize the data transfer between Spark processes. UCX exposes a set of abstract communication primitives which utilize the best of available hardware resources and offloads, including RDMA, TCP (Transmission Control Protocol), GPUs, shared memory, and network atomic operations

source

To accelerate the shuffle operations one can, use below parameters:

spark.rapids.sql.shuffle.spillThreads,

spark.rapids.shuffle.transport.maxReceiveInflightBytes,

spark.rapids.shuffle.ucx.useWakeup

Using above parameters, in recent research done by us, we found extra 23–29 % performance improvement with accelerated shuffle operation in RAPIDS spark, for this use-case input record count was more than 35 million and size of data was 167 GB

GPU-aware scheduling in Spark:

GPUs are now a schedulable resource in Apache Spark 3.0. This allows Spark to schedule executors with a specified number of GPUs, and you can specify how many GPUs each task requires. Spark conveys these resource requests to the underlying cluster manager, Kubernetes, YARN, or standalone. You can also configure a discovery script to detect which GPUs were assigned by the cluster manager.

Tuning based on use-cases along with Limitations

Along with above feature there are some few points to be considered during tuning.

Number of Executors:

The RAPIDS Accelerator plugin only supports a one-to-one mapping between GPUs and executors.

Number of Tasks per Executor:

if the cluster nodes each have 24 CPU cores and 4 GPUs then setting spark.executor.cores=6 will run each executor with 6 cores and 6 concurrent tasks per executor, assuming the default setting of one core per task, i.e.: spark.task.cpus=1.

Number of Concurrent Tasks per GPU:

The number of concurrent tasks per executor can be further limited when tasks are sharing the GPU. This is useful for avoiding GPU out of memory errors while still allowing full concurrency for the portions of the job that are not executing on the GPU. On other hand, setting this value to high can lead to GPU out of memory errors or poor runtime performance.

Configuration key: spark.rapids.sql.concurrentGpuTasks

Pooled Memory:

Allocating memory on a GPU can be an expensive operation. RAPIDS uses a pooling allocator called RMM (RAPIDS Memory Management )to mitigate this overhead. If the pool is exhausted more memory will be allocated and added to the pool. Most of the time this is a huge win, but if you need to share the GPU with other libraries that are not aware of RMM this can lead to memory issues.

Configurationkey: spark.rapids.memory.gpu.pooling.enabled

Locality Wait:

It controls how long Spark should wait to obtain better locality for tasks. When tasks complete quicker than this setting, the Spark scheduler can end up not leveraging all the executors in the cluster during a stage and queries will see significant performance gains by setting this to 0.

Configuration key: spark.locality.wait

Columnar Batch Size:

It controls the upper limit on the concatenation process of multiple batches during GPU processing. Setting this value too low can result in a large amount of GPU processing overhead and slower task execution. Setting this value too high can lead to GPU out of memory errors. If tasks fail due to GPU out of memory errors after the query input partitions have been read, try setting this to a lower value.

Configuration key: spark.rapids.sql.reader.batchSizeRows

Conclusion

If someone wants to move their compute intensive data pipelines on GPU, then SPARK-RAPIDS can be a great option as we have highlighted in this article. Only thing the user must make sure that the functions/operations used in current ETL pipeline are compatible with SPARK-RAPIDS, if the functions/operations are not compatible then the execution will fall back on CPU and the overall performance might drop little bit and we might not get the expected savings. We will show in our next article on when and how a part of SPARK RAPIDS GPU execution falls on CPU and what are the current options to handle those. SPARK RAPIDs are fast evolving, thus a functions/operation which is not compatible today might be compatible tomorrow.

References:

https://rapids.ai/

https://nvidia.github.io/spark-rapids/

--

--