Apache Spark is a fantastic tool for data pipelines. It has improved a lot over the last few years, and has become far more data scientist friendly. For example, PySpark is now as quick as Scala for most tasks and the data frame API is quite intuitive. Yet, machine learning on Spark can be more complex. Spark ML is OK but is less mature and feature complete than other tools.
In data science it is common to train your model on a small, labelled dataset - by “small” I mean “fits on your laptop”. But, predictions are often made at scale as part of a larger pipeline. If you use Spark ML for this the prediction step at least will slot into your existing codebase. Unfortunately Spark’s model development process can be cumbersome and slow.
Scikit Learn is a well established, mature tool for machine learning. Wouldn’t it be great if data scientists could train Scikit Learn models and apply them at scale using Spark? Well, Pandas UDFs make this possible!
Before Pandas UDFs the only option was standard Python UDFs. These are very slow as they apply your Python function one row at a time on each executor. A Pandas UDF applies your Python function to a Pandas Series/DataFrame per executor. This reduces the overhead of the task, most of which is due to cross talk between Python and the JVM.
There is, of course, a big gotcha! You must install any dependencies on your executors (e.g. Scikit Learn). If the UDF requires something not installed on the executors you won’t see the normal
ModuleNotFoundError: No module named ‘X’ exception. Instead you will see
TypeError: unorderable types: NoneType() < int()
This is because the executors return nothing to the driver, which it treats as a null. In particular, you must ensure each executor has a Pandas version more recent than 0.23. If they have an older version you will get the exception
AttributeError: module ‘pandas’ has no attribute ‘RangeIndex’
Below is an example function applying a Scikit Learn classifier to a Spark data frame. You provide the function with four arguments:
- The Spark data frame to which you want predictions applied.
- A list of feature columns.
- The path to the trained model.
Note, the model must be loaded from the driver. It is then broadcast to the executors. The function adds a column containing the predictions to the input data frame.
I hope you found this helpful! If you have any comments/suggestions or know a better way of doing this please leave a response below.