Pekiştirmeli öğrenmede seyrek ödül sorunu ve PER(Önceliklendirilmiş Deneyim Hafızası)

Yazar Mustafa Savran — Reinforcement Learning Turkiye

Pekiştirmeli öğrenme günden güne gelişmekte ve sorunlarına yeni yeni çözümler getirilmekte. Sorunlarından biri de seyrek ödül sorunu. Genel anlamda baktığımızda pekiştirmeli öğrenmede ajanımızın attığı her adım için bir ödül verilir eğer doğru bir adım attıysa pozitif yanlış ise negatif bir ödül verilir ve ajanımız deneme yanılma yöntemiyle ödülü en üst düzeye çıkarmaya çalışır. Eğitilirken geçmiş deneyimleri (durum , aksiyon, sonraki durum, ödül ,bitme durumu) formatıyla depoladığımız hafızadan bazı deneyimleri alarak eğitiriz. Bu yolla ajanımızın çevreyi iyi derecede tanımasını bekleriz.

Şöyle düşünelim eğer ajanımızın her harekette aldığı ödüller değişmezse ne olur?

Bir örnek vermek gerekirse gym kütüphanesinde bulunan MountainCar oyunu.

MountainCar
MountainCar
Mountain Car example

Burada ajanımız soldaki dağa tırmanmaya çalışıyor. Her bir adım için -1 ödül kazanıyor bunun amacı en kısa sürede hedefe varmasını sağlamak. Hedefe ulaştığında ise +0.5 ödül kazanarak oyun bitiyor ya da 200 adım atarsa oyun bitmiş oluyor. Böyle bir durumda rastgele hareket ederek hangi ödülden ne kadar kazandığını gözlemlersek sonuç ne olacak bakalım

import gymenv=gym.make(“MountainCar-v0”)done = False
rewards = {}
for _ in range(100):
env.reset()
while not done:
_, reward, done, _ = env.step(env.action_space.sample())
if reward in rewards:
rewards[reward] += 1
else:
rewards[reward] = 1
done = False
for reward in rewards:
print(“{} ödülü {} defa alındı.”.format(reward,rewards[reward]))

Çıktı

-1.0 ödülü 20000 defa alındı.

Gördüğümüz gibi 100 oyunda hiçbir şekilde hedefe ulaşamadı ve hep -1 aldı. Böyle bir durumda ajan nasıl öğrenebilir? Bu soruna çözüm olarak PER algoritması geliştirilmiş.

PER(Prioritized Experience Replay)

Bu algoritmamız kısaca şöyle bir şey yapıyor ajan eğitilirken kullandığımız hafızayı bir önceliklendirme yaparak bir şeyler öğrenme potansiyeli daha yüksek gördüğü deneyimi daha yüksek olasılıkla eğitime girmesini sağlıyor. Bunu şöyle sağlıyor her bir yeni deneyim geldiğinde kendisinin bu deneyimde ne kadar hata yaptığına bakıyor ve o hata yüksekliğine göre önceliklendirmeyi yapıyor.

Önceliklendirme Hesabı Nasıl Yapılır?

Önceliklendirme hesabı her bir yeni deneyim için hata hesabı yapılıyor ve bu hata ne kadar büyük olursa o kadar yüksek olasılığa sahip oluyor.

İki türlü hesap çeşidi var:

  • Rank based: p(i)=1/rank(i). Rank denilen kısım hatalar sıralandıktan sonra kaçıncı sırada yer aldığı.
  • proportional variant: pi=|δi|+ϵ. Bu kısımda δi sembol hatayı temsil ediyor. ϵ çok küçük bir sayıyı temsil ediyor. Bunun amacı her deneyim 0'dan farklı bir olasılık almasını sağlayarak eğitime her deneyimin girmesini sağlamak.

Sonrasında ise olasılık dağılımını çıkarmak için aşağıdaki formülü kullanıyoruz.

