Benchmarking Performances and Scaling of Time Series Forecast with Ray, Spark, Multiprocessing, and Concurrent futures
This article introduces different approaches to scale time-series forecasting models and benchmarks their performance
In my previous job as a Data Scientist, I was tasked with building a time series forecasting model to forecast traffic for 1000 stores. As would any experimentation start, I took the naive approach of building time series models: sequentially looping, one after another, which took around 7hrs to train and forecast. This led me to research the topic of large-scale forecasting. From my research, I found multiple ways to parallelize the task of training and forecasting time series models from using a single machine with multiple cores to using multiple machines. Having experimented with these approaches, I felt the need to benchmark them and find the best approach. The inspiration to benchmark these approaches arrived from the medium article forecasting-multiples-time-series-using-prophet-in-parallel by Matias Aravena Gamboa. In addition to using multiprocessing to scale Prophet time-series forecasting mentioned in the above article, I have introduced new approaches using Concurrent futures, Spark and Ray, which have performed significantly better than multiprocessing.
Different approaches to scale
For benchmarking, I have used the same dataset, machine spec, forecasting procedure, and hyperparameters for different approaches.
The dataset contains historical daily sales data of 10 stores, each containing 50 individual items over the past 5 years from 2013–2017. The task is to forecast sales for the next 90 days for each store item combination, which requires training and forecasting of 500 unique time series.
To benchmark multiprocessing, concurrent futures, and Ray, I used my personal machine (MSI GL62MVR, I7, 16GB RAM, 8 Core Setup)
For distributing on Spark, I provisioned a cluster with two worker nodes(n1-standard-4) 4 cores each using GCP Dataproc.
Forecasting Procedure: FBProphet
FBProphet is a procedure for forecasting time series. It is used as an example, but the function can be drop-in replaced with any forecasting model and scaled. One of the reasons for using this procedure is its efficient implementation using stan. When scaling these procedures, it's important to make sure they are performance-optimized. This helps to bring the cost per forecast when scaled using resources in the cloud.
To distribute FBProphet to different cores, I created a function called run_prophet. The parameters in the function are fixed for all store item combinations, and it returns a result DataFrame with the predictions.
Approach 1: Sequential
For a baseline, the prophet time series procedure was trained and forecasted sequentially. The entire process took about 1 Hr 20 Mins to forecast sales for each of the 500 store item combinations.
Approach 2: Multiprocessing
Multiprocessing is a package that supports the spawning of multiple processes. The Pool object in the code helps in parallelizing the execution of the run_prophet functions. imap() runs the run_prophet function asynchronously for each Data Frame in the iterable series and it blocks till the results are ready. Join() method waits for the worker process to exit.
Approach 3: Concurrent futures
ProcessPoolExecutor is an easier and efficient way to run multiple processes, It's available in the concurrent.futures module and it's best used with a context manager. The map() method runs the run_prophet function asynchronously for each Data Frame in the iterable series.
Approach 4: Ray
Ray provides a simple and universal API for building distributed systems. It enables to parallelize single machine code, with zero code changes. As seen from the example, Ray can be started using ray.init() method. Ray will then enable you to utilize all of the cores on your machine. By adding @ray.remote decorator the run_prophet function becomes a remote function that can be executed asynchronously as a task executed on a worker process. The results from the function can be retrieved with ray.get() method.
Approach 5: Spark
Apache Spark is a data processing engine and it also helps to distribute training of a large number of models, its processing number is limited only by the number of nodes that can be provisioned. For this benchmark, I provisioned a managed spark cluster using GCP Dataproc with 2 worker nodes(4 cores each).
Since each worker in Spark needs to have a subset of data, in the below snippet, I group by sales data for each store item combination.
Using the power of pandas UDF as declared using the decorator @pandas_udf, the model can be trained for each subset of data available in the grouped Data Frame. In addition to this, the UDF also collects the data into a single resulting DataFrame. The job ran around 12 mins.
Run the benchmarks
The results for these benchmarks can be reproduced using the Jupyter notebook.
Before running the notebook activate the conda environment.
The code was run on my personal laptop(MSI GL62MVR, I7, 16GB RAM, 8 Core Setup).
For spark, use GCPs free credits to create a DataProc cluster with 2 worker nodes. The spark code for the benchmark can be found here.
From the benchmarks, we can see that Ray and Spark frameworks took less than half the time than that of the modules multiprocessing and concurrent futures took to train and forecast.
The major reason for this being, Ray does efficient handling of numerical data and has an ability to share variables between tasks. Similarly, Spark with its distributed processing and ability to scale does an equally impressive job.
I personally like Ray because of its little to no code change to scale the time series forecast models in a single machine. Even with a larger number of models to train, Ray can scale effortlessly by provisioning a larger instance with an increased number of cores. In addition to this, Ray workloads can automatically recover from machine and process failures.
In some benchmarks
“Ray is 10–30x faster than serial Python, 5–25x faster than multiprocessing, and 5–15x faster than the faster of these two on a large machine.”
more info on this benchmark can be found here 10x-faster-parallel-python-without-python-multiprocessing by Robert Nishihara
Spark does an equally good job with training and forecasting. The major advantage of using Spark is its ability to scale from 1000s to millions of models, which is only limited by the size of the cluster. So depending on the use case Spark or Ray does an amazing job of scaling time series forecasting models.
10x Faster Parallel Python Without Python Multiprocessing
While Python’s multiprocessing library has been used successfully for a wide range of applications, in this blog post…
Time Series Forecasting with Prophet & Spark
Try this time series forecasting notebook in Databricks Advances in time series forecasting are enabling retailers to…
Forecasting multiples time-series using Prophet in parallel
A short story about multiprocessing
Introduction to multiprocessing and concurrent futures modules https://www.youtube.com/results?search_query=corey+shafer+multiprocessing