Analytics Vidhya
Published in

Analytics Vidhya

Understanding OpenAI baseline source code and making it do self-play! Part 2

In the previous section, we went all the way to the train function and stopped right before we actually build the environment. To check that out, click here!

build_env

In the next line, we see

env = build_env(args)

So, let us look at the build_env function!

def build_env(args):
ncpu = multiprocessing.cpu_count()
if sys.platform == 'darwin': ncpu //= 2
nenv = args.num_env or ncpu
alg = args.alg
seed = args.seed
env_type, env_id = get_env_type(args) if env_type in {'atari', 'retro'}:
if alg == 'deepq':
env = make_env(env_id, env_type, seed=seed, wrapper_kwargs={'frame_stack': True})
elif alg == 'trpo_mpi':
env = make_env(env_id, env_type, seed=seed)
else:
frame_stack_size = 4
env = make_vec_env(env_id, env_type, nenv, seed, gamestate=args.gamestate, reward_scale=args.reward_scale)
env = VecFrameStack(env, frame_stack_size)
else:
config = tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
get_session(config=config)
flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
if env_type == 'mujoco':
env = VecNormalize(env, use_tf=True)
return env

First,

ncpu = multiprocessing.cpu_count()
if sys.platform == 'darwin': ncpu //= 2
nenv = args.num_env or ncpu
alg = args.alg
seed = args.seed

appears to initialize parameters. Here, a quite common but interesting trick used is utilizing the nenv variable using the or keyword. What this does is quite intuitive once you consider what “or” does. First, a basic fact to note is that the or statement returns true if either one of the parameters is true. Thus, what happens is that if args.num_env is not None, it will be set to nenvs and otherwise it returns ncpu(according to here, it’s the number of online cpus)! It’s a neat trick to make your code shorter!

Next, we see

env_type, env_id = get_env_type(args)if env_type in {'atari', 'retro'}:

As we can specify the env_type from the arguments, and since our environment is a custom one, we can skip the contents of this if statement and go straight to the else portion.

else:
config = tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
get_session(config=config)
flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations)
if env_type == 'mujoco':
env = VecNormalize(env, use_tf=True)

First off, what is allow_soft_placement? According to here, it allows you can run operations on specific gpus. For example, you can do

with tf.device('/gpu:0'):

to allocate to the 0th gpu specifically which is interesting.

The next line which stands out is

env = make_vec_env(env_id, env_type, args.num_env or 1,        seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations

Now, let’s check out what make_vec_env does!

make_vec_env

make_vec_env resides in the cmd_utils.py

def make_vec_env(env_id, env_type, num_env, seed,
wrapper_kwargs=None,
env_kwargs=None,
start_index=0,
reward_scale=1.0,
flatten_dict_observations=True,
gamestate=None,
initializer=None,
force_dummy=False):
"""
Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo.
"""
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
seed = seed + 10000 * mpi_rank if seed is not None else None
logger_dir = logger.get_dir()
def make_thunk(rank, initializer=None):
return lambda: make_env(
env_id=env_id,
env_type=env_type,
mpi_rank=mpi_rank,
subrank=rank,
seed=seed,
reward_scale=reward_scale,
gamestate=gamestate,
flatten_dict_observations=flatten_dict_observations,
wrapper_kwargs=wrapper_kwargs,
env_kwargs=env_kwargs,
logger_dir=logger_dir,
initializer=initializer
)
set_global_seeds(seed)
if not force_dummy and num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index, initializer=initializer) for i in range(num_env)])
else:
return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)])

If we want multiple environments while running, which is the case for me! So, we will only look at the SubprocVecEnv. But I think it’s fair to say that it will be quite similar to DummyVecEnv in terms of expectations for the environment.

Before checking what that does, let us first check make_thunk. This is an interesting function in that it returns a function with arguments passed in. The way it does this is the following method

def a(args):
return lambda : b(args)

Which I think is kind of cool! Anyway, let’s not get distracted and look at the function returned: make_env.

make_env

