Pytorch分布式训练

Pytorch

Posted by Shaozi on March 28, 2019

背景:需要在短时间内完成大数据量的训练,处理较大的Batch_size

基于以上的需求,有了使用Pytorch进行分布式计算的需求。百度并且Google了一下,发现可供参考的资料并不多。Pytorch的官方文档只给了一个在本地模拟的demo,而且似乎没有用到GPU这里,个人表示写的并不清楚。

其他的参考资料在这篇知乎文章中有较为详细的总结pytorch分布式计算配置,但在作者明显没有做完.

但是这篇文章并没有给出一个完整的解决方案,这里将依据这篇文章,以及其中提及的这一篇PyTorch分布式训练为基础,进行探究。

先说一下我的实验环境: 目前有3台虚拟机器,分别编号为01,02,03,每一台上面有4块P40 GPU, 安装了相同的开发环境,Pytorch的版本为0.4.0(纯粹是因为该版本对P40支持较好,推荐使用最新的1.0.0版本)。这三台机器共享“/home/Username/”下的文件,并且在同一个局域网下。IP地址分别为:

host ip
01 192.168.54.179
02 192.168.234.13
03 192.168.119.37

实验环境的搭建不是我完成的,所以具体的搭建方法这里没法详细的说明了。感觉只需要不同机器之间能够相互访问就可以进行分布式训练。

上面的链接中也给出了几个不同的实现代码,当然我们不能只测试CPU或者单机多卡的情况。我们的目标是同时使用所有机器上所有的GPU进行训练。经过一些简单的实验,选用PyTorch分布式训练这里提供的代码作为探究的基础。

由于代码小小修改了一点,所以还是先贴在下面,文件名为”mnsit.py”:

from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import time

import torch.nn.parallel
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.utils.data 
import torch.utils.data.distributed
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable

# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=1024, metavar='N',
                    help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                    help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=20, metavar='N',
                    help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                    help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                    help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='how many batches to wait before logging training status')
parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')
parser.add_argument('--rank', type=int)
parser.add_argument('--world-size',type=int)

args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()

#初始化
dist.init_process_group(init_method=args.init_method,backend="gloo",world_size=args.world_size,rank=args.rank,group_name="pytorch_test")

torch.manual_seed(args.seed)
if args.cuda:
    torch.cuda.manual_seed(args.seed)

train_dataset=datasets.MNIST('data', train=True, download=True,
               transform=transforms.Compose([
                   transforms.ToTensor(),
                   transforms.Normalize((0.1307,), (0.3081,))
               ]))
# 分发数据
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}

train_loader = torch.utils.data.DataLoader(train_dataset,
    batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('data', train=False, transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=args.test_batch_size, shuffle=True, **kwargs)


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

model = Net()
if args.cuda:
    # 分发模型
    model.cuda()
    model = torch.nn.parallel.DistributedDataParallel(model)
    # model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda()
    # model.cuda()

optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        data, target = Variable(data), Variable(target)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.data[0]))

def test():
    model.eval()
    test_loss = 0
    correct = 0
    for data, target in test_loader:
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        data, target = Variable(data, volatile=True), Variable(target)
        output = model(data)
        test_loss += F.nll_loss(output, target, size_average=False).data[0] # sum up batch loss
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += pred.eq(target.data.view_as(pred)).cpu().sum()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

tot_time=0;

for epoch in range(1, args.epochs + 1):
    # 设置epoch位置,这应该是个为了同步所做的工作
    train_sampler.set_epoch(epoch)
    start_cpu_secs = time.time()
    #long running
    train(epoch)
    end_cpu_secs = time.time()
    print("Epoch {} of {} took {:.3f}s".format(
        epoch , args.epochs , end_cpu_secs - start_cpu_secs))
    tot_time+=end_cpu_secs - start_cpu_secs
    test()

print("Total time= {:.3f}s".format(tot_time))

先吧代码讲一遍吧,首先上来是读取各种输入的参数,由于是测试性质的,所以前面一律default就行了。主要要设置的是三个参数:

parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')
parser.add_argument('--rank', type=int)
parser.add_argument('--world-size',type=int)

这三个参数对应的是分布式计算的核心torch.distributed的设置:

dist.init_process_group(init_method=args.init_method,backend="gloo",world_size=args.world_size,rank=args.rank,group_name="pytorch_test")

这里详细介绍一下: 官方文档的输入如下:

torch.distributed.init_process_group(backend, init_method='env://', timeout=datetime.timedelta(0, 1800), **kwargs)
# Initializes the default distributed process group, and this will also initialize the distributed package

# Parameters:	
# backend (str or Backend) – The backend to use. Depending on build-time configurations, valid values include mpi, gloo, and nccl. This field should be given as a lowercase string (e.g., "gloo"), which can also be accessed via Backend attributes (e.g., Backend.GLOO).
# init_method (str, optional) – URL specifying how to initialize the process group.
# world_size (int, optional) – Number of processes participating in the job.
# rank (int, optional) – Rank of the current process.
# timeout (timedelta, optional) – Timeout for operations executed against the process group. Default value equals 30 minutes. This is only applicable for the gloo backend.
# group_name (str, optional, deprecated) – Group name.
# To enable backend == Backend.MPI, PyTorch needs to built from source on a system that supports MPI. The same applies to NCCL as well.

其中第一个设置的是通信库,分别可以选mpi,gloo和nccl,由于gloo是facebook自己家的,所以Pytorch对其的支持是最全的:

