How to scale Python multiprocessing to a cluster with one line of code

Edward Oakes
Distributed Computing with Ray
6 min readFeb 18, 2020

TL;DR: This blog post shows how multiprocessing.Pool can be seamlessly scaled from a single machine to a cluster.

Comparing the scalability of three Python implementations of Monte Carlo Pi estimation — in a single-process, parallel on a single AWS m4.4xlarge instance using multiprocessing.Pool, and distributed on a 10-node cluster of AWS m4.4xlarge instances using Ray. Going from one node to a cluster using Ray only required changing a single import statement and scales the throughput of the application by an order of magnitude.

Programs start off small. Be it exploratory data analysis or building a machine learning model, it’s important to get something simple working as quickly as possible. However, as time goes on, requirements shift and once-small programs need to be scaled up to handle more data or take advantage of more compute. Unfortunately, modifying a program to scale to multiple cores or multiple machines often requires rewriting it from the ground up, not to mention handling the slew of complexities that come with parallelism and distributed systems.

Python multiprocessing offers one solution to this, providing a set of convenient APIs that enable Python programs to take advantage of multiple cores on a single machine. However, while this may help an application scale 10x or maybe even 50x, it’s still limited to the parallelism of a single machine and going beyond that would require rethinking and rewriting the application.

In this blog post, I’ll highlight how you can overcome this limitation by seamlessly scaling to a multi-node cluster using the ray.util.multiprocessing.Pool API released with Ray — without rewriting your program!

Supporting the multiprocessing.Pool API on Ray

Note: For a more detailed introduction to Ray and its programming model, please refer to Ray for the Curious.

Ray is a powerful open source platform that makes it easy to write distributed Python programs and seamlessly scale them from your laptop to a cluster. Ray is designed from the ground up for performance and scalability and offers a simple but expressive API.

While the Ray API is easy to work with, it still requires modifying a program to some extent. In the scenario described above, where an engineer or a data scientist wants to scale their existing program, this may be a nonstarter.

However, Ray comes with support for the multiprocessing.Pool API out of the box (a thin wrapper that’s only ~500 lines of code). When you import ray.util.multiprocessing and start a new Pool, Ray initializes on the local node or connects to a running cluster transparently. Then, each “process” in the pool is instantiated as a Ray actor in the cluster and functions are parallelized across them.

While for some workloads this may result in performance gains even when running on a single node, the real power is in enabling existing programs to easily scale from a single node to a cluster.

Case study: Monte Carlo Pi Estimation

Let’s walk through an example of scaling an application from a serial Python implementation, to a parallel implementation on one machine using multiprocessing.Pool, to a distributed implementation on a 10-node Ray cluster using the same API. All machines used in the following experiments are AWS m4.4xlarge instances running Ubuntu 18.04 (ami-06d51e91cea0dac8d). A full code sample to run each of the following implementations is available here (note that as of publishing this post, running the code samples requires installing the nightly Ray wheels).

Visual representation of the Monte Carlo Pi estimation method described below.

The goal of our example program is to estimate the value of π. We will accomplish this using a Monte Carlo method that works by randomly sampling points within a 2x2 square. We can use the proportion of the points that are contained within the unit circle centered at the origin to estimate the ratio of the area of the circle to the area of the square. Given that we know the true ratio to be π/4, we can multiply our estimated ratio by 4 to approximate the value of π. The more points that we sample to calculate this approximation, the closer the value should be to the true value of π (3.1415926…).

Step 1: Serial Python

Monte Carlo Pi estimation that runs in a single Python process.

Above is a simple Python implementation of the Monte Carlo method. The sample function randomly samples num_samples points within the 2x2 square and returns the number that were within the circle (using math.hypot to calculate the distance from the origin). We call this function from approximate_pi, which prints the estimated value of π using the ratio of samples inside the circle.

> python monte_carlo_pi.py --num-samples 100_000_000
pi ~= 3.141646
Finished in 94.64s

Running this implementation, we see that it takes minutes to sample hundreds of millions of points. This may sound like a lot, but even with 100 million samples, our estimate of π is only accurate to 3 decimal places — we’ll have to scale up our implementation to improve the accuracy.

