Game development using Pygame & Reinforcement Learning with example

Finding an optimal path in pygame environment using Q Learning with code

Mehul Gupta
Data Science in your pocket
7 min readFeb 18, 2022

--

Game Development has always been a fascinating field, be it the graphics or the logic that goes behind the curtain. Also, being an avid gamer, developing something that at least looks like a small game has always been on my bucketlist. So, this time I tried my hands on building out something with Pygame (a game development open-source python library) in a combination of Q Learning to make a guy, stuck at the other end of the city reach his home amidst all the traffic in the city !!

Let’s help him reach his home !

Note: Basics of Reinforcement learning are a must for moving ahead. You can follow up here

Understanding the environment

The possible road network consists of the below elements & associated rewards with each element

  • Road : Reward =-3
  • Boost : Reward = 0
  • Traffic Signal: Reward =-20
  • Car Jam: Reward =-50
  • House: Reward = 500
  • Already visited : Reward= -10
  • Invalid move : Reward = -100

Why -ve rewards for some elements?

-ve rewards are a sort of punishment given so that the agent tries to refrain from reaching these states in the future. The rewards attached to each element can be easily played around with.

  • A move is considered invalid if the agent goes out of the environment (eg: taking a left at the top-left).
  • Already visited are those cells/states that the agent has already visited given an episode.

The pygame window is divided into rows & columns such that each cell in this grid represent a State starting from the top-left corner(0) to the bottom-right corner (rows*columns-1)

Why not OpenAI Gym?

As most of us must have heard just of OpenAI Gym for game development(from a Data Science perspective), I also explored it as a 1st option only to learn creating a new environment (the graphics & all) is really difficult in an OpenAI gym from scratch & you can only play around with the reinforcement learning part with existing environments. As I wish to develop something from scratch, be it conceptualization to graphics, I opted for pygame.

Codes ahead !!

All codes are available at : https://github.com/mehulgupta2016154/Traffic_Turbo/blob/main/pygame.ipynb

Building the environment

As conveyed, the environment & any sort of motion is achieved using PyGame. Our 1st step is to initiate pygame & create a game window object.

pygame.init()
DISPLAYSURF = pygame.display.set_mode((500,500),0,32)
clock = pygame.time.Clock()

Now, as we have a window object in the form of DISPLAYSURF, we will code out small functions to do different stuff on the screen using DISPLAYSURF. The clock object helps in maintaining the ideal speed for transition on the pygame window.

Before moving ahead, we must know a few frequently used code snippets in any pygame code:

  1. pygame.event.get(): This helps to trace any action taken by the user on the game window, be it a click or quit. Even if your environment doesn’t require any interaction with the user (as in this case, its the computer doing all the stuff), this snippet is used from time to time in my code else the game window becomes unresponsive. No other major significance
  2. clock.tick(1): This helps in maintaining the frame rate while transitioning on the pygame window. A low value means slow animation & vice-versa. Here 1 as a parameter means 1 frame/second. This can be any number. The bigger the number, the faster the screen transition
  3. pygame.display.update(): Every time we draw out anything on the pygame window, this function is required to refresh the screen to reflect the changes done.
  4. DISPLAYSURF.blit(obj_to_draw, top_left_cood): To draw out an object (like images) with given top-left coordinates as params

Done with the basics, time for some action

Moving onto declaring a few variables & importing essentials

import pygame, sys
from pygame.locals import *
import random
import numpy as np

class game_env:
def __init__(self,suffix):
self.q_table = np.zeros((100,4))
self.reward_map = {'traffic.png':-20, 'road.png':-3, 'jam.png':-50, 'fast.png':0,'house.png':500,'man.png':-500,'already_visited':-10,'invalid':-100}
self.dir = {0:'left',2:'right',1:'down',3:'up'}
self.alpha = 0.75 #used in q-learning formula
self.beta = 0.75 #used in q learning formula
self.greedy = 0.6 #epsilon-greedy, greedy
self.random = 0.4 #epsilon greedy, epsilon
self.delta = 0.005 #rate of change for epsilon & greedy
self.game_dim = (500,650) #window size
self.text_space = 150 #Window size for printing stats
self.initial_cood = (0,0+self.text_space) # state 0 position
self.rows,self.columns = 10,10
self.start_state = 0
self.end_state = 99
self.cell_dim = self.game_dim[0]/self.rows #side of each cell in grid
self.final_cood = (self.game_dim[0]-self.cell_dim, self.game_dim[1]-self.cell_dim) #state 99 position
self.game_grid = self.new_game_env() #declared below
self.suffix = suffix
self.action_space = {0:{'x':-1*self.cell_dim,'y':0}, 2:{'x':self.cell_dim,'y':0}, 1:{'x':0,'y':self.cell_dim},3:{'x':0,'y':-1*self.cell_dim}}
try:
with open('env_weights\\weights_{}.npy'.format(self.suffix),'rb') as f:
self.q_table = np.load(f)
with open('env_weights\\env_{}.npy'.format(self.suffix),'rb') as f:
self.game_grid = np.load(f)
except Exception as e:
print('No such files pre-exists. Starting a new environment')
with open('env_weights\\env_{}.npy'.format(self.suffix),'wb') as f:
np.save(f,self.game_grid)
with open('env_weights\\weights_{}.npy'.format(self.suffix),'wb') as f:
np.save(f,self.q_table)
pass