Pytoch支持情况

不过相比较而言,如果是基于GPU的训练,选择nccl更佳。

后面会详细探究这几个的差别。(写到这里的时候我也不清楚)

init_method分析

‘init_method’支持三种方式:

  1. TCP initialization

以TCP协议的方式进行不同分布式进程之间的数据交流,需要设置一个端口,不同进程之间公用这一个端口,并且设置host的级别和host的数量。设计两个参数rank和world_size。其中rank为host的编号,默认0为主机,端口应该位于该主机上。world_size为分布式主机的个数。

采用该方式,运行上面的代码可以使用如下指令:

在主机01上:

python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 0 --world-size 2

在主机02上:

python mnsit.py --init-method tcp://192.168.54.179:22225 --rank 1 --world-size 2

这里没有设置backend参数,所以默认是gloo。22225是端口号,用一个没有没占用的就行。这两句指令的先后顺序没有要求,只有两条指令都输入,程序才会运行起来。(实测两台机器的pytorch版本不一样也可以训练,但是会报警告)

此时上述代码训练完成后会出现错误,但是程序确实运行到了最后一行,运行完毕了。错误如下,关于这个错误的资料很少,因为不影响程序运行,所以就不深究了。

terminate called after throwing an instance of 'gloo::EnforceNotMet'
  what():  [enforce fail at /pytorch/third_party/gloo/gloo/cuda_private.h:40] error == cudaSuccess. 29 vs 0. Error at: /pytorch/third_party/gloo/gloo/cuda_private.h:40: driver shutting down

这里有一个简单的速度对比测试(不严谨,不同机器上还train着其他的训练代码,相互之间也有差别):

host数量 2 3
train time 175.325s 172.651s

惊奇的发现,速度并没有变快多少,可能是batch设小了。我们改为4096试试。

host数量 2 3
train time 166.262s 173.873s

居然还变慢了,我看了一下显存占用,直接将batch调为40960应该也可以跑。

host数量 2 3
train time 180.476s 173.212s

从上面的结果看,batch很大的时候更多的线程终于有点效果了,但是提升也不是很大。

从上述三组实验结果来看,并行运算除了可以使用更大的batch以外,在速度上并没有什么优势,注意到只用两台机器和用三台机器时,每一个epoch的训练时间基本是一致的。都是8点多秒。考虑瓶颈出现在数据的传输。

  1. Shared file-system initialization

提供的第二种方式是文件共享,由于我的三台机器有共享的文件系统,故可以采用这种方式,也避免了基于TCP的网络传输。这里使用方式是使用绝对路径在指定一个共享文件系统下不存在的文件。

在主机01上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2

在主机02上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2

这里相比于TCP的方式麻烦一点的是运行完一次必须更换共享的文件名,或者删除之前的共享文件,不然第二次运行会报错。

对于速度,就直接比较batch为40960的吧:

host数量 2 3
train time 176.041s 182.772s

好吧,这次又反过来了,所以。。。因为实验环境的文件系统小文件的读写速度很慢,可能是造成这种现象的原因,所以还是要依据不同的开发环境选择合适的方法。

  1. Environment variable initialization

这个文档中没有给详细的说明,所以我也不知道是什么原理,他要求提供以下参数:

MASTER_PORT - required; has to be a free port on machine with rank 0
MASTER_ADDR - required (except for rank 0); address of rank 0 node
WORLD_SIZE - required; can be set either here, or in a call to init function
RANK - required; can be set either here, or in a call to init function

但是前两个并没有在init_process_group的参数里。这里有一个官方文档中的用例:

Node 1: (IP: 192.168.1.1, and has a free port: 1234)

>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
Node 2:

>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

这里就不测试该方法了,可以看到相当于将distributed的初始化放到了代码外面。使用起来应该也是很清晰的。

backend分析

这里因为我们要用GPU所以就简单分析一下nccl和gloo的性能差异。下面就测试使用01和02机器的效果,通信方式选为TCP

backend gloo nccl
train time 180.476s 164.876s

这个差距就很大了,所以还是亲爹NB。 而且使用NCCL就没有一开始那个奇怪的报错了。

最后保存一下model。然后读取在输出,看一下这里和单机多卡使用的DataParallel有什么区别。

在保存时会出现下面的错误:

Traceback (most recent call last):
  File "mnsit.py", line 145, in <module>
    torch.save(model, 'model.pth')
  File "/usr/local/lib/python3.6/dist-packages/torch/serialization.py", line 209, in save
    return _with_file_like(f, "wb", lambda f: _save(obj, f, pickle_module, pickle_protocol))
  File "/usr/local/lib/python3.6/dist-packages/torch/serialization.py", line 134, in _with_file_like
    return body(f)
  File "/usr/local/lib/python3.6/dist-packages/torch/serialization.py", line 209, in <lambda>
    return _with_file_like(f, "wb", lambda f: _save(obj, f, pickle_module, pickle_protocol))
  File "/usr/local/lib/python3.6/dist-packages/torch/serialization.py", line 282, in _save
    pickler.dump(obj)
AttributeError: Can't pickle local object 'DistributedDataParallel._register_nccl_grad_hook.<locals>.allreduce_hook'

这里改为:

torch.save(model.state_dict(), 'model.pth')

就行了。

剩下的操作基本就是很标准的操作了,记住要分发数据集,和分发模型就行了。