编码实现RNN以及LSTM


本次涉及代码较多,部分重复代码不再重复发布。重复的代码详见以下链接:

https://blog.csdn.net/qq_37402392/article/details/121468321?spm=1001.2014.3001.5501

bn_layers.py

cnn_layers.py

dropout_layers.py

layers.py

updater.py

以下是本次添加的内容。

captioning_trainer.py

训练器,与之前的训练器Trainer类似。

#-*- coding: utf-8 -*-
import numpy as np

from coco_utils import *
import updater

class CaptioningTrainer(object):
    """ 
    CaptioningTrainer大部分内容和前面的Trainer相同
    使用方法:
    data = load_coco_data()
    model = MyAwesomeModel(hidden_dim=100)
    trainer = CaptioningTrainer(model, data,
                                    update_rule='sgd',
                                    updater_config={
                                        'learning_rate': 1e-3,
                                    },
                                    lr_decay=0.95,
                                    num_epochs=10, batch_size=100,
                                    print_every=100)
    trainer.train()
    """

    def __init__(self, model, data, **kwargs):
        """
        初始化CaptioningTrainer
        所需参数:
        - model: RNN模型
        - data: coco数据集

        可选参数:
        - update_rule:更新规则,查看 updater.py.
            默认为 'sgd'.
        - updater_config: 更新器配置
        - lr_decay:学习率衰减因子
        - batch_size: 批量大小
        - num_epochs: 迭代次数
        - print_every:每训练多少次,打印训练结果
        - verbose:是否打印训练中间结果
        """
        self.model = model
        self.data = data
        
        self.update_rule = kwargs.pop('update_rule', 'sgd')
        self.updater_config = kwargs.pop('updater_config', {})
        self.lr_decay = kwargs.pop('lr_decay', 1.0)
        self.batch_size = kwargs.pop('batch_size', 100)
        self.num_epochs = kwargs.pop('num_epochs', 10)

        self.print_every = kwargs.pop('print_every', 10)
        self.verbose = kwargs.pop('verbose', True)

        if len(kwargs) > 0:
            extra = ', '.join('"%s"' % k for k in kwargs.keys())
            raise ValueError('Unrecognized arguments %s' % extra)

        if not hasattr(updater, self.update_rule):
            raise ValueError('Invalid update_rule "%s"' % self.update_rule)
        self.update_rule = getattr(updater, self.update_rule)

        self._reset()


    def _reset(self):
        self.epoch = 0
        self.best_val_acc = 0
        self.best_params = {}
        self.loss_history = []
        self.train_acc_history = []
        self.val_acc_history = []

        self.updater_configs = {}
        for p in self.model.params:
            d = {k: v for k, v in self.updater_config.items()}
            self.updater_configs[p] = d


    def _step(self):
        minibatch = sample_coco_minibatch(self.data,
                                    batch_size=self.batch_size,
                                    split='train')
        captions, features, urls = minibatch

        loss, grads = self.model.loss(features, captions)
        self.loss_history.append(loss)

        for p, w in self.model.params.items():
            dw = grads[p]
            config = self.updater_configs[p]
            next_w, next_config = self.update_rule(w, dw, config)
            self.model.params[p] = next_w
            self.updater_configs[p] = next_config

    def train(self):
        num_train = self.data['train_captions'].shape[0]
        iterations_per_epoch = max(num_train / self.batch_size, 1)
        num_iterations = int(self.num_epochs * iterations_per_epoch)
        
        for t in range(num_iterations):
            self._step()


            if self.verbose and t % self.print_every == 0:
                print('(Iteration %d / %d) loss: %f' % (
                             t + 1, num_iterations, self.loss_history[-1]))

            epoch_end = (t + 1) % iterations_per_epoch == 0
            if epoch_end:
                self.epoch += 1
                for k in self.updater_configs:
                    self.updater_configs[k]['learning_rate'] *= self.lr_decay


coco_utils.py

对coco数据集进行处理,包括了读取数据文件、解码数据以及小批量读取数据

import os, json
import numpy as np
import h5py


