Understanding OpenAI baseline source code and making it do self-play! Part 2
In the previous section, we went all the way to the train function and stopped right before we actually build the environment. To check that out, click here!
build_env
In the next line, we see
env = build_env(args)
So, let us look at the build_env function!
def build_env(args):
ncpu = multiprocessing.cpu_count()
if sys.platform == 'darwin': ncpu //= 2
nenv = args.num_env or ncpu
alg = args.alg
seed = args.seed env_type, env_id = get_env_type(args) if env_type in {'atari', 'retro'}:
if alg == 'deepq':
env = make_env(env_id, env_type, seed=seed, wrapper_kwargs={'frame_stack': True})
elif alg == 'trpo_mpi':
env = make_env(env_id, env_type, seed=seed)
else:
frame_stack_size = 4
env = make_vec_env(env_id, env_type, nenv, seed, gamestate=args.gamestate, reward_scale=args.reward_scale)
env = VecFrameStack(env, frame_stack_size) else:
config = tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
get_session(config=config) flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations) if env_type == 'mujoco':
env = VecNormalize(env, use_tf=True) return env
First,
ncpu = multiprocessing.cpu_count()
if sys.platform == 'darwin': ncpu //= 2
nenv = args.num_env or ncpu
alg = args.alg
seed = args.seed
appears to initialize parameters. Here, a quite common but interesting trick used is utilizing the nenv variable using the or keyword. What this does is quite intuitive once you consider what “or” does. First, a basic fact to note is that the or statement returns true if either one of the parameters is true. Thus, what happens is that if args.num_env is not None, it will be set to nenvs and otherwise it returns ncpu(according to here, it’s the number of online cpus)! It’s a neat trick to make your code shorter!
Next, we see
env_type, env_id = get_env_type(args)if env_type in {'atari', 'retro'}:
As we can specify the env_type from the arguments, and since our environment is a custom one, we can skip the contents of this if statement and go straight to the else portion.
else:
config = tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1)
config.gpu_options.allow_growth = True
get_session(config=config) flatten_dict_observations = alg not in {'her'}
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations) if env_type == 'mujoco':
env = VecNormalize(env, use_tf=True)
First off, what is allow_soft_placement? According to here, it allows you can run operations on specific gpus. For example, you can do
with tf.device('/gpu:0'):
to allocate to the 0th gpu specifically which is interesting.
The next line which stands out is
env = make_vec_env(env_id, env_type, args.num_env or 1, seed, reward_scale=args.reward_scale, flatten_dict_observations=flatten_dict_observations
Now, let’s check out what make_vec_env does!
make_vec_env
make_vec_env resides in the cmd_utils.py
def make_vec_env(env_id, env_type, num_env, seed,
wrapper_kwargs=None,
env_kwargs=None,
start_index=0,
reward_scale=1.0,
flatten_dict_observations=True,
gamestate=None,
initializer=None,
force_dummy=False):
"""
Create a wrapped, monitored SubprocVecEnv for Atari and MuJoCo.
"""
wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
mpi_rank = MPI.COMM_WORLD.Get_rank() if MPI else 0
seed = seed + 10000 * mpi_rank if seed is not None else None
logger_dir = logger.get_dir()
def make_thunk(rank, initializer=None):
return lambda: make_env(
env_id=env_id,
env_type=env_type,
mpi_rank=mpi_rank,
subrank=rank,
seed=seed,
reward_scale=reward_scale,
gamestate=gamestate,
flatten_dict_observations=flatten_dict_observations,
wrapper_kwargs=wrapper_kwargs,
env_kwargs=env_kwargs,
logger_dir=logger_dir,
initializer=initializer
) set_global_seeds(seed)
if not force_dummy and num_env > 1:
return SubprocVecEnv([make_thunk(i + start_index, initializer=initializer) for i in range(num_env)])
else:
return DummyVecEnv([make_thunk(i + start_index, initializer=None) for i in range(num_env)])
If we want multiple environments while running, which is the case for me! So, we will only look at the SubprocVecEnv. But I think it’s fair to say that it will be quite similar to DummyVecEnv in terms of expectations for the environment.
Before checking what that does, let us first check make_thunk. This is an interesting function in that it returns a function with arguments passed in. The way it does this is the following method
def a(args):
return lambda : b(args)
Which I think is kind of cool! Anyway, let’s not get distracted and look at the function returned: make_env.
make_env
def make_env(env_id, env_type, mpi_rank=0, subrank=0, seed=None, reward_scale=1.0, gamestate=None, flatten_dict_observations=True, wrapper_kwargs=None, env_kwargs=None, logger_dir=None, initializer=None):
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank) wrapper_kwargs = wrapper_kwargs or {}
env_kwargs = env_kwargs or {}
if ':' in env_id:
import re
import importlib
module_name = re.sub(':.*','',env_id)
env_id = re.sub('.*:', '', env_id)
importlib.import_module(module_name)
if env_type == 'atari':
env = make_atari(env_id)
elif env_type == 'retro':
import retro
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **env_kwargs) if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):
keys = env.observation_space.spaces.keys()
env = gym.wrappers.FlattenDictWrapper(env, dict_keys=list(keys)) env.seed(seed + subrank if seed is not None else None)
env = Monitor(env,
logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)),
allow_early_resets=True) if env_type == 'atari':
env = wrap_deepmind(env, **wrapper_kwargs)
elif env_type == 'retro':
if 'frame_stack' not in wrapper_kwargs:
wrapper_kwargs['frame_stack'] = 1
env = retro_wrappers.wrap_deepmind_retro(env, **wrapper_kwargs) if isinstance(env.action_space, gym.spaces.Box):
env = ClipActionsWrapper(env) if reward_scale != 1:
env = retro_wrappers.RewardScaler(env, reward_scale) return env
For the first two lines,
if initializer is not None:
initializer(mpi_rank=mpi_rank, subrank=subrank)
I’m not particularly sure what an initializer is but for the functions with all the previous functions that we looked sofar, the initializer was set None so let’s skip this for now!
The next interesting piece of code is
if ':' in env_id:
import re
import importlib
module_name = re.sub(':.*','',env_id)
env_id = re.sub('.*:', '', env_id)
importlib.import_module(module_name)
Basically, what happens here is that my custom environment will be loaded here. This is done by removing everything past: in env_id and returning the part before! The env_id, on the other hand, is set to the part following “:”. This is done by putting a period that matches basically every character except line breaks, and * indicates that it matches 0 or more of the preceding token(.). Doing re.sub here replaces the section highlighted by these patters with the second argument, ‘’, nothing. I find it quite fun to look at regex because as I am not especially familiar with it, trying to find what it does is like a little puzzle. Well enough of that! Let’s continue
if env_type == 'atari':
env = make_atari(env_id)
elif env_type == 'retro':
import retro
gamestate = gamestate or retro.State.DEFAULT
env = retro_wrappers.make_retro(game=env_id, max_episode_steps=10000, use_restricted_actions=retro.Actions.DISCRETE, state=gamestate)
else:
env = gym.make(env_id, **env_kwargs)
As our environment is neither atari or retro, gym.make function is run. Now, what is this function?
make
It took me a while to find it but in gym.make.registertation I found it!
registry = EnvRegistry()def register(id, **kwargs):
return registry.register(id, **kwargs)def make(id, **kwargs):
return registry.make(id, **kwargs)
And this make function runs the function
def make(self, path, **kwargs):
if len(kwargs) > 0:
logger.info('Making new env: %s (%s)', path, kwargs)
else:
logger.info('Making new env: %s', path)
spec = self.spec(path)
env = spec.make(**kwargs)
# We used to have people override _reset/_step rather than
# reset/step. Set _gym_disable_underscore_compat = True on
# your environment if you use these methods and don't want
# compatibility code to be invoked.
if hasattr(env, "_reset") and hasattr(env, "_step") and not getattr(env, "_gym_disable_underscore_compat", False):
patch_deprecated_methods(env)
if (env.spec.max_episode_steps is not None) and not spec.tags.get('vnc'):
from gym.wrappers.time_limit import TimeLimit
env = TimeLimit(env, max_episode_steps=env.spec.max_episode_steps)
return env
First, what is kwargs? This is one of the nicer features in python and if you have a list of arguments in the form variable_name=variable when calling a function like(credit to geeks for geeks)
def myFun(arg1, **kwargs):
for key, value in kwargs.items():
print ("%s == %s" %(key, value))# Driver codemyFun("Hi", first ='Geeks', mid ='for', last='Geeks')
It will print out
last == Geeks
mid == for
first == Geeks
That’s where its name came from! keyword arguments, kwargs. You can use *args to get usual arguments but this should before kwargs!
Then, this function goes onto run the spec function on the path to the env file.
spec
def spec(self, path):
if ':' in path:
mod_name, _sep, id = path.partition(':')
try:
importlib.import_module(mod_name)
# catch ImportError for python2.7 compatibility
except ImportError:
raise error.Error('A module ({}) was specified for the environment but was not found, make sure the package is installed with `pip install` before calling `gym.make()`'.format(mod_name))
else:
id = pathmatch = env_id_re.search(id)
if not match:
raise error.Error('Attempted to look up malformed environment ID: {}. (Currently all IDs must be of the form {}.)'.format(id.encode('utf-8'), env_id_re.pattern))try:
return self.env_specs[id]
except KeyError:
# Parse the env name and check to see if it matches the non-version
# part of a valid env (could also check the exact number here)
env_name = match.group(1)
matching_envs = [valid_env_name for valid_env_name, valid_env_spec in self.env_specs.items()
if env_name == valid_env_spec._env_name]
if matching_envs:
raise error.DeprecatedEnv('Env {} not found (valid versions include {})'.format(id, matching_envs))
else:
raise error.UnregisteredEnv('No registered env with id: {}'.format(id))
The specs first look at the path argument. If it includes a module(includes :), it will try to import it and put the id portion into the id variable. Otherwise, it just sets the id variable to path.
Then,
match = env_id_re.search(id)
where
env_id_re = re.compile(r'^(?:[\w:-]+\/)?([\w:.-]+)-v(\d+)$')
Now, it’s time for some regex! Read here to follow along! The ^ operator matches the start of the string then the (?:…) matches everything inside the parentheses. This will match multiple(+) words(\w), colons and minus signs. This will be followed by /. ? after this indicates 0 or 1 repetition of the preceding pattern. Finally, it will match the words, colons, and minus signs again followed by — and a v and decimals(\d+) finally ending with $. $ indicates that it is the end of the line.. So, for example, if id has a pattern
env_name-v0
it will be a match! Thus, it must have been intended to be in the form environment name and then version
Thus we know what form the id must be in. The next line in the code tries to search the env_specs variable for the id. This should be added in the function called before the make function, the register function will make this key by
def register(id, **kwargs):
return registry.register(id, **kwargs)
And
def register(self, id, **kwargs):
if id in self.env_specs:
raise error.Error('Cannot re-register id: {}'.format(id))
self.env_specs[id] = EnvSpec(id, **kwargs)
In the EnvRegistry class. Thus, the EnvSpec of id is returned.
The EnvSpec class, according to the comments, stores “A specification for a particular instance of the environment. Used
to register the parameters for official evaluations.” Inside the init function, we see this in practice in the __init__ function of EnvSpec
def __init__(self, id, entry_point=None, reward_threshold=None, kwargs=None, nondeterministic=False, tags=None, max_episode_steps=None):
self.id = id
# Evaluation parameters
self.reward_threshold = reward_threshold
# Environment properties
self.nondeterministic = nondeterministic
self.entry_point = entry_pointif tags is None:
tags = {}
self.tags = tagstags['wrapper_config.TimeLimit.max_episode_steps'] = max_episode_steps
self.max_episode_steps = max_episode_steps# We may make some of these other parameters public if they're
# useful.
match = env_id_re.search(id)
if not match:
raise error.Error('Attempted to register malformed environment ID: {}. (Currently all IDs must be of the form {}.)'.format(id, env_id_re.pattern))
self._env_name = match.group(1)
self._kwargs = {} if kwargs is None else kwargs
Back in the registry make function
The next line executes
env = spec.make(**kwargs)
So let’s go to Spec’s make function.
Spec make function
def make(self, **kwargs):
"""Instantiates an instance of the environment with appropriate kwargs"""
if self.entry_point is None:
raise error.Error('Attempting to make deprecated env {}. (HINT: is there a newer registered version of this env?)'.format(self.id))
_kwargs = self._kwargs.copy()
_kwargs.update(kwargs)
if callable(self.entry_point):
env = self.entry_point(**_kwargs)
else:
cls = load(self.entry_point)
env = cls(**_kwargs) # Make the enviroment aware of which spec it came from.
env.unwrapped.spec = selfreturn env
First, entrypoint is taken and check if it’s a valid entrypoint. It, according to the comments, should be in the form
The Python entrypoint of the environment class (e.g. module.name:Class)
Or it can just be module.name
Then, it gets the environment either by calling self.entry_point or loading the entry point by calling
def load(name):
mod_name, attr_name = name.split(":")
mod = importlib.import_module(mod_name)
fn = getattr(mod, attr_name)
return fn
then calling that function!
And then running it with the keyword arguments. Then it sets self.unwrapped_spec but I’m not entirely sure what that does.
Registry make function
After that’s done
if hasattr(env, "_reset") and hasattr(env, "_step") and not getattr(env, "_gym_disable_underscore_compat", False):
patch_deprecated_methods(env)
First, it checks the env has the method _reset and _step, which I think is built into gym.Env. Thus, I suspect getattr(env, “_gym_disable_underscore_compat”, False)’s default output is False. We can check this by simply running
import gym
getattr(gym.Env, "_gym_disable_underscore_compat", False)
And it did return False! I used gym.Env because that is what our custom environments build on. As I inherited from gym.Env by doing
class Game_Env(gym.Env):
def __init__
...
Thus, the next line, which is
patch_deprecated_methods(env)
will be run. This basically sets the env.reset to be env._reset and env.step to env._step and so on. But wait, then what happens to the step functions we defined? To check if this function actually executes, I ran
hasattr(gym.Env, "_reset")
which returned False. Thus, the statement won’t be run for the purposes or my environment!
Finally, a time limit is added as follows
if (env.spec.max_episode_steps is not None) and not spec.tags.get('vnc'):
from gym.wrappers.time_limit import TimeLimit
env = TimeLimit(env, max_episode_steps=env.spec.max_episode_steps)
I find the way they did this to be quite interesting as it helps abstract the environment quite a bit. When you look into gym.wrappers.time_limit, the TimeLimit classes only function seems to be taking track of which step it’s on while also executing the environment’s step as follows!
class TimeLimit(gym.Wrapper):
def __init__(self, env, max_episode_steps=None):
super(TimeLimit, self).__init__(env)
if max_episode_steps is None and self.env.spec is not None:
max_episode_steps = env.spec.max_episode_steps
if self.env.spec is not None:
self.env.spec.max_episode_steps = max_episode_steps
self._max_episode_steps = max_episode_steps
self._elapsed_steps = None def step(self, action):
assert self._elapsed_steps is not None, "Cannot call env.step() before calling reset()"
observation, reward, done, info = self.env.step(action)
self._elapsed_steps += 1
if self._elapsed_steps >= self._max_episode_steps:
info['TimeLimit.truncated'] = not done
done = True
return observation, reward, done, info def reset(self, **kwargs):
self._elapsed_steps = 0
return self.env.reset(**kwargs)
It’s particularly nice because the step and reset functions just gets enhanced with each wrapper! I think I’ll do something like this from now on cause it looks so cool. Currently, my environment’s code isn’t particularly nice with its 1000 line size so I plan to use this technique to make my code cleaner.
Anyway, after this, the environment is returned!
Back to make_env
The next line of code is
if flatten_dict_observations and isinstance(env.observation_space, gym.spaces.Dict):
but since my observation spaces are all spaces.Box, I can skip this line but as I am a bit curious about how dictionaries are “flattened”, let’s check! The contents of the if statement is
env = FlattenObservation(env)
I found it in gym.wrappers. It is
import numpy as np
import gym.spaces as spaces
from gym import ObservationWrapperclass FlattenObservation(ObservationWrapper):
r"""Observation wrapper that flattens the observation."""
def __init__(self, env):
super(FlattenObservation, self).__init__(env) flatdim = spaces.flatdim(env.observation_space)
self.observation_space = spaces.Box(low=-float('inf'), high=float('inf'), shape=(flatdim,), dtype=np.float32) def observation(self, observation):
return spaces.flatten(self.env.observation_space, observation)
So, basically, it changes the observation space to a box using the functions defined in spaces. This module imported utils and inside there, I found flatten and flatdim
flatdim returns the integer in a form of a single integer as follows for all of possible observation spaces as follows
def flatdim(space):
if isinstance(space, Box):
return int(np.prod(space.shape))
elif isinstance(space, Discrete):
return int(space.n)
elif isinstance(space, Tuple):
return int(sum([flatdim(s) for s in space.spaces]))
elif isinstance(space, Dict):
return int(sum([flatdim(s) for s in space.spaces.values()]))
elif isinstance(space, MultiBinary):
return int(space.n)
elif isinstance(space, MultiDiscrete):
return int(np.prod(space.shape))
else:
raise NotImplementedError
So, self.observation_space will become 1 dimensional after applying this neat wrapper! I’m not sure if this is a good thing for convolutional neural networks and those things were the fact that it’s an image matter but anyway, I find it interesting.
Likewise, flatten function does exactly that, it flattens the observation space to a 1d array.
Then, let’s go to the next line!
env.seed(seed + subrank if seed is not None else None)
env = Monitor(env,
logger_dir and os.path.join(logger_dir, str(mpi_rank) + '.' + str(subrank)),
allow_early_resets=True)
Is run. I suspect env.seed just sets the seed for probability distributions and the like. However, what does Monitor do? After looking into monitor.py in baselines/bench, it appears that what this wrapper does is that on every step, write the rewards to a csv and json file with the ending “monitor.json” and “monitor.csv”!
Next,
if isinstance(env.action_space, gym.spaces.Box):
env = ClipActionsWrapper(env)
is run. This ClipActionsWrapper’s action function was defined as
np.clip(action, self.action_space.low, self.action_space.high)
Thus it clips the action to the set maximum and minimum value. I like how they use wrappers for very minute details like this. It enhances comprehensibility while making code easier to debug.
Finally,
if reward_scale != 1:
env = retro_wrappers.RewardScaler(env, reward_scale)return env
What this wrapper does is simply multiply the reward by reward_scale
def reward(self, reward):
return reward * self.scale
Now, finally, the env is returned.
As we finished talking about make_env, let’s take a break and talk about SubprocVecEnv in the next article!
Next
In the next article, we will check out SubprocVecEnv and get exposed to multiprocessing!