多卡分布式训练拉起方式

有3种方式可拉起分布式训练,分别为mp.spawn方式、Python方式、shell脚本方式(推荐)。以下脚本中除备注内容为不同场景配置时需注意的要点外,其余代码均相同,着重说明不同方式启动时代码修改的重点。
  1. 导入依赖。
    import torch.nn.parallel
    import torch.distributed as dist
    import torch.multiprocessing as mp  #使用mp.spawn方式启动时配置
    • torch.nn.parallel用于调用模型并行接口。
    • torch.distributed用于调用初始化进程组接口。
    • torch.multiprocessing用于调用多个进程接口。
  2. 参数设置增加以下参数,包括指定参与训练的昇腾910 AI处理器需要的参数。

    若源码中已有该参数则不用添加。

    parser.add_argument("--local_rank", default=-1, type=int)   #使用mp.spawn方式与shell方式启动时需删除此项
    parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr')
    parser.add_argument('--port', default='**', type=str, help='master port')    # **为端口号,请根据实际选择一个闲置端口填写
    parser.add_argument('--world-size', default=1, type=int,
                        help='number of nodes for distributed training')
    parser.add_argument('--rank', default=0, type=int,
                        help='node rank for distributed training')
    parser.add_argument('--dist-url', default='env://', type=str,
                        help='url used to set up distributed training')
    parser.add_argument('--dist-backend', default='hccl', type=str,
                        help='distributed backend')
    parser.add_argument('--multiprocessing-distributed', action='store_true',
                        help='Use multi-processing distributed training to launch '
                             'N processes per node, which has N NPUs. This is the '
                             'fastest way to use PyTorch for either single node or '
                             'multi node data parallel training')
    • --local_rank用于自动获取device号。
    • --addr和--port用于多进程之间通信。
    • --multiprocessing-distributed用于判断是否使用分布式训练。
    • --world-size、--rank、--dist-url、--dist-backend为下面初始化进程组接口所需参数。
  3. 获取训练服务器可用device数、设置地址和端口号、拉起多进程(mp.spawn方式)。

    代码位置:main.py文件中的主函数main()(文件名以及函数名根据具体模型而定,下同)。

    由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要配置MASTER_ADDR、MASTER_PORT等参数。

    原代码如下:

    def main():
        args = parser.parse_args()
        ngpus_per_node = torch.cuda.device_count()
        main_worker(args.gpu, ngpus_per_node, args)
    • Python方式、shell脚本方式启动时,代码修改如下:
      def main():
          args = parser.parse_args()
          os.environ['MASTER_ADDR'] = args.addr 
          os.environ['MASTER_PORT'] = args.port
          ngpus_per_node = torch.npu.device_count()
          main_worker(args.gpu, ngpus_per_node, args)
    • mp.spawn方式启动时,代码修改如下:
      def main():
          args = parser.parse_args()
          os.environ['MASTER_ADDR'] = args.addr 
          os.environ['MASTER_PORT'] = args.port
          ngpus_per_node = torch.npu.device_count() 
          if args.multiprocessing_distributed:
              mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
          else:
              # Simply call main_worker function
              main_worker(args.gpu, ngpus_per_node, args)

      其中mp.spawn函数中第一个参数为模型主函数名称,根据具体模型具体修改。

  4. 添加分布式逻辑。
    1. 初始化进程组。

      代码位置:main.py文件中的函数main_worker()。

      原代码如下:

      def main_worker(gpu, ngpus_per_node, args):
          global best_acc1
          args.gpu = gpu
          if args.gpu is not None:
              print("Use GPU: {} for training".format(args.gpu))

      不同的拉起训练方式下,device号的获取方式不同:

      • Python方式:任务拉起后,local_rank自动获得device号。
      • mp.spawn方式:mp.spawn多进程拉起main_worker后,第一个参数GPU自动获得device号(0 ~ ngpusper_node - 1)。
      • shell脚本方式:在shell脚本中循环传入local_rank变量作为指定的device。

      用户需根据自己选择的方式对代码中args.gpu参数做不同的修改,然后添加具体代码以初始化进程组。修改如下:

      • Python方式
        args.gpu = args.local_rank
      • mp.spawn方式
        args.gpu = gpu 
      • shell脚本方式
        args.gpu = int(os.environ['LOCAL_RANK'])

      以shell脚本方式为例,修改、添加后完整代码如下:

      def main_worker(gpu, ngpus_per_node, args):
          global best_acc1
          args.gpu = int(os.environ['LOCAL_RANK']) #shell脚本方式
          if args.gpu is not None:
              print("Use NPU: {} for training".format(args.gpu))
          if args.multiprocessing_distributed:
              # For multiprocessing distributed training, rank needs to be the
              # global rank among all the processes
              args.rank = args.rank * ngpus_per_node + args.gpu
              args.world_size = ngpus_per_node * args.world_size
              args.batch_size = int(args.batch_size / ngpus_per_node)
              dist.init_process_group(backend=args.dist_backend, 
                                      init_method=args.dist_url,
                                      world_size=args.world_size, 
                                      rank=args.rank)

      在8P分布式情况下传入的batchsize一般为单P的8倍,所以需要对batchsize进行处理,以保证8P分布式每张卡的batchsize和单P保持一致;同样地,为了保证精度,8P分布式情况下传入的学习率也应该为单P时的8倍,但模型中不需要对学习率再做处理。

    2. 数据集切分和模型并行。

      数据加载器结合了数据集和取样器,并以提供多个线程处理数据集。由于当前仅支持固定shape下的训练,数据流中剩余的样本数可能小于batch大小,因此需要将drop_last设置为True;train_sampler存在时train_loader的shuffle参数不可为True,因此shuffle须设置为train_sampler is None。

      1. 找到代码位置:main.py文件中的main_worker()。

        原代码如下:

        train_loader = torch.utils.data.DataLoader(
        			train_dataset, 
        			batch_size=args.batch_size, 
        			num_workers=args.workers, 
        			pin_memory=True)

        修改后代码如下:

        train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) if args.multiprocessing_distributed else None
        train_loader = torch.utils.data.DataLoader(
        			train_dataset, 
        			batch_size=args.batch_size, 
        			num_workers=args.workers, 
        			pin_memory=True,
        			shuffle=(train_sampler is None),
        			sampler=train_sampler,
        			drop_last=True)
      2. 找到模型定义处。

        修改前:

        print("=> creating model '{}'".format(args.arch))
        model = models.__dict__[args.arch]()

        修改后:

        print("=> creating model '{}'".format(args.arch))
        model = models.__dict__[args.arch]()
        model = model.to('npu:{}'.format(args.gpu))
        if args.multiprocessing_distributed:
            model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    3. 设置当前的epoch,为了让不同的结点之间保持同步。

      代码位置:main.py文件中的main_worker()。

      原代码如下:

      for epoch in range(args.start_epoch, args.epochs):
          adjust_learning_rate(optimizer, epoch, args)

      修改后代码如下:

      for epoch in range(args.start_epoch, args.epochs):
          if args.multiprocessing_distributed:
              train_sampler.set_epoch(epoch)
          adjust_learning_rate(optimizer, epoch, args)
  5. 拉起训练。
    • Python方式启动,其余所需参数未列举。
      python3 -m torch.distributed.launch --nproc_per_node 8 main.py
    • mp.spawn方式启动。
      python3 main.py
      	--rank 0
              --world-size 1 
              --dist-url 'env://'
              --dist-backend 'hccl'
              --multiprocessing-distributed
    • shell脚本方式启动新建shell脚本。
      RANK_ID_START=0
      WORLD_SIZE=8
      for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++));
      do
      	echo "Device ID: $RANK_ID"
      	export LOCAL_RANK=$RANK_ID
      	
      	python3 main.py
      		--rank 0
              --world-size 1 
              --dist-url 'env://'
              --dist-backend 'hccl'
              --multiprocessing-distributed
      done
      wait