Prediction at Scale with scikit-learn and PySpark Pandas UDFs

By Michael Heilman

Civis Analytics
The Civis Journal
3 min readSep 17, 2018

--

scikit-learn is a wonderful tool for machine learning in Python, with great flexibility for implementing pipelines and running experiments (see, e.g., this Civis blog post series), but it’s not really designed for distributed computing on “big data” (e.g., hundreds of millions of records or more).

A common predictive modeling scenario, at least at Civis, is having a small or medium amount of labeled data to estimate a model from (e.g., 10,000 records), but a much larger unlabeled dataset to make predictions about. In this scenario, one might want to train a model on a laptop or single server with scikit-learn for ease of use and flexibility, but then apply that model to the large unlabeled dataset more quickly by distributing the computation with PySpark. Using PySpark for distributed prediction might also make sense if your ETL task is already implemented with (or would benefit from being implemented with) PySpark, which is wonderful for data transformations and ETL.

PySpark has functionality to pickle python objects, including functions, and have them applied to data that is distributed across processes, machines, etc. Also, it has a pandas-like syntax but separates the definition of the computation from its execution, similar to TensorFlow.

One issue is that passing data between a) Java-based Spark execution processes, which send data between machines and can perform transformations super-efficiently, and b) a Python process (e.g., for predicting with scikit-learn) incurs some overhead due to serialization and inter-process communication. One solution for this is the User-Defined Functions (UDFs) in PySpark’s DataFrame API. You can use the DataFrame API to perform most operations efficiently in Java (without having to write Java or Scala!) but then call Python UDFs that incur the Java-Python communication overhead only when necessary.

Normal PySpark UDFs operate one-value-at-a-time, which incurs a large amount of Java-Python communication overhead. Recently, PySpark added Pandas UDFs, which efficiently convert chunks of DataFrame columns to Pandas Series objects via Apache Arrow to avoid much of the overhead of regular UDFs. Having UDFs expect Pandas Series also saves converting between Python and NumPy floating point representations for scikit-learn, as one would have to do for a regular UDF.

The PySpark documentation is generally good and there are some posts about Pandas UDFs (1, 2, 3), but maybe the example code below will help some folks who have the specific use case of deploying a scikit-learn model for prediction in PySpark. We’ve found this sort of workflow very promising for multiple use cases recently at Civis (see, this SciPy talk), and hopefully you will, too.

As a final note, I’ll mention that it’s worth considering PySpark as an alternative to Pandas for a dataframe implementation and/or Python’s concurrent.futures for parallelization. PySpark works on small datasets with only a little extra effort but can much more easily be scaled up if needed.

Image: Walt Askew, Civis Analytics

--

--