pytorch使用horovod多gpu训练的实现
pytorch在Horovod上训练步骤分为以下几步:
importtorch
importhorovod.torchashvd
#InitializeHorovod初始化horovod
hvd.init()
#PinGPUtobeusedtoprocesslocalrank(oneGPUperprocess)分配到每个gpu上
torch.cuda.set_device(hvd.local_rank())
#Definedataset...定义dataset
train_dataset=...
#PartitiondatasetamongworkersusingDistributedSampler对dataset的采样器进行调整,使用torch.utils.data.distributed.DistributedSampler
train_sampler=torch.utils.data.distributed.DistributedSampler(
train_dataset,num_replicas=hvd.size(),rank=hvd.rank())
train_loader=torch.utils.data.DataLoader(train_dataset,batch_size=...,sampler=train_sampler)
#Buildmodel...
model=...
model.cuda()
optimizer=optim.SGD(model.parameters())
#AddHorovodDistributedOptimizer使用Horovod的分布式优化器函数包裹在原先optimizer上
optimizer=hvd.DistributedOptimizer(optimizer,named_parameters=model.named_parameters())
#Broadcastparametersfromrank0toallotherprocesses.参数广播到每个gpu上
hvd.broadcast_parameters(model.state_dict(),root_rank=0)
forepochinrange(100):
forbatch_idx,(data,target)inenumerate(train_loader):
optimizer.zero_grad()
output=model(data)
loss=F.nll_loss(output,target)
loss.backward()
optimizer.step()
ifbatch_idx%args.log_interval==0:
print('TrainEpoch:{}[{}/{}]\tLoss:{}'.format(
epoch,batch_idx*len(data),len(train_sampler),loss.item()))
完整示例代码如下,在imagenet上采用resnet50进行训练
from__future__importprint_function
importtorch
importargparse
importtorch.backends.cudnnascudnn
importtorch.nn.functionalasF
importtorch.optimasoptim
importtorch.utils.data.distributed
fromtorchvisionimportdatasets,transforms,models
importhorovod.torchashvd
importos
importmath
fromtqdmimporttqdm
fromdistutils.versionimportLooseVersion
#Trainingsettings
parser=argparse.ArgumentParser(description='PyTorchImageNetExample',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir',default=os.path.expanduser('~/imagenet/train'),
help='pathtotrainingdata')
parser.add_argument('--val-dir',default=os.path.expanduser('~/imagenet/validation'),
help='pathtovalidationdata')
parser.add_argument('--log-dir',default='./logs',
help='tensorboardlogdirectory')
parser.add_argument('--checkpoint-format',default='./checkpoint-{epoch}.pth.tar',
help='checkpointfileformat')
parser.add_argument('--fp-allreduce',action='store_true',default=False,
help='usefpcompressionduringallreduce')
parser.add_argument('--batches-per-allreduce',type=int,default=,
help='numberofbatchesprocessedlocallybefore'
'executingallreduceacrossworkers;itmultiplies'
'totalbatchsize.')
parser.add_argument('--use-adasum',action='store_true',default=False,
help='useadasumalgorithmtodoreduction')
#Defaultsettingsfromhttps://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size',type=int,default=32,
help='inputbatchsizefortraining')
parser.add_argument('--val-batch-size',type=int,default=32,
help='inputbatchsizeforvalidation')
parser.add_argument('--epochs',type=int,default=90,
help='numberofepochstotrain')
parser.add_argument('--base-lr',type=float,default=0.0125,
44help='learningrateforasingleGPU')
45parser.add_argument('--warmup-epochs',type=float,default=5,
help='numberofwarmupepochs')
parser.add_argument('--momentum',type=float,default=0.9,
help='SGDmomentum')
parser.add_argument('--wd',type=float,default=0.00005,
help='weightdecay')
parser.add_argument('--no-cuda',action='store_true',default=False,
help='disablesCUDAtraining')
parser.add_argument('--seed',type=int,default=42,
help='randomseed')
args=parser.parse_args()
args.cuda=notargs.no_cudaandtorch.cuda.is_available()
allreduce_batch_size=args.batch_size*args.batches_per_allreduce
hvd.init()
torch.manual_seed(args.seed)
ifargs.cuda:
#Horovod:pinGPUtolocalrank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
cudnn.benchmark=True
#Ifset>0,willresumetrainingfromagivencheckpoint.
resume_from_epoch=0
fortry_epochinrange(args.epochs,0,-1):
ifos.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch=try_epoch
break
#Horovod:broadcastresume_from_epochfromrank0(whichwillhave
#checkpoints)tootherranks.
resume_from_epoch=hvd.broadcast(torch.tensor(resume_from_epoch),root_rank=0,
name='resume_from_epoch').item()
#Horovod:printlogsonthefirstworker.
verbose=1ifhvd.rank()==0else0
#Horovod:writeTensorBoardlogsonfirstworker.
try:
ifLooseVersion(torch.__version__)>=LooseVersion('1.2.0'):
fromtorch.utils.tensorboardimportSummaryWriter
else:
fromtensorboardXimportSummaryWriter
log_writer=SummaryWriter(args.log_dir)ifhvd.rank()==0elseNone
exceptImportError:
log_writer=None
#Horovod:limit#ofCPUthreadstobeusedperworker.
torch.set_num_threads(4)
kwargs={'num_workers':4,'pin_memory':True}ifargs.cudaelse{}
train_dataset=\
datasets.ImageFolder(args.train_dir,
transform=transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[.,.,.],
std=[0.229,0.224,0.225])
]))
#Horovod:useDistributedSamplertopartitiondataamongworkers.Manuallyspecify
#`num_replicas=hvd.size()`and`rank=hvd.rank()`.
train_sampler=torch.utils.data.distributed.DistributedSampler(
train_dataset,num_replicas=hvd.size(),rank=hvd.rank())
train_loader=torch.utils.data.DataLoader(
train_dataset,batch_size=allreduce_batch_size,
sampler=train_sampler,**kwargs)
val_dataset=\
datasets.ImageFolder(args.val_dir,
transform=transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485,0.456,0.406],
std=[0.229,0.224,0.225])
]))
val_sampler=torch.utils.data.distributed.DistributedSampler(
val_dataset,num_replicas=hvd.size(),rank=hvd.rank())
val_loader=torch.utils.data.DataLoader(val_dataset,batch_size=args.val_batch_size,
sampler=val_sampler,**kwargs)
#SetupstandardResNet-50model.
model=models.resnet50()
#Bydefault,Adasumdoesn'tneedscalinguplearningrate.
#Forsum/averagewithgradientAccumulation:scalelearningratebybatches_per_allreduce
lr_scaler=args.batches_per_allreduce*hvd.size()ifnotargs.use_adasumelse1
ifargs.cuda:
#MovemodeltoGPU.
model.cuda()
#IfusingGPUAdasumallreduce,scalelearningratebylocal_size.
ifargs.use_adasumandhvd.nccl_built():
lr_scaler=args.batches_per_allreduce*hvd.local_size()
#Horovod:scalelearningratebythenumberofGPUs.
optimizer=optim.SGD(model.parameters(),
lr=(args.base_lr*
lr_scaler),
momentum=args.momentum,weight_decay=args.wd)
#Horovod:(optional)compressionalgorithm.
compression=hvd.Compression.fp16ifargs.fp16_allreduceelsehvd.Compression.none
#Horovod:wrapoptimizerwithDistributedOptimizer.
optimizer=hvd.DistributedOptimizer(
optimizer,named_parameters=model.named_parameters(),
compression=compression,
backward_passes_per_step=args.batches_per_allreduce,
op=hvd.Adasumifargs.use_adasumelsehvd.Average)
#Restorefromapreviouscheckpoint,ifinitial_epochisspecified.
#Horovod:restoreonthefirstworkerwhichwillbroadcastweightstootherworkers.
ifresume_from_epoch>0andhvd.rank()==0:
filepath=args.checkpoint_format.format(epoch=resume_from_epoch)
checkpoint=torch.load(filepath)
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
#Horovod:broadcastparameters&optimizerstate.
hvd.broadcast_parameters(model.state_dict(),root_rank=)
hvd.broadcast_optimizer_state(optimizer,root_rank=)
deftrain(epoch):
model.train()
train_sampler.set_epoch(epoch)
train_loss=Metric('train_loss')
train_accuracy=Metric('train_accuracy')
withtqdm(total=len(train_loader),
desc='TrainEpoch#{}'.format(epoch+1),
disable=notverbose)ast:
forbatch_idx,(data,target)inenumerate(train_loader):
adjust_learning_rate(epoch,batch_idx)
ifargs.cuda:
data,target=data.cuda(),target.cuda()
optimizer.zero_grad()
#Splitdataintosub-batchesofsizebatch_size
foriinrange(0,len(data),args.batch_size):
data_batch=data[i:i+args.batch_size]
target_batch=target[i:i+args.batch_size]
output=model(data_batch)
train_accuracy.update(accuracy(output,target_batch))
loss=F.cross_entropy(output,target_batch)
train_loss.update(loss)
#Averagegradientsamongsub-batches
loss.div_(math.ceil(float(len(data))/args.batch_size))
loss.backward()
#Gradientisappliedacrossallranks
optimizer.step()
t.set_postfix({'loss':train_loss.avg.item(),
'accuracy':100.*train_accuracy.avg.item()})
t.update(1)
iflog_writer:
log_writer.add_scalar('train/loss',train_loss.avg,epoch)
log_writer.add_scalar('train/accuracy',train_accuracy.avg,epoch)
defvalidate(epoch):
model.eval()
val_loss=Metric('val_loss')
val_accuracy=Metric('val_accuracy')
withtqdm(total=len(val_loader),
desc='ValidateEpoch#{}'.format(epoch+),
disable=notverbose)ast:
withtorch.no_grad():
fordata,targetinval_loader:
ifargs.cuda:
data,target=data.cuda(),target.cuda()
output=model(data)
val_loss.update(F.cross_entropy(output,target))
val_accuracy.update(accuracy(output,target))
t.set_postfix({'loss':val_loss.avg.item(),
'accuracy':100.*val_accuracy.avg.item()})
t.update(1)
iflog_writer:
log_writer.add_scalar('val/loss',val_loss.avg,epoch)
log_writer.add_scalar('val/accuracy',val_accuracy.avg,epoch)
#Horovod:using`lr=base_lr*hvd.size()`fromtheverybeginningleadstoworsefinal
#accuracy.Scalethelearningrate`lr=base_lr`--->`lr=base_lr*hvd.size()`during
#thefirstfiveepochs.Seehttps://arxiv.org/abs/1706.02677fordetails.
#Afterthewarmupreducelearningrateby10onthe30th,60thand80thepochs.
defadjust_learning_rate(epoch,batch_idx):
ifepoch
到此这篇关于pytorch使用horovod多gpu训练的实现的文章就介绍到这了,更多相关pytorchhorovod多gpu训练内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。