def load_coco_data(base_dir='datasets/coco_captioning', max_train=None,
                         pca_features=True):
    '''
    读取CoCo训练文件

    Parameters
    ----------
    base_dir : TYPE, optional
        数据文件位置. The default is 'datasets/coco_captioning'.
    max_train : TYPE, optional
        是否对训练数据进行再抽样. The default is None.
    pca_features : TYPE, optional
        是否使用降维特征. The default is True.

    Returns
    -------
    data : TYPE
        读取的数据文件.

    '''
    # 保存数据
    data = {}
    # 获得文件
    caption_file = os.path.join(base_dir, 'coco2014_captions.h5')
    # 添加文件汇总的内容
    with h5py.File(caption_file, 'r') as f:
        for k, v in f.items():
            data[k] = np.asarray(v)
    
    # 是否使用降维特征
    if pca_features:
        train_feat_file = os.path.join(base_dir, 'train2014_vgg16_fc7_pca.h5')
    else:
        train_feat_file = os.path.join(base_dir, 'train2014_vgg16_fc7.h5')
    with h5py.File(train_feat_file, 'r') as f:
        data['train_features'] = np.asarray(f['features'])

    if pca_features:
        val_feat_file = os.path.join(base_dir, 'val2014_vgg16_fc7_pca.h5')
    else:
        val_feat_file = os.path.join(base_dir, 'val2014_vgg16_fc7.h5')
    with h5py.File(val_feat_file, 'r') as f:
        data['val_features'] = np.asarray(f['features'])

    dict_file = os.path.join(base_dir, 'coco2014_vocab.json')
    with open(dict_file, 'r') as f:
        dict_data = json.load(f)
        for k, v in dict_data.items():
            data[k] = v

    train_url_file = os.path.join(base_dir, 'train2014_urls.txt')
    with open(train_url_file, 'r') as f:
        train_urls = np.asarray([line.strip() for line in f])
    data['train_urls'] = train_urls

    val_url_file = os.path.join(base_dir, 'val2014_urls.txt')
    with open(val_url_file, 'r') as f:
        val_urls = np.asarray([line.strip() for line in f])
    data['val_urls'] = val_urls

    # 也许对训练数据进行再抽样
    if max_train is not None:
        num_train = data['train_captions'].shape[0]
        mask = np.random.randint(num_train, size=max_train)
        data['train_captions'] = data['train_captions'][mask]
        data['train_image_idxs'] = data['train_image_idxs'][mask]

    return data


def decode_captions(captions, idx_to_word):
    singleton = False
    if captions.ndim == 1:
        singleton = True
        captions = captions[None]
    decoded = []
    N, T = captions.shape
    for i in range(N):
        words = []
        for t in range(T):
            word = idx_to_word[captions[i, t]]
            if word != '':
                words.append(word)
            if word == '':
                break
        decoded.append(' '.join(words))
    if singleton:
        decoded = decoded[0]
    return decoded


def sample_coco_minibatch(data, batch_size=100, split='train'):
    split_size = data['%s_captions' % split].shape[0]
    mask = np.random.choice(split_size, batch_size)
    captions = data['%s_captions' % split][mask]
    image_idxs = data['%s_image_idxs' % split][mask]
    image_features = data['%s_features' % split][image_idxs]
    urls = data['%s_urls' % split][image_idxs]
    return captions, image_features, urls

image_utils.py

通用的方法用于展示过程中的图片。

import urllib.request, os, tempfile

import numpy as np
from scipy.misc import imread

from cnn_layers import conv_forward_fast


"""
Utility functions used for viewing and processing images.
"""


def blur_image(X):
    """
    A very gentle image blurring operation, to be used as a regularizer for image
    generation.
    
    Inputs:
    - X: Image data of shape (N, 3, H, W)
    
    Returns:
    - X_blur: Blurred version of X, of shape (N, 3, H, W)
    """
    w_blur = np.zeros((3, 3, 3, 3))
    b_blur = np.zeros(3)
    blur_param = {'stride': 1, 'pad': 1}
    for i in range(3):
        w_blur[i, i] = np.asarray([[1, 2, 1], [2, 188, 2], [1, 2, 1]], dtype=np.float32)
    w_blur /= 200.0
    return conv_forward_fast(X, w_blur, b_blur, blur_param)[0]


def preprocess_image(img, mean_img, mean='image'):
    """
    Convert to float, transepose, and subtract mean pixel
    
    Input:
    - img: (H, W, 3)
    
    Returns:
    - (1, 3, H, 3)
    """
    if mean == 'image':
        mean = mean_img
    elif mean == 'pixel':
        mean = mean_img.mean(axis=(1, 2), keepdims=True)
    elif mean == 'none':
        mean = 0
    else:
        raise ValueError('mean must be image or pixel or none')
    return img.astype(np.float32).transpose(2, 0, 1)[None] - mean


def deprocess_image(img, mean_img, mean='image', renorm=False):
    """
    Add mean pixel, transpose, and convert to uint8
    
    Input:
    - (1, 3, H, W) or (3, H, W)
    
    Returns:
    - (H, W, 3)
    """
    if mean == 'image':
        mean = mean_img
    elif mean == 'pixel':
        mean = mean_img.mean(axis=(1, 2), keepdims=True)
    elif mean == 'none':
        mean = 0
    else:
        raise ValueError('mean must be image or pixel or none')
    if img.ndim == 3:
        img = img[None]
    img = (img + mean)[0].transpose(1, 2, 0)
    if renorm:
        low, high = img.min(), img.max()
        img = 255.0 * (img - low) / (high - low)
    return img.astype(np.uint8)


