Analytics Vidhya
Published in

Analytics Vidhya

Understanding OpenAI baseline source code and making it do self-play! Part 3

In the previous articles, we went through how OpenAI builds and some parts of it that I found interesting. You can check these out part 1 and part 2 here and here!

SubprocVecEnv __init__: a dive into multiprocessing

The __init__ function is as follows

class SubprocVecEnv(VecEnv):
"""
VecEnv that runs multiple environments in parallel in subproceses and communicates with them via pipes.
Recommended to use when num_envs > 1 and step() can be a bottleneck.
"""
def __init__(self, env_fns, spaces=None, context='spawn', in_series=1):
"""
Arguments:
env_fns: iterable of callables - functions that create environments to run in subprocesses. Need to be cloud-pickleable
in_series: number of environments to run in series in a single process
(e.g. when len(env_fns) == 12 and in_series == 3, it will run 4 processes, each running 3 envs in series)
"""
self.waiting = False
self.closed = False
self.in_series = in_series
nenvs = len(env_fns)
assert nenvs % in_series == 0, "Number of envs must be divisible by number of envs to run in series"
self.nremotes = nenvs// in_series
env_fns = np.array_split(env_fns, self.nremotes)
ctx = mp.get_context(context)
self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.nremotes)])
self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))
for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]
for p in self.ps:
p.daemon = True # if the main process crashes, we should not cause things to hang
with clear_mpi_env_vars():
p.start()
for remote in self.work_remotes:
remote.close()
self.remotes[0].send(('get_spaces_spec', None))
observation_space, action_space, self.spec = self.remotes[0].recv().x

self.viewer = None
VecEnv.__init__(self, nenvs, observation_space, action_space)

The first lines are used to initialize

self.waiting = Falseself.closed = Falseself.in_series = in_seriesnenvs = len(env_fns)

Then,

assert nenvs % in_series == 0, "Number of envs must be divisible by number of envs to run in series"self.nremotes = nenvs// in_series

Here, the in_series variable comes into play. The nenvs variable is the number of environments but the in_series variable, as the text says, seems to divide up these environments to in_series many of processes and allocate nenvs // in_series many environments per each process or vice versa!

env_fns = np.array_split(env_fns, self.nremotes)

Here, we see that it was the other way around. There are nenvs // in_series many processes and each runs in_series many environments as np.array_split function does. This function just divides them up and make the base index choose which portion to look at. For example,

np.array_split(np.arange(6), 2)

will returns [array([0, 1, 2]), array([3, 4, 5])]. The outer portion is interestingly a list and not an np array. Then,

ctx = mp.get_context(context)

is run. The mp came from

import multiprocessing as mp

According to the documentation, apparently, when you use a context, you can treat each of them as a separate object. I’m not entirely sure what that means but let’s proceed!

Next,

self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.nremotes)])

is run. First, what is ctx.Pipe()? Let’s check! According to the same documentation, Pipes make two connected objects as follows

parent_conn, child_conn = Pipe()

What you can do from here is that they can further talk to each other! The way you do this is

parent_conn.send sends a message to the child_conn and vice versa!

On the other hand, parent_conn.recv() gets the message from the child node and vice versa again.

Thus, here, self.remotes correspond to the list of parent nodes and self.work_remotes correspond to the list of the corresponding child nodes.

Then,

self.ps = [ctx.Process(target=worker, args=(work_remote, remote, CloudpickleWrapper(env_fn)))for (work_remote, remote, env_fn) in zip(self.work_remotes, self.remotes, env_fns)]

is run.

Target the function that is run when the process is run. This can be done either by calling .run() or by .start() which calls run in a separate process!

The args are simply the arguments to the target function. In this case, this is the worker.

The CloudpickleWrapper is defined as follows!

