强化学习-价值迭代
一、价值迭代算法
1、利用动态规划的思想不断优化$v_{\pi}\left( s \right)$
$v_{\pi}\left( s \right)=\underset{a}{max}\sum_{s',r}{p\left( s',r|s,a \right)\left( r+v_{\pi}\left( s' \right) \right)}$
2、根据状态-动作价值函数选择最优的策略
$q_{\pi}\left( s,a \right)=\sum_{s',r}{p\left( s',r|s,a \right)\left( r+v_{\pi}\left( s' \right) \right)}$
$\pi(a|s)=\underset{a}{max}q_{\pi}\left( s,a \right)$
二、价值迭代实例
1、游戏背景介绍
请参考:
2、代码实现
import numpy as np import gym from gym.spaces import Discrete class SnakeEnv(gym.Env): #棋格数 SIZE = 100 def __init__(self, dices): #动作上限列表 self.dices = dices #梯子 self.ladders = {82: 52, 52: 92, 26: 66, 98: 22, 14: 22, 96: 63, 35: 12, 54: 78, 76: 57} #状态空间 self.observation_space = Discrete(self.SIZE + 1) #动作空间 self.action_space = Discrete(len(dices)) #初始位置 self.pos = 1 def reset(self): self.pos = 1 return self.pos def step(self, a): step = np.random.randint(1, self.dices[a] + 1) self.pos += step #到达终点,结束游戏 if self.pos == 100: return 100, 100, 1, {} #超过终点位置,回退 elif self.pos > 100: self.pos = 200 - self.pos if self.pos in self.ladders: self.pos = self.ladders[self.pos] return self.pos, -1, 0, {} def reward(self, s): if s == 100: return 100 else: return -1 def render(self): pass class TableAgent(): def __init__(self, env): #状态空间数 self.s_len = env.observation_space.n #动作空间数 self.a_len = env.action_space.n #每个状态的奖励 self.r = [env.reward(s) for s in range(0, self.s_len)] #策略(初始时每个状态只采取第一个策略) self.pi = np.array([0 for s in range(0, self.s_len)]) #状态转移概率 self.p = np.zeros([self.s_len, self.a_len, self.s_len], 'float') ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x) for src in range(1, 100): for i, dice in enumerate(env.dices): prob = 1 / dice step = np.arange(1, dice + 1) step += src step = np.piecewise(step, [step > 100, step <= 100], [lambda x: 200 - x, lambda x: x]) step = ladder_move(step) for dst in step: self.p[src, i, dst] += prob #状态价值函数 self.value_pi = np.zeros((self.s_len)) #状态-动作价值函数 self.value_q = np.zeros((self.s_len, self.a_len)) #打折率 self.gamma = 0.8 def play(self, state): return self.pi[state] def value_iteration(agent, max_iter = -1): iteration = 0 #价值迭代 while True: iteration += 1 new_value_pi = np.zeros_like(agent.value_pi) #遍历状态 for i in range(1, agent.s_len): value_sas = [] #遍历动作 for j in range(0, agent.a_len): value_sa = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi) value_sas.append(value_sa) new_value_pi[i] = max(value_sas) diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2))) if diff < 1e-6: break else: agent.value_pi = new_value_pi if iteration == max_iter: break print('Iter {} rounds converge'.format(iteration)) #根据状态-动作价值函数选取最优的策略 for i in range(1, agent.s_len): for j in range(0, agent.a_len): agent.value_q[i, j] = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi) max_act = np.argmax(agent.value_q[i, :]) agent.pi[i] = max_act def eval_game(env, policy): state = env.reset() return_val = 0 for epoch in range(100): while True: if isinstance(policy, TableAgent): act = policy.play(state) elif isinstance(policy, list): act = policy[state] else: raise IOError('Illegal policy') state, reward, terminate, _ = env.step(act) return_val += reward if terminate: break return return_val / 100 def policy_iteration_demo(): env = SnakeEnv([3, 6]) agent = TableAgent(env) #只使用第一个策略 agent.pi[:] = 0 print('0 avg = {}'.format(eval_game(env, agent))) #只使用第二个策略 agent.pi[:] = 1 print('1 avg = {}'.format(eval_game(env, agent))) #97-100使用第一个策略,其余使用第二个策略 agent.pi[97:100] = 0 print('opt avg = {}'.format(eval_game(env, agent))) value_iteration(agent) print('pi avg = {}'.format(eval_game(env, agent))) print(agent.pi) policy_iteration_demo()
3、运行结果