def image_from_url(url):
    """
    Read an image from a URL. Returns a numpy array with the pixel data.
    We write the image to a temporary file then read it back. Kinda gross.
    """
    try:
        f = urllib.request.urlopen(url)
        _, fname = tempfile.mkstemp()
        with open(fname, 'wb') as ff:
            ff.write(f.read())
        img = imread(fname)
        os.remove(fname)
        return img
    except urllib.request.URLError as e:
        print('URL Error: ', e.reason, url)
    except urllib.request.HTTPError as e:
        print('HTTP Error: ', e.code, url)

rnn.py

图片说明任务RNN网络类

#-*- coding: utf-8 -*-
import numpy as np

from layers import *
from rnn_layers import *


class CaptioningRNN(object):
    """
    处理图片说明任务RNN网络
    注意:不使用正则化
    """
    
    def __init__(self, word_to_idx, input_dim=512, wordvec_dim=128,
                             hidden_dim=128, cell_type='rnn'):
        """
        初始化CaptioningRNN 
        Inputs:
        - word_to_idx: 单词字典,用于查询单词索引对应的词向量
        - input_dim: 输入图片数据维度
        - wordvec_dim: 词向量维度.
        - hidden_dim: RNN隐藏层维度.
        - cell_type: 细胞类型; 'rnn' 或 'lstm'.
        """
        # 参数检验
        if cell_type not in {'rnn', 'lstm'}:
            raise ValueError('Invalid cell_type "%s"' % cell_type)
        
        # 初始化数据
        self.cell_type = cell_type
        self.word_to_idx = word_to_idx
        self.idx_to_word = {i: w for w, i in word_to_idx.items()}
        self.params = {}
        
        vocab_size = len(word_to_idx)

        self._null = word_to_idx['']
        self._start = word_to_idx.get('', None)
        self._end = word_to_idx.get('', None)
        
        # 初始化词向量
        self.params['W_embed'] = np.random.randn(vocab_size, wordvec_dim)
        self.params['W_embed'] /= 100
        
        # 初始化 CNN -> 隐藏层参数,用于将图片特征提取到RNN中
        self.params['W_proj'] = np.random.randn(input_dim, hidden_dim)
        self.params['W_proj'] /= np.sqrt(input_dim)
        self.params['b_proj'] = np.zeros(hidden_dim)

        # 初始化RNN参数
        dim_mul = {'lstm': 4, 'rnn': 1}[cell_type]
        self.params['Wx'] = np.random.randn(wordvec_dim, dim_mul * hidden_dim)
        self.params['Wx'] /= np.sqrt(wordvec_dim)
        self.params['Wh'] = np.random.randn(hidden_dim, dim_mul * hidden_dim)
        self.params['Wh'] /= np.sqrt(hidden_dim)
        self.params['b'] = np.zeros(dim_mul * hidden_dim)
        
        # 初始化输出层参数 
        self.params['W_vocab'] = np.random.randn(hidden_dim, vocab_size)
        self.params['W_vocab'] /= np.sqrt(hidden_dim)
        self.params['b_vocab'] = np.zeros(vocab_size)
            

    def loss(self, features, captions):
        """
        计算RNN或LSTM的损失值。
        Inputs:
        - features: 输入图片特征(N, D)。
        - captions: 图像文字说明(N, T)。 
            
        Returns 元组:
        - loss: 损失值。
        - grads:梯度。
        """
        #将文字切分为两段:captions_in除去最后一词用于RNN输入
        #captions_out除去第一个单词,用于RNN输出配对
        captions_in = captions[:, :-1]
        captions_out = captions[:, 1:]
        
        # 掩码 
        mask = (captions_out != self._null)

        # 图像仿射转换矩阵
        W_proj, b_proj = self.params['W_proj'], self.params['b_proj']
        
        # 词嵌入矩阵
        W_embed = self.params['W_embed']

        # RNN参数
        Wx, Wh, b = self.params['Wx'], self.params['Wh'], self.params['b']

        # 隐藏层输出转化矩阵
        W_vocab, b_vocab = self.params['W_vocab'], self.params['b_vocab']
        
        loss, grads = 0.0, {}
        ############################################################################
        #                        任务:实现CaptioningRNN传播                          #
        #         (1)使用仿射变换(features,W_proj,b_proj),                           #
        #                     将图片特征输入进隐藏层初始状态h0(N,H)                      #
        #         (2)使用词嵌入层将captions_in中的单词索引转换为词向量(N,T,W)              #
        #         (3)使用RNN或LSTM处理词向量(N,T,H)                                    #
        #         (4)使用时序仿射传播temporal_affine_forward计算各单词得分(N,T,V)        #
        #         (5)使用temporal_softmax_loss计算损失值                              #
        ############################################################################
        # 1 使用仿射变换(features,W_proj,b_proj),将图片特征输入进隐藏层初始状态h0(N,H)
        h0, cache_h0 = affine_forward(features, W_proj, b_proj)
        # 2 使用词嵌入层将captions_in中的单词索引转换为词向量(N,T,W)
        x, cache_embedding = word_embedding_forward(captions_in, W_embed)
        # 3 使用RNN或LSTM处理词向量(N,T,H)
        if self.cell_type == 'rnn':
            out_h, cache_rnn = rnn_forward(x, h0, Wx, Wh, b)
        elif self.cell_type == 'lstm':
            out_h, cache_rnn = lstm_forward(x, h0, Wx, Wh, b)
        else:
            raise ValueError('Invalid cell_type "%s"' % self.cell_type)
        # 4 使用时序仿射传播temporal_affine_forward计算各单词得分(N,T,V)
        yHat, cache_out = temporal_affine_forward(out_h, W_vocab, b_vocab)
        # 5 使用temporal_softmax_loss计算损失值
        loss, dy = temporal_softmax_loss(yHat, captions_out, mask, verbose=False)
        # 计算梯度
        dout_h, dW_vocab, db_vocab = temporal_affine_backward(dy, cache_out)
        # 输出层到隐藏层的反向传播
        if self.cell_type == 'rnn':
            dx, dh0, dWx, dWh, db = rnn_backward(dout_h, cache_rnn)
        elif self.cell_type == 'lstm':
            dx, dh0, dWx, dWh, db = lstm_backward(dout_h, cache_rnn)
        else:
            raise ValueError('Invalid cell_type "%s"' % self.cell_type)
        # 隐藏层到隐藏层自身的反向传播 
        dW_embed = word_embedding_backward(dx, cache_embedding)
        # 隐藏层到输入层的反向传播
        dfeatures, dW_proj, db_proj = affine_backward(dh0, cache_h0)
        # 记录梯度
        grads['W_proj'] = dW_proj
        grads['b_proj'] = db_proj
        grads['W_embed'] = dW_embed
        grads['Wx'] = dWx
        grads['Wh'] = dWh
        grads['b'] = db
        grads['W_vocab'] = dW_vocab
        grads['b_vocab'] = db_vocab
        ############################################################################
        #                          结束编码                                          #
        ############################################################################
        return loss, grads


    def sample(self, features, max_length=30):
        """
        测试阶段的前向传播过程,采样一批图片说明作为输入
        Inputs:
        - features: 图片特征(N, D).
        - max_length:生成说明文字的最大长度

        Returns:
        - captions: 说明文字的字典索引串(N, max_length)
        """
        N = features.shape[0]
        captions = self._null * np.ones((N, max_length), dtype=np.int32)

        W_proj, b_proj = self.params['W_proj'], self.params['b_proj']
        W_embed = self.params['W_embed']
        Wx, Wh, b = self.params['Wx'], self.params['Wh'], self.params['b']
        W_vocab, b_vocab = self.params['W_vocab'], self.params['b_vocab']
        
        ###########################################################################
        #                             任务:测试阶段前向传播                                                                        #
        #    提示:(1)第一个单词应该是标记,captions[:,0]=self._start                 #
        #             (2)当前单词输入为之前RNN的输出                                                                        #
        #        (3)前向传播过程为预测当前单词的下一个单词,                                                    #
        #         你需要计算所有单词得分,然后选取最大得分作为预测单词                                #
        #        (4)你无法使用rnn_forward 或 lstm_forward函数,                                                 #
        #        你需要循环调用rnn_step_forward或lstm_step_forward函数                                #
        ###########################################################################
        # 获取数据
        N, D = features.shape
        affine_out, affine_cache = affine_forward(features, W_proj, b_proj)
        prev_word_idx = [self._start]*N
        prev_h = affine_out
        prev_c = np.zeros(prev_h.shape)
        # 1第一个单词应该是标记
        captions[:, 0] = self._start
        for i in range(1, max_length):
            # 2当前单词输入为之前RNN的输出 
            prev_word_embed = W_embed[prev_word_idx]
            # 4循环调用rnn_step_forward或lstm_step_forward函数
            if self.cell_type == 'rnn':
                next_h, rnn_step_cache = rnn_step_forward(prev_word_embed, prev_h,
                                                          Wx, Wh, b)
            elif self.cell_type == 'lstm':
                next_h, next_c, lstm_step_cache = lstm_step_forward(prev_word_embed, prev_h,
                                                          prev_c, Wx, Wh, b)
                prev_c = next_c
            else:
                raise ValueError('Invalid cell_type "%s"' % self.cell_type)
            vocab_affine_out, vocab_affine_out_cache = affine_forward(next_h, 
                            W_vocab, b_vocab)
            # 3计算所有单词得分,然后选取最大得分作为预测单词 
            captions[:, i] = list(np.argmax(vocab_affine_out, axis=1))
            prev_word_idx = captions[:, i]
            prev_h = next_h
        ############################################################################
        #                                                         结束编码                                                                         #
        ############################################################################
        return captions

