Data Processing Support in Ray

Stephanie Wang
Feb 16 · 5 min read

Authors: Sang Cho, Alex Wu, Clark Zinzow, Eric Liang, Stephanie Wang

This blog post highlights two features in the latest Ray 1.2 release: native support for spilling to external storage, and support for libraries from the Python data processing ecosystem, including integrations for PySpark and Dask. This is the first blog post of a series on data processing support in Ray.

If you plan to try data processing on Ray, please reach out to us in the #ray-data-processing Slack channel (invitation to the public Slack channel link). We’d love to hear your feedback and experiences with Ray’s data processing support!

Over the past couple years, we’ve heard from many Ray users that they wish to incorporate parallel data processing more directly into their Python applications. These use cases range from processing input CSVs faster to shuffling hundreds of terabytes of ML input data (distributed ETL). The common desire is to stitch together an application with data processing and ML components in a single Python program, without needing to worry about setting up, maintaining, and gluing together separate clusters or services.

Rather than write another DataFrames library for Ray, we're focusing on supporting the integration of other frameworks so that you can:

  1. Break down compute silos: Invoke data processing APIs like Dask DataFrames directly from application code alongside ML libraries like Tune, XGBoost, and Horovod.
  2. Leverage distributed memory: Automatic object spilling enables data library developers and users to take advantage of Ray’s shared-memory object store.

A single substrate for distributed data processing and machine learning

However, in many cases the use of workflow orchestrators adds costs, due to the compute silo effect, in terms of system efficiency and operations.

System efficiency:

  1. The setup overhead of workflow tasks adds latency and reduces the cost efficiency of the job.
  2. Intermediate data must be materialized to external storage (e.g., HDFS or S3), also adding latency.


  1. There is high operational overhead maintaining separate distributed systems for data processing.
  2. The pipeline itself must be written and configured in a separate configuration language (vs. a single Python program). This also limits expressivity.

Our goal for supporting data processing in Ray is to enable distributed applications with less glue code, increasing expressivity and reducing system overheads.

In the remainder of this blog we’ll cover the new object spilling feature in Ray and library integrations.

Object Spilling

Object spilling enables libraries already using Ray’s object store to work with datasets that may not fit in memory. It also allows Ray programs to operate directly on big datasets. For example, you can now write a simple out-of-core distributed shuffle in just a few dozen lines of Python:

Simple shuffle example written in Ray with different object store memory limits on a single 32-core machine (AWS i3.8xlarge). The maximum working set size for this shuffle is 2x the input data size, so a memory limit less than that will require spilling. You can try running this yourself on Ray master with `python -m ray.experimental.shuffle — help`.

Here’s an example of using Ray’s object spilling to enable out-of-core workloads for Dask-on-Ray. Note that this code does a full sort of the dataset, so we will not compare it directly to the simple shuffle above.

Dask-on-Ray sort with Ray’s object spilling disabled vs. enabled. This is run on a 32-core machine (AWS i3.8xlarge) with a 30GB memory limit for Ray’s object store. With object spilling disabled, the application receives an out-of-memory error on 100GB.

Note that standalone systems like Spark already have native support for object spilling. However, this is a significant effort for libraries like Modin and Mars. This is where integration with Ray can reduce burden for library developers and enable new workloads for end users. We’ll cover how this works in the next section.

Note: To reproduce these benchmarks, you will need to run on the nightly wheels. You can also do this by installing Ray v1.2 (`pip install -U ray`) and running `ray install-nightly` from the command line.

Library Integrations

PySpark on Ray


$ pip install ray raydp


Under the hood, raydp.init_spark creates num_executors Ray Java actors that each launch a Spark executor. The actors communicate between each other using Spark’s internal IO layer. In the above example, we use the PySparkDriver actor class to wrap the Spark session so that it is callable from other parts of a Ray application. Note that this integration doesn’t use Ray’s object store, unlike the other integrations we’ll cover below.

If you like to learn more about RayDP, please check out the documentation.

Dask on Ray


$ pip install ray 'dask[dataframe]' pandas numpy

Note that you do not need to install dask.distributed even if you are using a cluster because Ray will handle the distribution.


Under the hood, Dask dispatches tasks to Ray for scheduling and execution. Task inputs and outputs get stored in Ray’s distributed, shared-memory object store. This means that you can seamlessly mix Dask and other Ray library workloads.

If you like to learn more about Dask on Ray, please check out the documentation.

Modin DataFrames


$ pip install 'modin[ray]' pandas numpy


If you like to learn more about Modin, please check out the documentation.

Mars on Ray

Here’s how you can try it out:


$ pip install -U pymars ray==1.0

Note that you do not need to install pymars[distributed] even if you are using a cluster because Ray will handle the distribution.


The Mars-on-Ray integration is experimental and under active development, but you can learn more from the documentation.


If you’re interested in learning more about Ray, check out the whitepaper or join us on the Ray Discourse.

Originally published at

Distributed Computing with Ray

Ray is a fast and simple framework for distributed…