import torch.nn.parallel import torch.distributed as dist import torch.multiprocessing as mp #使用mp.spawn方式启动时配置
parser.add_argument("--local_rank", default=-1, type=int) #使用mp.spawn方式与shell方式启动时需删除此项 parser.add_argument('--addr', default='127.0.0.1', type=str, help='master addr') parser.add_argument('--port', default='**', type=str, help='master port') # **为端口号,请根据实际选择一个闲置端口填写 parser.add_argument('--world-size', default=1, type=int, help='number of nodes for distributed training') parser.add_argument('--rank', default=0, type=int, help='node rank for distributed training') parser.add_argument('--dist-url', default='env://', type=str, help='url used to set up distributed training') parser.add_argument('--dist-backend', default='hccl', type=str, help='distributed backend') parser.add_argument('--multiprocessing-distributed', action='store_true', help='Use multi-processing distributed training to launch ' 'N processes per node, which has N NPUs. This is the ' 'fastest way to use PyTorch for either single node or ' 'multi node data parallel training')
代码位置:main.py文件中的主函数main()(文件名以及函数名根据具体模型而定,下同)。
由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要配置MASTER_ADDR、MASTER_PORT等参数。
原代码如下:
def main(): args = parser.parse_args() ngpus_per_node = torch.cuda.device_count() main_worker(args.gpu, ngpus_per_node, args)
def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = args.port ngpus_per_node = torch.npu.device_count() main_worker(args.gpu, ngpus_per_node, args)
def main(): args = parser.parse_args() os.environ['MASTER_ADDR'] = args.addr os.environ['MASTER_PORT'] = args.port ngpus_per_node = torch.npu.device_count() if args.multiprocessing_distributed: mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) else: # Simply call main_worker function main_worker(args.gpu, ngpus_per_node, args)
其中mp.spawn函数中第一个参数为模型主函数名称,根据具体模型具体修改。
代码位置:main.py文件中的函数main_worker()。
原代码如下:
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = gpu if args.gpu is not None: print("Use GPU: {} for training".format(args.gpu))
不同的拉起训练方式下,device号的获取方式不同:
用户需根据自己选择的方式对代码中args.gpu参数做不同的修改,然后添加具体代码以初始化进程组。修改如下:
args.gpu = args.local_rank
args.gpu = gpu
args.gpu = int(os.environ['LOCAL_RANK'])
以shell脚本方式为例,修改、添加后完整代码如下:
def main_worker(gpu, ngpus_per_node, args): global best_acc1 args.gpu = int(os.environ['LOCAL_RANK']) #shell脚本方式 if args.gpu is not None: print("Use NPU: {} for training".format(args.gpu)) if args.multiprocessing_distributed: # For multiprocessing distributed training, rank needs to be the # global rank among all the processes args.rank = args.rank * ngpus_per_node + args.gpu args.world_size = ngpus_per_node * args.world_size args.batch_size = int(args.batch_size / ngpus_per_node) dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
在8P分布式情况下传入的batchsize一般为单P的8倍,所以需要对batchsize进行处理,以保证8P分布式每张卡的batchsize和单P保持一致;同样地,为了保证精度,8P分布式情况下传入的学习率也应该为单P时的8倍,但模型中不需要对学习率再做处理。
数据加载器结合了数据集和取样器,并以提供多个线程处理数据集。由于当前仅支持固定shape下的训练,数据流中剩余的样本数可能小于batch大小,因此需要将drop_last设置为True;train_sampler存在时train_loader的shuffle参数不可为True,因此shuffle须设置为train_sampler is None。
train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, num_workers=args.workers, pin_memory=True)
修改后代码如下:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) if args.multiprocessing_distributed else None train_loader = torch.utils.data.DataLoader( train_dataset, batch_size=args.batch_size, num_workers=args.workers, pin_memory=True, shuffle=(train_sampler is None), sampler=train_sampler, drop_last=True)
print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]()
修改后:
print("=> creating model '{}'".format(args.arch)) model = models.__dict__[args.arch]() model = model.to('npu:{}'.format(args.gpu)) if args.multiprocessing_distributed: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
代码位置:main.py文件中的main_worker()。
原代码如下:
for epoch in range(args.start_epoch, args.epochs): adjust_learning_rate(optimizer, epoch, args)
修改后代码如下:
for epoch in range(args.start_epoch, args.epochs): if args.multiprocessing_distributed: train_sampler.set_epoch(epoch) adjust_learning_rate(optimizer, epoch, args)
python3 -m torch.distributed.launch --nproc_per_node 8 main.py
python3 main.py --rank 0 --world-size 1 --dist-url 'env://' --dist-backend 'hccl' --multiprocessing-distributed
RANK_ID_START=0 WORLD_SIZE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 main.py --rank 0 --world-size 1 --dist-url 'env://' --dist-backend 'hccl' --multiprocessing-distributed done wait