rnn_layers.py

RNN隐藏层需要使用到的方法,包括了RNN、LSTM以及词嵌入的前向传播和反向传播。

#-*- coding: utf-8 -*-
import numpy as np

def rnn_step_forward(x, prev_h, Wx, Wh, b):
    """
    RNN单步前向传播,使用tanh激活单元
    Inputs:
    - x: 当前时间步数据输入(N, D).
    - prev_h: 前一时间步隐藏层状态 (N, H)
    - Wx: 输入层到隐藏层连接权重(D, H)
    - Wh:隐藏层到隐藏层连接权重(H, H)
    - b: 隐藏层偏置项(H,)

    Returns 元组:
    - next_h: 下一隐藏层状态(N, H)
    - cache: 缓存
    """
    next_h, cache = None, None
    ##############################################################################
    #                        任务:实现RNN单步前向传播                              #
    #                         将输出值储存在next_h中,                              #
    #                 将反向传播时所需的各项缓存存放在cache中                         #
    ##############################################################################
    # 计算神经元输入
    a = prev_h.dot(Wh)+x.dot(Wx)+b
    # 神经元激活
    next_h = np.tanh(a)
    # 保留过程中的数据
    cache = (x, prev_h, Wh, Wx, b, next_h)    
    ##############################################################################
    #                      结束编码                                               #
    ##############################################################################
    return next_h, cache


