Analytics Vidhya
Published in

Analytics Vidhya

Understanding OpenAI baseline source code and making it do self-play! Part 4

Here, we will finally try to implement the self-play functionality to the OpenAI baselines! To see part 1, part 2, and part 3, check out here, here, and here. I think this will be a bit hard to follow without that foundation. I’ll try to implement the actual self-play functionality in baselines here so I hope it is fun! But, I’ll talk about my own workflow which I’m not used to so if you find some parts hard to understand, please tell me!

To view the code and follow along, check out here!

One requirement

One requirement of my environment is that both sides need to be able to make decisions at the same time. In that, I want the model to update both sides or any number of sides before the environment takes a step.

Thus we won’t send the observation of a certain side each time step and update the environment. This was my original idea but I think it won’t work.

Then how about we run each action from the side chosen and concatenate them and then finally run step and update the environment? For example, in the step function, if we just loop through all the sides and do an action based on what they observe, then update the environment, we can effectively make it like we have sides number of extra environments!

How are environments generated again?

So far, we saw that the functionalities that make_env provides overall get the environment that we made and just wraps it with additional functionalities. Thus the original functionalities don’t change. Thus, we can still access the attributes of the environment.

This explains why to accomplish our goals, in SubprocVecEnv we need

  1. In the worker function, it needs to send the action of all sides at once to the remote/environment. Or, to be more specific, it needs preparation for the step phase and the actual step phase. This can be either be done in the worker function or the actual environment. I think I’ll do it in the actual environment!
  2. The observation space is based on the same environment for the side number of players(they can be different in that you cannot see some of the players etc). Thus, the worker needs to send back the observations in the appropriate manner too!

Thus, my idea is to change it so that

  1. If the environment has an attribute “sides”, which will denote the number of sides the environment has, nenv is multiplied by this quantity.

I wasn’t sure how to do this because it will be quite messy to first make the environment in SubprocVecEnv and continue like that because, in the process of making the environments, we first need the number of environments.

I’m quite sure it’s possible but every way I thought of became quite messy. So, I decided to add an argument in common_arg_parser called “no_self_play” that is initially set to False and if the user puts


It will be True and thus makes a usual environment without sides.

For this, I went to the make_vec_env function and did

if not env_kwargs.get("no_self_play", True):
num_env *= env_kwargs.get("sides", 2)#The default number of sides is 2

Where no_self_play is an argument the user(in this case me!) provides. However, I found that the arguments that the user sends don’t go inside the env_kwargs argument in build env. Initially, env_kwargs is set to None and

env_kwargs = env_kwargs or {}

Is run to make it an empty dictionary.

Now, since I want the user’s arguments to go in env_kwargs argument, we just need to convert the args in build_env to a dictionary from arg_parse and then just provide it as an argument!

Converting the arg_parse args to a dictionary can be easily done by just doing(Thanks Raymond!)

args_dict = vars(args)

which is quite cool honestly!

Then, I just called make_vec_env by

env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations, env_kwargs=args_dict)

Then, we have successfully increased the number of environments by sides!

2. In SubprocVecEnv, only have nenvs//sides many of environments actually running and apply each sides many of indices of actions to each environment.

This may not be the most elegant solution as I am quite a newcomer to all the fancy threading and multiprocessing so if you have a better one, please tell me!

In the __init__ function, I wrote

if(hasattr(env_fns[0](), 'sides')):
self.sides = env_fns[0]().sides
self.sides = 1

Then, as nenvs//1 == nenvs, there should be no difference in what the code does. I’m still bug testing quite a bit so this might change!

Anyway then, before the array_split on the env_fns here

env_fns = np.array_split(env_fns, self.nremotes)

I did

