from torch import nn
import torch
from torch.utils.data import DataLoader
from torchvision import transforms, datasets
from torch import nn, optim
import matplotlib.pyplot as plt
plt.style.use("ggplot")
class AE(nn.Module):
def __init__(self):
# 调用父类方法初始化模块的state
super(AE, self).__init__()
# 编码器 : [b, 784] => [b, 20]
self.encoder = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 20),
nn.ReLU()
)
# 解码器 : [b, 20] => [b, 784]
self.decoder = nn.Sequential(
nn.Linear(20, 256),
nn.ReLU(),
nn.Linear(256, 784),
nn.Sigmoid() # 图片数值取值为[0,1],不宜用ReLU
)
def forward(self, x):
"""
向前传播部分, 在model_name(inputs)时自动调用
:param x: the input of our training model
:return: the result of our training model
"""
batch_size = x.shape[0] # 每一批含有的样本的个数
# flatten
# tensor.view()方法可以调整tensor的形状,但必须保证调整前后元素总数一致。view不会修改自身的数据,
# 返回的新tensor与原tensor共享内存,即更改一个,另一个也随之改变。
x = x.view(batch_size, 784) # 一行代表一个样本
# encoder
x = self.encoder(x)
# decoder
x = self.decoder(x)
# reshape
x = x.view(batch_size, 1, 28, 28)
return x
def main(epoch_num):
# 下载mnist数据集
mnist_train = datasets.MNIST('mnist', train=True, transform=transforms.Compose([
transforms.ToTensor()
]), download=True)
mnist_test = datasets.MNIST('mnist', train=False, transform=transforms.Compose([
transforms.ToTensor()
]), download=True)
# 载入mnist数据集
# batch_size设置每一批数据的大小,shuffle设置是否打乱数据顺序,结果表明,该函数会先打乱数据再按batch_size取数据
mnist_train = DataLoader(mnist_train, batch_size=32, shuffle=True)
mnist_test = DataLoader(mnist_test, batch_size=32, shuffle=True)
# 查看每一个batch图片的规模
x, label = iter(mnist_train).__next__() # 取出第一批(batch)训练所用的数据集
print(' img : ', x.shape) # img : torch.Size([32, 1, 28, 28]), 每次迭代获取32张图片,每张图大小为(1,28,28)
# 准备工作 : 搭建计算流程
device = torch.device('cuda')
model = AE().to(device) # 生成AE模型,并转移到GPU上去
print('The structure of our model is shown below: \n')
print(model)
loss_function = nn.MSELoss() # 生成损失函数
optimizer = optim.Adam(model.parameters(), lr=1e-3) # 生成优化器,需要优化的是model的参数,学习率为0.001
# 开始迭代
loss_epoch = []
for epoch in range(epoch_num):
# 每一代都要遍历所有的批次
for batch_index, (x, _) in enumerate(mnist_train):
# [b, 1, 28, 28]
x = x.to(device)
# 前向传播
x_hat = model(x) # 模型的输出,在这里会自动调用model中的forward函数
loss = loss_function(x_hat, x) # 计算损失值,即目标函数
# 后向传播
optimizer.zero_grad() # 梯度清零,否则上一步的梯度仍会存在
loss.backward() # 后向传播计算梯度,这些梯度会保存在model.parameters里面
optimizer.step() # 更新梯度,这一步与上一步主要是根据model.parameters联系起来了
loss_epoch.append(loss.item())
if epoch % (epoch_num // 10) == 0:
print('Epoch [{}/{}] : '.format(epoch, epoch_num), 'loss = ', loss.item()) # loss是Tensor类型
# x, _ = iter(mnist_test).__next__() # 在测试集中取出一部分数据
# with torch.no_grad():
# x_hat = model(x)
return loss_epoch