def rnn_step_backward(dnext_h, cache):
    """
    RNN单步反向传播。
    Inputs:
    - dnext_h: 后一时间片段的梯度。
    - cache: 前向传播时的缓存。
    
    Returns 元组:
    - dx: 数据梯度(N, D)。
    - dprev_h: 前一时间片段梯度(N, H)。
    - dWx: 输入层到隐藏层权重梯度(D,H)。
    - dWh:    隐藏层到隐藏层权重梯度(H, H)。
    - db: 偏置项梯度(H,)。
    """
    dx, dprev_h, dWx, dWh, db = None, None, None, None, None
    ##############################################################################
    #                            任务:实现RNN单步反向传播                           #
    #            提示:tanh(x)梯度:    1 - tanh(x)*tanh(x)                         # 
    ##############################################################################
    # 获取缓存数据
    x, prev_h, Wh, Wx, b, next_h = cache
    # 根据链式求导法则依次计算各个变量的梯度
    dscores = dnext_h*(1-next_h*next_h)
    dWx = np.dot(x.T, dscores)
    db = np.sum(dscores, axis=0)
    dWh = np.dot(prev_h.T, dscores)
    dx = np.dot(dscores, Wx.T)
    dprev_h = np.dot(dscores, Wh.T)
    ##############################################################################
    #                                                             结束编码                                                                         #
    ##############################################################################
    return dx, dprev_h, dWx, dWh, db


def rnn_forward(x, h0, Wx, Wh, b):
    """
    RNN前向传播。
    Inputs:
    - x: 完整的时序数据 (N, T, D)。
    - h0: 隐藏层初始化状态 (N, H)。
    - Wx: 输入层到隐藏层权重 (D, H)。
    - Wh:    隐藏层到隐藏层权重(H, H)。
    - b: 偏置项(H,)。
    
    Returns 元组:
    - h: 所有时间步隐藏层状态(N, T, H)。
    - cache: 反向传播所需的缓存。
    """
    h, cache = None, None
    ##############################################################################
    #                              任务:实现RNN前向传播。                           #
    #                提示: 使用前面实现的rnn_step_forward 函数。                     #
    ##############################################################################
    # 获取数据维度
    N, T, D = x.shape
    (H, ) = b.shape
    # 初始化h
    h = np.zeros((N, T, H))
    # 获取默认隐藏层状态
    prev_h = h0
    # 遍历所有时间
    for t in range(T):
        # 获取当前时间片段
        xt = x[:, t, :]
        # 计算每一个片段
        next_h, _ = rnn_step_forward(xt, prev_h, Wx, Wh, b)
        # 更新状态
        prev_h = next_h
        # 保留结果
        h[:, t, :] = prev_h
    # 数据缓存,
    cache = (x, h0, Wh, Wx, b, h)
    ##############################################################################
    #                              结束编码                                                                                 #
    ##############################################################################
    return h, cache