def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None, initializer=None):
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
if ':' in env_id:
import re
import importlib
module_name = re.sub(':.*','',env_id)
env_id = re.sub('.*:', '', env_id)
importlib.import_module(module_name)
if env_type == 'atari':
env = make_atari(env_id)
elif env_type == 'retro':
import retro
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **env_kwargs)
if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):
keys = env.observation_space.spaces.keys()
env = gym.wrappers.FlattenDictWrapper(env, dict_keys=list(keys))
env.seed(seed + subrank if seed is not None else None)
env = Monitor(env,
logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)),
allow_early_resets=True)
if env_type == 'atari':
env = wrap_deepmind(env, **wrapper_kwargs)
elif env_type == 'retro':
if 'frame_stack' not in wrapper_kwargs:
wrapper_kwargs['frame_stack'] = 1
env = retro_wrappers.wrap_deepmind_retro(env, **wrapper_kwargs)
if isinstance(env.action_space, gym.spaces.Box):
env = ClipActionsWrapper(env)
if reward_scale != 1:
env = retro_wrappers.RewardScaler(env, reward_scale)
return env

For the first two lines,

    if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)

I’m not particularly sure what an initializer is but for the functions with all the previous functions that we looked sofar, the initializer was set None so let’s skip this for now!

The next interesting piece of code is

if ':' in env_id:
import re
import importlib
module_name = re.sub(':.*','',env_id)
env_id = re.sub('.*:', '', env_id)
importlib.import_module(module_name)

Basically, what happens here is that my custom environment will be loaded here. This is done by removing everything past: in env_id and returning the part before! The env_id, on the other hand, is set to the part following “:”. This is done by putting a period that matches basically every character except line breaks, and * indicates that it matches 0 or more of the preceding token(.). Doing re.sub here replaces the section highlighted by these patters with the second argument, ‘’, nothing. I find it quite fun to look at regex because as I am not especially familiar with it, trying to find what it does is like a little puzzle. Well enough of that! Let’s continue

if env_type == 'atari':
env = make_atari(env_id)
elif env_type == 'retro':
import retro
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **env_kwargs)

As our environment is neither atari or retro, gym.make function is run. Now, what is this function?

make

It took me a while to find it but in gym.make.registertation I found it!

registry = EnvRegistry()def register(id, **kwargs):
return registry.register(id, **kwargs)
def make(id, **kwargs):
return registry.make(id, **kwargs)

And this make function runs the function

def make(self, path, **kwargs):
if len(kwargs) > 0:
logger.info('Making new env: %s (%s)', path, kwargs)
else:
logger.info('Making new env: %s', path)
spec = self.spec(path)
env = spec.make(**kwargs)
# We used to have people override _reset/_step rather than
# reset/step. Set _gym_disable_underscore_compat = True on
# your environment if you use these methods and don't want
# compatibility code to be invoked.
if hasattr(env, "_reset") and hasattr(env, "_step") and not getattr(env, "_gym_disable_underscore_compat", False):
patch_deprecated_methods(env)
if (env.spec.max_episode_steps is not None) and not spec.tags.get('vnc'):
from gym.wrappers.time_limit import TimeLimit
env = TimeLimit(env, max_episode_steps=env.spec.max_episode_steps)
return env

First, what is kwargs? This is one of the nicer features in python and if you have a list of arguments in the form variable_name=variable when calling a function like(credit to geeks for geeks)

def myFun(arg1, **kwargs):
for key, value in kwargs.items():
print ("%s == %s" %(key, value))
# Driver codemyFun("Hi", first ='Geeks', mid ='for', last='Geeks')

It will print out

last == Geeks
mid == for
first == Geeks

That’s where its name came from! keyword arguments, kwargs. You can use *args to get usual arguments but this should before kwargs!

Then, this function goes onto run the spec function on the path to the env file.

spec

def spec(self, path):
if ':' in path:
mod_name, _sep, id = path.partition(':')
try:
importlib.import_module(mod_name)
# catch ImportError for python2.7 compatibility
except ImportError:
raise error.Error('A module ({}) was specified for the environment but was not found, make sure the package is installed with `pip install` before calling `gym.make()`'.format(mod_name))
else:
id = path
match = env_id_re.search(id)
if not match:
raise error.Error('Attempted to look up malformed environment ID: {}. (Currently all IDs must be of the form {}.)'.format(id.encode('utf-8'), env_id_re.pattern))
try:
return self.env_specs[id]
except KeyError:
# Parse the env name and check to see if it matches the non-version
# part of a valid env (could also check the exact number here)
env_name = match.group(1)
matching_envs = [valid_env_name for valid_env_name, valid_env_spec in self.env_specs.items()
if env_name == valid_env_spec._env_name]
if matching_envs:
raise error.DeprecatedEnv('Env {} not found (valid versions include {})'.format(id, matching_envs))
else:
raise error.UnregisteredEnv('No registered env with id: {}'.format(id))

