pytorch使用horovod多gpu训练

pytorch使用horovod多gpu训练

pytorch在Horovod上训练步骤分为以下几步:

import torch
import horovod.torch as hvd

# Initialize Horovod 初始化horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process) 分配到每个gpu上
torch.cuda.set_device(hvd.local_rank())

# Define dataset... 定义dataset
train_dataset = ...

# Partition dataset among workers using DistributedSampler  对dataset的采样器进行调整,使用torch.utils.data.distributed.DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)

# Build model...
model = ...
model.cuda()

optimizer = optim.SGD(model.parameters())

# Add Horovod Distributed Optimizer  使用Horovod的分布式优化器函数包裹在原先optimizer上
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())

# Broadcast parameters from rank 0 to all other processes.  参数广播到每个gpu上
hvd.broadcast_parameters(model.state_dict(), root_rank=0)

for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
       optimizer.zero_grad()
       output = model(data)
       loss = F.nll_loss(output, target)
       loss.backward()
       optimizer.step()
       if batch_idx % args.log_interval == 0:
           print(‘Train Epoch: {} [{}/{}]\tLoss: {}‘.format(
               epoch, batch_idx * len(data), len(train_sampler), loss.item()))

完整示例代码如下,在imagenet上采用resnet50进行训练

  1 from __future__ import print_function
  2
  3 import torch
  4 import argparse
  5 import torch.backends.cudnn as cudnn
  6 import torch.nn.functional as F
  7 import torch.optim as optim
  8 import torch.utils.data.distributed
  9 from torchvision import datasets, transforms, models
 10 import horovod.torch as hvd
 11 import os
 12 import math
 13 from tqdm import tqdm
 14 from distutils.version import LooseVersion
 15
 16 # Training settings
 17 parser = argparse.ArgumentParser(description=‘PyTorch ImageNet Example‘,
 18                                  formatter_class=argparse.ArgumentDefaultsHelpFormatter)
 19 parser.add_argument(‘--train-dir‘, default=os.path.expanduser(‘~/imagenet/train‘),
 20                     help=‘path to training data‘)
 21 parser.add_argument(‘--val-dir‘, default=os.path.expanduser(‘~/imagenet/validation‘),
 22                     help=‘path to validation data‘)
 23 parser.add_argument(‘--log-dir‘, default=‘./logs‘,
 24                     help=‘tensorboard log directory‘)
 25 parser.add_argument(‘--checkpoint-format‘, default=‘./checkpoint-{epoch}.pth.tar‘,
 26                     help=‘checkpoint file format‘)
 27 parser.add_argument(‘--fp16-allreduce‘, action=‘store_true‘, default=False,
 28                     help=‘use fp16 compression during allreduce‘)
 29 parser.add_argument(‘--batches-per-allreduce‘, type=int, default=1,
 30                     help=‘number of batches processed locally before ‘
 31                          ‘executing allreduce across workers; it multiplies ‘
 32                          ‘total batch size.‘)
 33 parser.add_argument(‘--use-adasum‘, action=‘store_true‘, default=False,
 34                     help=‘use adasum algorithm to do reduction‘)
 35
 36 # Default settings from https://arxiv.org/abs/1706.02677.
 37 parser.add_argument(‘--batch-size‘, type=int, default=32,
 38                     help=‘input batch size for training‘)
 39 parser.add_argument(‘--val-batch-size‘, type=int, default=32,
 40                     help=‘input batch size for validation‘)
 41 parser.add_argument(‘--epochs‘, type=int, default=90,
 42                     help=‘number of epochs to train‘)
 43 parser.add_argument(‘--base-lr‘, type=float, default=0.0125,
 44                     help=‘learning rate for a single GPU‘)
 45 parser.add_argument(‘--warmup-epochs‘, type=float, default=5,
 46                     help=‘number of warmup epochs‘)
 47 parser.add_argument(‘--momentum‘, type=float, default=0.9,
 48                     help=‘SGD momentum‘)
 49 parser.add_argument(‘--wd‘, type=float, default=0.00005,
 50                     help=‘weight decay‘)
 51
 52 parser.add_argument(‘--no-cuda‘, action=‘store_true‘, default=False,
 53                     help=‘disables CUDA training‘)
 54 parser.add_argument(‘--seed‘, type=int, default=42,
 55                     help=‘random seed‘)
 56
 57 args = parser.parse_args()
 58 args.cuda = not args.no_cuda and torch.cuda.is_available()
 59
 60 allreduce_batch_size = args.batch_size * args.batches_per_allreduce
 61
 62 hvd.init()
 63 torch.manual_seed(args.seed)
 64
 65 if args.cuda:
 66     # Horovod: pin GPU to local rank.
 67     torch.cuda.set_device(hvd.local_rank())
 68     torch.cuda.manual_seed(args.seed)
 69
 70 cudnn.benchmark = True
 71
 72 # If set > 0, will resume training from a given checkpoint.
 73 resume_from_epoch = 0
 74 for try_epoch in range(args.epochs, 0, -1):
 75     if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
 76         resume_from_epoch = try_epoch
 77         break
 78
 79 # Horovod: broadcast resume_from_epoch from rank 0 (which will have
 80 # checkpoints) to other ranks.
 81 resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
 82                                   name=‘resume_from_epoch‘).item()
 83
 84 # Horovod: print logs on the first worker.
 85 verbose = 1 if hvd.rank() == 0 else 0
 86
 87 # Horovod: write TensorBoard logs on first worker.
 88 try:
 89     if LooseVersion(torch.__version__) >= LooseVersion(‘1.2.0‘):
 90         from torch.utils.tensorboard import SummaryWriter
 91     else:
 92         from tensorboardX import SummaryWriter
 93     log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None
 94 except ImportError:
 95     log_writer = None
 96
 97 # Horovod: limit # of CPU threads to be used per worker.
 98 torch.set_num_threads(4)
 99
