Running distributed hyperparameter optimization with Optuna-distributed
Optuna is an automatic hyperparameter optimization software framework, particularly designed for machine learning. Parallelized hyperparameter optimization is a topic that appears quite frequently in Optuna issues and discussions. There are a few approaches to it, including builtin thread-based solutions. Today, however, I’d like to present a slightly different solution, one that’s based on process-based parallelism and can easily scale hyperparameter optimization tasks from one machine, to multiple machines in a cluster. Introducing Optuna-distributed! In this article I will present the architecture of Optuna-distributed and go over the details and design decisions behind each major component. We will also finish things off with a small example.
High level overview
Optuna-distributed is an extension to regular Optuna, providing tight integration with base library by wrapping Optuna’s
Study and introducing new implementation of
optimize method (other methods behave like original), which leverages process-based parallelism by default and is able to utilize Dask cluster to distribute computation among many machines. Architecture-wise, Optuna-distributed implements actor model and splits the environment into one main (user) process and many worker processes. Main process contains the event loop which has exclusive access to standard Optuna study and as a consequence to storage, sampler and pruner. Workers are tasked with executing user-defined objective functions asynchronously, and they communicate with the main process via a simple messaging system. Within a message, a worker may ask for hyperparameter suggestions, report a new objective or intermediate value. Communication is bidirectional, so the main process might respond to a worker with data required to continue the trial. Optimization state is tracked within the event loop by entities called managers, which can spawn additional workers, monitor progression on each trial, or request to end the optimization process altogether. Such design removes the need for lock-based synchronization at storage level, avoids sharing memory between tasks, enables usage of all standard Optuna components in distributed mode, and provides an easy way to scale asynchronous optimization process from single machine to many machines in cluster.
With all major components briefly introduced, let’s dive into the details.
We jump into the event loop right after
optimize was called, and stay there for the duration of the optimization process. The event loop has its own event queue and does one iteration every time a worker puts a message there. Messages are read and processed one by one, ensuring sequential access to study resources. The event loop provides these resources as a context when a message is being processed. Since messages are effectively synchronized at the event queue level, all following operations in the loop are basically equivalent to the Optuna synchronous optimization flow — with each message a progress bar is updated (when enabled of course!), exit conditions are checked and exceptions from objective function are raised. However, even though messages might wait a moment in the queue to be processed, workers who send them in many cases don’t have to wait for this to happen, so things like intermediate value or status updates don’t carry additional overhead.
Distributed trial and objective function
To provide all trial functionalities within an objective function without direct access to storage and sampler, a new type of
Trial is defined. On the surface,
DistributedTrial encapsulates exactly the same functionality and is exposing the same APIs as its well-known counterpart, making already existing objective functions usable within a distributed environment. Internally, however, instead of holding direct references to study resources, we only have access to inter-process communication primitives, which allows us to send messages back to the main process and receive responses if necessary. This way we can keep the code executed outside of the main process to a minimum, as only the instance of
DistributedTrial and user-defined objective function are sent over to a worker. This pair however needs some sort of small environment to run within, providing things such as error recovery, cleanup after a trial finishes, and in some cases nanny thread allowing it to interrupt the running objective function. Final setup looks like this:
This setup relaxes serializability requirements on custom storages or samplers users might want to use, as they will always stay in the main process, but still require objective functions to be serializable. The latter, however, will always be a limitation in the case of asynchronous optimization.
Messages and IPC
So far we’ve seen what happens in the main process event loop and in the workers. Now it’s time to discuss a bridge that connects the two — the messaging system. Messages consist of two parts — data and code. In the majority of the cases, a message is instantiated and populated with data by the worker, sent to the main process and picked up within an event loop, where message code is executed with full access to Optuna study. As an example, let’s follow a path of
SuggestMessage— a message used to request hyperparameter suggestions.
As usual in Optuna, a user may request hyperparameter suggestions by calling one of the
suggest_* APIs within the objective function. After doing so,
DistributedTrial instance will instantiate
SuggestMessage, providing information on the trial that is being executed by the worker, the name of the hyperparameter and the distribution we are sampling from. After that, the message is serialized, and
DistributedTrial uses its IPC connection to send it over to the main process. After being picked up inside the event loop,
process method is called on the message, with study and optimization manager provided as arguments. Since now we have all the data needed to make a hyperparameter suggestion, we can call regular Optuna APIs to do so. Now that we finally have our suggested value, there is one more important step before finishing the process — we have to send it back to the worker! As previously mentioned, communication between the main process and workers is bidirectional. All we need to do is ask the optimization manager to provide a return connection to the worker that has originally sent the message, and use it to send back the response. The worker will pick up the response data, and return it to the
suggest_* method caller, oblivious to the fact that his hyperparameter might just have traveled many kilometers to reach him.
This pattern of message and optional response is followed to provide all regular
Trial functionalities. Messages that don’t require responses, such as intermediate value reports, do not block trial progression, as
DistributedTrial can just send them and let the optimization process continue. This is in contrast to synchronization at the storage level, where each operation has to request and wait for a storage lock. However, the main benefit of this solution is that the IPC system is abstracted away. We don’t care if messages are sent between two processes running on the same machine or between multiple machines via a network. This makes Optuna-distributed easy to scale from simple process based asynchronous optimization on your local machine, to large scale multi machine cluster-based workloads.
The last component to discuss is the optimization manager. As you might have noticed by now, managers are a neat way to provide a unified API to optimization backend and carry any state associated with it. Currently in Optuna-distributed there are two types of managers — local and distributed. The first one allows to schedule and execute trials by spawning and controlling a local process for each one, using the standard
multiprocessing library. Second leverages Dask client to communicate with the scheduler, allowing it to run the optimization process in distributed mode on a Dask cluster. Each manager has its own flavor of
IPCPrimitive available to use as a task queue as well as a response communication method with workers. Since managers have a full overview of progress on each trial, it’s up to them to decide when the event loop should close, what should be executed before and after message processing, and which trials should be interrupted in cases where timeout is reached or the user manually interrupts the optimization process.
Let’s finish things off with an example! Before we start, if you would like to follow along, Optuna-distributed can be installed from PyPI by running
pip install optuna-distributed. If you are using a Dask cluster and want to run the example in distributed mode, remember that Optuna-distributed (and all other dependencies) has to be installed on each of the workers. You can read about cluster setup in Dask documentation. There are also different examples available in the Optuna-distributed examples section, showing how to use Dask client in distributed optimization cases, presenting usage of Optuna
RDBStorage or demonstrating optimization with pruning. In this example, however, we will locally optimize a simple quadratic function, but include a simulated “long” running process to show a difference in evaluation times. Full code to the example is available here. Our objective function is defined as follows:
As you can see,
trial behaves exactly like in the standard Optuna. In fact, to demonstrate this, we will run two rounds of optimization with the same study object — first by using regular
optimize, and second by wrapping the study with Optuna-distributed to run the optimization in asynchronous mode using all 4 CPU cores my machine has to offer. As a result, we should see an approximately 4x improvement in optimization times with similar best objective function values reached.
Running the above yields:
And that is the entire Optuna-distributed in a nutshell! For now, development will continue as there are still missing features, and awkward pieces of logic to improve on, so stick around for future releases! Thank you for reading, and if you have any questions or ideas, feel free to raise an issue or open a PR. Also, big thanks to the entire Optuna dev team for having me here!