The specs first look at the path argument. If it includes a module(includes :), it will try to import it and put the id portion into the id variable. Otherwise, it just sets the id variable to path.

Then,

match = env_id_re.search(id)

where

env_id_re = re.compile(r'^(?:[\w:-]+\/)?([\w:.-]+)-v(\d+)$')

Now, it’s time for some regex! Read here to follow along! The ^ operator matches the start of the string then the (?:…) matches everything inside the parentheses. This will match multiple(+) words(\w), colons and minus signs. This will be followed by /. ? after this indicates 0 or 1 repetition of the preceding pattern. Finally, it will match the words, colons, and minus signs again followed by — and a v and decimals(\d+) finally ending with $. $ indicates that it is the end of the line.. So, for example, if id has a pattern

env_name-v0

it will be a match! Thus, it must have been intended to be in the form environment name and then version

Thus we know what form the id must be in. The next line in the code tries to search the env_specs variable for the id. This should be added in the function called before the make function, the register function will make this key by

def register(id, **kwargs):
return registry.register(id, **kwargs)

And

def register(self, id, **kwargs):
if id in self.env_specs:
raise error.Error('Cannot re-register id: {}'.format(id))
self.env_specs[id] = EnvSpec(id, **kwargs)

In the EnvRegistry class. Thus, the EnvSpec of id is returned.

The EnvSpec class, according to the comments, stores “A specification for a particular instance of the environment. Used
to register the parameters for official evaluations.” Inside the init function, we see this in practice in the __init__ function of EnvSpec

def __init__(self, id, entry_point=None, reward_threshold=None,    kwargs=None, nondeterministic=False, tags=None, max_episode_steps=None):
self.id = id
# Evaluation parameters
self.reward_threshold = reward_threshold
# Environment properties
self.nondeterministic = nondeterministic
self.entry_point = entry_point
if tags is None:
tags = {}
self.tags = tags
tags['wrapper_config.TimeLimit.max_episode_steps'] = max_episode_steps

self.max_episode_steps = max_episode_steps
# We may make some of these other parameters public if they're
# useful.
match = env_id_re.search(id)
if not match:
raise error.Error('Attempted to register malformed environment ID: {}. (Currently all IDs must be of the form {}.)'.format(id, env_id_re.pattern))
self._env_name = match.group(1)
self._kwargs = {} if kwargs is None else kwargs

Back in the registry make function

The next line executes

env = spec.make(**kwargs)

So let’s go to Spec’s make function.

Spec make function

def make(self, **kwargs):
"""Instantiates an instance of the environment with appropriate kwargs"""
if self.entry_point is None:
raise error.Error('Attempting to make deprecated env {}. (HINT: is there a newer registered version of this env?)'.format(self.id))
_kwargs = self._kwargs.copy()
_kwargs.update(kwargs)
if callable(self.entry_point):
env = self.entry_point(**_kwargs)
else:
cls = load(self.entry_point)
env = cls(**_kwargs)
# Make the enviroment aware of which spec it came from.
env.unwrapped.spec = self
return env

First, entrypoint is taken and check if it’s a valid entrypoint. It, according to the comments, should be in the form

The Python entrypoint of the environment class (e.g. module.name:Class)

Or it can just be module.name

Then, it gets the environment either by calling self.entry_point or loading the entry point by calling

def load(name):
mod_name, attr_name = name.split(":")
mod = importlib.import_module(mod_name)
fn = getattr(mod, attr_name)
return fn

then calling that function!

And then running it with the keyword arguments. Then it sets self.unwrapped_spec but I’m not entirely sure what that does.

Registry make function

After that’s done

if hasattr(env, "_reset") and hasattr(env, "_step") and not getattr(env, "_gym_disable_underscore_compat", False):
patch_deprecated_methods(env)

First, it checks the env has the method _reset and _step, which I think is built into gym.Env. Thus, I suspect getattr(env, “_gym_disable_underscore_compat”, False)’s default output is False. We can check this by simply running

import gym
getattr(gym.Env, "_gym_disable_underscore_compat", False)

And it did return False! I used gym.Env because that is what our custom environments build on. As I inherited from gym.Env by doing

class Game_Env(gym.Env):
def __init__
...

Thus, the next line, which is

patch_deprecated_methods(env)

will be run. This basically sets the env.reset to be env._reset and env.step to env._step and so on. But wait, then what happens to the step functions we defined? To check if this function actually executes, I ran

