import torch import torch_npu from apex import amp
parser.add_argument('--device', default='npu', type=str, help='npu or gpu') parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr') parser.add_argument('--device_list', default='0,1,2,3,4,5,6,7', type=str, help='device id list') parser.add_argument('--amp', default=False, action='store_true', help='use amp to train the model') parser.add_argument('--loss_scale', default=1024., type=float, help='loss scale using in amp, default -1 means dynamic') parser.add_argument('--opt_level', default='O2', type=str, help='loss scale using in amp, default -1 means dynamic') parser.add_argument('--dist_backend', default='hccl', type=str, help='distributed backend')
def device_id_to_process_device_map(device_list): devices = device_list.split(",") devices = [int(x) for x in devices] devices.sort() process_device_map = dict() for process_id, device_id in enumerate(devices): process_device_map[process_id] = device_id return process_device_map
代码位置:main.py文件中的主函数main()。
添加代码如下:
def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = '**' # **为端口号,请根据实际选择一个闲置端口填写
代码位置:main.py文件中的主函数main()。
原代码如下:
args.distributed = args.world_size > 1 or args.multiprocessing_distributed if torch.cuda.is_available(): ngpus_per_node = torch.cuda.device_count() else: ngpus_per_node = 1
修改后代码如下:
args.distributed = args.world_size > 1 or args.multiprocessing_distributed args.process_device_map = device_id_to_process_device_map(args.device_list) if args.device == 'npu': ngpus_per_node = len(args.process_device_map) else: ngpus_per_node = torch.cuda.device_count()
原代码如下:
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = gpu
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = args.process_device_map[gpu]
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
if args.device == 'npu': dist.init_process_group(backend=args.dist_backend, #init_method=args.dist_url, world_size=args.world_size, rank=args.rank) else: dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
代码位置:main.py文件中的main_worker()。
原代码如下:
# create model if args.pretrained: print("=> using pre-trained model '{}'".format(args.arch)) model = models.__dict__[args.arch](pretrained=True) else: print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]() if not torch.cuda.is_available(): print('using CPU, this will be slow') ...... else: model = torch.nn.DataParallel(model).cuda()
修改后代码如下:
# create model if args.pretrained: print("=> using pre-trained model '{}'".format(args.arch)) model = models.__dict__[args.arch](pretrained=True) else: print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]() # 指定训练设备为昇腾AI处理器 loc = 'npu:{}'.format(args.gpu) torch_npu.npu.set_device(loc) # 计算用于训练的batch_size和workers args.batch_size = int(args.batch_size / ngpus_per_node) args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
代码位置:main.py文件中的main_worker()。
需要屏蔽的原代码如下,已注释:
# define loss function (criterion), optimizer, and learning rate scheduler # ..... # optionally resume from a checkpoint # ..... # else: # print("=> no checkpoint found at '{}'".format(args.resume))
再将原代码中的scheduler屏蔽:
... # train for one epoch train(train_loader, model, criterion, optimizer, epoch, device, args) # evaluate on validation set acc1 = validate(val_loader, model, criterion, args) # scheduler.step() ...
代码位置:main.py文件中的main_worker()。
原代码如下:
train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.workers, pin_memory=True, sampler=train_sampler) val_loader = torch.utils.data.DataLoader( val_dataset, batch_size=args.batch_size, shuffle=False, num_workers=args.workers, pin_memory=True, sampler=val_sampler)
修改后代码如下:
train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( datasets.ImageFolder(valdir, transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), normalize, ])), batch_size=args.batch_size, shuffle=True, num_workers=args.workers, pin_memory=False, drop_last=True)
val_loader = torch.utils.data.DataLoader( datasets.ImageFolder(valdir, transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), normalize, ])), batch_size=args.batch_size, shuffle=True, num_workers=args.workers, pin_memory=False, drop_last=True) model = model.to(loc) # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), args.lr, momentum=args.momentum, weight_decay=args.weight_decay) if args.amp: model, optimizer = amp.initialize(model, optimizer, opt_level=args.opt_level, loss_scale=args.loss_scale) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) # optionally resume from a checkpoint if args.resume: if os.path.isfile(args.resume): print("=> loading checkpoint '{}'".format(args.resume)) checkpoint = torch.load(args.resume, map_location=loc) args.start_epoch = checkpoint['epoch'] best_acc1 = checkpoint['best_acc1'] model.load_state_dict(checkpoint['state_dict']) optimizer.load_state_dict(checkpoint['optimizer']) if args.amp: amp.load_state_dict(checkpoint['amp']) print("=> loaded checkpoint '{}' (epoch {})" .format(args.resume, checkpoint['epoch'])) else: print("=> no checkpoint found at '{}'".format(args.resume)) cudnn.benchmark = True
原代码如下:
# remember best acc@1 and save checkpoint is_best = acc1 > best_acc1 best_acc1 = max(acc1, best_acc1) if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0): save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': model.state_dict(), 'best_acc1': best_acc1, 'optimizer' : optimizer.state_dict(), 'scheduler' : scheduler.state_dict() }, is_best)
# remember best acc@1 and save checkpoint is_best = acc1 > best_acc1 best_acc1 = max(acc1, best_acc1) if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0): if args.amp: save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': model.state_dict(), 'best_acc1': best_acc1, 'optimizer' : optimizer.state_dict(), 'amp': amp.state_dict(), }, is_best) else: save_checkpoint({ 'epoch': epoch + 1, 'arch': args.arch, 'state_dict': model.state_dict(), 'best_acc1': best_acc1, 'optimizer' : optimizer.state_dict(), }, is_best)
for i, (images, target) in enumerate(train_loader): # measure data loading time data_time.update(time.time() - end) # move data to the same device as model images = images.to(device, non_blocking=True) target = target.to(device, non_blocking=True)
for i, (images, target) in enumerate(train_loader): # measure data loading time data_time.update(time.time() - end) loc = 'npu:{}'.format(args.gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
optimizer.zero_grad() loss.backward() optimizer.step()
optimizer.zero_grad() if args.amp: with amp.scale_loss(loss, optimizer) as scaled_loss: scaled_loss.backward() else: loss.backward() optimizer.step()
with torch.no_grad(): end = time.time() for i, (images, target) in enumerate(val_loader): i = base_progress + i if args.gpu is not None and torch.cuda.is_available(): images = images.cuda(args.gpu, non_blocking=True) #if torch.backends.mps.is_available(): #images = images.to('mps') #target = target.to('mps') if torch.cuda.is_available(): target = target.cuda(args.gpu, non_blocking=True)
with torch.no_grad(): end = time.time() for i, (images, target) in enumerate(val_loader): loc = 'npu:{}'.format(args.gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)