class CloudpickleWrapper(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
"""
def __init__(self, x):
self.x = x
def __getstate__(self):
import cloudpickle
return cloudpickle.dumps(self.x)
def __setstate__(self, ob):
import pickle
self.x = pickle.loads(ob)

What it basically does is to save the env_fn function in x. Then, __getstate__ function returns the serialized representation of self.x. This means, basically, that if you save the thing returned from cloudpickle.dumps(self.x) to a file, you can later load it by calling def __setstate__ to retrieve all the attributes and the like! The reason cloudpickle is used for __getstate__ is, according to here, because it supports more stuff.

Then,

for p in self.ps:
p.daemon = True # if the main process crashes, we should not cause things to hang
with clear_mpi_env_vars():
p.start()

Is run. daemon, in a computer science perspective, means background process. So, it makes sense that even if it crashes, nothing bad happens!

For clear_mpi_env_vars, according to the comments,

"""from mpi4py import MPI will call MPI_Init by default.  If the child process has MPI environment variables, MPI will think that the child process is an MPI process just like the parent and do bad things such as hang.This context manager is a hacky way to clear those environment variables temporarily such as when we are starting multiprocessingProcesses."""

which is interesting. Then, finally,

p.start()

and the environments started!

for remote in self.work_remotes:
remote.close()
self.remotes[0].send(('get_spaces_spec', None))
observation_space, action_space, self.spec = self.remotes[0].recv().x
self.viewer = None
VecEnv.__init__(self, len(env_fns), observation_space, action_space)

I do not understand why the close method is here so if anyone does, please tell me! I think it has something to do with initialization but I’m not entirely sure.

Then, the specs are retrieved from the environment and VecEnv’s init function is called.

But this is a new way of calling it! I’m used to seeing

class a:
def __init___(self):
self.a = "a"
def hello(self):
print(self.a)
class b(a):
def __init__(self):
super(b, self).__init__()

and not directly a.__init__ in the function. I’m not completely sure but I think this allows the values of attributes of a to be copied to b because usually, you can’t do that.

VecEnv

class VecEnv(ABC):
"""
An abstract asynchronous, vectorized environment.
Used to batch data from multiple copies of an environment, so that
each observation becomes an batch of observations, and expected action is a batch of actions to
be applied per-environment.
"""
closed = False
viewer = None
metadata = {
'render.modes': ['human', 'rgb_array']
}
def __init__(self, num_envs, observation_space, action_space):
self.num_envs = num_envs
self.observation_space = observation_space
self.action_space = action_space

Since it’s setting attributes, I suspect I’m right! Now, let’s check out the reset, step, and render functions!

def step(self, actions):
"""
Step the environments synchronously.
This is available for backwards compatibility.
"""
self.step_async(actions)
return self.step_wait()
def render(self, mode='human'):
imgs = self.get_images()
bigimg = tile_images(imgs)
if mode == 'human':
self.get_viewer().imshow(bigimg)
return self.get_viewer().isopen
elif mode == 'rgb_array':
return bigimg
else:
raise NotImplementedError
@abstractmethod
def reset(self):
pass
raise NotImplementedError@abstractmethod
def reset(self):
pass

The @abstractclass is a decorator given by the abc(the abstract base class) module and what it does is basically say that this method needs to be overridden.

So, basically, for each step, the step_async function is run and the step_wait function is returned. And these functions are given in SubprocVecEnv!

step_async

def step_async(self, actions):
self._assert_not_closed()
actions = np.array_split(actions, self.nremotes)
for remote, action in zip(self.remotes, actions):
remote.send(('step', action))
self.waiting = True

Thus, the remote sends the string “step” and the action to its child and set the variable self.waiting to True.

step_wait

def step_wait(self):
self._assert_not_closed()
results = [remote.recv() for remote in self.remotes]
results = _flatten_list(results)
self.waiting = False
obs, rews, dones, infos = zip(*results)
return _flatten_obs(obs), np.stack(rews), np.stack(dones), infos

We see that remote receive back a message from its child which is the states of the environment! We see that it returns obs, observations, rews, rewards, dones, whether it’s over or not, and infos which I presume is extra infos after it flattened the list by running _flatten_list.

Then, after some processing, it returns them! Now, let’s look at the worker, the function that actually runs everything(the target of the processes)

But, before continuing, let’s check out flatten_list!

flatten_list

I must say I was quite confused when I first saw this.

def _flatten_list(l):
assert isinstance(l, (list, tuple))
assert len(l) > 0
assert all([len(l_) > 0 for l_ in l])
return [l__ for l_ in l for l__ in l_]

So, let’s first look at l_. l_ is the elements of l_ as it is written for l_ in l. Thus for each element of l_, a thing called l__ is returned. This is the elements of l_ as it saids for l__ in l_. Thus we can see that this is a shorthand for a 2d loop of putting the indices in a 1d array by putting all the rows next to each other.

worker

def worker(remote, parent_remote, env_fn_wrappers):
def step_env(env, action):
ob, reward, done, info = env.step(action)
if done:
ob = env.reset()
return ob, reward, done, info
parent_remote.close()
envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]
try:
while True:
cmd, data = remote.recv()
if cmd == 'step':
remote.send([step_env(env, action) for env, action in zip(envs, data)])
elif cmd == 'reset':
remote.send([env.reset() for env in envs])
elif cmd == 'render':
remote.send([env.render(mode='rgb_array') for env in envs])
elif cmd == 'close':
remote.close()
break
elif cmd == 'get_spaces_spec':
remote.send(
CloudpickleWrapper((envs[0].observation_space, envs[0].action_space, envs[0].spec)))
else:
raise NotImplementedError
except KeyboardInterrupt:
print('SubprocVecEnv worker: got KeyboardInterrupt')
finally:
for env in envs:
env.close()

First for,

def step_env(env, action):
ob, reward, done, info = env.step(action)
if done:
ob = env.reset()
return ob, reward, done, info

This appears to call the step function of our environments and returns the observations, rewards, whether the environment is done, and the extra info and return them!

In addition, here, if the environment is done, the environment is reset and the first observation is returned and that replaces the observation which is quite cool!

Then,

parent_remote.close()
envs = [env_fn_wrapper() for env_fn_wrapper in env_fn_wrappers.x]

I initially thought this makes a new environment all the time but if this was the case, I do not think any forms of games are possible. Thus, I suspect that the __get_state__ and __set_state__ methods play a role to maintain the environment’s state. But I don’t see it anywhere so I am not perfectly sure.

It seems like the worker runs the env function and when it’s done, it sends the data to the remote! There’s a lot of closes in this function which might indicate, that it is to cut off communication for the moment. For example, if two messages are sent and the child wants to read one at a time, I think this does something like closing the communication so that the second message doesn’t come through yet, read the first message then wait for the next message. But I’m not completely sure.

Another interesting thing for me though is that in the reset function, the observation needs to be sent back too! So I’ll do that!

Now, let’s go back to run.py’s build_env!

build_env

After that is done, build_env just returns the environment. Then let’s go up to train!

train

After build_env is done,

if args.save_video_interval != 0:
env = VecVideoRecorder(env, osp.join(logger.get_dir(), "videos"), record_video_trigger=lambda x: x % args.save_video_interval == 0, video_length=args.save_video_length)

Is done. This, just records the video using gym.wrappers.monitoring’s video_recorder.

Then, after the network is set as follows,

if args.network:
alg_kwargs['network'] = args.network
else:
if alg_kwargs.get('network') is None:
alg_kwargs['network'] = get_default_network(env_type)

model is set to

model = learn(
env=env,
seed=seed,
total_timesteps=total_timesteps,
**alg_kwargs
)

which basically is the learning function of all the chosen algorithm!

And finally, model and env is returned

return model, env

Then, finally, let’s go into the learn function to see what we should do!

ppo2’s learn function

As there are lots of algorithms, but as I presumed their expectations of what the environment does should be similar, I decided to look at ppo2’s learn function! I chose ppo2 because, well, I know the most about it all the others.

Inside the learn function, the first instance we see interactions with the environment is

obs, returns, masks, actions, values, neglogpacs, states, epinfos = runner.run() #pylint: disable=E0632

Now, what is runner? It is defined as

runner = Runner(env=env, model=model, nsteps=nsteps, gamma=gamma, lam=lam)

and the Runner module is defined in baselines/ppo2/runner.py. In here, we see

self.obs[:], rewards, self.dones, infos = self.env.step(actions)

In the run function. As well as

actions, values, self.states, neglogpacs = self.model.step(self.obs, S=self.states, M=self.dones)

before it.

Overall, we see that while it was an interesting journey, we have returned to the quintessential question: what should we do to make this a self-play environment?

Next

In the next article, I’ll discuss how I implemented this here!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store