Lightning Fast ML Predictions with PySpark

Photo by Max Saeling on Unsplash

At HomeAway, we have several ways of deploying machine learning models depending on how quickly their predictions are needed. There are essentially three regimes for making ML predictions:

  1. Synchronous: The user needs a prediction right away before they can proceed.
  2. Asynchronous: The predictions can be executed as the input data arrives, but won’t be served until later.
  3. Batch: The predictions need to be made at a regular (long) interval.

We mostly use Flask and Kafka to serve models for synchronous and asynchronous use-cases, with scores being served either directly from the service or through a fast cache. For batch scoring we use Apache Spark. Since our primary ML language is Python, Spark’s Python bindings make it an ideal choice for deploying ML models for batch predictions.

Once a Python model is trained (which we do not usually use Spark for), PySpark allows several ways to execute it for batch predictions.

Method 1: UDF (Not Fast)

This is the most straightforward way to execute predictions, and is the slowest. It’s also the simplest and most stable. Suppose I’ve got some function, predict, that looks like this:

With this as a starting point, it’s straightforward to turn this into a PySpark user-defined function (UDF):

Basically I wrap the function in PySpark udf function and declare the return type so PySpark knows how to translate it (udf can also be called via decorator). That’s it — I can now call predict_udf as though it were a regular Spark function.

So how does this magic work? Spark’s core runs in the Java virtual machine, which doesn’t easily share memory with Python. What happens is PySpark launches a Python interpreter on each executor and pipes the data in, one record at a time. Spark knows what datatypes go in, so it can tell Python what to expect to see. It can’t tell what Python’s going to give back, though, which is the reason for explicitly declaring the output type.

If you’re at all familiar with numeric computation in Python, something might have jumped out just now. One record at a time. This is the slowest possible way to execute the prediction. Nearly all numeric heavy lifting in Python (numpy, pandas, xgboost, sklearn, …) is implemented in C, which is called from Python. Inside those implementations are extremely fast, statically compiled loops. This is commonly known as vectorized computation. We get precisely zero vectorized computation benefits executing the model one record at a time.

So is there a way to vectorize the calculation from within Spark?

Method 2: RDD Vectorization (Very Fast)

It is possible to manually write code that will perform the vectorization for us. It’s considerably more code, and it’s a little unsafe. It’s also significantly faster, up to an order of magnitude depending on the complexity of the model.

The general strategy is this: load each partition’s contents into a data frame (meaning memory not managed by the JVM) and execute the model on that data frame. That way, within each partition, we get the benefit of vectorization.

There’s a lot happening in this function.

  1. Convert the incoming iterable of rows into a pandas data frame.
  2. Execute the prediction, taking care that the feature order is correct.
  3. Convert the data frame with predictions back into an iterable of rows.

Note in this case we can no longer operate on one column at a time — this function has to receive and return all of the features in addition to the prediction. It’s also a little more complicated to call. Here’s what that looks like:

The predict_partition function exists outside of anything managed by Spark, including query planning, type checking, and memory management, so being conservative with partition sizes (make sure it’s well under the executor memory limit) and testing that predict_partition function are critical. This is a lot more trouble than a standard UDF. Is it worth the trouble?

For machine learning models, the answer is definitely yes. This executes at least two times (and much much more for complex models) faster than a Spark UDF. To get a sense of how much faster this is, I ran one ML model using a Spark UDF and it was so slow I implemented the RDD-partition solution before Spark UDF finished. The UDF solution for that model took so long I don’t actually know what the execution time would have been, while the RDD-partition solution executed on the same data and cluster in three minutes. The RDD-partition for that model has been happily running in production for a few months now.

That said, it’s an awful lot of work and the code isn’t the greatest. Is there a way to get the speed of vectorized predictions and the good UDF syntax?

Method 3: Pandas UDF (Lightning Fast)

The biggest challenge we’ve faced when executing machine learning on Spark boils down to Python and Java not being able to share memory — either we use Spark SQL’s language and make a slow UDF, or we open the hood and mess with stuff that could get us into trouble. If there were a way to share the memory between the JVM and Python runtime, then we’d be able to keep the managed memory Spark provides and use the speed we get from Python’s vectorization. In 2016 Cloudera announced the Apache Arrow project, which is an in-memory columnar data representation and interchange format. Its goal is exactly what we’d need — to share in-memory columnar data between different runtimes (such as R, Python, Java, and Julia).

A little over a year later, Spark 2.3 added support for the Pandas UDF in PySpark, which uses Arrow to bridge the gap between the Spark SQL runtime and Python. For an ML prediction, a Pandas UDF would be a function that takes a bunch of pandas Series (one per feature) and returns a Series, which is not too different from what we’d write for local runs anyway. You just need a decorator and the data type of the Series.

That’s all there is to it. Calling it is straightforward too — just like a regular UDF:

In addition to being safer and simpler than the RDD method, it’s also faster because there’s not as much serialization overhead when communicating between the Python and Java runtimes. For one model I clocked this as being about 20% faster than the RDD method and orders of magnitude faster than a normal UDF.

Tying it All Together

Now that I’ve walked through the three ways to execute a machine learning model on PySpark, how do they stack up quantitatively? That’s a tough question to answer generally, but I did run some timings on a specific model. Effectively the model performs a pandas lookup of some precomputed scores. In this case the ML inference is very light, but still benefits from vectorization. I cached the input data frame and timed all three methods. These are the results:

Timing comparison for a simple model.

For this model, the RDD-partition method is over twice as fast as the UDF, and the Pandas UDF method is over twenty percent faster than RDD-partition. If there was serious computation happening in the model, the differences would be even greater.

Not surprisingly, the CPU utilization also improves as we vectorize. Below is the CPU utilization of the cluster for each method (the first spike is the dataset being cached prior to running each method).

CPU utilization for the simple model.

TL;DR

There are three methods for executing predictions with PySpark:

  • UDF (slow),
  • RDD (faster),
  • and Pandas UDF (lightning fast).

CPU utilization improves greatly when leveraging vectorized calculations, with Pandas UDF being by far the most efficient.

Given the simplicity of implementing it, Pandas UDF is the best place to start in nearly every case, even without natively vectorized calculations (use np.vectorize( … )!). On an older version of Spark, the Pandas UDF might not be available — in that case, the RDD-partition method is a good choice for vectorizing, but you have to be really careful tuning and testing the job. And if speed and efficiency aren’t too much of a concern, the UDF is always available and easy to use.