Training OpenAI gym environments using REINFORCE algorithm in reinforcement learning

Policy gradient methods explained with codes

Mehul Gupta
Data Science in your pocket
8 min readMar 26, 2023

--

In my previous parts on reinforcement learning, we have covered many topics ranging from

Reinforcement Learning basics

Formulating Multi-Armed Bandits (MABs)

Monte Carlo with example

Temporal Difference learning with SARSA and Q Learning

Game dev using reinforcment learning and pygame

Contextual bandits with codes

My debut book “LangChain in your Pocket” is out now

The typical target in most of the above methods was to estimate the Value function for a given Action or (Action|State) pair. After estimating the Value function, using a combination of Value function + Policy (commonly e-greedy), we choose the next action.

Changing perspectives, can we directly devise methods that can choose Policy directly?

A few direct advantages I can see is

No Value function required

No need to decide on any policy

Hence, we are free from designing many things and the onus is on the network to handle everything except the reward function that we need to declare.

Policy Network

So, instead of using a pre-defined ideology of e-greedy or greedy policy, we can train Neural Network that can give an ideal policy as an output.

What does this even mean? Like Neural Network outputs probabilities, not algorithms/methods.

If you give a slight thought, a policy is nothing but setting probabilities for actions to be taken. So, a Policy Network outputs probabilities for Actions to be taken such that these probabilities add up to 1 (sort of multi-class classification). This is so because, at a time, the agent can take one action given multiple options.

Such methods that directly estimate actions eliminating Value functions completely are called policy gradient methods. The REINFORCE algorithm which we will be talking about soon is one such algorithm.

REINFORCE algorithm

  • Design a Neural network that intakes state and output action probabilities (multi-class)

Preparing training dataset

  • Run the environment simulation for N episodes where for

For each episode

Initialize multiple empty lists to save state, action & reward separately

Choose an initial state

Feed this to the Neural Network to get action probability

Choose action based on probabilities

Get reward

Save Action chosen, State, Reward to their respective list

Repeat the above steps until the terminal state is reached. This forms one episode.

Once you end one episode, calculate discounted rewards.

Discounted Rewards

The concept of discounted rewards is simple. For all the actions involved in an episode, while training, we would be adding a discounted (a fraction) value of the future reward alongside the actual reward received for the given action. This is done so that the model can have an idea of whether taking this action helped in success or failure in the end. How? we will soon see the codes

  • Define loss function to be a summation of the product of

log(probability) x discounted reward for each action chosen at particular states. Why? will explain later in the post

  • Apply gradients to train the Policy Network

Code Alert !!

We would be training LunarLander-v2 for this activity. What’s that?

It is a simulation of a lunar lander attempting to land on the moon’s surface. The objective of the environment is to successfully land the lunar lander on a designated landing pad while minimizing fuel consumption and avoiding crashes.

The lunar lander is represented by a spacecraft with two engines that can be controlled to adjust the spacecraft’s velocity and direction. The environment is simulated with a two-dimensional view of the moon’s surface, where the lunar lander must navigate through a variety of obstacles such as craters and mountains.

The state of the environment is represented by an array of 8 values, including the x and y position, velocity, angle, angular velocity, and information about the state of the legs of the lunar lander. The actions that can be taken by the agent include firing the engines, rotating the spacecraft, or doing nothing i.e 3 actions

The environment provides a reward signal to the agent at each time step based on its actions and the resulting state. The agent receives a positive reward for moving toward the landing pad and a negative reward for using fuel or crashing. The episode ends either when the lunar lander successfully lands on the designated pad or crashes.

  1. Import the required libraries
import tensorflow as tf
import numpy as np
import gym
import math
from PIL import Image
import pygame, sys
from pygame.locals import *
from tensorflow import keras

2. Initiate an OpenAI gym environment. We would be using LunarLander-v2 for training

env = gym.make('LunarLander-v2')

input_shape = env.observation_space.shape[0]
num_actions = env.action_space.n

input_shape= Array of values representing a unique state

num_actions= Total actions are possible in LunarLander-v2

3. Define the policy network

policy_network = tf.keras.models.Sequential([
tf.keras.layers.Dense(32, activation='relu', input_shape=(input_shape,)),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(num_actions, activation='softmax')
])

# Set up the optimizer and loss function
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()

So, we have used tensorflow 2 for coding this baseline neural network. Observe the output to be probabilities for each action in the action space

4. Training loop

# Set up lists to store episode rewards and lengths
episode_rewards = []
episode_lengths = []

num_episodes = 1000
discount_factor = 0.99

# Train the agent using the REINFORCE algorithm
for episode in range(num_episodes):
# Reset the environment and get the initial state
state = env.reset()
episode_reward = 0
episode_length = 0

# Keep track of the states, actions, and rewards for each step in the episode
states = []
actions = []
rewards = []

# Run the episode
while True:
# Get the action probabilities from the policy network
action_probs = policy_network.predict(np.array([state]))[0]

# Choose an action based on the action probabilities
action = np.random.choice(num_actions, p=action_probs)

# Take the chosen action and observe the next state and reward
next_state, reward, done, _ = env.step(action)

# Store the current state, action, and reward
states.append(state)
actions.append(action)
rewards.append(reward)

# Update the current state and episode reward
state = next_state
episode_reward += reward
episode_length += 1

# End the episode if the environment is done
if done:
print('Episode {} done !!!!!!'.format(episode))
break

# Calculate the discounted rewards for each step in the episode
discounted_rewards = np.zeros_like(rewards)
running_total = 0
for i in reversed(range(len(rewards))):
running_total = running_total * discount_factor + rewards[i]
discounted_rewards[i] = running_total