def rnn_backward(dh, cache):
    """
    RNN反向传播。
    Inputs:
    - dh: 隐藏层所有时间步梯度(N, T, H)。
    Returns 元组:
    - dx: 输入数据时序梯度(N, T, D)。
    - dh0: 初始隐藏层梯度(N, H)。
    - dWx: 输入层到隐藏层权重梯度(D, H)。
    - dWh: 隐藏层到隐藏层权重梯度(H, H)。
    - db: 偏置项梯度(H,)。
    """
    dx, dh0, dWx, dWh, db = None, None, None, None, None
    ##############################################################################
    #                              任务:实现RNN反向传播。                           #
    #                        提示:使用 rnn_step_backward函数。                     #
    ##############################################################################
    # 获取缓存数据
    x, h0, Wh, Wx, b, h = cache 
    # 获取数据维度
    N, T, H = dh.shape
    _, _, D = x.shape
    # 得到最后的细胞状态
    next_h = h[:, T-1, :]
    # 初始化
    dprev_h = np.zeros((N, H))
    dx = np.zeros((N, T, D))
    dh0 = np.zeros((N, H))
    dWx = np.zeros((D, H))
    dWh = np.zeros((H, H))
    db = np.zeros((H,))
    # 遍历所有时间片段
    for t in range(T):
        # 当前处理的时间片段(从后往前)
        t = T-1-t
        # 获取对应的数据
        xt = x[:, t, :]
        # 最初时间片段的之前细胞状态默认为h0
        if t == 0:
            prev_h = h0
        else:
            prev_h = h[:, t-1, :]
        # 获取缓存数据 
        step_cache = (xt, prev_h, Wh, Wx, b, next_h)
        # 更新状态
        next_h = prev_h
        dnext_h = dh[:, t, :]+dprev_h
        # 进行反向传播
        dx[:, t, :], dprev_h, dWxt, dWht, dbt = rnn_step_backward(dnext_h, step_cache)
        # 状态累加
        dWx, dWh, db = dWx+dWxt, dWh+dWht, db+dbt
    # 记录h0的梯度
    dh0 = dprev_h
    ##############################################################################
    #                                    结束编码                                  #
    ##############################################################################
    return dx, dh0, dWx, dWh, db


def word_embedding_forward(x, W):
    """
    词嵌入前向传播,将数据矩阵中的N条长度为T的词索引转化为词向量。
    如:W[x[i,j]]表示第i条,第j时间步单词索引所对应的词向量。
    Inputs:
    - x: 整数型数组(N,T),N表示数据条数,T表示单条数据长度,
        数组的每一元素存放着单词索引,取值范围[0,V)。
    - W: 词向量矩阵(V,D)存放各单词对应的向量。
    
    Returns 元组:
    - out:输出词向量(N, T, D)。 
    - cache:反向传播时所需的缓存。
    """
    out, cache = None, None
    ##############################################################################
    #                           任务:实现词嵌入前向传播。                            #
    ##############################################################################
    # 获取数据维度
    N, T = x.shape
    V, D = W.shape
    # 初始化
    out = np.zeros((N, T, D))
    # 遍历所有数据
    for i in range(N):
        for j in range(T):
            # 将其转化为词向量
            out[i, j] = W[x[i, j]]
    cache = (x, W.shape)
    ##############################################################################
    #                                        结束编码                              #
    ##############################################################################
    return out, cache


def word_embedding_backward(dout, cache):
    """
    词嵌入反向传播
    
    Inputs:
    - dout: 上层梯度 (N, T, D)
    - cache:前向传播缓存
    
    Returns:
    - dW: 词嵌入矩阵梯度(V, D).
    """
    dW = None
    ##############################################################################
    #                          任务:实现词嵌入反向传播                               #
    #                     提示:你可以使用np.add.at函数                              #
    #            例如 np.add.at(a,[1,2],1)相当于a[1],a[2]分别加1                    #
    ##############################################################################
    x, W_shape = cache
    dW = np.zeros(W_shape)
    # np.add.at()是将传入的数组中制定下标位置的元素加上指定的值.
    np.add.at(dW, x, dout)
    ##############################################################################
    #                                                             结束编码                                                                         #
    ##############################################################################
    return dW


def sigmoid(x):
    """
    数值稳定版本的sigmoid函数。
    """
    pos_mask = (x >= 0)
    neg_mask = (x < 0)
    z = np.zeros_like(x)
    z[pos_mask] = np.exp(-x[pos_mask])
    z[neg_mask] = np.exp(x[neg_mask])
    top = np.ones_like(x)
    top[neg_mask] = z[neg_mask]
    return top / (1 + z)


def lstm_step_forward(x, prev_h, prev_c, Wx, Wh, b):
    """
    LSTM单步前向传播
    
    Inputs:
    - x: 输入数据 (N, D)
    - prev_h: 前一隐藏层状态 (N, H)
    - prev_c: 前一细胞状态(N, H)
    - Wx: 输入层到隐藏层权重(D, 4H)
    - Wh: 隐藏层到隐藏层权重 (H, 4H)
    - b: 偏置项(4H,)
    
    Returns 元组:
    - next_h:    下一隐藏层状态(N, H)
    - next_c:    下一细胞状态(N, H)
    - cache: 反向传播所需的缓存
    """
    next_h, next_c, cache = None, None, None
    #############################################################################
    #                            任务:实现LSTM单步前向传播。                        #
    #                 提示:稳定版本的sigmoid函数已经帮你实现,直接调用即可。            #
    #                             tanh函数使用np.tanh。                           #
    #############################################################################
    # 获取数据
    N, D = x.shape
    N, H = prev_h.shape
    # 计算输入门、遗忘门、输出门
    input_gate = sigmoid(np.dot(x, Wx[:, 0:H])+np.dot(prev_h, Wh[:, 0:H])+b[0:H])
    forget_gate = sigmoid(np.dot(x, Wx[:, H:2*H])+np.dot(prev_h, Wh[:, H:2*H])
                          +b[H:2*H])
    output_gate = sigmoid(np.dot(x, Wx[:, 2*H:3*H])+np.dot(prev_h, Wh[:, 2*H:3*H])
                          +b[2*H:3*H])
    # 计算输出单元
    input_data = np.tanh(np.dot(x, Wx[:, 3*H:4*H])+np.dot(prev_h, Wh[:, 3*H:4*H])
                         +b[3*H:4*H])
    # 更新细胞记忆
    next_c = forget_gate*prev_c+input_data*input_gate
    # 计算细胞输出
    next_scores_c = np.tanh(next_c)
    next_h = output_gate*next_scores_c
    cache = (x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate,
             prev_h, prev_c, next_scores_c)
    ##############################################################################
    #                             结束编码                                         #
    ##############################################################################
    return next_h, next_c, cache