Buradaki alpha değeri ne kadar önceliklendirme yapacağını belirten bir hiper parametredir. Yani eğer alphayı 0'a eşitlenirse bütün deneyim olasılıkları eşit oluyor ve uniform bir dağılım elde etmiş oluyoruz.

Importance Sampling

Burada hesabı yaptıktan sonra herbir deneyim değişiminde ve eğitimden sonra hatalar yeniden hesaplanıp kaydediliyor ve durmadan değiştiğinden dolayı tamamen kontrolsüz bir durum ortaya çıkıyor ve bu da önyargı(bias) oluşturuyor. Bunun önüne geçmek için importance sampling denilen formülü kullanıyoruz.

Buradaki N bizim hafızamızın o anda ki hafıza boyutunu temsil ediyor. Beta değerimiz ne kadar önceliklendirme kullanacağımızı temsil ediyor ve bu beta değerimiz 0–1 arasında sayı olup belirli aralıklarla arttırılıyorve bir hiper parametre. Bu durumda daha dengeli bir hafıza modeli elde etmiş oluyoruz.

Implementasyon

Şimdi geldik bu algoritmayı nasıl implement edeceğimize. Burada biz deneyim hafızası olarak makalede de önerilen Sum tree kullanacağız.

Sum tree

Sum tree, her bir ata çocuklarının toplamına eşit olan bir ikili ağaçtır.

Sumtree basit yapısı

Sum tree bize şöyle bir şey sağlıyor: Bizim verimizi önceliklendirmeye yarıyor ve sıralama gerektirmeden bunu yapıyor. Mesela yuakrıdaki görsleden örnek vermek gerekirse biz bir tane rastgele önceliklendirilmiş bir sayı seçeceğiz.

  • Bunu seçerken 44 ile 0 arasından bir sayı seçiyoruz 15 olsun.
  • Seçtikten sonra sol tarafa bakıyoruz ve 9 var 15> 9 olduğu için 15–9 = 6 yapıyoruz ve sağ düğüme gidiyoruz.
  • Bir daha sağ düğümün sol çocuğuna bakıyoruz. 6 ≥6 sol düğüme gidiyoruz ve sonucumuz 6 oluyor.

Kodlaması ise şu şekilde öncelikle verimizi tutması için bir Node sınıfı oluşturuyoruz:

from collections import namedtuple
class Node(object):
def __init__(self, left, right, data = None):
self.left = left
self.right = right
self.parent = None
self.value = self.update_value(self.left, self.right)
self.make_relation(self.left, self.right)
self.data = data
def update_value(self, left, right) -> float:
value1, value2 = 0., 0.
if left is not None:
value1 = left.value
if right is not None:
value2 = right.value
return value1 + value2
def make_relation(self, left, right):
if left is not None:
left.parent = self
if right is not None:
right.parent = self

Sonrasında ise bu Node objelerini tutan SumTree sınıfımızı implemente ediyoruz.

from Node import Nodeclass SumTree:
def __init__(self, size):
self.writer = 0
self.size = size
self.exp_nodes, self.base_node = self.construct_tree(size)
self.n_filled_leaves = 0
self.w_cursor = 0
def _retrieve(self, value, node) -> Node:
if node.left is None:
return node
if node.left.value >= value:
return self._retrieve(value, node.left)
else:
return self._retrieve(value - node.left.value, node.right)
def retrieve(self, value) -> Node:
"""
:param value: value to search
:return: Node
"""
return self._retrieve(value, self.base_node) @staticmethod
def construct_tree(size):
"""
Construct tree from the amount of leaves
:param size: size of the nodes at the end of tree
:return: tree and root node
"""
nodes = [Node(None, None) for _ in range(size)] exp_nodes = nodes.copy()
while len(nodes) > 1:
nodes = [Node(nodes[idx1], nodes[idx2]) for idx1, idx2 in
zip(range(0, len(nodes), 2), range(1, len(nodes), 2))]
return exp_nodes, nodes[0] def update_node(self, value, node, experience=None):
change = value - node.value
node.value = value
if experience is not None:
node.data = experience
self.update_tree(node.parent, change)
def update_tree(self, node, change):
node.value = node.value + change
if node.parent is not None:
self.update_tree(node.parent, change)
def append(self, value, experience): self.update_node(value, self.exp_nodes[self.w_cursor], experience) self.w_cursor += 1 if self.w_cursor == self.size:
self.w_cursor = 0
if self.n_filled_leaves <= self.size:
self.n_filled_leaves += 1
def __len__(self):
return self.n_filled_leaves