# Normalize the discounted rewards
discounted_rewards -= np.mean(discounted_rewards)
discounted_rewards /= np.std(discounted_rewards)

# Convert the lists of states, actions, and discounted rewards to tensors
states = tf.convert_to_tensor(states)
actions = tf.convert_to_tensor(actions)
discounted_rewards = tf.convert_to_tensor(discounted_rewards)

# Train the policy network using the REINFORCE algorithm
with tf.GradientTape() as tape:
# Get the action probabilities from the policy network
action_probs = policy_network(states)
# Calculate the loss
loss = tf.cast(tf.math.log(tf.gather(action_probs,actions,axis=1,batch_dims=1)),tf.float64)

loss = loss * discounted_rewards
loss = -tf.reduce_sum(loss)

# Calculate the gradients and update the policy network
grads = tape.gradient(loss, policy_network.trainable_variables)
optimizer.apply_gradients(zip(grads, policy_network.trainable_variables))

# Store the episode reward and length
episode_rewards.append(episode_reward)
episode_lengths.append(episode_length)

policy_network.save('keras/')

This requires some explanation I guess

  • The training starts with 1000 episodes where for each episode
  1. We reset the environment and take the initial state as a starting point
  2. Log in a sequence of activities alongside their reward and state as we move ahead in the episode till the episode ends.

How was the action chosen? Using the Policy Network we are trying to train and then depending upon the probabilities, choosing an action randomly

3. Calculate discounted reward for action in the sequence. The early the action is taken, the less the discount factor is hence lesser reward from the future is transferred

4. Separate State, Action, and Reward that we logged in as a tuple

5. Recalculate action probabilities (they would remain the same as while logging Action, State, and Reward as the model isn’t trained in between ) and get the probability for the action chosen (that we did use np.random)

6. Calculate loss function = -1 x Σlog(probability) x discounted_reward

Let’s understand the loss function before moving ahead:

So, if you give it a thought, we really don’t know what is the ideal action for a given state i.e. the ground truth for the Policy Network. It can be the case you consider action with a maximum reward for that state can be the ground truth but it might be the case you took an action with a max reward at timestamp_5 and eventually, you failed in the end hence the agent actually lost and at timestamp_5, you shouldn’t have taken that ‘best’ action.

Hence, in short, generating a ground truth in terms of actions is not possible. What to do?

We will try to train the models depending on the discounted_reward we get for the episode. So if you notice, in the loss function we are also using probabilities of actions that are dependent on the Policy Network weights hence backpropagation for weights update can happen.

Eventually, the goal of the architecture should be to increase the probability x discounted_reward and i.e. why we have negated it to decrease this value as we do for other loss functions also (we generally try to decrease and not increase loss value. No?)

Why a log()? Because probabilities for action would range from 0–1 which are very small values and lead to training instabilities hence log is used for probabilities.

7. Calculate gradients and apply them on the network for backpropagation

That’s it !!

Now, once the agent gets trained, we will render this whole environment using pygame animation following the below code snippet. First, declare a few constants and load a fresh environment and our trained network

#pygame essentials
pygame.init()
DISPLAYSURF = pygame.display.set_mode((500,500),0,32)
clock = pygame.time.Clock()
pygame.display.flip()

def print_summary(text,cood,size):
font = pygame.font.Font(pygame.font.get_default_font(), size)
text_surface = font.render(text, True, (255,255,255))
DISPLAYSURF.blit(text_surface,cood)

#openai gym env
env = gym.make('LunarLander-v2')
input_shape = env.observation_space.shape[0]
num_actions = env.action_space.n
state = env.reset()

done = False
count=0
done=False
steps = 0
#loading trained model
policy_network = tf.keras.models.load_model('keras')

Now the loop for running the simulation for 1 episode

while not done:
steps+=1

# Get the action probabilities from the policy network
action_probs = policy_network.predict(np.array([state]))[0]

# Choose an action based on the action probabilities
action = np.random.choice(num_actions, p=action_probs)

next_state, reward, done, info = env.step(action) # take a step in the environment
image = env.render(mode='rgb_array') # render the environment to the screen

#convert image to pygame surface object
image = Image.fromarray(image,'RGB')
mode,size,data = image.mode,image.size,image.tobytes()
image = pygame.image.fromstring(data, size, mode)

DISPLAYSURF.blit(image,(0,0))
print_summary('Step {}'.format(steps),(10,10),15)
pygame.display.update()
clock.tick(100)
count+=1
pygame.time.delay(10)
state = next_state

pygame.quit()

The explanation is simple

  • After resetting the env, starting off with the initial state, use the trained Policy Network to generate probabilities for 3 actions
  • Choose one of the actions depending on the probability
  • Render the environment as an RGB array (image) after the action is taken
  • Convert this array into a pygame surface to draw on the pygame window
  • Using displayburf.blit, draw the image
  • Add summary text like total steps using the print_summary function described earlier
  • Once the episode is done, display the end message and end the window.

Below video can give you an idea about the final agent that is trained for ~1000 episodes.

As was using CPU, it took me some 5–6 hours to get here. Also, I even tried my hands with more complex environments like Atari games but due to more complexity, the training would have taken an eternity on my age-old PC. Though, this code can be used to train any OpenAI agent that has a state space of 1d array (like CartPole, Hill Climbing, etc) in OpenAI gym environments. For Atari games, this state space is of 3D dimension hence minor tweaks in the policy network (addition of conv2d layers) are required.

That’s all for today, see you soon !!

--

--