Scaling Machine Learning Algorithms(Fbprophet/XGBoost) with pyspark on W-MLP

Walmart — Machine Learning Platform (W-MLP)

Shubham Agrawal
Walmart Global Tech Blog
6 min readDec 18, 2019

--

Pyspark+xgboost/fbprophet. Source: https://miro.medium.com/max/1200/1*nPcdyVwgcuEZiEZiRqApug.jpeg

W-MLP is a machine learning platform that provides end-to-end capabilities to data scientists and data engineers enabling them to develop and deploy models faster.

It also provides capabilities to connect and ingest data from different data sources that are used to train, test, and deploy models on production.

The key capabilities that W-MLP provides:

  • Access to data across different sources.
  • Procuring, managing, and maintaining owned VMs.
  • Managing and maintaining an environment (libraries and tools) for different projects.
  • Ability to collaborate and develop.
  • Reduce duplication as multiple teams can be working on creating similar datasets, models and are solving similar problems.
  • Compatibility issues in re-using components (different tools, versions, and so on).

Problem Statement

W-MLP has come across a variety of machine learning use cases where data size was huge and the libraries used were not inherently providing ways to scale the use case. We were able to support multiple orgs in W-MLP who wanted to scale their forecasting algorithms using pyspark on W-MLP . It’s unique features like abstraction of library installations/distribution of same on spark cluster , easy way to interact to their existing hadoop clusters through jupyter notebook on W-MLP platform and deploying them as workflows with scheduling capabilities helped these use cases to scale and move to production with ease.

Use-cases:

1) To run forecasting for 91 days on customer's data source after grouping data at specific levels like category/chamber/dept/date etc. The forecasting logic needs to run on approximately 90,000 such groups where each group comprised of more than 1000 records. The preferred library to perform forecasting was fbprophet. Prophet is a procedure for forecasting time series data based on an additive model where non-linear trends are fit with yearly, weekly, and daily seasonality, plus holiday effects.

2) Perform forecasting where the data grouping was done at the department and Fineline number. The overall data size was close to 35GB. The preferred library was XGBoost. Handling this volume and completing the forecasting within SLA in a simple python notebook was not feasible.

3)Getting the right assortment is extremely important as it is one of the major ways in which the customers perceive a store
The challenge here is about the scale, we have around 4500 stores, around 1700 categories. How do we scale the algorithm to give us assortments for all 4500 stores in real-time across all categories?

Above use cases had many common patterns in the way the data rollup happens and how the data is grouped before performing forecasting. As the grouping of data is possible at specific levels this opens ways to partition the data which is the key to distribute the workloads. Problems involved in above use cases so basically boil down to following tasks:

1) Efficiently partition data.
2) Add logic for forecasting per partition.
3) Make the preferred libraries available on all spark worker nodes with all its dependencies.
4) Decide on the number Spark executors/cores and memory required to scale the use case.
5) Broadcast Small data to all nodes.

Partitioning data

Data can be repartitioned based on specific columns of interest as follow

sparkDataFrame.repartition(*cols)

Spark will detect unique values across the specified columns and partition them accordingly. So depending on the number of unique values for the specified columns those many partitions will be created(If you were expecting a lower number of partitions and noticed 200 partitions being created, its a spark default setting which can be controlled using spark.sql.shuffle.partitions property. Only the expected number of partitions will have data and you will see the rest of the tasks quickly running to completion).

In spark, partitions equate to tasks and these are the small units that will be distributed across worker nodes to perform the computation. Repartitioning will involve data shuffling between worker nodes to ensure the columns with the same values are all moved to the same executor. For eg:- if you are partitioning on a department number with unique values 10,11,12, the spark will move data in such a way that each partition will have only one department number. One common mistake that can slow down this step is loading unnecessary columns to spark Dataframe and not getting rid of them before performing partitioning. The above use cases had 300+ columns while only 150 were required in the actual computation. By ensuring only required columns that are needed to perform forecasting the data size will reduce significantly and speed up the partitioning process.