def lstm_step_backward(dnext_h, dnext_c, cache):
    """
     LSTM单步反向传播
    
    Inputs:
    - dnext_h: 下一隐藏层梯度 (N, H)
    - dnext_c: 下一细胞梯度 (N, H)
    - cache: 前向传播缓存
    
    Returns 元组:
    - dx: 输入数据梯度 (N, D)
    - dprev_h: 前一隐藏层梯度 (N, H)
    - dprev_c: 前一细胞梯度(N, H)
    - dWx: 输入层到隐藏层梯度(D, 4H)
    - dWh:    隐藏层到隐藏层梯度(H, 4H)
    - db:    偏置梯度(4H,)
    """
    dx, dprev_h, dc, dWx, dWh, db = None, None, None, None, None, None
    #############################################################################
    #                      任务:实现LSTM单步反向传播                               #
    #       提示:sigmoid(x)函数梯度:sigmoid(x)*(1-sigmoid(x))                    #
    #             tanh(x)函数梯度:     1-tanh(x)*tanh(x)                         #
    #############################################################################
    # 获取数据
    x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate, prev_h,\
        prev_c, next_scores_c = cache
    N, D = x.shape
    N, H = prev_h.shape
    # 初始化变量
    dWx = np.zeros((D, 4*H))
    dxx = np.zeros((D, 4*H))
    dWh = np.zeros((H, 4*H))
    dhh = np.zeros((H, 4*H))
    db = np.zeros(4*H)
    dx = np.zeros((N, D))
    dprev_h = np.zeros((N, H))
    # 计算当前细胞的梯度
    dc_tem = dnext_c+dnext_h*(1-next_scores_c**2)*output_gate
    # 求解tanh层
    dprev_c = forget_gate*dc_tem
    dforget_gate = prev_c*dc_tem
    dinput_gate = input_data*dc_tem
    dinput = input_gate*dc_tem
    doutput_gate = next_scores_c*dnext_h
    # 求解sigmoid层
    dscores_in_gate = input_gate*(1-input_gate)*dinput_gate
    dscores_forget_gate = forget_gate*(1-forget_gate)*dforget_gate
    dscores_out_gate = output_gate*(1-output_gate)*doutput_gate
    dscores_in = (1-input_data**2)*dinput
    da = np.hstack((dscores_in_gate, dscores_forget_gate, dscores_out_gate, dscores_in))
    dWx = np.dot(x.T, da)
    dWh = np.dot(prev_h.T, da)
    db = np.sum(da, axis=0)
    dx = np.dot(da, Wx.T)
    dprev_h = np.dot(da, Wh.T)
    ##############################################################################
    #                           结束编码                                           #
    ##############################################################################

    return dx, dprev_h, dprev_c, dWx, dWh, db


def lstm_forward(x, h0, Wx, Wh, b):
    """
    LSTM前向传播
    Inputs:
    - x: 输入数据 (N, T, D)
    - h0:初始化隐藏层状态(N, H)
    - Wx: 输入层到隐藏层权重 (D, 4H)
    - Wh: 隐藏层到隐藏层权重(H, 4H)
    - b: 偏置项(4H,)
    
    Returns 元组:
    - h: 隐藏层所有状态 (N, T, H)
    - cache: 用于反向传播的缓存
    """
    h, cache = None, None
    #############################################################################
    #                    任务: 实现完整的LSTM前向传播                              #
    #############################################################################
    # 获取数据
    N, T, D = x.shape
    H = int(b.shape[0]/4)
    # 初始化信息
    h = np.zeros((N, T, H))
    cache = {}
    prev_h = h0
    prev_c = np.zeros((N, H))
    # 遍历所有时序数据
    for t in range(T):
        # 当前数据
        xt = x[:, t, :]
        # 进行单步LSTM前向传播
        next_h, next_c, cache[t] = lstm_step_forward(xt, prev_h, prev_c, Wx, Wh, b)
        # 更新状态
        prev_h = next_h
        prev_c = next_c
        h[:, t, :] = prev_h
    ##############################################################################
    #                          结束编码                                            #
    ##############################################################################

    return h, cache