100 kwargs = {‘num_workers‘: 4, ‘pin_memory‘: True} if args.cuda else {}
101 train_dataset = 102     datasets.ImageFolder(args.train_dir,
103                          transform=transforms.Compose([
104                              transforms.RandomResizedCrop(224),
105                              transforms.RandomHorizontalFlip(),
106                              transforms.ToTensor(),
107                              transforms.Normalize(mean=[0.485, 0.456, 0.406],
108                                                   std=[0.229, 0.224, 0.225])
109                          ]))
110 # Horovod: use DistributedSampler to partition data among workers. Manually specify
111 # `num_replicas=hvd.size()` and `rank=hvd.rank()`.
112 train_sampler = torch.utils.data.distributed.DistributedSampler(
113     train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
114 train_loader = torch.utils.data.DataLoader(
115     train_dataset, batch_size=allreduce_batch_size,
116     sampler=train_sampler, **kwargs)
117
118 val_dataset = 119     datasets.ImageFolder(args.val_dir,
120                          transform=transforms.Compose([
121                              transforms.Resize(256),
122                              transforms.CenterCrop(224),
123                              transforms.ToTensor(),
124                              transforms.Normalize(mean=[0.485, 0.456, 0.406],
125                                                   std=[0.229, 0.224, 0.225])
126                          ]))
127 val_sampler = torch.utils.data.distributed.DistributedSampler(
128     val_dataset, num_replicas=hvd.size(), rank=hvd.rank())
129 val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size,
130                                          sampler=val_sampler, **kwargs)
131
132
133 # Set up standard ResNet-50 model.
134 model = models.resnet50()
135
136 # By default, Adasum doesn‘t need scaling up learning rate.
137 # For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
138 lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1
139
140 if args.cuda:
141     # Move model to GPU.
142     model.cuda()
143     # If using GPU Adasum allreduce, scale learning rate by local_size.
144     if args.use_adasum and hvd.nccl_built():
145         lr_scaler = args.batches_per_allreduce * hvd.local_size()
146
147 # Horovod: scale learning rate by the number of GPUs.
148 optimizer = optim.SGD(model.parameters(),
149                       lr=(args.base_lr *
150                           lr_scaler),
151                       momentum=args.momentum, weight_decay=args.wd)
152
153 # Horovod: (optional) compression algorithm.
154 compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
155
156 # Horovod: wrap optimizer with DistributedOptimizer.
157 optimizer = hvd.DistributedOptimizer(
158     optimizer, named_parameters=model.named_parameters(),
159     compression=compression,
160     backward_passes_per_step=args.batches_per_allreduce,
161     op=hvd.Adasum if args.use_adasum else hvd.Average)
162
163 # Restore from a previous checkpoint, if initial_epoch is specified.
164 # Horovod: restore on the first worker which will broadcast weights to other workers.
165 if resume_from_epoch > 0 and hvd.rank() == 0:
166     filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
167     checkpoint = torch.load(filepath)
168     model.load_state_dict(checkpoint[‘model‘])
169     optimizer.load_state_dict(checkpoint[‘optimizer‘])
170
171 # Horovod: broadcast parameters & optimizer state.
172 hvd.broadcast_parameters(model.state_dict(), root_rank=0)
173 hvd.broadcast_optimizer_state(optimizer, root_rank=0)
174
175 def train(epoch):
176     model.train()
177     train_sampler.set_epoch(epoch)
178     train_loss = Metric(‘train_loss‘)
179     train_accuracy = Metric(‘train_accuracy‘)
180
181     with tqdm(total=len(train_loader),
182               desc=‘Train Epoch     #{}‘.format(epoch + 1),
183               disable=not verbose) as t:
184         for batch_idx, (data, target) in enumerate(train_loader):
185             adjust_learning_rate(epoch, batch_idx)
186
187             if args.cuda:
188                 data, target = data.cuda(), target.cuda()
189             optimizer.zero_grad()
190             # Split data into sub-batches of size batch_size
191             for i in range(0, len(data), args.batch_size):
192                 data_batch = data[i:i + args.batch_size]
193                 target_batch = target[i:i + args.batch_size]
194                 output = model(data_batch)
195                 train_accuracy.update(accuracy(output, target_batch))
196                 loss = F.cross_entropy(output, target_batch)
197                 train_loss.update(loss)
198                 # Average gradients among sub-batches
199                 loss.div_(math.ceil(float(len(data)) / args.batch_size))
200                 loss.backward()
201             # Gradient is applied across all ranks
202             optimizer.step()
203             t.set_postfix({‘loss‘: train_loss.avg.item(),
204                            ‘accuracy‘: 100. * train_accuracy.avg.item()})
205             t.update(1)
206
207     if log_writer:
208         log_writer.add_scalar(‘train/loss‘, train_loss.avg, epoch)
209         log_writer.add_scalar(‘train/accuracy‘, train_accuracy.avg, epoch)
210
211
212 def validate(epoch):
213     model.eval()
214     val_loss = Metric(‘val_loss‘)
215     val_accuracy = Metric(‘val_accuracy‘)
216
217     with tqdm(total=len(val_loader),
218               desc=‘Validate Epoch  #{}‘.format(epoch + 1),
219               disable=not verbose) as t:
220         with torch.no_grad():
221             for data, target in val_loader:
222                 if args.cuda:
223                     data, target = data.cuda(), target.cuda()
224                 output = model(data)
225
226                 val_loss.update(F.cross_entropy(output, target))
227                 val_accuracy.update(accuracy(output, target))
228                 t.set_postfix({‘loss‘: val_loss.avg.item(),
229                                ‘accuracy‘: 100. * val_accuracy.avg.item()})
230                 t.update(1)
231
232     if log_writer:
233         log_writer.add_scalar(‘val/loss‘, val_loss.avg, epoch)
234         log_writer.add_scalar(‘val/accuracy‘, val_accuracy.avg, epoch)
235
236
237 # Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
238 # accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
239 # the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
240 # After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
241 def adjust_learning_rate(epoch, batch_idx):
242     if epoch < args.warmup_epochs:
243         epoch += float(batch_idx + 1) / len(train_loader)
244         lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
245     elif epoch < 30:
246         lr_adj = 1.
247     elif epoch < 60:
248         lr_adj = 1e-1
249     elif epoch < 80:
250         lr_adj = 1e-2
251     else:
252         lr_adj = 1e-3
253     for param_group in optimizer.param_groups:
254         param_group[‘lr‘] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj
255
256
257 def accuracy(output, target):
258     # get the index of the max log-probability
259     pred = output.max(1, keepdim=True)[1]
260     return pred.eq(target.view_as(pred)).cpu().float().mean()
261
262
263 def save_checkpoint(epoch):
264     if hvd.rank() == 0:
265         filepath = args.checkpoint_format.format(epoch=epoch + 1)
266         state = {
267             ‘model‘: model.state_dict(),
268             ‘optimizer‘: optimizer.state_dict(),
269         }
270         torch.save(state, filepath)
271
272
273 # Horovod: average metrics from distributed training.
274 class Metric(object):
275     def __init__(self, name):
276         self.name = name
277         self.sum = torch.tensor(0.)
278         self.n = torch.tensor(0.)
279
280     def update(self, val):
281         self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
282         self.n += 1
283
284     @property
285     def avg(self):
286         return self.sum / self.n
287
288
289 for epoch in range(resume_from_epoch, args.epochs):
290     train(epoch)
291     validate(epoch)
292     save_checkpoint(epoch)

