Using actors to manage model simulations with Dask.distributed

Willi Rath
3 min readJul 15, 2019

(This post is also available as a Gist on Github.com.)

During an E-CAM workshop, we discussed how to best use Dask.distributed to run dynamical model simulations (like the movement of a set of particles) and analyses (like statistics across all the particle positions at a given time) concurrently. As simulations often need a lot of internal information that is expensive to tear down / set up repeatedly, we want to spawn analyses of the state of the simulation without interrupting it.

There’s actors which are relatively new in Dask.distributed. Actors are meant to manage state that lives on workers.

The model simulation: A random walk for N particles

As a mock-up dynamical simulation, we chose N particles at positions (x, y) which are moved by normally distributed random steps without interacting with each other. As a simple analysis, we calculate the center of mass of all the N particles.

class Particles:    def __init__(self, N=100_000, seed=0):
"""Seed RNG and initialize positions."""
np.random.seed(seed)
self.xy = np.random.normal(size=(N, 2))

def move(self, steps=1):
"""Move particles."""
for step in range(steps):
self.xy += np.random.normal(size=self.xy.shape)

def calc_center_of_mass(self):
"""Find position of center of mass."""
return self.xy.mean(axis=0)

[...]

Run without Dask

We can initialize the experiment and, in alternating order, move particles and diagnose the center of mass.

particles = Particles()center_of_mass = []
for n in range(100):
center_of_mass.append(particles.calc_center_of_mass())
particles.move(2)
center_of_mass = np.stack(center_of_mass)
[...]

Plotting center_of_mass will give:

Centers of mass for 100_000 particles every 2 steps
Resulting plot of `center_of_mass`.

Running with an actor

To run the Experiment on a worker, we spin up a Dask cluster and submit the experiment with an actor that can be used to change and inspect the state of the experiment living on a worker.

from dask.distributed import Clientclient = Client(
n_workers=1, threads_per_worker=1,
memory_limit=400e6)
particles = client.submit(
Particles, actor=True, pure=False
).result()

Then, we use the Actor to do the same as above (move particles and get the center of mass but this time with the state of the experiment living on a Dask worker.

center_of_mass_on_worker = []for n in range(100):
center_of_mass_on_worker.append(
particles.calc_center_of_mass().result()
)
particles.move(2).result() # .result() is important!
center_of_mass_on_worker = np.stack(center_of_mass_on_worker)
[...]

In the end, we plot, getting the same figure as above:

Centers of mass for 100_000 particles every 2 steps

Beware of race conditions!

Above, we made sure to call particles.move(2).result() instead of just particles.move(2). While the latter works (the particles will be moved by two steps but the result is never gathered), it would not be certain that the next analysis (.calc_center_of_mass()) will be done after and not before the move.

To show that this can actually happen, we generate a counter that is incremented without waiting for the result before querying its state:

class Counter:
def __init__(self):
self.n = 0

def get_n(self):
return self.n

def increment(self):
self.n += 1
counter = client.submit(Counter, actor=True, pure=False).result()numbers = []
for n in range(500):
numbers.append(counter.get_n().result())
counter.increment()
numbers = np.asarray(numbers)

We'll find that np.all(np.diff(numbers) == 1 is False.

If we make sure to block until the incrementation is done, everything is fine:

counter = client.submit(Counter, actor=True, pure=False).result()numbers = []
for n in range(500):
numbers.append(counter.get_n().result())
counter.increment().result() # blocks until increment is done
numbers = np.asarray(numbers)

--

--