hasattr(gym.Env, "_reset")

which returned False. Thus, the statement won’t be run for the purposes or my environment!

Finally, a time limit is added as follows

if (env.spec.max_episode_steps is not None) and not spec.tags.get('vnc'):
from gym.wrappers.time_limit import TimeLimit
env = TimeLimit(env, max_episode_steps=env.spec.max_episode_steps)

I find the way they did this to be quite interesting as it helps abstract the environment quite a bit. When you look into gym.wrappers.time_limit, the TimeLimit classes only function seems to be taking track of which step it’s on while also executing the environment’s step as follows!

class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super(TimeLimit, self).__init__(env)
if max_episode_steps is None and self.env.spec is not None:
max_episode_steps = env.spec.max_episode_steps
if self.env.spec is not None:
self.env.spec.max_episode_steps = max_episode_steps
self._max_episode_steps = max_episode_steps
self._elapsed_steps = None
def step(self, action):
assert self._elapsed_steps is not None, "Cannot call env.step() before calling reset()"
observation, reward, done, info = self.env.step(action)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
info['TimeLimit.truncated'] = not done
done = True
return observation, reward, done, info
def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)

It’s particularly nice because the step and reset functions just gets enhanced with each wrapper! I think I’ll do something like this from now on cause it looks so cool. Currently, my environment’s code isn’t particularly nice with its 1000 line size so I plan to use this technique to make my code cleaner.

Anyway, after this, the environment is returned!

Back to make_env

The next line of code is

if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):

but since my observation spaces are all spaces.Box, I can skip this line but as I am a bit curious about how dictionaries are “flattened”, let’s check! The contents of the if statement is

env = FlattenObservation(env)

I found it in gym.wrappers. It is

import numpy as np
import gym.spaces as spaces
from gym import ObservationWrapper
class FlattenObservation(ObservationWrapper):
r"""Observation wrapper that flattens the observation."""
def __init__(self, env):
super(FlattenObservation, self).__init__(env)
flatdim = spaces.flatdim(env.observation_space)
self.observation_space = spaces.Box(low=-float('inf'), high=float('inf'), shape=(flatdim,), dtype=np.float32)
def observation(self, observation):
return spaces.flatten(self.env.observation_space, observation)

So, basically, it changes the observation space to a box using the functions defined in spaces. This module imported utils and inside there, I found flatten and flatdim

flatdim returns the integer in a form of a single integer as follows for all of possible observation spaces as follows

def flatdim(space):
if isinstance(space, Box):
return int(np.prod(space.shape))
elif isinstance(space, Discrete):
return int(space.n)
elif isinstance(space, Tuple):
return int(sum([flatdim(s) for s in space.spaces]))
elif isinstance(space, Dict):
return int(sum([flatdim(s) for s in space.spaces.values()]))
elif isinstance(space, MultiBinary):
return int(space.n)
elif isinstance(space, MultiDiscrete):
return int(np.prod(space.shape))
else:
raise NotImplementedError

So, self.observation_space will become 1 dimensional after applying this neat wrapper! I’m not sure if this is a good thing for convolutional neural networks and those things were the fact that it’s an image matter but anyway, I find it interesting.

Likewise, flatten function does exactly that, it flattens the observation space to a 1d array.

Then, let’s go to the next line!

env.seed(seed + subrank if seed is not None else None)
env = Monitor(env,
logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)),
allow_early_resets=True)

Is run. I suspect env.seed just sets the seed for probability distributions and the like. However, what does Monitor do? After looking into monitor.py in baselines/bench, it appears that what this wrapper does is that on every step, write the rewards to a csv and json file with the ending “monitor.json” and “monitor.csv”!

Next,

if isinstance(env.action_space, gym.spaces.Box):
env = ClipActionsWrapper(env)

is run. This ClipActionsWrapper’s action function was defined as

np.clip(action, self.action_space.low, self.action_space.high)

Thus it clips the action to the set maximum and minimum value. I like how they use wrappers for very minute details like this. It enhances comprehensibility while making code easier to debug.

Finally,

if reward_scale != 1:
env = retro_wrappers.RewardScaler(env, reward_scale)
return env

What this wrapper does is simply multiply the reward by reward_scale

def reward(self, reward):
return reward * self.scale

Now, finally, the env is returned.

As we finished talking about make_env, let’s take a break and talk about SubprocVecEnv in the next article!

Next

In the next article, we will check out SubprocVecEnv and get exposed to multiprocessing!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store