Quite a few things to grasp

  • game_env is our class that will include codes for creating an environment to train & test agents using Q Learning.
  • The constructor is passed with a param=suffix so as to load an existing environment & weights.
  • Delta: the rate at which we wish to increase greedy & decrease random overtime in training
  • text_space: Space reserved for printing on the game window. The game window starts below this region
  • game_grid: NumPy array with game elements as values
  • action_space: A dict to store how to Increment/Decrement in x or y coordinate given an action taken when at a state. For example, x+=50,y+=0 when moving right if cell_dim=50

Print any text on the pygame game window.

def print_summary(self,text,cood,size):
font = pygame.font.Font(pygame.font.get_default_font(), size)
text_surface = font.render(text, True, (255,255,255))
DISPLAYSURF.blit(text_surface,cood)

Initialize NumPy matrix to create environment later

def new_game_env(self):
matrix = random.choices (['road.png','traffic.png','jam.png','fast.png'], weights=[0.55,0.15,0.15,0.15], k=self.rows*self.columns)
matrix = np.asarray(matrix).reshape(self.rows,self.columns)
matrix[0][0] = 'man.png'
matrix[self.rows-1][self.columns-1] = 'house.png'
return matrix

The above code block simply initializes the environment as a NumPy matrix of n x m dimension with elements discussed above (road, traffic signal, etc) randomly & sets initial & final positions for ‘man’ & ‘house’.

Draw above initialized NumPy matrix on pygame DISPLAYSURF object

def image_loader(self,img_path):
img = pygame.image.load('icons\\{}'.format(img_path))
img = pygame.transform.scale(img,(self.cell_dim,self.cell_dim))
return img
def initial_state(self):
DISPLAYSURF.fill((0,0,0))
for x in range(self.rows):
for y in range(self.columns):
img = self.image_loader(self.game_grid[x][y])
cood = (y*self.cell_dim,x*self.cell_dim+self.text_space)
DISPLAYSURF.blit(img,cood)
self.print_summary('Traffic Turbo',(175,25),24)
pygame.display.update()

Using the above-initialized matrix, icons associated with different elements at different positions are drawn on Pygame’s game window to set up the initial state of the environment by iterating over the matrix. DISPLAYSURF.fill((0,0,0)) helps in cleaning the entire environment after each episode & make it available as a blank slate again

Draw new footsteps after each action taken in an episode

def steps_visualizer(self,cood):
img = pygame.image.load('icons\\feet.png')
img = pygame.transform.scale(img,(self.cell_dim,self.cell_dim))
DISPLAYSURF.blit(img,cood)
pygame.display.update()
clock.tick(1)

The mathematical confusion in this entire setup is associating State to coordinates in pygame environment & vice-versa.

State-Cood conversion & vice-versa

def cood_state_calc(self,cood):
state = int((self.rows*(cood[1]-self.text_space)/self.cell_dim)+(cood[0]/self.cell_dim))
return state