def lstm_backward(dh, cache):
    """
    LSTM反向传播
    Inputs:
    - dh: 各隐藏层梯度(N, T, H)
    - cache: V前向传播缓存
    
    Returns 元组:
    - dx: 输入数据梯度 (N, T, D)
    - dh0:初始隐藏层梯度(N, H)
    - dWx: 输入层到隐藏层权重梯度 (D, 4H)
    - dWh: 隐藏层到隐藏层权重梯度 (H, 4H)
    - db: 偏置项梯度 (4H,)
    """
    dx, dh0, dWx, dWh, db = None, None, None, None, None
    #############################################################################
    #               任务:实现完整的LSTM反向传播                                     #
    #############################################################################
    # 获取数据
    N, T, H = dh.shape
    # 从最后一条开始更新
    x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate, prev_h, prev_c,\
        next_scores_c = cache[T-1]
    D = x.shape[1]
    # 初始化
    dprev_h = np.zeros((N, H))
    dprev_c = np.zeros((N, H))
    dx = np.zeros((N, T, D))
    dh0 = np.zeros((N, H))
    dWx = np.zeros((D, 4*H))
    dWh = np.zeros((H, 4*H))
    db = np.zeros((4*H,))
    # 遍历所有数据
    for t in range(T):
        # 选择当前时间(从后向前)
        t = T-1-t
        # 获取数据
        step_cache = cache[t]
        dnext_h = dh[:, t, :]+dprev_h
        dnext_c = dprev_c
        # 进行单步反向传播计算
        dx[:, t, :], dprev_h, dprev_c, dWxt, dWht, dbt = lstm_step_backward(dnext_h,
                        dnext_c, step_cache)
        # 更新参数
        dWx, dWh, db = dWx+dWxt, dWh+dWht, db+dbt
    # 更新h0梯度
    dh0 = dprev_h
    ##############################################################################
    #                            结束编码                                          #
    ##############################################################################
    
    return dx, dh0, dWx, dWh, db


def temporal_affine_forward(x, w, b):
    """
    时序隐藏层仿射传播:将隐藏层时序数据(N,T,D)重塑为(N*T,D),
    完成前向传播后,再重塑回原型输出。

    Inputs:
    - x: 时序数据(N, T, D)。
    - w: 权重(D, M)。
    - b: 偏置(M,)。
    
    Returns 元组:
    - out: 输出(N, T, M)。
    - cache: 反向传播缓存。
    """
    N, T, D = x.shape
    M = b.shape[0]
    # Affine层
    out = x.reshape(N * T, D).dot(w).reshape(N, T, M) + b
    cache = x, w, b, out
    return out, cache


def temporal_affine_backward(dout, cache):
    """
    时序隐藏层仿射反向传播。

    Input:
    - dout:上层梯度 (N, T, M)。
    - cache: 前向传播缓存。

    Returns 元组:
    - dx: 输入梯度(N, T, D)。
    - dw: 权重梯度 (D, M)。
    - db: 偏置项梯度 (M,)。
    """
    x, w, b, out = cache
    N, T, D = x.shape
    M = b.shape[0]
    # Affine层反向传播
    dx = dout.reshape(N * T, M).dot(w.T).reshape(N, T, D)
    dw = dout.reshape(N * T, M).T.dot(x.reshape(N * T, D)).T
    db = dout.sum(axis=(0, 1))

    return dx, dw, db


def temporal_softmax_loss(x, y, mask, verbose=False):
    """
    时序版本的Softmax损失和原版本类似,只需将数据(N, T, V)重塑为(N*T,V)即可。
    需要注意的是,对于NULL标记不计入损失值,因此,你需要加入掩码进行过滤。
    Inputs:
    - x: 输入数据得分(N, T, V)。
    - y: 目标索引(N, T),其中0<= y[i, t] < V。
    - mask: 过滤NULL标记的掩码。
    Returns 元组:
    - loss: 损失值。
    - dx: x梯度。
    """
    # 获取必备信息
    N, T, V = x.shape
    
    x_flat = x.reshape(N * T, V)
    y_flat = y.reshape(N * T)
    mask_flat = mask.reshape(N * T)
    
    # 和原有softmax类似,不足的部分使用NULL补充,计算的时候过滤
    probs = np.exp(x_flat - np.max(x_flat, axis=1, keepdims=True))
    probs /= np.sum(probs, axis=1, keepdims=True)
    loss = -np.sum(mask_flat * np.log(probs[np.arange(N * T), y_flat])) / N
    dx_flat = probs.copy()
    dx_flat[np.arange(N * T), y_flat] -= 1
    dx_flat /= N
    dx_flat *= mask_flat[:, None]
    
    # 是否打印
    if verbose: 
        print('dx_flat: ', dx_flat.shape)
    
    dx = dx_flat.reshape(N, T, V)
    
    return loss, dx


相关