Burada ikinci olarak herbir değişimde olasılıklar yeniden güncellenmeli ve yeni bir deneyim geldiğinde eklenmeli o fonksiyonları da yukarıda bulabilirsiniz.

Sıra geldi PER sınıfımıza öncelikle parametrelerimizi ekliyoruz.

class PER(object):
def __init__(self, size, min_prob=1e-4, alpha=0.8, beta=0.8, beta_inc=1e-5):
self.tree = SumTree(size)
self.min_prob = min_prob
self.alpha = alpha
self.beta = beta
self.beta_inc = beta_inc

Sonrasında ise önceliklendirme hesabı yapmamız gerekli.

def calculate_priority(self, error) -> float:
return pow(abs(error) + self.min_prob, self.alpha)

Bu fonksiyon alınan hatayı yukarıda bahsettiğimiz formülü kullanarak önceliklendirme hesabı yapıyor.

def update_beta(self):
self.beta = self.beta + self.beta_inc
def append(self, value, experience):
prior = self.calculate_priority(value)
self.tree.append(prior, experience)
def sample(self, batch_size=32):
states = []
next_states = []
rewards = []
dones = []
actions = []
values = []

sample_values = np.random.uniform(self.tree.base_node.value, size=batch_size)
nodes = []
for val in sample_values:
node = self.tree.retrieve(val)
states.append(node.data.state.tolist())
next_states.append(node.data.next_state.tolist())
rewards.append(node.data.reward)
dones.append(node.data.done)
actions.append(node.data.action)
values.append(node.value)
nodes.append(node)
samp_prob = np.array(values) / self.tree.base_node.value
is_weights = pow(1 / (self.tree.n_filled_leaves * samp_prob), self.beta)
is_weights = is_weights / max(is_weights)
self.update_beta() return (np.array(states), np.array(actions), np.array(next_states), np.array(
rewards), np.array(dones)), is_weights, nodes

Sonrasında ise örnekleme için öncelikle en üstteki ata düğümün değeriyle 0 arasında bir float değerler alıyoruz ve bu değerlerle yukarıda anlattığım arama algoritmasıyla düğümleri alıyoruz.

Daha sonrasında model ve ajanımızın hazırlığı var.

model.py:

import torch
import torch.nn as nn
import config as cf
class DQN(nn.Module):
def __init__(self, n_input, n_action):
super(DQN, self).__init__()
self.n_input = n_input
self.n_output = n_action
self.fc = nn.Sequential(
nn.Linear(n_input, 256),
nn.ReLU(),
nn.Linear(256, 512),
nn.ReLU(),
nn.Linear(512, n_action)
)
def forward(self, x):
x=x.to(cf.DEVICE)
return self.fc(x.float())

agent.py:

from model import DQN
import torch
import config as cf
import numpy as np
from collections import namedtuple
experience = namedtuple("Experience",["state","action","next_state","reward","done"])
class Agent:
def __init__(self, env, n_input, n_output,buffer):
self.env = env
self.epsilon = 1.0
self.epsilon_decay = 0
self.net = DQN(n_input, n_output).to(cf.DEVICE)
self.tgt_net = DQN(n_input, n_output).to(cf.DEVICE)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=cf.LEARNING_RATE)
self.buffer = buffer
def action(self, state):
state = state.unsqueeze(0).unsqueeze(0)