原文地址:https://www.cnblogs.com/ywheunji/p/12298518.html

时间: 2024-08-30 08:29:34

pytorch使用horovod多gpu训练的相关文章

Pytorch中多GPU训练指北

前言 在数据越来越多的时代,随着模型规模参数的增多,以及数据量的不断提升,使用多GPU去训练是不可避免的事情.Pytorch在0.4.0及以后的版本中已经提供了多GPU训练的方式,本文简单讲解下使用Pytorch多GPU训练的方式以及一些注意的地方. 这里我们谈论的是单主机多GPUs训练,与分布式训练不同,我们采用的主要Pytorch功能函数为DataParallel而不是DistributedParallel,后者为多主机多GPUs的训练方式,但是在实际任务中,两种使用方式也存在一部分交集.

Pytorch 多GPU训练-多计算节点并行-All you need

概述 本篇介绍多计算节点上的pytorch分布式训练.从环境配置到运行demo的所有步骤,step by step.没有理论原理,理论原理可以参考这里. 基础环境 多台linux计算节点,通过网络连接,不同主机之间可以相互ping通.网速越快越好,如果通信速度比较慢,就不用怎么考虑分布式训练. 所有linux计算节点都包含若干GPU,GPU数量可以不一致,但是所有GPU计算速度尽量一致,否则模型的同步时会等待大量时间(短板效应). 所有计算节点都拥有Pytorch运行环境,即都可以单独的运行训练

