开发者

Pytorch实现Fashion-mnist分类任务全过程

目录
  • 数据概况
    • Fashion-mnist
    • 类别标注
  • 数据处理
    • 对输入进行归一化
    • 数据增强
  • 定义Resnet网络
    • 训练与测试
      • 完整代码
        • 总结

          数据概况

          Fashion-mnist

          经典的MNIST数据集包含了大量的手写数字。十几年来,来自机器学习、机器视觉、人工智能、深度学习领域的研究员们把这个数据集作为衡量算法的基准之一。

          你会在很多的会议,期刊的论文中发现这个数据集的身影。实际上,MNIST数据集已经成为算法作者的必测的数据集之一。

          类别标注

          在Fashion-mnist数据集中,每个训练样本都按照以下类别进行了标注:

          Pytorch实现Fashion-mnist分类任务全过程

          数据处理

          对输入进行归一化

          归一化时需要统一进行 x = (x - mean) / std

          train_trans = transforms.Compose([
                  transforms.RandomCrop(28, padding=2),#数据增强
                  transforms.RandomHorizontalFlip(),
                  transforms.ToTensor(),
                  normalize
              ])
          test_trans = transforms.Compose([
                  transforms.ToTensor(),
                  normalize
              ])
          mnist_train = torchvision.datasets.FashionMNIST(root='../data',train=True,download=True,transform=train_trans)
          mnist_test = torchvision.datasets.FashionMNIST(root='../data',train=False,download=True,transform=test_trans)
          train_iter = torch.utils.data.DataLoader(mnist_train, BATch_size=batch_size, shuffle=True)
          test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False)
          # 求整个数据集的均值
          temp_sum = 0
          cnt = 0
          for X, y in train_iter:
              if y.shape[0] != batch_size:
                  break   # 最后一个batch不足batch_size,这里就忽略了
              channel_mean = torch.mean(X, dim=(0,2,3))  # 按channel求均值(不过这里只有1个channel)
              cnt += 1   # cnt记录的是batch的个数,不是图像
              temp_sum += channel_mean[0].item()
          dataset_global_mean = temp_sum / cnt
          print('整个数据集的像素均值:{}'.format(dataset_global_mean))
          # 求整个数据集的标准差
          cnt = 0
          temp_sum = 0
          for X, y in train_iter:
              if y.shape[0] != batch_size:
                  break   # 最后一个batch不足batch_size,这里就忽略了
              residual = (X - dataset_global_mean) ** 2
              channel_var_mean = torch.mean(residual, dim=(0,2,3))
              cnt += 1   # cnt记录的是batch的个数,不是图像
              temp_sum += math.sqrt(channel_var_mean[0].item())
          dataset_global_std = temp_sum / cnt
          print('整个数据集的像素标准差:{}'.format(dataset_global_std))
          

          整个数据集的像素均值:0.2860366729433025

          整个数据集的像素标准差:0.35288708155778725

          数据增强

          加入随机裁剪和翻转

           ============================ step 1/6 数据 ============================
          batch_size = 64
          normalize = transforms.Normalize(mean=[0.286], std=[0.352])#对像素值归一化
          train_trans = transforms.Compose([
                  transforms.RandomCrop(28, padding=2),
                  transforms.RandomHorizontalFlip(),
                  transforms.ToTensor(),
                  normalize
              ])
          test_trans = transforms.Compose([
                  transforms.ToTensor(),
                  normalize
              ])
          mnist_train = torchvision.datasets.FashionMNIST(root='../data',train=True,download=True,transform=train_trans)
          mnist_test = torchvision.datasets.FashionMNIST(root='../data',train=False,download=True,transform=test_trans)
          train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True)
          test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False)
          

          定义Resnet网络

          class GlobalAvgPool2d(nn.Module):
              """
              全局平均池化层
              可通过将普通的平均池化的窗口形状设置成输入的高和宽实现
              """
          
              def __init__(self):
                  super(GlobalAvgPool2d, self).__init__()
          
              def forward(self, x):
                  return F.avg_pool2d(x, kernel_size=x.size()[2:])
          
          
          class FlattenLayer(torch.nn.Module):
              def __init__(self):
                  super(FlattenLayer, self).__init__()
          
              def forward(self, x):  # x shape: (batch, *, *, ...)
                  return x.view(x.shape[0], -1)
          
          
          class Residual(nn.Module):
              def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
                  """
                      use_11conv: 是否使用额外的1x1卷积层来修改通道数
                      stride: 卷积层的步幅, resnet使用步长为2的卷积来替代pooling的作用,是个很赞的idea
                  """
                  super(Residual, self).__init__()
                  self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
                  self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
                  if use_1x1conv:
                      self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
                  else:
                      self.conv3 = None
                  self.bn1 = nn.BatchNorm2d(out_channels)
                  self.bn2 = nn.BatchNorm2d(out_channels)
          
              def forward(self, X):
                  Y = F.relu(self.bn1(self.conv1(X)))
                  Y = self.bn2(self.conv2(Y))
                  if self.conv3:
                      X = self.conv3(X)
                  return F.relu(Y + X)
          
          
          def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
              '''
              resnet block
              num_residuals: 当前block包含多少个残差块
              first_block: 是否为第一个block
              一个resnet block由num_residuals个残差块组成
              其中第一个残差块起到了通道数的转换和pooling的作用
            js  后面的若干残差块就是完成正常的特征提取
              '''
              if first_block:
                  assert in_channels == out_channels  # 第一个模块的输出通道数同输入通道数一致
              blk = []
              for i in range(num_residuals):
                  if i == 0 and not first_block:
                      blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
                  else:
                开发者_JS教程      blk.append(Residual(out_channels, out_channels))
              return nn.Sequential(*blk)
          
          
          # 定义resnet模型结构
          net = nn.Sequential(
              nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),  # TODO: 缩小感受野, 缩channel
              nn.BatchNorm2d(32),
              nn.ReLU())
          # nn.ReLU(),
          # nn.MaxPool2d(kernel_size=2, stride=2))   # TODO:去掉maxpool缩小感受野
          
          # 然后是连续4个block
          net.add_module("resnet_block1", resnet_block(32, 32, 2, first_block=True))  # TODO: channel统一减半
          net.add_module("resnet_block2", resnet_block(32, 64, 2))
          net.add_module("resnet_block3", resnet_block(64, 128, 2))
          net.add_module("resnet_block4", resnet_block(128, 256, 2))
          # global average pooling
          net.add_module("global_avg_pool", GlobalAvgPool2d())
          # fc layer
          net.add_module("fc", nn.Sequential(FlattenLayer(), nn.Linear(256, 10)))
          

          训练与测试

          def evaLuate_accuracy(data_iter, net, device=None):
          	#评估模型在测试集的准确率
              if device is None and isinstance(net, torch.nn.Module):
                  # 如果没指定device就使用net的device
                  device = list(net.parameters())[0].device
              net.eval()
              acc_sum, n = 0.0, 0
              with torch.no_grad():
                  for X, y in data_iter:
                      acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
                      n += y.shape[0]
              net.train()  # 改回训练模式
              return acc_sum / n
          
          
          def train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs):
              net = net.to(device)
              print("training on ", device)
              loss = torch.nn.CrossEntropyLoss()
              best_test_acc = 0
              for epoch in range(num_epochs):
                  train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
                  for X, y in train_iter:
                      X = X.to(device)
                      y = y.to(device)
                      y_hat = net(X)
                      l = loss(y_hat, y)
                      optimizer.zero_grad()
                      l.backward()
                      optimizer.step()
                      train_l_sum += l.cpu().item()
                      train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
                      n += y.shape[0]
                      batch_count += 1
                  test_acc = evaluate_accuracy(test_iter, net)
                  print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
                        % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
                  if test_acc > best_test_acc:
                      print('find best! save at model/best.pth')
                      best_test_acc = test_acc
                      torch.save(net.state_dict(), 'model/best.pth')
          
          
          lr, num_epochs = 0.01, 10
          optimizer = torch.optim.Adam(net.parameters(), lr=lr)
          device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
          train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs)
          
          

          完整代码

          import os
          import sys
          import time
          import torch
          from torch import nn, optim
          import torch.nn.functional as F
          import torchvision
          from torchvision import transforms
          
          
          
          class GlobalAvgPool2d(nn.Module):
              """
              全局平均池化层
              可通过将普通的平均池化的窗口形状设置成输入的高和宽实现
              """
          
              def __init__(self):
                  super(GlobalAvgPool2d, self).__init__()
          
              def forward(self, x):
                  return F.avg_pool2d(x, kernel_size=x.size()[2:])
          
          
          class FlattenLayer(torch.nn.Module):
              def __init__(self):
                  super(FlattenLayer, self).__init__()
          
              def forward(self, x):  # x shape: (batch, *, *, ...)
                  return x.view(x.shape[0], -1)
          
          
          class Residual(nn.Module):
              def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1):
                  """
                      use_11conv: 是否使用额外的1x1卷积层来修改通道数
                      stride: 卷积层的步幅, resnet使用步长为2的卷积来替代pooling的作用,是个很赞的idea
                  """
                  super(Residual, self).__init__()
                  self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)
                  self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
                  if use_1x1conv:
                      self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
                  else:
                      self.conv3 = None
                  self.bn1 = nn.BatchNorm2d(out_channels)
                  self.bn2 = nn.BatchNorm2d(out_channels)
          
              def forward(self, X):
                  Y = F.relu(self.bn1(self.conv1(X)))
                  Y = self.bn2(self.conv2(Y))
                  if self.conv3:
                      X = self.conv3(X)
                  return F.relu(Y + X)
          
          
          def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
              '''
              resnet block
              num_residuals: 当前block包含多少个残差块
              first_block: 是否为第一个block
              一个resnet block由num_residuals个残差块组成
              其中第一个残差块起到了通道数的转换和pooling的作用
              后面的若干残差块就是完成正常的特征提取
              '''
              if first_block:
                  assert in_channels == out_channels  # 第一个模块的输出通道数同输入通道数一致
              blk = []
              for i in range(num_residuals):
                  if i == 0 and not first_block:
                      blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
                  else:
                      blk.append(Residual(out_channels, out_channels))
              return nn.Sequential(*blk)
          
          
          # 定义resnet模型结构
          net = nn.Sequential(
              nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),  # TODO: 缩小感受野, 缩channel
              nn.BatchNorm2d(32),
              nn.ReLU())
          # nn.ReLU(),
          # nn.MaxPool2d(kernel_size=2, stride=2))   # TODO:去掉maxpool缩小感受野
          
          # 然后是连续4个block
          net.add_module("resnet_block1", resnet_block(32, 32, 2, first_block=True))  # TODO: channel统一减半
          net.add_module("resnet_block2", resnet_block(32, 64, 2))javascript
          net.add_module("resnet_block3", resnet_block(64, 128, 2))
          net.add_module("resnet_block4", resnet_block(128, 256, 2))
          # global average pooling
          net.add_module("global_avg_pool", GlobalAvgPool2d())
          # fc layer
          net.add_module("fc", nn.Sequential(FlattenLayer(), nn.Linear(256, 10)))
          
          def load_data_fashion_mnist(batch_size, root='../data'):
              """Download the fashion mnist dataset and then load into memory."""
          
              normalize = transforms.Normalize(mean=[0.28], std=[0.35])
              train_augs = transforms.Compose([
                  transforms.RandomCrop(28, padding=2),
                  transforms.RandomHorizontalFlip(),
                  transforms.ToTensor(),
                  normalize
              ])
          
              test_augs = transforms.Compose([
                  transforms.ToTensor(),
                  normalize
              ])
          
              mnist_train = torchvision.datasets.FashionMNIST(root=root, train=True, download=True, transform=train_augs)
              mnist_test = torchvision.datasets.FashionMNIST(root=root, train=False, download=True, transform=test_augs)
              if sys.platform.startswith('win'):
                  num_workers = 0  # 0表示不用额外的进程来加速读取数据
              else:
                  num_workers = 4
              train_iter = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
          编程    test_iter = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size, shuffle=False, num_workers=num_workers)
          
              return train_iter, test_iter
          
          
          print('训练...')
          batch_size = 64
          train_iter, test_iter = load_data_fashion_mnist(batch_size, root='../data')
          
          
          def编程 evaluate_accuracy(data_iter, net, device=None):
              if device is None and isinstance(net, torch.nn.Module):
                  # 如果没指定device就使用net的device
                  device = list(net.parameters())[0].device
              net.eval()
              acc_sum, n = 0.0, 0
              with torch.no_grad():
                  for X, y in data_iter:
                      acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
                      n += y.shape[0]
              net.train()  # 改回训练模式
              return acc_sum / n
          
          
          def train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs, lr, lr_period, lr_decay):
              net = net.to(device)
              print("training on ", device)
              loss = torch.nn.CrossEntropyLoss()
              best_test_acc = 0
              for epoch in range(num_epochs):
                  train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time()
          
                  if epoch > 0 and epoch % lr_period == 0:  # 每lr_period个epoch,学习率衰减一次
                      lr = lr * lr_decay
                      for param_group in optimizer.param_groups:
                          param_group['lr'] = lr
          
                  for X, y in train_iter:
                      X = X.to(device)
                      y = y.to(device)
                      y_hat = net(X)
               android       l = loss(y_hat, y)
                      optimizer.zero_grad()
                      l.backward()
                      optimizer.step()
                      train_l_sum += l.cpu().item()
                      train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
                      n += y.shape[0]
                      batch_count += 1
                  test_acc = evaluate_accuracy(test_iter, net)
                  print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
                        % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
                  if test_acc > best_test_acc:
                      print('find best! save at model/best.pth')
                      best_test_acc = test_acc
                      torch.save(net.state_dict(), 'model/best.pth')
                      # utils.save_model({
                      #    'arch': args.model,
                      #    'state_dict': net.state_dict()
                      # }, 'saved-models/{}-run-{}.pth.tar'.format(args.model, run))
          
          
          lr, num_epochs, lr_period, lr_decay = 0.01, 50, 5, 0.1
          #optimizer = optim.Adam(net.parameters(), lr=lr)
          optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
          device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
          train_model(net, train_iter, test_iter, batch_size, optimizer, device, num_epochs, lr, lr_period, lr_decay)
          
          print('加载最优模型')
          net.load_state_dict(torch.load('model/best.pth'))
          net = net.to(device)
          
          print('inference测试集')
          net.eval()
          id = 0
          preds_list = []
          with torch.no_grad():
              for X, y in test_iter:
                  batch_pred = list(net(X.to(device)).argmax(dim=1).cpu().numpy())
                  for y_pred in batch_pred:
                      preds_list.append((id, y_pred))
                      id += 1
          
          print('生成测试集评估文件')
          with open('result.csv', 'w') as f:
              f.write('ID,Prediction\n')
              for id, pred in preds_list:
                  f.write('{},{}\n'.format(id, pred))
          

          总结

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

          0

          上一篇:

          下一篇:

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          最新开发

          开发排行榜