Step 2: Parallel on one machine using multiprocessing.Pool

Monte Carlo Pi estimation running across the CPUs of one machine using multiprocessing.Pool.

To scale up the number of samples, let’s modify the code to parallelize the sampling across cores on one machine. We first instantiate a multiprocessing Pool that will start as many processes as there are CPUs on the machine by default. Then, instead of directly calling sample, we invoke it across the process pool using pool.map() and aggregate the results.

> python parallel_monte_carlo_pi.py --num-samples 100_000_000
pi ~= 3.141432
Finished in 12.52s
> python parallel_monte_carlo_pi.py --num-samples 1_000_000_000
pi ~= 3.141532
Finished in 124.02s

Now that we’ve parallelized the code across the cores on one machine, we can sample 100 million samples in 12.54s where it took 94.64s in the serial implementation. We’re also able to scale to a billion samples in just a few minutes, bringing out estimate of π accurate to 4 decimal places. This is clearly an improvement, but if we want to achieve higher accuracy in any reasonable amount of time, we’ll have to go distributed across a cluster.

Step 3: Distributed on a 10-node cluster using Ray

Monte Carlo Pi estimation running across a Ray cluster using ray.util.multiprocessing.Pool. The only difference between this and the version that runs on Python’s multiprocessing.Pool is the import statement.

Modifying our script to run on a Ray cluster only requires changing the Pool import statement to point to the Ray implementation, which will parallelize our sample function in Ray actors across a cluster rather than local processes.

To run the estimation on a cluster, we first use the Ray automatic cluster setup tool to start a 10 node cluster (using a minimal configuration file). Once the cluster is started, we copy the script to the head node of the cluster and then attach to a remote SSH session. Then, we run our modified script, setting the RAY_ADDRESS environment variable so that Ray connects to the existing multi-node cluster instead of starting a new one on the machine.

# Start the cluster and copy the script to it.
> ray up -y ray-cluster.yaml
> ray rsync-up ray-cluster.yaml monte_carlo_pi.py monte_carlo_pi.py

# Attach to a remote shell in the head node and run the script.
> ray attach ray-cluster.yaml
[remote] > RAY_ADDRESS=auto python monte_carlo_pi.py --distributed --num-samples 100_000_000
pi ~= 3.141634
Finished in 1.92
[remote] > RAY_ADDRESS=auto python monte_carlo_pi.py --distributed --num-samples 1_000_000_000
pi ~= 3.141541
Finished in 13.75
[remote] > RAY_ADDRESS=auto python monte_carlo_pi.py --distributed --num-samples 10_000_000_000
pi ~= 3.141599
Finished in 131.37s

# Clean up the cluster.
> ray down -y ray-cluster.yaml
A screenshot from the Ray dashboard (accessible by default at localhost:8265 on the head node) while running the above example. Using the multiprocessing.Pool API on Ray allows us to fully utilize 10 machines’ CPUs.

Now that our sampling is distributed across the 10-node cluster, we see significant speedups: sampling 100 million points (which originally took 94.64s) now takes just under 2 seconds, sampling 1 billion points (which took 2 minutes on one machine with multiprocessing.Pool) takes only 14 seconds, and we’re now able to scale up to 10 billion samples and reach 5 decimals of accuracy in just 2 minutes. While this order-of-magnitude speedup is already impressive, one of the great parts about using Ray is that we could easily continue to scale the program by increasing the number of nodes in the cluster if needed.

Conclusion

In this blog post I explored how you can scale Python multiprocessing workloads to a cluster without rewriting your program using Ray support for the multiprocessing.Pool API.

This is only one of many powerful libraries built to scale using Ray, including Tune, a scalable hyperparameter tuning library, RLlib, a scalable reinforcement learning library, and Modin, a scalable DataFrame engine. If you have any questions or thoughts about Ray, you can join our community through Discourse or Slack. If you would like to know how Ray is being used in industry, please consider attending Ray Summit.

If you’re interested in working on Ray and helping to shape the future of distributed computing, join us at Anyscale! We’re hiring.

--

--