Shipping preferred libraries to executors

W-MLP abstracts the shipping of library to executors under the hood. All you need to do is install the libraries from the library wizard which then W-MLP makes available to all executors when the spark job initializes. The W-MLP uses the CONDA environment to package the libraries so there is no need to worry about all dependencies or shared libraries that will be associated with those libraries.

Adding logic for forecasting per partition

Once the repartition happened the logic to perform forecasting can be added to the mapPartitions method. The logic for model fitting and prediction can remain the same as it was written even before using Spark. Spark will take care of executing the same logic on multiple partitions on multiple nodes in parallel.

sparkDataFrame.rdd.mapPartitions(forecasting_logic)
def forecasting_logic(partition_list):
pdf = pd.DataFrame(partition_list)
model=your_prefferred_lib.fit(pdf)
predicted_pdf = model.predict()
return predicted_pdf.values.tolist()

Deciding on spark executors/cores and memory

One of the common question that arises while porting code to pyspark is how to decide on how much resources is required. While this totally depends on the use case but some common guidelines can be followed.

Performing simple spark SQL to do a count after performing group by on the specific columns on which partitioning to be done will give a hint on the number of records a single task will be handling. Looking at this figure along with an understanding of data will help to arrive at a ballpark figure on the amount of memory a single task will need. One thing to keep in mind is this memory cannot exceed the total memory available for an executor node in the Hadoop cluster. For example, if maximum memory allotted for a single node in your Hadoop cluster is 16 GB, then we cannot execute the task which needs a bigger memory requirement.

Let's look at a use case example

Assume we have a data which we are partitioning on department id. If there 20 unique values for the department we will end up with 20 partitions. Now if we can allot 20 cores all the 20 partitions can be computed in parallel. We can provide a configuration of 4 executors and 5 cores per executor(4*5=20). A maximum of 5 cores per executor is a recommend configuration to start with. However, this means 5 tasks will be running at a time within one executor. If data for all 5 tasks cannot fit in memory, ensure to give lesser cores and give more executors. For eg:- 10 executors and 2 cores per executor(10*2=20).

Assigning as many executors and cores as the number partitions may not be feasible in all scenarios. In such cases see what is the maximum executors and cores that can be allotted to this job from the dedicated queue without affecting any other job for the same queue

Broadcast small data/variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication costs.

item_attribute_data=item_attribute_data.collect()
sc.broadcast(item_attribute_data)
# Above will broadcast item_attribute_data to all nodes of cluster.

Monitoring pyspark jobs

Spark UI provides sufficient details on how the job is progressing. To ensure your jobs are getting the specified number of resources(cores/memory), take a look at the executor tab. You might have to adjust the memory requirements, executors or cores depending on your job. Looking at the logs in spark UI you will be able to figure out if a lot of tasks are failing due to out of memory. In such case provide more memory or reduce cores per executor

Sample Code Snippets:

Conclusion

While the use cases discussed here are mostly around forecasting and scaling the solution, this pattern can be followed for any ml algorithms where your data can be partitioned (which can also fit in memory) to perform model fitting/inferencing. Once the partitioning logic is figured out spark can take care of distributing the workload. The original code written for forecasting requires minimal to no changes in porting to spark along with W-MLP library management makes it easier to scale the algorithms.

Authors

  1. Shubham Agrawal, Software Engineer III, Walmart Labs, India
  2. Arun Nair, Software Engineer IV, Walmart Labs, India

References

1.Spark Documentation: https://spark.apache.org/docs/latest/
2. fbProphet: https://facebook.github.io/prophet/docs/quick_start.html
3. pyspark: https://spark.apache.org/docs/latest/api/python/index.html

--

--

Shubham Agrawal
Walmart Global Tech Blog

Parallel Processing is a task but data distribution is a logic