Create and Deploy a Machine Learning Model Pipeline in Spark

Chao Li
Red Ventures Data Science & Engineering
7 min readApr 2, 2019

As data scientists at Red Ventures, we define ourselves as Type B data scientists, which differs from the role of business intelligence or data analysts. Besides providing business insights to drive actionable items, we are also dedicated to building models from end to end and deploying them as a service for business usage, both internally and externally.

This post describes the general process of building a classification model pipeline in Spark and touches upon its deployment via a REST API with code snippets that provide all the details for implementation from data import, preprocessing, feature engineering, model tuning and training to its deployment.

Note: The Spark code in this post is written in Scala and run on the Databricks platform.

Data Import
The first thing we need to do is import the data for training our model. Our data, in this example, is stored in the Snowflake warehouse in the cloud. To enable our SQL queries to read data from the Snowflake database, we’ll use the databricks-snowflake connector. When we have a connection, we’ll use a Scala function to query and read from the Snowflake database in the cloud.

The following readDataFromSnowflake function does the following:

  • Configures the connection with Snowflake
  • Executes a query to select all data fields from the Snowflake table myTable under the schema mySchema for a particular date range, specified by the function arguments
  • Returns the entire dataset as a Spark DataFrame
Fig. 1: Defining a function to establish a connection with Snowflake and executing the SQL query to get data.

To automate the model update process, the date range is extracted from the system datetime every time the job is running. The code below implements the function to obtain the training data from a six month (180 days) look-back window. A snapshot of this dataset suggests the features are comprised of various types, such as categorical variables Var1, Var2, Var3, binary variable Var4, and numeric variable types Var5 and Var6. Here, label is the classification target that the model learns to predict.

Fig. 2a: Importing data within a specified date range.
Fig. 2b: A snapshot of our dataset indicates various feature types.

Var1–3 represent the device type, browser type, and time range respectively. To make these variable names more descriptive, you can rename the dataframe columns using Spark Dataframe’s method withColumnRenamed(). In this exercise, I found a really useful function, foldLeft(), that makes this process more scalable in the sense that you need not write 100 statements for 100 variable name changes. All that’s needed is to make the old name and new name a key-value pair and store it in the scala Map value. No other code change is required. Another benefit of the foldLeft() function is that you don’t have to loop through an overwrite a var (anything which is mutable is riskier), but can loop through a val and output a val.

Fig. 3a: Making more sense of our data by using the foldLeft() function to rename our columns.
Fig. 3b: Our dataset with renamed columns.

Because the subject of this article is building a model pipeline for deployment, the data exploration section is done offline and not described here. Next, we’ll look at pipeline creation for feature engineering and model training.

Fig. 4: Splitting training and testing datasets.

Feature Transformer Pipeline

Numeric Variables
For a model running in production, it’s always a good habit to set a defensive layer to handle any anomalies gracefully. In this example, we set an Imputer transformer in the pipeline to handle the missing values for numeric variables Var5 and Var6.

This process generates two additional columns Var5Impute and Var6Impute that replace the NaN value in original column Var5 and Var6 by their respective median (see the example outcome below).

Note: The choice of the transformer is, to some extent, limited to the availability in MLeap. MLeap is the tool that we use to serialize the Spark model pipelines, and we’ll touch on that later in this post. If the transformer function that you need doesn’t exist on their list, follow the procedure here to create the custom transformer.

Fig. 5a: Setting an imputer transformer in the pipeline to handle numeric missing values.
Fig. 5b: Our updated dataset.

Next, we perform numeric operations on these variables, such as dividing one feature value by the other, taking the logarithmic transform of the value and scale normalization (Min-Max or Z-score). The columns created by these operations are concatenated to the table as shown in the following example:

Fig. 6a: Setting other numeric operations, such as division, logarithmic, and normalization, in the pipeline.
Fig. 6b: A snapshot of the dataset displays the transformed result.

Categorical Variables
The same goes for categorical variables. At the beginning, we’ll set an imputation stage for handling missing values. MLeap doesn’t provide this transformer function, as you can’t find it on this list. Therefore, we’ve written a custom transformer, StringImputer, by following the aforementioned MLeap document. This transformer imputes missing data with a string value of your choice.

