Using Ray for Time Series Forecasting At Scale

Erkin Sagiroglu
KoçDigital
Published in
9 min readFeb 4, 2022

Distributed computing is not really a “should” but rather a must for the majority of today’s machine learning workflows most of which are developed on Python. While scaling ML projects is not a simple work, there are technologies that try to ease the process in terms of the time it takes to design the structure of a distributed application or the conversion process of an existing project into a distributed setup while maintaining the efficiency and the speed that comes with some challenges like fault tolerance, orchestration and utilization of the available resources(that might change by day), scheduling etc. Created by U.C. Berkeley’s RISElab and now developed by Anyscale, Ray tries to solve these challenges.

Ray is an open-source distributed technology that makes scaling Python applications a more intuitive and simpler process. It offers several libraries for scalable model training, model serving, hyper-parameter tuning, reinforcement learning and scaling any Python application. In this blog I will talk about Ray Core through a project that me and my colleagues participated in. It is about forecasting storage capacities of disks of a wide variety of servers. Basically, it is a time series forecasting problem at scale that involves thousands of disks of different servers whose storage values (%) are to be forecasted daily, so that measures/actions can be taken beforehand for the disks that are likely to reach a critical threshold like 95% or 100%.

Use Case

Let’s go through the use case in a bit more detail. The input of each disk is the daily disk storage values for the past month and the output is the utilization forecast values for the upcoming months. These forecasts are to be written into a database which are then to be used by alert systems as needed. Each disk has one of the five models assigned to it for daily forecasts since each disk has a different “best model” due to the fact that the recent behavior of a disk fits best with a particular model.

We use five models in total, one of which is ARIMA. A code snippet for ARIMA’s forecast part is;

Inference main steps

The main steps of a disk’s model inference can be summed up as such. Basically, a disk’s data (past month’s) is a pandas series and then converted into a darts series for inference and is finally returned in the appropriate output format. It is not an expensive operation at first sight but given the high number of disks which can vary and increase over time, the computation cost starts to become more noticeable.

These inference results are sent to the database batch by batch. So whenever there are sufficient number of inference results, they are sent to the database since there is a chance of a problem at any stage of the daily run and losing the inferences that were made beforehand would not be desired:

Synchronous flow

Predictions are made one after another in a synchronous fashion. Whenever 1000 inferences are made, these results are written to the database, then the inference process proceeds.

Synchronous flow

In this setting a daily run takes about 15 to 17 hours on an average machine which sounds quite risky. It is likely that the number of disks will rise day by day, which will lead to even longer runs. So firstly, there is a need for better utilization of the available resources let alone utilizing more than one machine although the latter does not take much of an effort to achieve with Ray.

Main Components of Ray

