单机多卡训练手动迁移

  1. main.py增加头文件,以支持基于PyTorch框架的模型在昇腾910 AI处理器上训练,以及进行混合精度训练。

    import torch
    import torch_npu
    from apex import amp

  2. 参数设置增加以下参数,包括指定参与训练的昇腾910 AI处理器以及进行混合精度训练需要的参数。

    parser.add_argument('--device', default='npu', type=str, help='npu or gpu')                        
    parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr')                       
    parser.add_argument('--device_list', default='0,1,2,3,4,5,6,7', type=str, help='device id list')
    parser.add_argument('--amp', default=False, action='store_true', help='use amp to train the model')                    
    parser.add_argument('--loss_scale', default=1024., type=float,
                        help='loss scale using in amp, default -1 means dynamic')
    parser.add_argument('--opt_level', default='O2', type=str,
                        help='loss scale using in amp, default -1 means dynamic')
    parser.add_argument('--dist_backend', default='hccl', type=str,
                        help='distributed backend')

  3. 创建由device_id到process_id的映射函数,指定device进行训练(请指定相邻的device,如1、2号卡或2、3号卡)。在main.py文件中增加以下接口:

    def device_id_to_process_device_map(device_list):
        devices = device_list.split(",")
        devices = [int(x) for x in devices]
        devices.sort()
    
        process_device_map = dict()
        for process_id, device_id in enumerate(devices):
            process_device_map[process_id] = device_id
    
        return process_device_map

  4. 指定训练服务器的ip和端口。

    代码位置:main.py文件中的主函数main()。

    添加代码如下:

    def main():
        args = parser.parse_args()
        os.environ['MASTER_ADDR'] = args.addr 
        os.environ['MASTER_PORT'] = '**'        # **为端口号,请根据实际选择一个闲置端口填写

  5. 创建由device_id到process_id的映射参数,获取单节点昇腾910 AI处理器数量。

    代码位置:main.py文件中的主函数main()。

    原代码如下:

    args.distributed = args.world_size > 1 or args.multiprocessing_distributed
    if torch.cuda.is_available():
        ngpus_per_node = torch.cuda.device_count()
    else:
        ngpus_per_node = 1

    修改后代码如下:

    args.distributed = args.world_size > 1 or args.multiprocessing_distributed
    args.process_device_map = device_id_to_process_device_map(args.device_list)
    if args.device == 'npu':
        ngpus_per_node = len(args.process_device_map)
    else:
        ngpus_per_node = torch.cuda.device_count()

  6. 获取进程process_id对应的昇腾910 AI处理器编号,指定在对应的昇腾910 AI处理器上进行训练。

    原代码如下:

    def main_worker(gpu, ngpus_per_node, args):   
        global best_acc1
        args.gpu = gpu
    修改后代码如下:
    def main_worker(gpu, ngpus_per_node, args):   
        global best_acc1
        args.gpu = args.process_device_map[gpu]

  7. 初始化进程组,屏蔽掉初始化方式。

    原代码如下:
          dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                   world_size=args.world_size, rank=args.rank)
    修改后代码如下:
          if args.device == 'npu':
              dist.init_process_group(backend=args.dist_backend, #init_method=args.dist_url,
                                   world_size=args.world_size, rank=args.rank)
          else:
              dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,         
                                  world_size=args.world_size, rank=args.rank)

  8. 要进行分布式训练且需要引入混合精度模块,并且需要将模型迁移到昇腾AI处理器上,因此需要屏蔽掉原始代码中判断是否为分布式训练以及模型是否在GPU上进行训练的代码部分。

    代码位置:main.py文件中的main_worker()。

    原代码如下:

        # create model
        if args.pretrained:
            print("=> using pre-trained model '{}'".format(args.arch))
            model = models.__dict__[args.arch](pretrained=True)
        else:
            print("=> creating model '{}'".format(args.arch))
            model = models.__dict__[args.arch]()
        if not torch.cuda.is_available():
            print('using CPU, this will be slow')
            ......
            else:
                model = torch.nn.DataParallel(model).cuda()

    修改后代码如下:

        # create model
        if args.pretrained:
            print("=> using pre-trained model '{}'".format(args.arch))
            model = models.__dict__[args.arch](pretrained=True)
        else:
            print("=> creating model '{}'".format(args.arch))
            model = models.__dict__[args.arch]()
        # 指定训练设备为昇腾AI处理器
        loc = 'npu:{}'.format(args.gpu)
        torch_npu.npu.set_device(loc)
        # 计算用于训练的batch_size和workers
        args.batch_size = int(args.batch_size / ngpus_per_node)
        args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)

  9. 屏蔽掉损失函数、优化器和断点续训部分,将这部分在后面与混合精度训练结合起来。

    代码位置:main.py文件中的main_worker()。

    需要屏蔽的原代码如下,已注释:

        # define loss function (criterion), optimizer, and learning rate scheduler
        #     .....
        # optionally resume from a checkpoint
        #     .....
        #     else:
        #         print("=> no checkpoint found at '{}'".format(args.resume))

    再将原代码中的scheduler屏蔽:

    ...
    # train for one epoch
    train(train_loader, model, criterion, optimizer, epoch, device, args)
    
    # evaluate on validation set
    acc1 = validate(val_loader, model, criterion, args)
    
    # scheduler.step()
    ...

  10. 数据加载器结合了数据集和取样器,并且可以提供多个线程处理数据集。使用昇腾AI处理器进行训练,需要将pin_memory设置为False;由于当前仅支持固定shape下的训练,数据流中剩余的样本数可能小于batch大小,因此需要将drop_last设置为True;另外需要将验证部分数据集shuffle设置为True。

    代码位置:main.py文件中的main_worker()。

    原代码如下:

        train_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
            num_workers=args.workers, pin_memory=True, sampler=train_sampler)
    
        val_loader = torch.utils.data.DataLoader(
            val_dataset, batch_size=args.batch_size, shuffle=False,
            num_workers=args.workers, pin_memory=True, sampler=val_sampler)

    修改后代码如下:

        train_loader = torch.utils.data.DataLoader(
            train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
            num_workers=args.workers, pin_memory=False, sampler=train_sampler, drop_last=True)
    
        val_loader = torch.utils.data.DataLoader(
            datasets.ImageFolder(valdir, transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                normalize,
            ])),
            batch_size=args.batch_size, shuffle=True,
            num_workers=args.workers, pin_memory=False, drop_last=True)

  11. 进行损失函数及优化器构建,将模型、损失函数迁移到昇腾AI处理器上;将优化器、模型与混合精度模块进行结合以支持混合精度训练;将断点训练部分与混合精度模块结合以支持混合精度训练。

    需要添加的代码如下:
        val_loader = torch.utils.data.DataLoader(
            datasets.ImageFolder(valdir, transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                normalize,
            ])),
            batch_size=args.batch_size, shuffle=True,
            num_workers=args.workers, pin_memory=False, drop_last=True)
    
        model = model.to(loc)
        # define loss function (criterion) and optimizer
        criterion = nn.CrossEntropyLoss().to(loc)
        optimizer = torch.optim.SGD(model.parameters(), args.lr,
                                    momentum=args.momentum,
                                    weight_decay=args.weight_decay)
    
        if args.amp:
            model, optimizer = amp.initialize(model, optimizer, opt_level=args.opt_level, loss_scale=args.loss_scale)
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    
        # optionally resume from a checkpoint
        if args.resume:
            if os.path.isfile(args.resume):
                print("=> loading checkpoint '{}'".format(args.resume))
                checkpoint = torch.load(args.resume, map_location=loc)
                args.start_epoch = checkpoint['epoch']
                best_acc1 = checkpoint['best_acc1']
                model.load_state_dict(checkpoint['state_dict'])
                optimizer.load_state_dict(checkpoint['optimizer'])
                if args.amp:
                    amp.load_state_dict(checkpoint['amp'])
                print("=> loaded checkpoint '{}' (epoch {})"
                      .format(args.resume, checkpoint['epoch']))
            else:
                print("=> no checkpoint found at '{}'".format(args.resume))
    
        cudnn.benchmark = True

  12. 断点checkpoint保存需要与混合精度训练结合。

    原代码如下:

    # remember best acc@1 and save checkpoint
    is_best = acc1 > best_acc1
    best_acc1 = max(acc1, best_acc1)
    
    if not args.multiprocessing_distributed or (args.multiprocessing_distributed
            and args.rank % ngpus_per_node == 0):
        save_checkpoint({
            'epoch': epoch + 1,
            'arch': args.arch,
            'state_dict': model.state_dict(),
            'best_acc1': best_acc1,
            'optimizer' : optimizer.state_dict(),
            'scheduler' : scheduler.state_dict()
        }, is_best)
    修改后代码如下:
            # remember best acc@1 and save checkpoint
            is_best = acc1 > best_acc1
            best_acc1 = max(acc1, best_acc1)
    
            if not args.multiprocessing_distributed or (args.multiprocessing_distributed
                    and args.rank % ngpus_per_node == 0):
                if args.amp:
                    save_checkpoint({
                        'epoch': epoch + 1,
                        'arch': args.arch,
                        'state_dict': model.state_dict(),
                        'best_acc1': best_acc1,
                        'optimizer' : optimizer.state_dict(),
                        'amp': amp.state_dict(),
                    }, is_best)
                else:
                    save_checkpoint({
                        'epoch': epoch + 1,
                        'arch': args.arch,
                        'state_dict': model.state_dict(),
                        'best_acc1': best_acc1,
                        'optimizer' : optimizer.state_dict(),
                    }, is_best)

  13. 训练时,需要将数据集迁移到昇腾AI处理器上。

    原代码如下:
        for i, (images, target) in enumerate(train_loader):
            # measure data loading time
            data_time.update(time.time() - end)
            # move data to the same device as model
            images = images.to(device, non_blocking=True)
            target = target.to(device, non_blocking=True)
    修改后代码如下:
        for i, (images, target) in enumerate(train_loader):
            # measure data loading time
            data_time.update(time.time() - end)
            loc = 'npu:{}'.format(args.gpu)
            target = target.to(torch.int32)
            images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)

  14. 标记反向传播.backward()发生的位置,这样混合精度模块就可以进行Loss Scaling并清除每次迭代的状态。

    原代码如下:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    修改后代码如下:
            optimizer.zero_grad()
            if args.amp:
                with amp.scale_loss(loss, optimizer) as scaled_loss:
                    scaled_loss.backward()
            else:
                loss.backward()
            optimizer.step()

  15. 验证时,需要将验证数据集迁移到昇腾AI处理器上。

    原代码如下:
        with torch.no_grad():
            end = time.time()
            for i, (images, target) in enumerate(val_loader):
                i = base_progress + i
                if args.gpu is not None and torch.cuda.is_available():
                    images = images.cuda(args.gpu, non_blocking=True)
                #if torch.backends.mps.is_available():
                        #images = images.to('mps')
                        #target = target.to('mps')
                if torch.cuda.is_available():
                    target = target.cuda(args.gpu, non_blocking=True)
    修改后代码如下:
        with torch.no_grad():
            end = time.time()
            for i, (images, target) in enumerate(val_loader):
                loc = 'npu:{}'.format(args.gpu)
                target = target.to(torch.int32)
                images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)

  16. 参考模型训练,拉起分布式训练进程。