多卡场景样例
模块和参数设置
引入所需的模块,设置供用户自定义的参数。
import argparse import os import time import torch import torch_npu import torch.nn.parallel import torch.multiprocessing as mp import torch.distributed as dist import torch.optim import torch.utils.data import torch.utils.data.distributed import torchvision.transforms as transforms import torchvision.datasets as datasets import torchvision.models as models from apex import amp # 导入amp模块 model_names = sorted(name for name in models.__dict__ if name.islower() and not name.startswith("__") and callable(models.__dict__[name])) def parse_args(): """ 用户自定义数据集路径、模型路径 """ parser = argparse.ArgumentParser(description='PyTorch ImageNet Inferring') parser.add_argument('--data', metavar='DIR', default="/data/imagenet", help='path to dataset') parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet50', choices=model_names, help='model architecture: ' + ' | '.join(model_names) + ' (default: resnet18)') parser.add_argument('--epochs', default=100, type=int, metavar='N', help='number of total epochs to run') parser.add_argument('-b', '--batch_size', default=512, type=int, metavar='N', help='mini-batch size (default: 256), this is the total ' 'batch size of all GPUs on the current node when ' 'using Data Parallel or Distributed Data Parallel') parser.add_argument('--resume', default='', type=str, metavar='PATH', help='path to latest checkpoint (default: none)') parser.add_argument('--pretrained', dest='pretrained', action='store_true', help='use pre-trained model') parser.add_argument('-j', '--workers', default=32, type=int, metavar='N', help='number of data loading workers (default: 32)') parser.add_argument('--lr', '--learning_rate', default=0.1, type=float, metavar='LR', help='initial learning rate', dest='lr') parser.add_argument('--wd', '--weight_decay', default=1e-4, type=float, metavar='W', help='weight decay (default: 1e-4)', dest='weight_decay') parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr') parser.add_argument('--device_list', default='0,1,2,3,4,5,6,7', type=str, help='device id list') parser.add_argument('--dist_backend', default='hccl', type=str, help='distributed backend') parser.add_argument('--world_size', default=1, type=int, help='number of nodes for distributed training') parser.add_argument('--rank', default=0, type=int, help='node rank for distributed training') parser.add_argument('--amp', default=False, action='store_true', help='use amp to train the model') args, unknown_args = parser.parse_known_args() if len(unknown_args) > 0: for bad_arg in unknown_args: print("ERROR: Unknown command line arg: %s" % bad_arg) raise ValueError("Invalid command line arg(s)") return args ......
主函数
设置主函数入口。
...... def main(): args = parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = '**' # **为端口号,请根据实际选择一个闲置端口填写 args.process_device_map = device_id_to_process_device_map(args.device_list) ngpus_per_node = len(args.process_device_map) args.world_size = ngpus_per_node * args.world_size mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) ......
创建模型
在main_worker中创建模型,设置device和优化器。
...... def main_worker(npu, ngpus_per_node, args): global best_acc1 args.npu = args.process_device_map[npu] args.rank = args.rank * ngpus_per_node + npu dist.init_process_group(backend=args.dist_backend, world_size=args.world_size, rank=args.rank) # ========================================================================= # 创建模型 # ========================================================================= print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch](zero_init_residual=True) # 指定推理设备为昇腾AI处理器 loc = 'npu:{}'.format(args.npu) torch_npu.npu.set_device(loc) # 计算用于推理的batch_size和workers args.batch_size = int(args.batch_size / ngpus_per_node) args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node) # 将模型数据复制到昇腾AI处理器中 model = model.to(loc) optimizer = torch.optim.SGD([ {'params': [param for name, param in model.named_parameters() if name[-4:] == 'bias'], 'weight_decay': 0.0}, {'params': [param for name, param in model.named_parameters() if name[-4:] != 'bias'], 'weight_decay': args.weight_decay}], args.lr) ......
使能混合精度
在main_worker中初始化混合精度模型,使用后可加速运算,但结果的准确率可能会轻微降低。可根据实际场景选择使用。
...... if args.amp: model, optimizer = amp.initialize(model, optimizer, opt_level="O2", loss_scale=1024, verbosity=1) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.npu]) ......
加载模型参数
在main_worker中从模型文件中恢复训练好的模型参数并加载。
...... if os.path.isfile(args.resume): print("=> loading checkpoint '{}'".format(args.resume)) checkpoint = torch.load(args.resume) best_acc1 = checkpoint['best_acc1'] best_acc1 = best_acc1.to("npu:{}".format(args.npu)) model.load_state_dict(checkpoint['state_dict']) print("=> loaded checkpoint '{}' (epoch {})".format(args.resume, checkpoint['epoch'])) else: print("=> no checkpoint found at '{}'".format(args.resume)) ......
初始化数据集
在main_worker中对图像数据进行加载与预处理。
...... valdir = os.path.join(args.data, 'val') normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) val_dataset = datasets.ImageFolder(valdir, transforms.Compose([ transforms.Resize(256), transforms.CenterCrop(224), transforms.ToTensor(), normalize, ])) val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, shuffle=False, drop_last=False) val_loader = torch.utils.data.DataLoader( val_dataset, batch_size=args.batch_size, num_workers=args.workers, pin_memory=False, sampler=val_sampler) ......
运行推理
在main_worker中运行推理。
...... validate(val_loader, model, args) ......
在线推理
在线推理的实现代码如下。
...... def validate(val_loader, model, args): batch_time = AverageMeter('Time', ':6.3f') top1 = AverageMeter('Acc@1', ':6.2f') top5 = AverageMeter('Acc@5', ':6.2f') progress = ProgressMeter( len(val_loader), [batch_time, top1, top5], prefix='Test: ') # ========================================================================= # 切换到推理模式 # ========================================================================= model.eval() # ========================================================================= # 在 torch.no_grad():分支下执行模型正向计算 # ========================================================================= with torch.no_grad(): end = time.time() for i, (images, target) in enumerate(val_loader): # 将图像数据置于NPU中 loc = 'npu:{}'.format(args.npu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False) # 计算输出 output = model(images) # 统计结果精度 acc1, acc5 = accuracy(output, target, topk=(1, 5)) top1.update(acc1[0], images.size(0)) top5.update(acc5[0], images.size(0)) # 测量运行时间 batch_time.update(time.time() - end) end = time.time() # 打印推理运算过程日志 progress.display(i) print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}'.format(top1=top1, top5=top5)) return top1.avg class AverageMeter(object): """计算并存储平均值和当前值""" def __init__(self, name, fmt=':f'): self.name = name self.fmt = fmt self.reset() self.start_count_index = 10 def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 def update(self, val, n=1): if self.count == 0: self.batchsize = n self.val = val self.count += n if self.count > (self.start_count_index * self.batchsize): self.sum += val * n self.avg = self.sum / (self.count - self.start_count_index * self.batchsize) def __str__(self): fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' return fmtstr.format(**self.__dict__) class ProgressMeter(object): """记录模型运算过程信息""" def __init__(self, num_batches, meters, prefix=""): self.batch_fmtstr = self._get_batch_fmtstr(num_batches) self.meters = meters self.prefix = prefix def display(self, batch): entries = [self.prefix + self.batch_fmtstr.format(batch)] entries += [str(meter) for meter in self.meters] print('\t'.join(entries)) def _get_batch_fmtstr(self, num_batches): num_digits = len(str(num_batches // 1)) fmt = '{:' + str(num_digits) + 'd}' return '[' + fmt + '/' + fmt.format(num_batches) + ']' def accuracy(output, target, topk=(1,)): """根据指定值k,计算k个顶部预测的精度""" with torch.no_grad(): maxk = max(topk) batch_size = target.size(0) _, pred = output.topk(maxk, 1, True, True) pred = pred.t() correct = pred.eq(target.view(1, -1).expand_as(pred)) res = [] for k in topk: correct_k = correct[:k].view(-1).float().sum(0, keepdim=True) res.append(correct_k.mul_(100.0 / batch_size)) return res def device_id_to_process_device_map(device_list): devices = device_list.split(",") devices = [int(x) for x in devices] devices.sort() process_device_map = dict() for process_id, device_id in enumerate(devices): process_device_map[process_id] = device_id return process_device_map if __name__ == '__main__': main()
推理完成
当出现推理结果精度的回显时,说明推理完成。样例回显截图如下。
父主题: 参考样例