For categorical variables, there are instances when we might want to bucket two strings that represent similar concepts, such as “Mobile” and “SmartPhone”. In this situation, the StringMapper transformer is employed to achieve this. Note that I utilize a custom transformer in the code instead of the MLeap built-in StringMap, because StringMap doesn’t allow the default value to be set in the map.

Next, we’ll utilize StringIndexer as an additional defensive layer for handling unseen values during training. The categorical values are mapped to a numeric index based on frequency. For example, the dataset contains more Desktop than SmartPhone and, thus, their corresponding string indices are 0.0 and 1.0. An unseen device value like Console will be mapped to 2.0.

See the following table with the columns Device_Impute, Device_Map, and Device_Index. For the best comparison, the numeric variable columns are not shown here.

Fig. 7a: Setting the StringImputer, StringMapper, and StringIndexer transformers for a categorical variable.
Fig. 7b: A snapshot of the dataset shows the transformed outcome.

Repeat the procedures to apply transformers to other categorical variables TimeRange and Browser. And then set the one-hot encoding stage for all processed categorical variables. The output values by one-hot encoding are represented in a sparse format. For example, (2, [1], [1.0]) in Device_OHE indicates a vector of length of 2 with 1.0 at position 1 and 0 elsewhere.

Fig. 8a: Applying the StringImputer and StringIndexer transformers to the other categorical variables, and setting the one-hot encoding stage in the pipeline.
Fig. 8b: Our transformed dataset.

The classification model in the spark.ml package is an estimator, which requires all feature data to be assembled as a vector for each record in the column “feature”. This is done by the transformer VectorAssembler() and, again, the output values are represented in a sparse format as shown in the following table.

Lastly, we stack all the aforementioned transformers in the sequence to a pipeline object. As a consequence, every single record of both the training and testing set is guaranteed to go through the same feature engineering process without incurring exception by anomaly value.

Fig. 9a: Setting the VectorAssembler transformer to concatenate all variables into a single “feature” column, and creating the feature engineering pipeline object with all aforementioned stages.
Fig. 9b: Our transformed dataset.

Model Estimator

In this example, the gradient boosting classifier GBTClassifier is chosen to do the predictive task as it achieves state-of-the-art performance. The CrossValidator is employed for the model tuning, and the Parallelizable version of cross validator can speed up the computation. After the model parameters are determined by the tuning, the model estimator is appended to the pipeline as the last stage. The pipeline object’s fit method executes the entire workflow, including both the feature engineering and model training process on the dataset.

Fig. 10: Tuning the model and appending it to the pipeline.

Model Evaluation
Because this is a binary classification task, the area under the ROC curve is calculated to measure the model performance. In addition, the gradient boosting models are also evaluated against the vanilla version model’s logistic regression.

Fig. 11: Evaluating the model performance against the vanilla version.

Logistic regression is considered a white box algorithm. The code below extracts the coefficients for the model.

Fig. 12: Extracting the logistic regression model’s coefficients.

Model Serialization and Deserialization for Deployment

To deploy the trained model to the production, we first serialize the final_pipeline_gbt object into a single JSON file using MLeap. Serialization using MLeap is simple and straightforward, and it supports serializing models to a directory or a .zip file in the format of either JSON or Protobuf.

Fig. 13: Serializing the pipeline object into a single JSON file using MLeap.

The model deployment is implemented as a service via the REST API. Simply take the saved JSON file and deserialize it inside a Scala web framework. This article demonstrates how to build an API using Scala Play framework.

Fig. 14: Deserializing the JSON file back to pipeline object in preparation for REST API deployment.

Conclusion

This post elaborates on the process of building a machine learning model pipeline in Spark, with the code snippets providing all the details for the implementation from data import, preprocessing, feature engineering, model tuning and training to its deployment. This protocol enables me to build a predictive model in production and serve our business. Hopefully this can also be a helpful tutorial for people who are new to the Spark machine learning process.

Interested in solving complex problems and building products that drive value? Come check us out at RedVentures.com.

--

--