q_value = self.net.forward(state)

_, action_ = torch.max(q_value, 2)
self.epsilon_decay += 1
self.update_epsilon()
if np.random.rand() <= self.epsilon:
return self.env.action_space.sample()
else:
return action_.item() def update_epsilon(self): if self.epsilon_decay > 1000:
self.epsilon = max(self.epsilon - 0.00005, 0.02)
def update_tgt(self): self.tgt_net.load_state_dict(self.net.state_dict()) def train_model(self, batch,is_weights,nodes):
states_v,actions_v,next_states_v, rewards_v, dones_v = batch
states_v = torch.tensor(states_v)
next_states_v = torch.tensor(next_states_v)
actions_v = torch.tensor(actions_v).long()
rewards_v = torch.tensor(rewards_v)
dones_v = torch.ByteTensor(dones_v)
states_v = states_v.view(cf.BATCH_SIZE, self.net.n_input)
next_states_v = next_states_v.view(cf.BATCH_SIZE, self.net.n_input)
rewards_v = rewards_v.view(cf.BATCH_SIZE, -1).to(cf.DEVICE)
dones_v = dones_v.view(cf.BATCH_SIZE, -1).to(cf.DEVICE)
state_action_values = self.net(states_v)
state_action_values = state_action_values.gather(1, actions_v.to(cf.DEVICE).unsqueeze(-1)).squeeze(-1)
next_state_values = self.tgt_net(next_states_v)
next_state_values = next_state_values.max(1, keepdim=True)[0] next_state_values = next_state_values.detach() expected_state_action_values = (1-dones_v) * cf.gamma * next_state_values + rewards_v
errors = torch.abs(state_action_values - expected_state_action_values)
for i in range(cf.BATCH_SIZE):
self.buffer.update(errors[0][i].item(), nodes[i])
self.optimizer.zero_grad()
loss = torch.nn.functional.mse_loss(state_action_values.double().view(cf.BATCH_SIZE,1), expected_state_action_values.double())
loss = (torch.DoubleTensor(is_weights) * loss).mean()
loss.backward()
self.optimizer.step()
return loss def append_exp(self, state, action, next_state, reward, done):
target = self.net(state)
old_val = torch.max(target)
target_val = self.tgt_net(next_state)
if not done:
expected_reward = reward
else:
expected_reward = reward + cf.gamma * torch.max(target_val)
error = abs(old_val - expected_reward) self.buffer.append(error.item(), experience(state, action, next_state, reward, done))

SONUÇLAR

Şimdi geldik MountainCar oyununda ajanımızın öğrenip öğrenemediğine.

loss fonksiyonu
ödüller

Kayıp ve ödül fonksiyonlarına baktığımızda gördüğünüz gibi 600 bölüm sonra öğrendiğini görüyoruz.

Son olarakta ajanımızın nasıl oynadığını kontrol edelim.

Görüldüğü üzere ajanımız ne kadar varış noktasına varmayı öğrenmiş olsa da daha iyi sonuçlar verebilir bunun için hiper parametreleri daha iyi ayarlanabilir.

Bu makalede yazdığımız kodlara bu github linkinden ulaşabilirsiniz.

Reinforcement Learning Turkiye Twitter : https://twitter.com/Turkiye_RL

YouTube : https://www.youtube.com/ReinforcementLearningTurkiye

Reinforcement Learning Turkiye

All things Reinforcement Learning

Reinforcement Learning Turkiye

All things Reinforcement Learning including model based , model free, robotics, genetic algorithms, custom environments, agents, RLtoSimulation and more !

Reinforcement Learning Türkiye

Written by

Reinforcement Learning (Pekiştirmeli Öğrenme) Turkiye TR/EN. A research/application group towards Reinforcement Learning. RL-TR | @Turkiye_RL

Reinforcement Learning Turkiye

All things Reinforcement Learning including model based , model free, robotics, genetic algorithms, custom environments, agents, RLtoSimulation and more !