强化学习-价值迭代


一、价值迭代算法

1、利用动态规划的思想不断优化$v_{\pi}\left( s \right)$

$v_{\pi}\left( s \right)=\underset{a}{max}\sum_{s',r}{p\left( s',r|s,a \right)\left( r+v_{\pi}\left( s' \right) \right)}$

2、根据状态-动作价值函数选择最优的策略

$q_{\pi}\left( s,a \right)=\sum_{s',r}{p\left( s',r|s,a \right)\left( r+v_{\pi}\left( s' \right) \right)}$

$\pi(a|s)=\underset{a}{max}q_{\pi}\left( s,a \right)$

二、价值迭代实例

1、游戏背景介绍

请参考:

2、代码实现

import numpy as np
import gym
from gym.spaces import Discrete

class SnakeEnv(gym.Env):
    
    #棋格数
    SIZE = 100
    
    def __init__(self, dices):
        
        #动作上限列表
        self.dices = dices 
        #梯子
        self.ladders = {82: 52, 52: 92, 26: 66, 98: 22, 14: 22, 96: 63, 35: 12, 54: 78, 76: 57}
        #状态空间
        self.observation_space = Discrete(self.SIZE + 1)
        #动作空间
        self.action_space = Discrete(len(dices))
        #初始位置
        self.pos = 1
        
    def reset(self):
        
        self.pos = 1
        return self.pos
    
    def step(self, a):
        
        step = np.random.randint(1, self.dices[a] + 1)
        self.pos += step
        
        #到达终点,结束游戏
        if self.pos == 100:
            return 100, 100, 1, {}
        #超过终点位置,回退
        elif self.pos > 100:
            self.pos = 200 - self.pos
            
        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
            
        return self.pos, -1, 0, {}
    
    def reward(self, s):
        
        if s == 100:
            return 100
        else:
            return -1
        
    def render(self):
        
        pass
    
class TableAgent():
    
    def __init__(self, env):
        
        #状态空间数
        self.s_len = env.observation_space.n
        #动作空间数
        self.a_len = env.action_space.n
        
        #每个状态的奖励
        self.r = [env.reward(s) for s in range(0, self.s_len)]
        #策略(初始时每个状态只采取第一个策略)
        self.pi = np.array([0 for s in range(0, self.s_len)])
        #状态转移概率
        self.p = np.zeros([self.s_len, self.a_len, self.s_len], 'float')
        
        ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)
        for src in range(1, 100):
            for i, dice in enumerate(env.dices):
                prob = 1 / dice
                step = np.arange(1, dice + 1)
                step += src
                step = np.piecewise(step, [step > 100, step <= 100], [lambda x: 200 - x, lambda x: x])
                step = ladder_move(step)
                for dst in step:
                    self.p[src, i, dst] += prob
                    
        #状态价值函数
        self.value_pi = np.zeros((self.s_len))
        #状态-动作价值函数
        self.value_q = np.zeros((self.s_len, self.a_len))
        #打折率
        self.gamma = 0.8
        
    def play(self, state):
        
        return self.pi[state]
    
def value_iteration(agent, max_iter = -1):
    
    iteration = 0
    #价值迭代
    while True:
        iteration += 1
        new_value_pi = np.zeros_like(agent.value_pi)
        #遍历状态
        for i in range(1, agent.s_len):
            value_sas = []
            #遍历动作
            for j in range(0, agent.a_len):
                value_sa = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi)
                value_sas.append(value_sa)
            new_value_pi[i] =  max(value_sas)
        
        diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
        if diff < 1e-6:
            break
        else:
            agent.value_pi = new_value_pi
        
        if iteration == max_iter:
            break
        
    print('Iter {} rounds converge'.format(iteration))
    
    #根据状态-动作价值函数选取最优的策略
    for i in range(1, agent.s_len):
        for j in range(0, agent.a_len):
            agent.value_q[i, j] = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi)
        max_act = np.argmax(agent.value_q[i, :])
        agent.pi[i] = max_act
        
def eval_game(env, policy):
    
    state = env.reset()
    return_val = 0
    
    for epoch in range(100):
        while True:
            if isinstance(policy, TableAgent):
                act = policy.play(state)
            elif isinstance(policy, list):
                act = policy[state]
            else:
                raise IOError('Illegal policy')
            
            state, reward, terminate, _ = env.step(act)
            return_val += reward
            
            if terminate:
                break
        
    return return_val / 100
    
def policy_iteration_demo():
    
    env = SnakeEnv([3, 6])
    agent = TableAgent(env)
    
    #只使用第一个策略
    agent.pi[:] = 0
    print('0 avg = {}'.format(eval_game(env, agent)))
    #只使用第二个策略
    agent.pi[:] = 1
    print('1 avg = {}'.format(eval_game(env, agent)))
    #97-100使用第一个策略,其余使用第二个策略
    agent.pi[97:100] = 0
    print('opt avg = {}'.format(eval_game(env, agent)))
    
    value_iteration(agent)
    print('pi avg = {}'.format(eval_game(env, agent)))
    print(agent.pi)
    
policy_iteration_demo()

3、运行结果