At its core, Ray has two components called tasks and actors and the different libraries of Ray are built on these components. While the documentation is clear and well organized (https://docs.ray.io/en/latest/walkthrough.html), I will briefly explain what a Ray task and an actor is since Ray Core is used in the project.

Ray Task

A Ray Task is a converted function that is able to execute instructions asynchronously. By placing @ray.remote decorator, the function becomes a Ray Task that Ray will create a worker process for on each function call. A remote function is called using the .remote() method and the return value of any remote function is a future which can be thought as the ID number of the eventual result of the function call. These function calls (i.e Ray Tasks) are scheduled, computed and returned as a result by Ray using the available resources. The real advantage here, however, is not only that it utilizes the resources of the host machine that it is running on, but Ray also has the ability to utilize the resources of a cluster with nearly zero code changes. So a remote function may execute on any machine on the cluster as a worker process which is the reason why it is called remote.

Our synchronous infer_arima_disk() function can be converted into a remote function by just placing a remote decorator.

a Ray Task definition

And it can be called with Ray s.t. the available resources are going to be utilized.

An example with a small subset of data of 50 disks

Just by applying this logic for each model, the time it takes for a daily run drops from 15–17 hours to 1–2 hours on an average server. There are a couple of points to be careful about. A Ray Task can actually be slower than an equivalent synchronous Python function. One of the factors that can lead to that is making the function too small. Under the hood, Ray transfers objects and communicates with workers which brings some overhead and if the function is too small this overhead can lead to a bigger delay than the function itself. For instance, in our case the infer_arima_disks() and all the other inference functions is a little different than the above implementation s.t. they take five disks’ data as input and return the corresponding five results of the disks which are then to be gathered later.

A single Ray task.

Inference happens synchronously for the five disks inside the function. However, for each function call a new worker process is created to run asynchronously with the rest of the worker processes, as the resources become available.

ray.wait()

As mentioned above, the database transactions are needed to be done in a batch fashion. The results must be sent to the database as soon as some portion of them is ready to be sent. In this case we do not have to deal with the manual process of dividing the data into batches for inference and then handing these batches to Ray as input (one batch after another), wait for a batch to finish(with ray.get), send it to the database, then giving Ray the other batch. ray.wait() does that for us asynchronously. It can obtain a batch result of a length that we specified as soon as there are enough number of results whose calculations are finalized while proceeding the rest of the futures’ computations on the background. In a way, it waits for a number of futures’ results to be ready, obtain it and hand it to us.

Thus, the new structure becomes;

Scheduling, utilization of available resources are handled by Ray.

Ray Actor

An actor is a stateful worker that executes its methods synchronously. This enables many use cases one of which is that they can be used as a service in a Ray cluster since an actor is essentially another worker process that was placed somewhere (on one of the nodes) in the cluster and they can be called by any other Ray worker in the cluster; stateless (a task) or stateful (another actor). The methods of an actor can be used by other actors or tasks by passing the actor handle as argument or by naming the actor instance as an option and calling it by name in any other task or actor method.

Now let’s look at some use cases for actors from our project.

ARIMA Issue

There is an issue about some of the inference results of ARIMA. It shows a different behavior from other models that we use and sometimes gives a constant result that is a continuous horizontal line. The reason is the number of data points can be insufficient for ARIMA and in that case inference result should be ignored and that disk’s data must be assigned to another model during the Ray task.

Bulk Change Issue

Another issue that affects the overall performance of all models is the bulk change problem. If there is an anomaly on a particular disk’s data, because of a disk cleaning operation or installing a big-sized software, the prediction of the model that the disk was assigned to, starts to act strangely because of this anomaly. For a healthy inference, in case of a such anomaly, the input data should be processed and shortened with respect to the anomaly and assigned to a specific model that particularly takes short input.

A Possible Solution

Due to these issues, there is a need to assign a disk’s data to another model and this should be done inside the Ray task. This can be an issue since our Ray tasks (e.g., infer_arima_disks()) are not aware of which model’s data they have, what are the disks that other models were assigned to and how can it access to other models’ disk data so that it can assign a troubled disk to another model. While there could be a variety of solutions, a quick workaround could be to create an actor that holds each model’s assigned disks’ data and has the ability to update them, acting as a data service. Basically it holds the assigned disks’ data of each model in a dictionary format(specifically; {key:model name, value:assigned disks’ data}), serves these data and appends to a model’s assigned disks as requested.

After creating the actor with remote method, it gets placed as a process somewhere in the cluster.

Creating the actor handle to pass into the Ray tasks.

Giving this actor handle to a Ray task as an argument makes it possible to call its methods inside the Ray task and update the state of the actor if needed, just like a stateful service.

Ray task update for disk data forwarding

Once the ARIMA disks are finalized, we can proceed with another model.

And we may see that the number of disks assigned to the LGBM has increased(If ARIMA assigned some disks to it).

Logging

Ray stores logs for any run but I wanted more of a centralized and quick solution for logging both Ray (tasks) and non-Ray processes in a single location. Tbh I didn’t really spend much time searching for it and that’s one of the good parts of Ray that you don’t necessarily need every tool to be there since it gives you a low level flexibility, which is why I went for a log actor; a quick and simple solution for our case. It stores two types of logs; INFO and ERROR and it can take logs from Ray tasks, store it, then retrieve it and remove it to prevent duplication on the next retrieve. The retrieve method was called from the non-Ray functions so that it is guaranteed to be logged on the driver node (may become a bottleneck if the application and the cluster grow so much).

Ray tasks send log to the LogHandlerActor as it can be passed as an argument to the Ray tasks;

Ray task update for logging

Then write the logs of the actor into the log file on the driver;

Writing the logs of the Ray tasks into the file on driver.

Both model holder actor and log handler actor can be called by many Ray tasks at the same time. Thanks to synchronous method execution fashion of Ray actors, even if they receive many requests from Ray tasks at the same time, they execute them one after another. This allows to protect its state and prevent a corruption that may happen when two programs are writing into the same location simultaneously.

The Real Advantage

Maybe we could find a workaround for our case with Python’s multiprocessing library, but the real advantage Ray gives us in our case is flexibility and comfort in terms of scalability. Once the “code on the laptop” is working as desired, it is rather a “straightforward” process compared to other technologies. There are many deployment options that Ray has. Anyscale, the company that develops Ray, offers a “laptop to cloud” solution. AWS, Azure, GCP are some of the other provider options for deploying Ray apps on the cloud. Ray also has a Kubernetes operator or you can go for an on-prem solution without k8s as well.

--

--