Multiple CatBoost Models Prediction over Apache Spark

By Hen Ben Hemo & Ben Amir

Ben Amir
Riskified Tech
Published in
7 min readMar 19, 2020

--

Here at Riskified, we analyze millions of online transactions per day to detect and prevent eCommerce fraud. Our AI platform crunches these orders to predict their risk level and returns a final approve or decline decision. As Riskified’s R&D, we use our own machine learning-based pipeline to select data, train the models, validate, and replace them in production. We use several different models, according to industry and other parameters. To make sure we’re always using the right model for each case, our data science department validates the model using its predict component, which simulates our online flow.

It’s not just our data science teams that use the models’ predictions to check their accuracy. Our business operations teams also use it to forecast performance when onboarding new clients. As developers, it’s our job to provide those users with an internal tool that they can use to select the data, perform a prediction and get results from the model.

Recently, due to our increase in volume of data, we reconstructed our ML infrastructure. Our main goals were to have an infrastructure that could support our need for accuracy, scale and stability. In the end, we chose Apache Spark as the main computing engine that most of our platform is based on.

In this blog, alongside Hen Ben Hemo we’ll talk about our migration to Spark, and the specific challenges that arose from using the CatBoost algorithm for validation.

We’ll walk you through:

  1. What our prediction flow looks like and how we handled it in the past using Redshift
  2. The limitations imposed on us that caused us to look for an alternative
  3. Why we chose Apache Spark as our data processing platform
  4. The challenges we encountered when using CatBoost over Spark
  5. How we overcame these challenges and used Spark’s power of parallelism to decrease our running time and performance.

What does our prediction flow consist of?

Our offline flow for predicting the risk level of an order (meaning, giving it a score from the model) includes the flowing process:

  • Data selection — The user selects a population of orders that they want to classify, along with the relevant features, so that our model will be able to predict them.
  • Model loading — After the model is trained, it’s saved in our data stores. In order to serve it, the model needs to be loaded every time.
  • Prediction — Once we have the data and models, we can now iterate to get the score, and then run statistical testing to decide which model is better.
Prediction flow

In the past, when running the process above on Redshift, we performed the following steps:

  1. Running a dedicated EC2 machine for the orchestration above.
  2. Querying Redshift with the relevant data for prediction — order data and their relevant features.
  3. Saving the validation set that we used for later inquiry.
  4. Prediction over a single machine, one row at a time in succession.
  5. Saving the results to S3.

So basically, this is the code we used for this process (pseudocode):