def state_cood_calc(self, state):
cood = int((state%self.rows)*self.cell_dim),int((state//self.rows)*self.cell_dim+self.text_space)
return cood

where cell_dim = dimensions of each cell of the grid. As each cell is square in setup, only the length of one side (in terms of pygame window coordinates) is required.

Checking move validity

def is_valid_move(self, cood, already_visited):
if cood in already_visited:
return False

if self.initial_cood[0]<=cood[0]<=self.final_cood[0] and self.initial_cood[1]<=cood[1]<=self.final_cood[1]:
return True
return False

This checks on 2 conditions

  1. Whether the move is out of the environment
  2. Is leading to a state/coordinate already visited

Q Table update

def q_table_update(self,  state, action, already_visited):
curr_cood = self.state_cood_calc(state)
new_cood = (int(curr_cood[0] + self.action_space[action]['x']), int(curr_cood[1] + self.action_space[action]['y']))
new_state = self.cood_state_calc(new_cood)
is_valid = self.is_valid_move(new_cood, already_visited)


if is_valid:
reward = self.reward_map[self.game_grid[int(new_state//self.rows)][int(new_state%self.rows)]]
elif new_cood in already_visited:
reward = self.reward_map['already_visited']
else:
reward = self.reward_map['invalid']

try:
state_value_diff = max(self.q_table[new_state]) - self.q_table[state][action]
except:
state_value_diff = 0
self.q_table[state][action]+=self.alpha*(reward + self.beta*state_value_diff)

return is_valid, new_state, new_cood,reward

This is the most crucial part & requires some explanation

  • Depending upon current coordinates & action taken, new coordinates & new state is calculated
  • Validity for the new state is checked
  • The reward is assigned accordingly
  • Q Table for given state-action is updated via (the crux of Q Learning)
self.q_table[state][action]+=self.alpha*(reward + self.beta*state_value_diff)

Each episode

def episode(self, current_state, is_valid):
pygame.event.get()
cood = self.state_cood_calc(current_state)
already_visited = [cood]
self.steps_visualizer(cood)

while current_state!=self.end_state and is_valid==True:
pygame.draw.rect(DISPLAYSURF,(0,0,0),(0,100,self.game_dim[0],50))
pygame.display.update()
for event in pygame.event.get():
if event.type==QUIT:
pygame.quit()
raise Exception('training ended')
choice = random.choices([True,False],weights=[self.greedy,self.random],k=1)
if choice[0]:
action = np.argmax(self.q_table[current_state])
else:
action = random.choices([0,1,2,3],weights=[0.25,0.25,0.25,0.25],k=1)
action = action[0]
self.print_summary('State:{}'.format(current_state),(10,100),15)
self.print_summary('Action:{}'.format(self.dir[action]),(110,100),15)
is_valid, current_state, cood, reward = self.q_table_update(current_state, action, already_visited)

self.print_summary('Reward:{}'.format(reward),(220,100),15)

if is_valid==False and cood not in already_visited:
self.print_summary('INVALID MOVE !!',(330,100),15)
elif is_valid==False:
self.print_summary('ALREADY VISITED',(330,100),15)
else:
self.print_summary('New State:{}'.format(current_state),(330,100),15)

pygame.display.update()
clock.tick(0.9)
already_visited.append(cood)
if is_valid:
self.steps_visualizer(cood)
else:
break

Again, a crucial piece

  • Intakes random State as input to initialize episode. The initial state is randomized & not kept as 0,0 for better training.
  • While the episode doesn’t end (either invalid move or agent reached house)

Draw a black rectangle to erase stats from previous episodes that were printed on the screen

Choose an action using epsilon-greedy policy

Call q_table_update()

If the action is valid, print footsteps on screen using steps_visualizer() declared earlier

Print basic stats on pygame screen using print_summary()

Add new coordinate in already_visited.

Training

def training(self, epoch):
state=random.randint(self.start_state,self.end_state)
self.initial_state()
self.print_summary(' Episode:{}'.format(epoch),(200,60),20)
self.episode(state, True)
print('episode {} ---->'.format(epoch))
pygame.display.set_caption('greedy={}, random={}'.format(round(self.greedy,4),round(self.random,4)))
if epoch%50==0:
if self.random>0:
self.greedy+=self.delta
self.random-=self.delta
self.greedy = min(self.greedy,1)
self.random= max(self.random,0)

if epoch%2000==0:
self.delta*=2
with open('env_weights\\weights_{}.npy'.format(self.suffix),'wb') as f:
np.save(f,self.q_table)

clock.tick(1)

This code block

  • Reset environment after each episode calling initial_state()
  • Calls episode() with any random_state (from 0–99) as parameter
  • update greedy & random/epsilon as time passes
  • Saves Q Table from time to time as NumPy array.

Testing

def testing(self,initial_state=0):
self.greedy = 1
self.random = 0
with open('env_weights\\env_{}.npy'.format(self.suffix),'rb') as f:
self.game_grid = np.load(f)

with open('env_weights\\weights_{}.npy'.format(self.suffix),'rb') as f:
self.q_table = np.load(f)

self.initial_state()
self.episode(initial_state,True)

It, first of all, sets up greedy=1 & random=0 to observe what the agent has learned. Also, it loads up the environment matrix (the NumPy matrix initialized earlier) & weights for which testing has to be done.

It then initializes the environment using initial_state() & calls episode with initial_state=0 which can be changed.

That’s it !!

The codes used above are just for reference, everything can be explored at the below repo

--

--