奉献pytorch 搭建 CNN 卷积神经网络训练图像识别的模型,配合numpy 和matplotlib 一起使用调用 cuda GPU进行加速训练

1.Torch构建简单的模型 # coding:utf-8 import torch class Net(torch.nn.Module): def __init__(self,img_rgb=3,img_size=32,img_class=13): super(Net, self).__init__() self.conv1 = torch.nn.Sequential( torch.nn.Conv2d(in_channels=img_rgb, out_channels=img_size, ke

[深度学习] Pytorch学习(二)—— torch.nn 实践:训练分类器(含多GPU训练CPU加载预测的使用方法)

Learn From: Pytroch 官方Tutorials Pytorch 官方文档 环境:python3.6 CUDA10 pytorch1.3 vscode+jupyter扩展 #%% #%% # 1.Loading and normalizing CIFAR10 import torch import torchvision import torchvision.transforms as transforms batch_size = 16 transform = transform

pytorch 多GPU 训练

import osos.environ['CUDA_VISIBLE_DEVICES'] = '0, 1, 2'import torch #注意以上两行先后顺序不可弄错 device = torch.device('cuda') model = DataParallel(model)model.to(device) 这样模型就会在gpu 0, 1, 2 上进行训练 原文地址:https://www.cnblogs.com/rabitvision/p/12218986.html

pytorch中使用多显卡训练以及训练时报错:expect more than 1 value per channel when training, got input size..

pytorch在训练中使用多卡: conf.device = torch.device('cuda:0' if torch.cuda.is_available() else "cpu") conf.device_ids = list(conf.device_ids) self.model = torch.nn.DataParallel(self.model, device_ids=conf.device_ids) self.model.to(conf.device) 然后在训练的命令行

TensorFlow如何提高GPU训练效率和利用率

前言 首先,如果你现在已经很熟悉tf.data+estimator了,可以把文章x掉了╮( ̄▽ ̄””)╭ 但是!如果现在还是在进行session.run(..)的话!尤其是苦恼于GPU显存都塞满了利用率却上不去的童鞋,这篇文章或许可以给你打开新世界的大门噢( ̄∇ ̄) 如果发现经过一系列改良后训练效率大大提高了,记得回来给小夕发小红包( ̄∇ ̄) 不过,这并不是一篇怒贴一堆代码,言(三)简(言)意(两)赅(语)就结束的CSDN文风的文章...所以伸手党们也可以X掉了╮( ̄▽ ̄””)╭ 缘起 很早很早

单机多GPU训练报错

问题一: 在keras中使用多个GPU训练模型时,出现错误 AttributeError: '_TfDeviceCaptureOp' object has no attribute '_set_device_from_string' , 根据错误提示是'_TfDeviceCaptureOp'对象没有属性'_set_device_from_string'. 解决措施:经过思考,我觉得我的tensorflow版本可能有问题,所以将tensorflow从1.14.0版本降到1.12.0版本,此问题得到

pytorch代码中同时包含训练和测试代码时显存爆炸

原因在于没有使用torch.no_grad()函数.在查看验证集和测试集表现时,应使用类似这样的代码 def evaluate(data_loader): with torch.no_grad(): mean_acc, mean_iou = 0, 0 for i, (img, gnd) in enumerate(data_loader): if torch.cuda.is_available(): img = img.cuda(device=device) gnd = gnd.cuda(devi