orderFeaturesDataset = Redshift.sql("select * from orders_and_features where order_id in {selected_orders")
loadedModels = S3Loader.load(modelList)
results = []
orderFeaturesDataset.forEach {row =>
loadedModels.forEach { model =>
results.append(model.predict(row)
}
}
return results

Our main use case

In our case, we needed to process a huge amount of data (over 5 terabytes) that contains the information for orders along with their features (calculated by our data science department). Each row of orders can contain hundreds of features, meaning a lot of columns (around 800, more or less). Also, we wanted to be able to make predictions using several CatBoost models, meaning our basic logic should be:

each row in orderFeaturesDataset
each model in models
run CatBoost.predict(model, row)

Each prediction returns the score provided by the model for every order, based on the trained model.

Sounds good, so what isn’t working?

As we grow, not only is the number of users taking advantage of this tool rising, our sample size for prediction is increasing as well. We now have over one billion records of data that we want to classify. We wanted to be able to predict more than one million orders for each batch, using several models.

As you can see, our previous orchestration wasn’t built to the scale we’re aiming for:

  1. Multiple users — We needed support around 100 users, and we expect that number to grow. We didn’t want to have to load a new EC2 machine per run, so we were limited in our ability to run to several in parallel.
  2. Parallel predictionWe didn’t parallelize our component at all. Even if we did, we’d still have just one data store as our resource (Redshift).
  3. Redshift as a bottleneck — While our data was saved on Redshift, we have multiple rows per order, and each order has many columns. That means we had to work the database hard querying all that data and received a lot of failures because of that.
  4. Speed & Scale — Due to Redshift and our previous orchestration, we weren’t able to scale up the process. With several bottlenecks and resource issues, our users needed to wait for our pipeline to be free to use.
  5. Recovery from failures — Given all of the issues we mentioned, we were bound to experience failures in our process. With the previous architecture, it was difficult to track and recover from the issues we encountered.

Scaling it up!

We chose Apache Spark as our main data processing platform. Our main reason for choosing it is that Spark uses an in-memory computation engine, which makes it much faster than its competitors. Its support of Master-Slave Architecture makes it scalable when needed (by simply scaling up Spark executors as necessary).

Additional reasons for choosing Spark for Riskified:

  • Streaming and batch support — In some of our other use cases we wanted not only to use batch computation or analysis as explained above, but also to make use of Spark’s streaming functionality.
  • Easy to use API — Spark Dataframe and Dataset API are extremely easy to use and oriented towards our main goal of transforming raw data from our data lake (Amazon S3) to Dataframe, where it’s be much easier to access and analyze.
  • Support for multiple data sources — Spark supports multiple connectors, making it easy to change our data layer if we choose to do so. And, we’re not limited to one, such as JDBC, Kinesis, Kafka, DynamoDB or file formats such as CSV, parquets, etc.
  • Runs on Kubernetes — As we mentioned in this blog , we shifted our clusters to Kubernetes. Instead of managing our application over Amazon EMR, we now run it on Spark-on-K8s-operator in order to be able to trigger multiple containers.

So what’s the big deal?

We use the CatBoost library as the main machine learning algorithm in our analysis flow. Over the last six months, we’ve upgraded our ML analysis platform pipeline such that our core structure is based on Apache Spark, but when we began rewriting our pipeline we found that the current CatBoost library (0.2.4) still doesn’t have Spark integration.

Challenges with CatBoost over Spark

During our implementation we encountered a few issues:

  • CatBoost library classes are not serialized when working with Spark — When working with multiple processing components, we wanted to load all of our data and the relevant model before we start processing and passing around. CatBoost library doesn’t support this which is why Spark executors were failing to predict the data.
  • Predict a variety of models in one Spark job — We wanted to be able to use the power of Spark to run several models in parallel, in order to shorten our component run time.
  • Spark cluster memory specs for model loading — We had to define a large memory cluster in order to be able to load several models, where each model can weigh around 500MB.

The solution

Our solution was handled in two ways:

Singleton of models loading for resources

In order to create a single instance of our models in the Spark executor, we load the models in each executor. We created an abstract layer that loads the model, and wraps it with a layer that implements and uses the predict function of CatBoost (we call it ‘CatBoostModelEvaluator’):

This abstraction can support multiple algorithms and not only CatBoost (also LightGBM, XGBoost, etc.) so we can use it when needed

Using Spark ‘mapPartitions’ method

mapPartitions() Spark transformation is similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. “

After loading the data (in our case ‘orderFeaturesDataset’) we use mapPartitions for distribution so that each executor works on a different part of the dataset. Now when we are inside mapPartitions, we can run an atomic code inside the individual executors.

Each executor has an iterator (a part of the dataset). We then load all of the models using our CatBoostModelLoader, and iterate the rows in that executor using flatMap. Next, we can call the evaluate function (that performs the prediction) of each model that we loaded in order to get its score:

Our solution over Apache Spark

Summary

In this post, we went over our solution for using Spark as the main computation engine for our prediction component in our machine learning platform pipeline. We discussed the issues we encountered while using Spark & CatBoost and how we dealt with them, eventually succeeding to process over 1 million records in just 6 to 7 minutes! (for comparison, with our previous tool the time measure was around 30–40 minutes for each run). This gave us the ability to classify dozens of models a day (scale — yay!), compare them and decide which one gives us better results.

--

--

Ben Amir
Riskified Tech

Software Engineer Team Lead, Ball is life 🏀, #StrengthInNumbers