编码实现RNN以及LSTM
本次涉及代码较多,部分重复代码不再重复发布。重复的代码详见以下链接:
https://blog.csdn.net/qq_37402392/article/details/121468321?spm=1001.2014.3001.5501
bn_layers.py
cnn_layers.py
dropout_layers.py
layers.py
updater.py
以下是本次添加的内容。
captioning_trainer.py
训练器,与之前的训练器Trainer类似。
#-*- coding: utf-8 -*-
import numpy as np
from coco_utils import *
import updater
class CaptioningTrainer(object):
"""
CaptioningTrainer大部分内容和前面的Trainer相同
使用方法:
data = load_coco_data()
model = MyAwesomeModel(hidden_dim=100)
trainer = CaptioningTrainer(model, data,
update_rule='sgd',
updater_config={
'learning_rate': 1e-3,
},
lr_decay=0.95,
num_epochs=10, batch_size=100,
print_every=100)
trainer.train()
"""
def __init__(self, model, data, **kwargs):
"""
初始化CaptioningTrainer
所需参数:
- model: RNN模型
- data: coco数据集
可选参数:
- update_rule:更新规则,查看 updater.py.
默认为 'sgd'.
- updater_config: 更新器配置
- lr_decay:学习率衰减因子
- batch_size: 批量大小
- num_epochs: 迭代次数
- print_every:每训练多少次,打印训练结果
- verbose:是否打印训练中间结果
"""
self.model = model
self.data = data
self.update_rule = kwargs.pop('update_rule', 'sgd')
self.updater_config = kwargs.pop('updater_config', {})
self.lr_decay = kwargs.pop('lr_decay', 1.0)
self.batch_size = kwargs.pop('batch_size', 100)
self.num_epochs = kwargs.pop('num_epochs', 10)
self.print_every = kwargs.pop('print_every', 10)
self.verbose = kwargs.pop('verbose', True)
if len(kwargs) > 0:
extra = ', '.join('"%s"' % k for k in kwargs.keys())
raise ValueError('Unrecognized arguments %s' % extra)
if not hasattr(updater, self.update_rule):
raise ValueError('Invalid update_rule "%s"' % self.update_rule)
self.update_rule = getattr(updater, self.update_rule)
self._reset()
def _reset(self):
self.epoch = 0
self.best_val_acc = 0
self.best_params = {}
self.loss_history = []
self.train_acc_history = []
self.val_acc_history = []
self.updater_configs = {}
for p in self.model.params:
d = {k: v for k, v in self.updater_config.items()}
self.updater_configs[p] = d
def _step(self):
minibatch = sample_coco_minibatch(self.data,
batch_size=self.batch_size,
split='train')
captions, features, urls = minibatch
loss, grads = self.model.loss(features, captions)
self.loss_history.append(loss)
for p, w in self.model.params.items():
dw = grads[p]
config = self.updater_configs[p]
next_w, next_config = self.update_rule(w, dw, config)
self.model.params[p] = next_w
self.updater_configs[p] = next_config
def train(self):
num_train = self.data['train_captions'].shape[0]
iterations_per_epoch = max(num_train / self.batch_size, 1)
num_iterations = int(self.num_epochs * iterations_per_epoch)
for t in range(num_iterations):
self._step()
if self.verbose and t % self.print_every == 0:
print('(Iteration %d / %d) loss: %f' % (
t + 1, num_iterations, self.loss_history[-1]))
epoch_end = (t + 1) % iterations_per_epoch == 0
if epoch_end:
self.epoch += 1
for k in self.updater_configs:
self.updater_configs[k]['learning_rate'] *= self.lr_decay
coco_utils.py
对coco数据集进行处理,包括了读取数据文件、解码数据以及小批量读取数据
import os, json
import numpy as np
import h5py
def load_coco_data(base_dir='datasets/coco_captioning', max_train=None,
pca_features=True):
'''
读取CoCo训练文件
Parameters
----------
base_dir : TYPE, optional
数据文件位置. The default is 'datasets/coco_captioning'.
max_train : TYPE, optional
是否对训练数据进行再抽样. The default is None.
pca_features : TYPE, optional
是否使用降维特征. The default is True.
Returns
-------
data : TYPE
读取的数据文件.
'''
# 保存数据
data = {}
# 获得文件
caption_file = os.path.join(base_dir, 'coco2014_captions.h5')
# 添加文件汇总的内容
with h5py.File(caption_file, 'r') as f:
for k, v in f.items():
data[k] = np.asarray(v)
# 是否使用降维特征
if pca_features:
train_feat_file = os.path.join(base_dir, 'train2014_vgg16_fc7_pca.h5')
else:
train_feat_file = os.path.join(base_dir, 'train2014_vgg16_fc7.h5')
with h5py.File(train_feat_file, 'r') as f:
data['train_features'] = np.asarray(f['features'])
if pca_features:
val_feat_file = os.path.join(base_dir, 'val2014_vgg16_fc7_pca.h5')
else:
val_feat_file = os.path.join(base_dir, 'val2014_vgg16_fc7.h5')
with h5py.File(val_feat_file, 'r') as f:
data['val_features'] = np.asarray(f['features'])
dict_file = os.path.join(base_dir, 'coco2014_vocab.json')
with open(dict_file, 'r') as f:
dict_data = json.load(f)
for k, v in dict_data.items():
data[k] = v
train_url_file = os.path.join(base_dir, 'train2014_urls.txt')
with open(train_url_file, 'r') as f:
train_urls = np.asarray([line.strip() for line in f])
data['train_urls'] = train_urls
val_url_file = os.path.join(base_dir, 'val2014_urls.txt')
with open(val_url_file, 'r') as f:
val_urls = np.asarray([line.strip() for line in f])
data['val_urls'] = val_urls
# 也许对训练数据进行再抽样
if max_train is not None:
num_train = data['train_captions'].shape[0]
mask = np.random.randint(num_train, size=max_train)
data['train_captions'] = data['train_captions'][mask]
data['train_image_idxs'] = data['train_image_idxs'][mask]
return data
def decode_captions(captions, idx_to_word):
singleton = False
if captions.ndim == 1:
singleton = True
captions = captions[None]
decoded = []
N, T = captions.shape
for i in range(N):
words = []
for t in range(T):
word = idx_to_word[captions[i, t]]
if word != '':
words.append(word)
if word == '':
break
decoded.append(' '.join(words))
if singleton:
decoded = decoded[0]
return decoded
def sample_coco_minibatch(data, batch_size=100, split='train'):
split_size = data['%s_captions' % split].shape[0]
mask = np.random.choice(split_size, batch_size)
captions = data['%s_captions' % split][mask]
image_idxs = data['%s_image_idxs' % split][mask]
image_features = data['%s_features' % split][image_idxs]
urls = data['%s_urls' % split][image_idxs]
return captions, image_features, urls
image_utils.py
通用的方法用于展示过程中的图片。
import urllib.request, os, tempfile
import numpy as np
from scipy.misc import imread
from cnn_layers import conv_forward_fast
"""
Utility functions used for viewing and processing images.
"""
def blur_image(X):
"""
A very gentle image blurring operation, to be used as a regularizer for image
generation.
Inputs:
- X: Image data of shape (N, 3, H, W)
Returns:
- X_blur: Blurred version of X, of shape (N, 3, H, W)
"""
w_blur = np.zeros((3, 3, 3, 3))
b_blur = np.zeros(3)
blur_param = {'stride': 1, 'pad': 1}
for i in range(3):
w_blur[i, i] = np.asarray([[1, 2, 1], [2, 188, 2], [1, 2, 1]], dtype=np.float32)
w_blur /= 200.0
return conv_forward_fast(X, w_blur, b_blur, blur_param)[0]
def preprocess_image(img, mean_img, mean='image'):
"""
Convert to float, transepose, and subtract mean pixel
Input:
- img: (H, W, 3)
Returns:
- (1, 3, H, 3)
"""
if mean == 'image':
mean = mean_img
elif mean == 'pixel':
mean = mean_img.mean(axis=(1, 2), keepdims=True)
elif mean == 'none':
mean = 0
else:
raise ValueError('mean must be image or pixel or none')
return img.astype(np.float32).transpose(2, 0, 1)[None] - mean
def deprocess_image(img, mean_img, mean='image', renorm=False):
"""
Add mean pixel, transpose, and convert to uint8
Input:
- (1, 3, H, W) or (3, H, W)
Returns:
- (H, W, 3)
"""
if mean == 'image':
mean = mean_img
elif mean == 'pixel':
mean = mean_img.mean(axis=(1, 2), keepdims=True)
elif mean == 'none':
mean = 0
else:
raise ValueError('mean must be image or pixel or none')
if img.ndim == 3:
img = img[None]
img = (img + mean)[0].transpose(1, 2, 0)
if renorm:
low, high = img.min(), img.max()
img = 255.0 * (img - low) / (high - low)
return img.astype(np.uint8)
def image_from_url(url):
"""
Read an image from a URL. Returns a numpy array with the pixel data.
We write the image to a temporary file then read it back. Kinda gross.
"""
try:
f = urllib.request.urlopen(url)
_, fname = tempfile.mkstemp()
with open(fname, 'wb') as ff:
ff.write(f.read())
img = imread(fname)
os.remove(fname)
return img
except urllib.request.URLError as e:
print('URL Error: ', e.reason, url)
except urllib.request.HTTPError as e:
print('HTTP Error: ', e.code, url)
rnn.py
图片说明任务RNN网络类
#-*- coding: utf-8 -*-
import numpy as np
from layers import *
from rnn_layers import *
class CaptioningRNN(object):
"""
处理图片说明任务RNN网络
注意:不使用正则化
"""
def __init__(self, word_to_idx, input_dim=512, wordvec_dim=128,
hidden_dim=128, cell_type='rnn'):
"""
初始化CaptioningRNN
Inputs:
- word_to_idx: 单词字典,用于查询单词索引对应的词向量
- input_dim: 输入图片数据维度
- wordvec_dim: 词向量维度.
- hidden_dim: RNN隐藏层维度.
- cell_type: 细胞类型; 'rnn' 或 'lstm'.
"""
# 参数检验
if cell_type not in {'rnn', 'lstm'}:
raise ValueError('Invalid cell_type "%s"' % cell_type)
# 初始化数据
self.cell_type = cell_type
self.word_to_idx = word_to_idx
self.idx_to_word = {i: w for w, i in word_to_idx.items()}
self.params = {}
vocab_size = len(word_to_idx)
self._null = word_to_idx['']
self._start = word_to_idx.get('', None)
self._end = word_to_idx.get('', None)
# 初始化词向量
self.params['W_embed'] = np.random.randn(vocab_size, wordvec_dim)
self.params['W_embed'] /= 100
# 初始化 CNN -> 隐藏层参数,用于将图片特征提取到RNN中
self.params['W_proj'] = np.random.randn(input_dim, hidden_dim)
self.params['W_proj'] /= np.sqrt(input_dim)
self.params['b_proj'] = np.zeros(hidden_dim)
# 初始化RNN参数
dim_mul = {'lstm': 4, 'rnn': 1}[cell_type]
self.params['Wx'] = np.random.randn(wordvec_dim, dim_mul * hidden_dim)
self.params['Wx'] /= np.sqrt(wordvec_dim)
self.params['Wh'] = np.random.randn(hidden_dim, dim_mul * hidden_dim)
self.params['Wh'] /= np.sqrt(hidden_dim)
self.params['b'] = np.zeros(dim_mul * hidden_dim)
# 初始化输出层参数
self.params['W_vocab'] = np.random.randn(hidden_dim, vocab_size)
self.params['W_vocab'] /= np.sqrt(hidden_dim)
self.params['b_vocab'] = np.zeros(vocab_size)
def loss(self, features, captions):
"""
计算RNN或LSTM的损失值。
Inputs:
- features: 输入图片特征(N, D)。
- captions: 图像文字说明(N, T)。
Returns 元组:
- loss: 损失值。
- grads:梯度。
"""
#将文字切分为两段:captions_in除去最后一词用于RNN输入
#captions_out除去第一个单词,用于RNN输出配对
captions_in = captions[:, :-1]
captions_out = captions[:, 1:]
# 掩码
mask = (captions_out != self._null)
# 图像仿射转换矩阵
W_proj, b_proj = self.params['W_proj'], self.params['b_proj']
# 词嵌入矩阵
W_embed = self.params['W_embed']
# RNN参数
Wx, Wh, b = self.params['Wx'], self.params['Wh'], self.params['b']
# 隐藏层输出转化矩阵
W_vocab, b_vocab = self.params['W_vocab'], self.params['b_vocab']
loss, grads = 0.0, {}
############################################################################
# 任务:实现CaptioningRNN传播 #
# (1)使用仿射变换(features,W_proj,b_proj), #
# 将图片特征输入进隐藏层初始状态h0(N,H) #
# (2)使用词嵌入层将captions_in中的单词索引转换为词向量(N,T,W) #
# (3)使用RNN或LSTM处理词向量(N,T,H) #
# (4)使用时序仿射传播temporal_affine_forward计算各单词得分(N,T,V) #
# (5)使用temporal_softmax_loss计算损失值 #
############################################################################
# 1 使用仿射变换(features,W_proj,b_proj),将图片特征输入进隐藏层初始状态h0(N,H)
h0, cache_h0 = affine_forward(features, W_proj, b_proj)
# 2 使用词嵌入层将captions_in中的单词索引转换为词向量(N,T,W)
x, cache_embedding = word_embedding_forward(captions_in, W_embed)
# 3 使用RNN或LSTM处理词向量(N,T,H)
if self.cell_type == 'rnn':
out_h, cache_rnn = rnn_forward(x, h0, Wx, Wh, b)
elif self.cell_type == 'lstm':
out_h, cache_rnn = lstm_forward(x, h0, Wx, Wh, b)
else:
raise ValueError('Invalid cell_type "%s"' % self.cell_type)
# 4 使用时序仿射传播temporal_affine_forward计算各单词得分(N,T,V)
yHat, cache_out = temporal_affine_forward(out_h, W_vocab, b_vocab)
# 5 使用temporal_softmax_loss计算损失值
loss, dy = temporal_softmax_loss(yHat, captions_out, mask, verbose=False)
# 计算梯度
dout_h, dW_vocab, db_vocab = temporal_affine_backward(dy, cache_out)
# 输出层到隐藏层的反向传播
if self.cell_type == 'rnn':
dx, dh0, dWx, dWh, db = rnn_backward(dout_h, cache_rnn)
elif self.cell_type == 'lstm':
dx, dh0, dWx, dWh, db = lstm_backward(dout_h, cache_rnn)
else:
raise ValueError('Invalid cell_type "%s"' % self.cell_type)
# 隐藏层到隐藏层自身的反向传播
dW_embed = word_embedding_backward(dx, cache_embedding)
# 隐藏层到输入层的反向传播
dfeatures, dW_proj, db_proj = affine_backward(dh0, cache_h0)
# 记录梯度
grads['W_proj'] = dW_proj
grads['b_proj'] = db_proj
grads['W_embed'] = dW_embed
grads['Wx'] = dWx
grads['Wh'] = dWh
grads['b'] = db
grads['W_vocab'] = dW_vocab
grads['b_vocab'] = db_vocab
############################################################################
# 结束编码 #
############################################################################
return loss, grads
def sample(self, features, max_length=30):
"""
测试阶段的前向传播过程,采样一批图片说明作为输入
Inputs:
- features: 图片特征(N, D).
- max_length:生成说明文字的最大长度
Returns:
- captions: 说明文字的字典索引串(N, max_length)
"""
N = features.shape[0]
captions = self._null * np.ones((N, max_length), dtype=np.int32)
W_proj, b_proj = self.params['W_proj'], self.params['b_proj']
W_embed = self.params['W_embed']
Wx, Wh, b = self.params['Wx'], self.params['Wh'], self.params['b']
W_vocab, b_vocab = self.params['W_vocab'], self.params['b_vocab']
###########################################################################
# 任务:测试阶段前向传播 #
# 提示:(1)第一个单词应该是标记,captions[:,0]=self._start #
# (2)当前单词输入为之前RNN的输出 #
# (3)前向传播过程为预测当前单词的下一个单词, #
# 你需要计算所有单词得分,然后选取最大得分作为预测单词 #
# (4)你无法使用rnn_forward 或 lstm_forward函数, #
# 你需要循环调用rnn_step_forward或lstm_step_forward函数 #
###########################################################################
# 获取数据
N, D = features.shape
affine_out, affine_cache = affine_forward(features, W_proj, b_proj)
prev_word_idx = [self._start]*N
prev_h = affine_out
prev_c = np.zeros(prev_h.shape)
# 1第一个单词应该是标记
captions[:, 0] = self._start
for i in range(1, max_length):
# 2当前单词输入为之前RNN的输出
prev_word_embed = W_embed[prev_word_idx]
# 4循环调用rnn_step_forward或lstm_step_forward函数
if self.cell_type == 'rnn':
next_h, rnn_step_cache = rnn_step_forward(prev_word_embed, prev_h,
Wx, Wh, b)
elif self.cell_type == 'lstm':
next_h, next_c, lstm_step_cache = lstm_step_forward(prev_word_embed, prev_h,
prev_c, Wx, Wh, b)
prev_c = next_c
else:
raise ValueError('Invalid cell_type "%s"' % self.cell_type)
vocab_affine_out, vocab_affine_out_cache = affine_forward(next_h,
W_vocab, b_vocab)
# 3计算所有单词得分,然后选取最大得分作为预测单词
captions[:, i] = list(np.argmax(vocab_affine_out, axis=1))
prev_word_idx = captions[:, i]
prev_h = next_h
############################################################################
# 结束编码 #
############################################################################
return captions
rnn_layers.py
RNN隐藏层需要使用到的方法,包括了RNN、LSTM以及词嵌入的前向传播和反向传播。
#-*- coding: utf-8 -*-
import numpy as np
def rnn_step_forward(x, prev_h, Wx, Wh, b):
"""
RNN单步前向传播,使用tanh激活单元
Inputs:
- x: 当前时间步数据输入(N, D).
- prev_h: 前一时间步隐藏层状态 (N, H)
- Wx: 输入层到隐藏层连接权重(D, H)
- Wh:隐藏层到隐藏层连接权重(H, H)
- b: 隐藏层偏置项(H,)
Returns 元组:
- next_h: 下一隐藏层状态(N, H)
- cache: 缓存
"""
next_h, cache = None, None
##############################################################################
# 任务:实现RNN单步前向传播 #
# 将输出值储存在next_h中, #
# 将反向传播时所需的各项缓存存放在cache中 #
##############################################################################
# 计算神经元输入
a = prev_h.dot(Wh)+x.dot(Wx)+b
# 神经元激活
next_h = np.tanh(a)
# 保留过程中的数据
cache = (x, prev_h, Wh, Wx, b, next_h)
##############################################################################
# 结束编码 #
##############################################################################
return next_h, cache
def rnn_step_backward(dnext_h, cache):
"""
RNN单步反向传播。
Inputs:
- dnext_h: 后一时间片段的梯度。
- cache: 前向传播时的缓存。
Returns 元组:
- dx: 数据梯度(N, D)。
- dprev_h: 前一时间片段梯度(N, H)。
- dWx: 输入层到隐藏层权重梯度(D,H)。
- dWh: 隐藏层到隐藏层权重梯度(H, H)。
- db: 偏置项梯度(H,)。
"""
dx, dprev_h, dWx, dWh, db = None, None, None, None, None
##############################################################################
# 任务:实现RNN单步反向传播 #
# 提示:tanh(x)梯度: 1 - tanh(x)*tanh(x) #
##############################################################################
# 获取缓存数据
x, prev_h, Wh, Wx, b, next_h = cache
# 根据链式求导法则依次计算各个变量的梯度
dscores = dnext_h*(1-next_h*next_h)
dWx = np.dot(x.T, dscores)
db = np.sum(dscores, axis=0)
dWh = np.dot(prev_h.T, dscores)
dx = np.dot(dscores, Wx.T)
dprev_h = np.dot(dscores, Wh.T)
##############################################################################
# 结束编码 #
##############################################################################
return dx, dprev_h, dWx, dWh, db
def rnn_forward(x, h0, Wx, Wh, b):
"""
RNN前向传播。
Inputs:
- x: 完整的时序数据 (N, T, D)。
- h0: 隐藏层初始化状态 (N, H)。
- Wx: 输入层到隐藏层权重 (D, H)。
- Wh: 隐藏层到隐藏层权重(H, H)。
- b: 偏置项(H,)。
Returns 元组:
- h: 所有时间步隐藏层状态(N, T, H)。
- cache: 反向传播所需的缓存。
"""
h, cache = None, None
##############################################################################
# 任务:实现RNN前向传播。 #
# 提示: 使用前面实现的rnn_step_forward 函数。 #
##############################################################################
# 获取数据维度
N, T, D = x.shape
(H, ) = b.shape
# 初始化h
h = np.zeros((N, T, H))
# 获取默认隐藏层状态
prev_h = h0
# 遍历所有时间
for t in range(T):
# 获取当前时间片段
xt = x[:, t, :]
# 计算每一个片段
next_h, _ = rnn_step_forward(xt, prev_h, Wx, Wh, b)
# 更新状态
prev_h = next_h
# 保留结果
h[:, t, :] = prev_h
# 数据缓存,
cache = (x, h0, Wh, Wx, b, h)
##############################################################################
# 结束编码 #
##############################################################################
return h, cache
def rnn_backward(dh, cache):
"""
RNN反向传播。
Inputs:
- dh: 隐藏层所有时间步梯度(N, T, H)。
Returns 元组:
- dx: 输入数据时序梯度(N, T, D)。
- dh0: 初始隐藏层梯度(N, H)。
- dWx: 输入层到隐藏层权重梯度(D, H)。
- dWh: 隐藏层到隐藏层权重梯度(H, H)。
- db: 偏置项梯度(H,)。
"""
dx, dh0, dWx, dWh, db = None, None, None, None, None
##############################################################################
# 任务:实现RNN反向传播。 #
# 提示:使用 rnn_step_backward函数。 #
##############################################################################
# 获取缓存数据
x, h0, Wh, Wx, b, h = cache
# 获取数据维度
N, T, H = dh.shape
_, _, D = x.shape
# 得到最后的细胞状态
next_h = h[:, T-1, :]
# 初始化
dprev_h = np.zeros((N, H))
dx = np.zeros((N, T, D))
dh0 = np.zeros((N, H))
dWx = np.zeros((D, H))
dWh = np.zeros((H, H))
db = np.zeros((H,))
# 遍历所有时间片段
for t in range(T):
# 当前处理的时间片段(从后往前)
t = T-1-t
# 获取对应的数据
xt = x[:, t, :]
# 最初时间片段的之前细胞状态默认为h0
if t == 0:
prev_h = h0
else:
prev_h = h[:, t-1, :]
# 获取缓存数据
step_cache = (xt, prev_h, Wh, Wx, b, next_h)
# 更新状态
next_h = prev_h
dnext_h = dh[:, t, :]+dprev_h
# 进行反向传播
dx[:, t, :], dprev_h, dWxt, dWht, dbt = rnn_step_backward(dnext_h, step_cache)
# 状态累加
dWx, dWh, db = dWx+dWxt, dWh+dWht, db+dbt
# 记录h0的梯度
dh0 = dprev_h
##############################################################################
# 结束编码 #
##############################################################################
return dx, dh0, dWx, dWh, db
def word_embedding_forward(x, W):
"""
词嵌入前向传播,将数据矩阵中的N条长度为T的词索引转化为词向量。
如:W[x[i,j]]表示第i条,第j时间步单词索引所对应的词向量。
Inputs:
- x: 整数型数组(N,T),N表示数据条数,T表示单条数据长度,
数组的每一元素存放着单词索引,取值范围[0,V)。
- W: 词向量矩阵(V,D)存放各单词对应的向量。
Returns 元组:
- out:输出词向量(N, T, D)。
- cache:反向传播时所需的缓存。
"""
out, cache = None, None
##############################################################################
# 任务:实现词嵌入前向传播。 #
##############################################################################
# 获取数据维度
N, T = x.shape
V, D = W.shape
# 初始化
out = np.zeros((N, T, D))
# 遍历所有数据
for i in range(N):
for j in range(T):
# 将其转化为词向量
out[i, j] = W[x[i, j]]
cache = (x, W.shape)
##############################################################################
# 结束编码 #
##############################################################################
return out, cache
def word_embedding_backward(dout, cache):
"""
词嵌入反向传播
Inputs:
- dout: 上层梯度 (N, T, D)
- cache:前向传播缓存
Returns:
- dW: 词嵌入矩阵梯度(V, D).
"""
dW = None
##############################################################################
# 任务:实现词嵌入反向传播 #
# 提示:你可以使用np.add.at函数 #
# 例如 np.add.at(a,[1,2],1)相当于a[1],a[2]分别加1 #
##############################################################################
x, W_shape = cache
dW = np.zeros(W_shape)
# np.add.at()是将传入的数组中制定下标位置的元素加上指定的值.
np.add.at(dW, x, dout)
##############################################################################
# 结束编码 #
##############################################################################
return dW
def sigmoid(x):
"""
数值稳定版本的sigmoid函数。
"""
pos_mask = (x >= 0)
neg_mask = (x < 0)
z = np.zeros_like(x)
z[pos_mask] = np.exp(-x[pos_mask])
z[neg_mask] = np.exp(x[neg_mask])
top = np.ones_like(x)
top[neg_mask] = z[neg_mask]
return top / (1 + z)
def lstm_step_forward(x, prev_h, prev_c, Wx, Wh, b):
"""
LSTM单步前向传播
Inputs:
- x: 输入数据 (N, D)
- prev_h: 前一隐藏层状态 (N, H)
- prev_c: 前一细胞状态(N, H)
- Wx: 输入层到隐藏层权重(D, 4H)
- Wh: 隐藏层到隐藏层权重 (H, 4H)
- b: 偏置项(4H,)
Returns 元组:
- next_h: 下一隐藏层状态(N, H)
- next_c: 下一细胞状态(N, H)
- cache: 反向传播所需的缓存
"""
next_h, next_c, cache = None, None, None
#############################################################################
# 任务:实现LSTM单步前向传播。 #
# 提示:稳定版本的sigmoid函数已经帮你实现,直接调用即可。 #
# tanh函数使用np.tanh。 #
#############################################################################
# 获取数据
N, D = x.shape
N, H = prev_h.shape
# 计算输入门、遗忘门、输出门
input_gate = sigmoid(np.dot(x, Wx[:, 0:H])+np.dot(prev_h, Wh[:, 0:H])+b[0:H])
forget_gate = sigmoid(np.dot(x, Wx[:, H:2*H])+np.dot(prev_h, Wh[:, H:2*H])
+b[H:2*H])
output_gate = sigmoid(np.dot(x, Wx[:, 2*H:3*H])+np.dot(prev_h, Wh[:, 2*H:3*H])
+b[2*H:3*H])
# 计算输出单元
input_data = np.tanh(np.dot(x, Wx[:, 3*H:4*H])+np.dot(prev_h, Wh[:, 3*H:4*H])
+b[3*H:4*H])
# 更新细胞记忆
next_c = forget_gate*prev_c+input_data*input_gate
# 计算细胞输出
next_scores_c = np.tanh(next_c)
next_h = output_gate*next_scores_c
cache = (x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate,
prev_h, prev_c, next_scores_c)
##############################################################################
# 结束编码 #
##############################################################################
return next_h, next_c, cache
def lstm_step_backward(dnext_h, dnext_c, cache):
"""
LSTM单步反向传播
Inputs:
- dnext_h: 下一隐藏层梯度 (N, H)
- dnext_c: 下一细胞梯度 (N, H)
- cache: 前向传播缓存
Returns 元组:
- dx: 输入数据梯度 (N, D)
- dprev_h: 前一隐藏层梯度 (N, H)
- dprev_c: 前一细胞梯度(N, H)
- dWx: 输入层到隐藏层梯度(D, 4H)
- dWh: 隐藏层到隐藏层梯度(H, 4H)
- db: 偏置梯度(4H,)
"""
dx, dprev_h, dc, dWx, dWh, db = None, None, None, None, None, None
#############################################################################
# 任务:实现LSTM单步反向传播 #
# 提示:sigmoid(x)函数梯度:sigmoid(x)*(1-sigmoid(x)) #
# tanh(x)函数梯度: 1-tanh(x)*tanh(x) #
#############################################################################
# 获取数据
x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate, prev_h,\
prev_c, next_scores_c = cache
N, D = x.shape
N, H = prev_h.shape
# 初始化变量
dWx = np.zeros((D, 4*H))
dxx = np.zeros((D, 4*H))
dWh = np.zeros((H, 4*H))
dhh = np.zeros((H, 4*H))
db = np.zeros(4*H)
dx = np.zeros((N, D))
dprev_h = np.zeros((N, H))
# 计算当前细胞的梯度
dc_tem = dnext_c+dnext_h*(1-next_scores_c**2)*output_gate
# 求解tanh层
dprev_c = forget_gate*dc_tem
dforget_gate = prev_c*dc_tem
dinput_gate = input_data*dc_tem
dinput = input_gate*dc_tem
doutput_gate = next_scores_c*dnext_h
# 求解sigmoid层
dscores_in_gate = input_gate*(1-input_gate)*dinput_gate
dscores_forget_gate = forget_gate*(1-forget_gate)*dforget_gate
dscores_out_gate = output_gate*(1-output_gate)*doutput_gate
dscores_in = (1-input_data**2)*dinput
da = np.hstack((dscores_in_gate, dscores_forget_gate, dscores_out_gate, dscores_in))
dWx = np.dot(x.T, da)
dWh = np.dot(prev_h.T, da)
db = np.sum(da, axis=0)
dx = np.dot(da, Wx.T)
dprev_h = np.dot(da, Wh.T)
##############################################################################
# 结束编码 #
##############################################################################
return dx, dprev_h, dprev_c, dWx, dWh, db
def lstm_forward(x, h0, Wx, Wh, b):
"""
LSTM前向传播
Inputs:
- x: 输入数据 (N, T, D)
- h0:初始化隐藏层状态(N, H)
- Wx: 输入层到隐藏层权重 (D, 4H)
- Wh: 隐藏层到隐藏层权重(H, 4H)
- b: 偏置项(4H,)
Returns 元组:
- h: 隐藏层所有状态 (N, T, H)
- cache: 用于反向传播的缓存
"""
h, cache = None, None
#############################################################################
# 任务: 实现完整的LSTM前向传播 #
#############################################################################
# 获取数据
N, T, D = x.shape
H = int(b.shape[0]/4)
# 初始化信息
h = np.zeros((N, T, H))
cache = {}
prev_h = h0
prev_c = np.zeros((N, H))
# 遍历所有时序数据
for t in range(T):
# 当前数据
xt = x[:, t, :]
# 进行单步LSTM前向传播
next_h, next_c, cache[t] = lstm_step_forward(xt, prev_h, prev_c, Wx, Wh, b)
# 更新状态
prev_h = next_h
prev_c = next_c
h[:, t, :] = prev_h
##############################################################################
# 结束编码 #
##############################################################################
return h, cache
def lstm_backward(dh, cache):
"""
LSTM反向传播
Inputs:
- dh: 各隐藏层梯度(N, T, H)
- cache: V前向传播缓存
Returns 元组:
- dx: 输入数据梯度 (N, T, D)
- dh0:初始隐藏层梯度(N, H)
- dWx: 输入层到隐藏层权重梯度 (D, 4H)
- dWh: 隐藏层到隐藏层权重梯度 (H, 4H)
- db: 偏置项梯度 (4H,)
"""
dx, dh0, dWx, dWh, db = None, None, None, None, None
#############################################################################
# 任务:实现完整的LSTM反向传播 #
#############################################################################
# 获取数据
N, T, H = dh.shape
# 从最后一条开始更新
x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate, prev_h, prev_c,\
next_scores_c = cache[T-1]
D = x.shape[1]
# 初始化
dprev_h = np.zeros((N, H))
dprev_c = np.zeros((N, H))
dx = np.zeros((N, T, D))
dh0 = np.zeros((N, H))
dWx = np.zeros((D, 4*H))
dWh = np.zeros((H, 4*H))
db = np.zeros((4*H,))
# 遍历所有数据
for t in range(T):
# 选择当前时间(从后向前)
t = T-1-t
# 获取数据
step_cache = cache[t]
dnext_h = dh[:, t, :]+dprev_h
dnext_c = dprev_c
# 进行单步反向传播计算
dx[:, t, :], dprev_h, dprev_c, dWxt, dWht, dbt = lstm_step_backward(dnext_h,
dnext_c, step_cache)
# 更新参数
dWx, dWh, db = dWx+dWxt, dWh+dWht, db+dbt
# 更新h0梯度
dh0 = dprev_h
##############################################################################
# 结束编码 #
##############################################################################
return dx, dh0, dWx, dWh, db
def temporal_affine_forward(x, w, b):
"""
时序隐藏层仿射传播:将隐藏层时序数据(N,T,D)重塑为(N*T,D),
完成前向传播后,再重塑回原型输出。
Inputs:
- x: 时序数据(N, T, D)。
- w: 权重(D, M)。
- b: 偏置(M,)。
Returns 元组:
- out: 输出(N, T, M)。
- cache: 反向传播缓存。
"""
N, T, D = x.shape
M = b.shape[0]
# Affine层
out = x.reshape(N * T, D).dot(w).reshape(N, T, M) + b
cache = x, w, b, out
return out, cache
def temporal_affine_backward(dout, cache):
"""
时序隐藏层仿射反向传播。
Input:
- dout:上层梯度 (N, T, M)。
- cache: 前向传播缓存。
Returns 元组:
- dx: 输入梯度(N, T, D)。
- dw: 权重梯度 (D, M)。
- db: 偏置项梯度 (M,)。
"""
x, w, b, out = cache
N, T, D = x.shape
M = b.shape[0]
# Affine层反向传播
dx = dout.reshape(N * T, M).dot(w.T).reshape(N, T, D)
dw = dout.reshape(N * T, M).T.dot(x.reshape(N * T, D)).T
db = dout.sum(axis=(0, 1))
return dx, dw, db
def temporal_softmax_loss(x, y, mask, verbose=False):
"""
时序版本的Softmax损失和原版本类似,只需将数据(N, T, V)重塑为(N*T,V)即可。
需要注意的是,对于NULL标记不计入损失值,因此,你需要加入掩码进行过滤。
Inputs:
- x: 输入数据得分(N, T, V)。
- y: 目标索引(N, T),其中0<= y[i, t] < V。
- mask: 过滤NULL标记的掩码。
Returns 元组:
- loss: 损失值。
- dx: x梯度。
"""
# 获取必备信息
N, T, V = x.shape
x_flat = x.reshape(N * T, V)
y_flat = y.reshape(N * T)
mask_flat = mask.reshape(N * T)
# 和原有softmax类似,不足的部分使用NULL补充,计算的时候过滤
probs = np.exp(x_flat - np.max(x_flat, axis=1, keepdims=True))
probs /= np.sum(probs, axis=1, keepdims=True)
loss = -np.sum(mask_flat * np.log(probs[np.arange(N * T), y_flat])) / N
dx_flat = probs.copy()
dx_flat[np.arange(N * T), y_flat] -= 1
dx_flat /= N
dx_flat *= mask_flat[:, None]
# 是否打印
if verbose:
print('dx_flat: ', dx_flat.shape)
dx = dx_flat.reshape(N, T, V)
return loss, dx