强化学习-广义策略迭代


一、广义策略迭代算法

将策略迭代和价值迭代结合起来(策略迭代和价值迭代都是广义策略迭代的特例)

比如:执行若干轮价值迭代后,转去执行策略迭代或执行若干轮策略迭代后再去执行价值迭代(可以根据需要去设计)

在执行策略迭代时,也可以适当减小策略评估的迭代轮数,不必等到状态价值函数收敛,这样可以提高算法的计算速度

二、广义策略迭代实例

1、游戏背景介绍

请参考:

2、代码实现

import numpy as np
import gym
from gym.spaces import Discrete
from contextlib import contextmanager
import time

class SnakeEnv(gym.Env):
    
    #棋格数
    SIZE = 100
    
    def __init__(self, dices):
        
        #动作上限列表
        self.dices = dices 
        #梯子
        self.ladders = {82: 52, 52: 92, 26: 66, 98: 22, 14: 22, 96: 63, 35: 12, 54: 78, 76: 57}
        #状态空间
        self.observation_space = Discrete(self.SIZE + 1)
        #动作空间
        self.action_space = Discrete(len(dices))
        #初始位置
        self.pos = 1
        
    def reset(self):
        
        self.pos = 1
        return self.pos
    
    def step(self, a):
        
        step = np.random.randint(1, self.dices[a] + 1)
        self.pos += step
        
        #到达终点,结束游戏
        if self.pos == 100:
            return 100, 100, 1, {}
        #超过终点位置,回退
        elif self.pos > 100:
            self.pos = 200 - self.pos
            
        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
            
        return self.pos, -1, 0, {}
    
    def reward(self, s):
        
        if s == 100:
            return 100
        else:
            return -1
        
    def render(self):
        
        pass
    
class TableAgent():
    
    def __init__(self, env):
        
        #状态空间数
        self.s_len = env.observation_space.n
        #动作空间数
        self.a_len = env.action_space.n
        
        #每个状态的奖励
        self.r = [env.reward(s) for s in range(0, self.s_len)]
        #策略(初始时每个状态只采取第一个策略)
        self.pi = np.array([0 for s in range(0, self.s_len)])
        #状态转移概率
        self.p = np.zeros([self.s_len, self.a_len, self.s_len], 'float')
        
        ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)
        for src in range(1, 100):
            for i, dice in enumerate(env.dices):
                prob = 1 / dice
                step = np.arange(1, dice + 1)
                step += src
                step = np.piecewise(step, [step > 100, step <= 100], [lambda x: 200 - x, lambda x: x])
                step = ladder_move(step)
                for dst in step:
                    self.p[src, i, dst] += prob
                    
        #状态价值函数
        self.value_pi = np.zeros((self.s_len))
        #状态-动作价值函数
        self.value_q = np.zeros((self.s_len, self.a_len))
        #打折率
        self.gamma = 0.8
        
    def play(self, state):
        
        return self.pi[state]

class PolicyIteration():
    
    def __init__(self):
        
        pass
    
    def policy_evaluation(self, agent, max_iter = -1):
        
        iteration = 0
        while True:
            iteration += 1
            new_value_pi = agent.value_pi.copy()
            for i in range(1, agent.s_len):
                ac = agent.pi[i]
                transition = agent.p[i, ac, :]
                #通过迭代使状态价值函数收敛
                value_sa = np.dot(transition, agent.r + agent.gamma * agent.value_pi)
                new_value_pi[i] = value_sa
                
            diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
            if diff < 1e-6:
                break
            else:
                agent.value_pi = new_value_pi
                
            if iteration == max_iter:
                break
    
    def policy_improvement(self, agent):
        
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            for j in range(0, agent.a_len):
                #计算状态-动作价值函数
                agent.value_q[i, j] = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi)
                #策略改进
                max_act = np.argmax(agent.value_q[i, :])
                new_policy[i] = max_act
        
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True
        
    def policy_iteration(self, agent):
        
        iteration = 0
        while True:
            iteration += 1
            self.policy_evaluation(agent)
            ret = self.policy_improvement(agent)
            if not ret:
                break
        
class ValueIteration():
    
    def __init__(self):
        
        pass
    
    def value_iteration(self, agent, max_iter = -1):
    
        iteration = 0
        #价值迭代
        while True:
            iteration += 1
            new_value_pi = np.zeros_like(agent.value_pi)
            #遍历状态
            for i in range(1, agent.s_len):
                value_sas = []
                #遍历动作
                for j in range(0, agent.a_len):
                    value_sa = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi)
                    value_sas.append(value_sa)
                new_value_pi[i] =  max(value_sas)
            
            diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
            if diff < 1e-6:
                break
            else:
                agent.value_pi = new_value_pi
            
            if iteration == max_iter:
                break
        
        #根据状态-动作价值函数选取最优的策略
        for i in range(1, agent.s_len):
            for j in range(0, agent.a_len):
                agent.value_q[i, j] = np.dot(agent.p[i, j, :], agent.r + agent.gamma * agent.value_pi)
            max_act = np.argmax(agent.value_q[i, :])
            agent.pi[i] = max_act
    
def eval_game(env, policy):
    
    state = env.reset()
    return_val = 0
    
    for epoch in range(100):
        while True:
            if isinstance(policy, TableAgent):
                act = policy.play(state)
            elif isinstance(policy, list):
                act = policy[state]
            else:
                raise IOError('Illegal policy')
            
            state, reward, terminate, _ = env.step(act)
            return_val += reward
            
            if terminate:
                break
        
    return return_val / 100

@contextmanager
def timer(name):
    
    start = time.time()
    yield
    end = time.time()
    print('{} cost:{}'.format(name, end - start))

def policy_iteration_demo1():
    
    env = SnakeEnv([3, 6])
    agent = TableAgent(env)
    
    pi_algo = PolicyIteration()
    iteration = 0
    with timer('time'):
        while True:
            iteration += 1
            #将策略评估轮数设置为10
            pi_algo.policy_evaluation(agent, 10)
            res = pi_algo.policy_improvement(agent)
            if not res:
                break
    print('return_val={}'.format(eval_game(env, agent)))
    print(agent.pi)
    
def policy_iteration_demo2():
    
    env = SnakeEnv([3, 6])
    
    pi_algo = PolicyIteration()
    vi_algo = ValueIteration()
    #策略迭代
    with timer('PolicyIteration'):
        agent1 = TableAgent(env)
        pi_algo.policy_iteration(agent1)
    #价值迭代
    with timer('ValueIteration'):
        agent2 = TableAgent(env)
        vi_algo.value_iteration(agent2)
    #广义策略迭代(这里只是简单地将价值迭代和策略迭代结合到了一起)
    with timer('GeneralizedPolicyIteration'):
        agent3 = TableAgent(env)
        vi_algo.value_iteration(agent3, 20)
        pi_algo.policy_iteration(agent3)
            
    print('return_val={}'.format(eval_game(env, agent1)))
    print(agent1.pi)
    print('return_val={}'.format(eval_game(env, agent2)))
    print(agent2.pi)
    print('return_val={}'.format(eval_game(env, agent3)))
    print(agent3.pi)

policy_iteration_demo1()
policy_iteration_demo2()

3、运行结果