در این مقاله کد پیادهسازی الگوریتم صخرهنورد در پایتون پیادهسازی شده است.
import random
import numpy as np
import matplotlib.pyplot as plt
import copy
np.set_printoptions(threshold = np.inf)
def display2D(array):
for row in array:
# Loop over columns.
for column in row:
print(column.getSign(), end=” “)
# getSign() basically returns ‘O’ to represent nonvisited element
# ‘C’ to cliff and ‘X’ to represent visited element
print(end=”n”)
def displayRewards(array):
for row in array:
# Loop over columns
for column in row:
print(column.getReward(), end=” “)
print(end=’n’)
def handleRewards(array,last_row, column):
# column – 2 equals to number of cliff elements
for i in range (column – 2):
array[last_row][i+1].setReward(-100)
#array[last_row][column – 1].setReward(50)
def shiftAndAddArray(arrayToShift, newElement):
for i in range(len(arrayToShift) – 1):
arrayToShift[i] = arrayToShift[i + 1]
arrayToShift[len(arrayToShift) – 1] = newElement
return arrayToShift
actionList = [0, 1, 2, 3] # 0 for right,1 for down, 2 for up, 3 for left
class States:
#sign indicates the position if sign is C it is cliff, if S starting point F is finishing point
# O is empty spot X is where the agent travels
def __init__(self,sign,reward,row,column):
self.sign = sign
self.reward = reward
self.row = row
self.column = column
# returns the reward of a particular state(note that states are enough for this problem for rewards)
def getReward(self):
return self.reward
# returns the signs of a particular state
def getSign(self):
return self.sign
# changes the sign of a particular state
def changeSign(self,newSign):
self.sign = newSign
# changes the reward of a particular state
def setReward(self,newReward):
self.reward = newReward
“””Returns an action (encoded integer) by using a epsilon-greedy approach”””
def eGreedy(self,Qvalue,epsilon,last_row_index, last_column_index): # returns action
newActionList = actionList.copy() # new action list to choose by eliminating prohibited actions
if self.row == 0 and self.column == 0: # no up or left
newActionList.remove(3) # remove left
newActionList.remove(2) # remove up
elif self.row == last_row_index and self.column == 0: # no down or left
newActionList.remove(3) # remove left
newActionList.remove(1) # remove down
elif self.row == 0 and self.column == last_column_index: # no up or right
newActionList.remove(0) # remove right
newActionList.remove(2) # remove up
elif self.row == last_row_index and self.column == last_column_index: # no down or right
newActionList.remove(0) # remove right
newActionList.remove(1) # remove down
# More general cases for prohibiting actions
elif self.row == 0: # no up
newActionList.remove(2) # remove up
elif self.row == last_row_index: # no down
newActionList.remove(1) # remove down
elif self.column == 0: # no left
newActionList.remove(3) # remove left
elif self.column == last_column_index: # no right
newActionList.remove(0) # remove right
probability = random.uniform(0, 1) # random number between 0 and 1 for epsilon greedy
# To ensure exploration and exploitation e-greedy approach is used
if probability < 1 – epsilon: # greedy action
return self.findMaxIndex(Qvalue, newActionList,last_row_index,last_column_index)
else: # random action
return random.choice(newActionList)
“””Returns a pure greedy action without exploration”””
def ePureGreedy(self,Qvalue,last_row_index, last_column_index):
newActionList = actionList.copy() # copy action list again
if self.row == 0 and self.column == 0: # no up or left
newActionList.remove(3) # remove left
newActionList.remove(2) # remove up
elif self.row == last_row_index and self.column == 0: # no down or left
newActionList.remove(3) # remove left
newActionList.remove(1) # remove down
elif self.row == 0 and self.column == last_column_index: # no up or right
newActionList.remove(0) # remove right
newActionList.remove(2) # remove up
elif self.row == last_row_index and self.column == last_column_index: # no down or right
newActionList.remove(0)
newActionList.remove(1)
# More general cases
elif self.row == 0: # no up
newActionList.remove(2) # remove up
elif self.row == last_row_index: # no down
newActionList.remove(1) # remove down
elif self.column == 0: # no left
newActionList.remove(3) # remove left
elif self.column == last_column_index: # no right
newActionList.remove(0) # remove right
return self.findMaxIndex(Qvalue, newActionList,last_row_index,last_column_index)
# change the state according to the action
def takeAction(self,array,action,Qvalue,last_row_index, last_column_index,epsilon):
# note that action is an integer value
# if 0 right, 1 down, 2 up, 3 left
# the following 4 lines of code can be discarded by inputting newAction list to this method
# I used a recursive approach for the solution by checking again whether motions are valid
# row = 0 -> we can’t go up – call eGreedy for a valid action
if self.row == 0 and action == 2: # if row 0 then up is not valid – call again eGreedy
newAction = self.eGreedy(Qvalue, epsilon,last_row_index, last_column_index)
return self.takeAction(array,newAction,Qvalue,last_row_index,last_column_index,epsilon)
# row == last_row_index -> can’t go down
elif self.row == last_row_index and action == 1:
newAction = self.eGreedy(Qvalue, epsilon,last_row_index, last_column_index)
return self.takeAction(array, newAction,Qvalue,last_row_index,last_column_index,epsilon)
# column == last_column_index can’t go right
elif self.column == last_column_index and action == 0:
newAction = self.eGreedy(Qvalue, epsilon,last_row_index, last_column_index)
return self.takeAction(array, newAction,Qvalue,last_row_index,last_column_index,epsilon)
# column = 0 -> can’t go left
elif self.column == 0 and action == 3:
newAction = self.eGreedy(Qvalue, epsilon,last_row_index, last_column_index)
return self.takeAction(array, newAction,Qvalue, last_row_index, last_column_index,epsilon)
else: # if everything is valid
if action == 0: # right
next_state = array[self.row][self.column+1] # update the state
elif action == 1: # down
next_state = array[self.row + 1][self.column] # update the state
elif action == 2: # up
next_state = array[self.row – 1][self.column] # update the state
elif action == 3: # left
next_state = array[self.row][self.column – 1]
return next_state
def findMaxIndex(self, Qvalue, actionList, last_row_index, last_column_index): # returns index of maximum number, for maximum value action
“””2D array modeled as a vector such a way that from left-to-right and up-to-down state number is incremented
i.e 0 – 1 – 2 – ….. – 11
12 – 13 – ……. – 23
24 – 25 – ……. – 35
36 – 37 – ……. – 47
Also state-action matrix is constrcuted and values Q values are stored in that in the following way
i.e Actions
States Right Down Up Left
… … … … ….
State 11 -4.4 14.6 0.6 6.4
… … … … ….
if the model is 12 row and 4 columns then state 11 refers to the right-top point in which case the maximum
action will be going down
“””
maxIndex = np.argmax(Qvalue[self.row*12+self.column])
return maxIndex
# checks whether the current state is terminal or not for ending the episode
def isTerminal(self,last_row_index):
# returns 1 if the current state is terminal – cliff or finish point
if self.row == last_row_index and self.column > 0:
return 1
else:
return 0
“””
@params
array = state grid we constructed
row_number = #rows of state grid
column_number = #columns of state grid
episodes = #episodes
steps = #steps within episodes
alpha = learning rate
gama = discount factor
“””
def applySarsa(array,row_number,column_number,episodes,steps,alpha,gama):
state_number = row_number * column_number
last_row_index = row_number – 1
last_column_index = column_number – 1
action_number = 4
epsilon_sarsa = 0.2 # hyperparameter to tune actually
# random initialization of the q table
q_table = np.random.rand(state_number, action_number)
# terminal state initialization
q_table[state_number – 1][0] = 0
q_table[state_number – 1][1] = 0
q_table[state_number – 1][2] = 0
q_table[state_number – 1][3] = 0
rewardArray = [] # initialize – for plotting purpose
episodeArray = [] # initialize – for plotting purpose
windowing_average_samples = 40
# the number here is the window for taking moving average to smooth the graph
averageRewardArray = np.zeros(windowing_average_samples) #
for i in range(episodes): # loop for each episode
“””Following code segment can be used to try epsilon decaying strategy
basically by decaying epsilon in couple of episodes we force the agent to follow only exploit
not explore after a point. For this problem I did not use this approach..
if(i % some_number_of_episodes == 0 and epsilon > some threshold):
epsilon = epsilon – 0.01 “””
totalReward = 0
episodeArray.append(i)
current_state = array[last_row_index][0] # initial state S
# choose action A from S using policy derived from Q
action = current_state.eGreedy(q_table,epsilon_sarsa, last_row_index, last_column_index)
count = 0
while count in range(steps) and current_state.isTerminal(last_row_index) == 0:
count = count + 1
# take action A, observe R,S’
next_state = current_state.takeAction(array, action,q_table,last_row_index,last_column_index,epsilon_sarsa)
reward_got = next_state.getReward()
totalReward = totalReward + reward_got
# choose A’ from S’ using policy derived from Q
followingAction = next_state.eGreedy(q_table,epsilon_sarsa,last_row_index, last_column_index)
q_table_next = q_table[((next_state.row)*12)+next_state.column][followingAction]
q_table_current = q_table[((current_state.row)*12)+current_state.column][action]
# update of the q method
q_table[((current_state.row)*12)+current_state.column][action] = q_table_current + alpha*(reward_got +
gama*q_table_next – q_table_current)
# update the state
current_state = next_state
action = followingAction
averageRewardArray = shiftAndAddArray(averageRewardArray, totalReward)
valueToDraw = np.sum(averageRewardArray) / np.count_nonzero(averageRewardArray)
rewardArray.append(valueToDraw)
print(“Sarsa TRAINING IS OVER”)
# draw the final path
drawPath(array, q_table,last_row_index,last_column_index, epsilon_sarsa)
# clip a segment because they corrupt the moving average
episodeArray = episodeArray[10:]
rewardArray = rewardArray[10:]
# plotting
plt.plot(episodeArray, rewardArray)
plt.xlabel(‘episodes’)
plt.ylabel(‘Total reward(smoothed)’)
print(averageRewardArray)
“””
@params
array = state grid we constructed
row_number = #rows of state grid
column_number = #columns of state grid
episodes = #episodes
steps = #steps within episodes
alpha = learning rate
gama = discount factor
“””
def applyQlearning(array,row_number,column_number,episodes,steps,alpha,gama):
epsilon_q = 0.1
state_number = row_number * column_number
last_row_index = row_number – 1
last_column_index = column_number – 1
action_number = 4
q_table = np.random.rand(state_number, action_number)
# terminal state initialization
q_table[state_number – 1][0] = 0
q_table[state_number – 1][1] = 0
q_table[state_number – 1][2] = 0
q_table[state_number – 1][3] = 0
rewardArray = []
episodeArray = []
# averageRewardArray = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
windowing_average_samples = 40
averageRewardArray = np.zeros(windowing_average_samples)
for i in range(episodes): # loop for each episode
step = 0
# if (i % 50 == 0 and epsilonNew > 0.01):
# epsilonNew = epsilonNew – 0.01
current_state = array[3][0] # initialize S
count = 0
totalReward = 0
episodeArray.append(i)
while (count in range(steps) and current_state.isTerminal(last_row_index) == 0):
step = step + 1
count = count + 1
# choose action A from S using policy derived from Q
action = current_state.eGreedy(q_table,epsilon_q, last_row_index,last_column_index)
# take action A, observe R,S’
next_state = current_state.takeAction(array, action, q_table,last_row_index,last_column_index,epsilon_q)
reward_got = next_state.getReward()
totalReward = totalReward + reward_got
actionMax = next_state.ePureGreedy(q_table,last_row_index,last_column_index)
q_table_max = q_table[(next_state.row * column_number) + next_state.column][actionMax]
q_table_current = q_table[(current_state.row * column_number) + current_state.column][action]
q_table[(current_state.row * column_number) + current_state.column][action] = q_table_current +
alpha * (reward_got + (gama * q_table_max) – q_table_current) # update
current_state = next_state
averageRewardArray = shiftAndAddArray(averageRewardArray, totalReward)
valueToDraw = np.sum(averageRewardArray) / np.count_nonzero(averageRewardArray)
rewardArray.append(valueToDraw)
print(“Q TRAINING IS OVER”)
episodeArray = episodeArray[10:]
rewardArray = rewardArray[10:]
drawPath(array, q_table,last_row_index,last_column_index,epsilon_q)
plt.plot(episodeArray, rewardArray)
plt.legend([‘SARSA’,’Q learning’])
print(averageRewardArray)
plt.title(f’Smoothed by averaging {windowing_average_samples} successive episodes’)
# limitting the overall boundary to have a nice vision
plt.ylim((-120, 0))
plt.show()
# this is for the test after the training
def drawPath(array,Qvalue,last_row_index, last_column_index,epsilon):
current_state = array[last_row_index][0] # initial state
greedyAction = current_state.ePureGreedy(Qvalue,last_row_index,last_column_index) # action is determined
while(current_state.isTerminal(last_row_index) == 0):
next_state = current_state.takeAction(array,greedyAction,Qvalue,last_row_index, last_column_index,epsilon)
next_state.changeSign(“X”)
followingAction = next_state.ePureGreedy(Qvalue,last_row_index, last_column_index)
current_state = next_state
greedyAction = followingAction
display2D(array)
def main():
episode_number = 600
steps = 500
gama = 0.9
alpha = 0.25
row_number = 4
column_number = 12
#construct the environment here q values are random integers
stateGrid = [[States(‘O’,-1,y,x) for x in range (column_number)] for y in range(row_number)]
#print(random.choice(actionList)) this code segment changes a random action from the list
display2D(stateGrid)
print(“OK HERE WE STOP”)
stateGrid[row_number – 1][0].changeSign(‘S’)
stateGrid[row_number – 1][column_number – 1].changeSign(‘F’)
for i in range(10):
stateGrid[row_number – 1][i+1].changeSign(‘C’)
#this loop displays the two d array in a neat format
display2D(stateGrid)
#construction stopped here
handleRewards(stateGrid, row_number – 1, column_number)
displayRewards(stateGrid)
print(“CONSTRUCTED”)
stateGrid2 = copy.deepcopy(stateGrid)
applySarsa(stateGrid, row_number, column_number, episode_number, steps, alpha, gama)
applyQlearning(stateGrid2, row_number, column_number, episode_number, steps, alpha, gama)
if __name__ == “__main__”:
main()
این کد الگوریتم صخرهنورد در پایتون بود که یکی از مثالهای معروف برای درس یادگیری تقویتی میباشد.
مقاله پیشنهادی:”الگوریتم Q-Learning در یادگیری تقویتی چیست؟“