env_fns = env_fns[:nenvs//self.sides]

To get only the first nenvs//self.sides many environments because the rest will just be not used! Then, I changed the assertion and self.n_remotes to be

assert nenvs//self.sides % in_series == 0, "Number of envs must be divisible by number of envs to run in series"self.nremotes = nenvs //self.sides // in_series

Here, the reason I did not change the value of the n_envs variable is that because in the actual learn function which trains the algorithms on the environment(at least in ppo2), I saw

nenvs = env.num_envs

which is later used to compute the batches! So, I can’t alter this number because then, I can’t account for the number of sides in the environment!

Anyway, overall, this should make so that out of number of environments * self.sides, nenv actions, each self.sides actions form a chunk which calls the correct environment!

Then, I went to step_async and changed it so that it does sides number of action on each side by calling the action of action i on remote i//self.sides!

def step_async(self, actions):
actions = np.array_split(actions, self.nremotes)
for j in range(len(actions)):
for action in actions[j]:
self.remotes[j].send(('step', action))

self.waiting = True

Update: Sorry, it originally did not work because the index of self.remotes was wrong originally. Fixed 2019/12/8


However, here, there is one slight problem. There is no way for the environment to know which side the action it’s being sent corresponds to! It can be from the first side or the second side and overall, it’s confusing.

To solve this, I went to my environment and added an attribute called side which is initially set to 0 in the __init__ function as follows

My environment

self.side = 0

Then, I modified my environment so that you can set which side you want the action to correspond to like

def step(self, action):
side = self.side
self.action[side] = action
self.finished_sides[side] = 1
self.side += 1
self.side %= self.sides
if self.finished_side[self.finished_side == 0].shape[0] == 0:
return None, None, None, None

where self.finished_side is initially all 0 and has size of sides as follows

self.finished_side = np.zeros(self.sides)

I’m sure that

self.finished_side[self.finished_side == 0].shape[0] == 0

is not the most beautiful way of doing this I’ll think of a better one liner later but for now, it works!

The update_step runs the environment and returns the observation, rewards, whether it’s done, and an empty dictionary for all the sides

def update_step(self):
done = False
if t>=self.terminate_turn or self.end():
done = True
self.t += 1
self.finishe_sides[...] = 0
dones = [done for _ in range(self.sides)]
infos = [{} for _ in range(self.sides)]
return self.obs, self.rewards, dones, infos

Here, I made it so that for all the things returned, such as self.obs, the base index shows which side it is. So, for example, the side 0’s observation can be found at self.obs[0]!

Next, in step_wait, since Nones are returned for some, we need to modify the code slightly to make it so that the data is distributed across all environments so that the model sees the right observation and each action corresponds the right side and environment.


Now, while the step_wait function initially started off with

results = [remote.recv() for remote in self.remotes]

we need to modify it slightly to be

results = [self.remotes[i//self.sides].recv() for i in range(len(self.remotes)*self.sides)]

which will get sides number of results from each remote!

Then, this is just to make things smooth with the reset functions later on but I decided to order the results array so that the first len(self.remotes) or nenvs // self.sides will be the actual observations and not have the observations occur at every i % self.sides = self.sides-1 response(this is the case because I get the response only once all the sides have recorded their action)

So, after the _flatten_list function which removes the effect of in_series to a certain extent(at least that’s my understanding!) I did

data = results.copy()[self.sides-1::self.sides]
results = np.asarray(results)
results[:len(self.remotes)] = data

Now, that’s done, I passed the results variable through my horribly named method, tactic_game_fix_results, if the self.sides is larger than one(meaning it is a self-play environment)!

def tactic_game_fix_results(self, results):
for i in range(len(results)-1, -1, -1):
for j in range(len(results[i])):
results[i][j] = results[i//self.sides][j][i % self.sides]
return results

Basically, what happens here is that i iterates through the results from behind(where it’s all Nones) and then go forward overwriting those Nones with actual observations, rewards, done, and info of that agent i. I made it so that that agent sees the i//self.sides environment’s side i % self.sides which I think is quite consistent! The j just loops through observation, rewards, done, and info. So, len(results[i]) here is 4!

Thus, overall, the step_wait function ended up as

def step_wait(self):
#do recv on the same remote several times
results = [self.remotes[i//self.sides].recv() for i in range(len(self.remotes)*self.sides)]
results = _flatten_list(results)
data = results.copy()[self.sides-1::self.sides]
results = np.asarray(results)
results[:len(self.remotes)] = data
#push the observations to the first portion of the results array.
if self.sides > 1:
results = self.tactic_game_fix_results(results)
self.waiting = False
obs, rews, dones, infos = zip(*results)
return _flatten_obs(obs), np.stack(rews), np.stack(dones), infos

I did not put the data portion inside the if self.sides > 1 because if self.sides = 1 there should be no change!


Since the reset function needs to return observations but doesn’t need any actions(since it’s the initial state), I can just do something like

def reset(self):
return self.obs

in my environment and in SubprocVecEnv, I changed reset to be like

def reset(self):
for i in range(len(self.remotes)):
self.remotes[i].send(('reset', None))
obs = [self.remotes[i].recv() for i, _ in enumerate(self.remotes)]
obs = _flatten_list(obs)
if self.sides > 1:
obs += [[None] for _ in range(len(self.remotes)*(self.sides-1))]
obs = self.tactic_game_fix_results(obs)
obs = zip(*obs)
obs = obs.__next__()
return _flatten_obs(obs)

which I am frankly not especially clean and I’m not that proud of looking at it aesthetically. But anyway, the main thing changed is

if self.sides > 1:
obs += [[None] for _ in range(len(self.remotes)*(self.sides-1))]
obs = self.tactic_game_fix_results(obs)
obs = zip(*obs)
obs = obs.__next__()

The first part pads the observation so there are enough observations for all the models. The self.tactic_game_fix_results should be self-explanatory too! Now, what occurs in the next lines? Well, basically, I tried to do something like

obs, rews, dones, infos = zip(*results)

To get the observation. What basically happens here is that the size changes from [num_agents, 1, rest of dims] to [num_agents, rest_of_dims] for reset’s obs.

However, in the step function what is returned is 4 values j is 4 but for the observation of reset, a problem that occurs is that since only 1 value, observation, is returned, j starts to iterate over the sides in the observation which broke everything and a debugging nightmare for me. Basically, what I did was to put the self.obs inside a list in my environment like as follows!

def reset(self):
return [self.obs]

Now, after a bit of debugging, I found another source of some bugs. This is the step_env function in the worker function. What this mainly does is when a step is requested, it just gets the observations, and all the nice outputs from the env’s step function and sends it for all the environments in a process like this!

if cmd == 'step':
remote.send([step_env(env, action) for env, action in zip(envs, data)])

The actual implementation is

def step_env(env, action):
ob, reward, done, info = env.step(action)
if done:
ob = env.reset()
return ob, reward, done, info

Here, I was a bit confused initially but it is important to note that this is the reset function of your environment and not the reset function or step function of your SubprocVecEnv or any of the wrapper!

Anyway, here, I have a problem. Exactly because I put the ob in a list like [ob] and returned in the reset function, I cannot just put it in ob. I need to get the 0th index of it.

Also, when we check for conditions of done, if we have a list, we need to check to see if an index is False or True to see if it’s over. This is because, in python, or maybe most languages,

bool([False, False])

returns True.

In my case, I made my environment where if one side is done, all of the sides are done. So, overall, I did

def step_env(env, action):
ob, reward, done, info = env.step(action)
if type(done) == list and done[0]:#For custom environments
ob = env.reset()
ob = ob[0]
if type(done) != list and done:#For non-custom environments
ob = env.reset()
return ob, reward, done, info

And that’s it! I basically separated into cases of if it was my game and a list was returned for done and the case of a usual gym environment.

I’m quite sure I might have overcomplicated stuff around here so if you think some places were badly implemented, please tell me because I’ll love to learn.

3. Rendering the game

Here, I just went to the render function, did the same thing as reset(put the rendered images in a list) for the environment and changed the get_image function to

def get_images(self):
for pipe in self.remotes:
pipe.send(('render', None))
imgs = [pipe.recv() for pipe in self.remotes]
imgs = _flatten_list(imgs)
if self.sides > 1:
imgs += [[None] for _ in range(len(self.remotes)*(self.sides-1))]
imgs = self.tactic_game_fix_results(imgs)
imgs = zip(*imgs)
imgs = imgs.__next__()
return imgs

4. This is slightly unrelated but since **kwargs are used to initialize the game, I made modifications to my environment

The environment is obtained with the code, in


where entrypoint is your module in! What this does, basically, is to initialize the function with all the arguments in addition in **_kwargs to the env_id parameter.

In addition, in the __init__ function of my class, I made it so that the kwargs becomes my attributes because, in my environment, I was doing something similar with already(argparse)! The way you do this, which I took from here(all credit to Mike Lewis!) is

for k,v in kwarg.iteritems():
setattr(self, k, v)

which is quite clever honestly!

However, since the gym.Envs function has an important method called seeds and that happened to be the same name as one of the arguments, in the build_env function I did

args_dict = vars(args)
del args_dict["seed"]
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations, env_kwargs=args_dict)

before I ran the make_vec_env

I was able to succeed with this method because previously the seed argument was saved to the seed variable that is being passed to the make_vec_env as

seed = args.seed

Some other minor details

  1. This might be a bit minor but I thought to be comprehensive I should include it. Remember the Monitor wrappers that we wrapped our environment as well as the reward_scale? As it happens, every time we call step in the, these functions add the reward or multiply the reward. So, since we returned None as the reward, this slightly complicated matters in that it gave errors. Thus, I needed to check if it was None but overall, I was able to fix that so it went nice!
  2. I noticed that gym did not know my id so I needed to manually import my custom environment in which registers it. It was kind of interesting! Maybe there’s a better way to go about it but for now, I’ll go with this!

End results

In the end, I was able to start training in a decent manner. I haven’t run a full-fledged test yet so there still might be bugs but so far it looks nice!

After roughly 10 minutes of training

The above is the result of the render! To view the code, check out here!



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store