拉起多卡分布式训练
在单机和多机场景下,有4种方式可拉起分布式训练,分别为shell脚本方式(推荐)、mp.spawn方式、Python方式、torchrun方式。其中torchrun方式仅在PyTorch 1.11.0及以上版本支持使用。以下内容以一个简单模型脚本为样例,展示前3种拉起方式分别需要对脚本代码进行的修改。torchrun方式的代码修改与shell脚本方式完全相同。
- 集合通信存在如下约束:
- 数据并行模式中不同device上执行的图相同。
- 针对Atlas 训练系列产品:allreduce和reduce_scatter仅支持int8、int32、float16和float32数据类型。
- 针对Atlas A2 训练系列产品:allreduce和reduce_scatter仅支持int8, int32, float16, float32和bfp16数据类型。
- 分布式训练场景下,HCCL会使用Host服务器的部分端口进行集群信息收集,需要操作系统预留该部分端口。默认情况下,HCCL使用60000-60015端口,若通过环境变量HCCL_IF_BASE_PORT指定了Host网卡起始端口,则需要预留以该端口起始的16个端口。
sysctl -w net.ipv4.ip_local_reserved_ports=60000-60015
- 若用户准备进行2卡训练,可将8卡训练脚本进行改写,改为2卡训练脚本。可参见以下修改方法:
- 若8卡脚本的batchsize是单卡脚本的batchsize的8倍,则将8卡训练时的batch size和learning rate同时除以4,作为2卡训练时的batch size和learning rate。
- 如果使用for循环启动训练入口脚本,则将for循环的次数改为2次。
- world size或者rank size修改为2,并确保训练脚本中dist.init_process_group()中world_size参数为2。
- 如果有指定device list参数,且取值取值范围为0-7,则将其改为0-1。
构建简单模型
我们先构建一个简单的神经网络。
# 导入依赖和库 import torch from torch import nn import torch_npu import torch.distributed as dist from torch.utils.data import DataLoader from torchvision import datasets from torchvision.transforms import ToTensor import time import torch.multiprocessing as mp import os torch.manual_seed(0) # 下载训练数据 training_data = datasets.FashionMNIST( root="./data", train=True, download=True, transform=ToTensor(), ) # 下载测试数据 test_data = datasets.FashionMNIST( root="./data", train=False, download=True, transform=ToTensor(), ) # 构建模型 class NeuralNetwork(nn.Module): def __init__(self): super().__init__() self.flatten = nn.Flatten() self.linear_relu_stack = nn.Sequential( nn.Linear(28*28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10) ) def forward(self, x): x = self.flatten(x) logits = self.linear_relu_stack(x) return logits def test(dataloader, model, loss_fn): size = len(dataloader.dataset) num_batches = len(dataloader) model.eval() test_loss, correct = 0, 0 with torch.no_grad(): for X, y in dataloader: X, y = X.to(device), y.to(device) pred = model(X) test_loss += loss_fn(pred, y).item() correct += (pred.argmax(1) == y).type(torch.float).sum().item() test_loss /= num_batches correct /= size print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
获取超参数
在主函数main中获取训练所需的超参数。
- shell脚本/torchrun方式
def main(world_size: int, batch_size = 64, total_epochs = 5,): # 用户可自行设置 ngpus_per_node = world_size main_worker(args.gpu, ngpus_per_node, args)
- mp.spawn方式
def main(world_size: int, batch_size = 64, total_epochs = 5,): # 用户可自行设置 ngpus_per_node = world_size mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) # mp.spawn方式启动
- Python方式
def main(world_size: int, batch_size, args): # 使用Python拉起命令中设置的超参数 ngpus_per_node = world_size args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号 main_worker(args.gpu, ngpus_per_node, args)
设置地址和端口号
由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要配置MASTER_ADDR、MASTER_PORT等参数。用户需根据自己实际情况配置。
- shell脚本方式、mp.spawn拉起方式和torchrun方式的配置代码相同,如下所示:
def ddp_setup(rank, world_size): """ Args: rank: Unique identifier of each process world_size: Total number of processes """ os.environ["MASTER_ADDR"] = "localhost" # 用户需根据自己实际情况设置 os.environ["MASTER_PORT"] = "***" # 用户需根据自己实际情况设置 dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
- Python方式需要把配置参数的命令放到拉起训练中。脚本中代码如下所示:
def ddp_setup(rank, world_size): """ Args: rank: Unique identifier of each process world_size: Total number of processes """ dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
添加分布式逻辑
不同的拉起训练方式下,device号的获取方式不同:
- shell脚本方式:在shell脚本中循环传入local_rank变量作为指定的device。
- mp.spawn方式:mp.spawn多进程拉起main_worker后,第一个参数GPU自动获得device号(0 ~ ngpusper_node - 1)。
- Python方式:任务拉起后,local_rank自动获得device号。
用户需根据自己选择的方式对代码做不同的修改。
- shell脚本/torchrun方式
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 args.gpu = int(os.environ['LOCAL_RANK']) # 在shell脚本中循环传入local_rank变量作为指定的device ddp_setup(args.gpu, args.world_size) torch_npu.npu.set_device(args.gpu) total_batch_size = args.batch_size total_workers = ngpus_per_node batch_size = int(total_batch_size / ngpus_per_node) workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node) model = NeuralNetwork() device = torch.device("npu") train_sampler = torch.utils.data.distributed.DistributedSampler(training_data) test_sampler = torch.utils.data.distributed.DistributedSampler(test_data) train_loader = torch.utils.data.DataLoader( training_data, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( test_data, batch_size=batch_size, shuffle=(test_sampler is None), num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True) loc = 'npu:{}'.format(args.gpu) model = model.to(loc) criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) for epoch in range(start_epoch, end_epoch): print("curr epoch: ", epoch) train_sampler.set_epoch(epoch) train(train_loader, model, criterion, optimizer, epoch, args.gpu) def train(train_loader, model, criterion, optimizer, epoch, gpu): size = len(train_loader.dataset) model.train() end = time.time() for i, (images, target) in enumerate(train_loader): # measure data loading time loc = 'npu:{}'.format(gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False) # compute output output = model(images) loss = criterion(output, target) # compute gradient and do SGD step optimizer.zero_grad() loss.backward() optimizer.step() end = time.time() if i % 100 == 0: loss, current = loss.item(), i * len(target) print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
- mp.spawn方式
不需要专门设置args.gpu,将shell脚本方式中main_worker里的args.gpu均替换为gpu。
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 ddp_setup(gpu, args.world_size) torch_npu.npu.set_device(gpu) total_batch_size = args.batch_size total_workers = ngpus_per_node batch_size = int(total_batch_size / ngpus_per_node) workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node) model = NeuralNetwork() device = torch.device("npu") train_sampler = torch.utils.data.distributed.DistributedSampler(training_data) test_sampler = torch.utils.data.distributed.DistributedSampler(test_data) train_loader = torch.utils.data.DataLoader( training_data, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( test_data, batch_size=batch_size, shuffle=(test_sampler is None), num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True) loc = 'npu:{}'.format(gpu) model = model.to(loc) criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) for epoch in range(start_epoch, end_epoch): print("curr epoch: ", epoch) train_sampler.set_epoch(epoch) train(train_loader, model, criterion, optimizer, epoch, gpu) ...... # train函数代码同shell脚本方式
- Python方式
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号 ddp_setup(args.gpu, args.world_size) ...... # 其余代码同shell脚本方式
设置参数
在模型脚本中,根据拉起方式不同,设置不同的参数。
- shell脚本/torchrun方式
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size)
- mp.spawn方式
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size)
- Python方式
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') parser.add_argument("--local_rank", default=-1, type=int) # local_rank用于自动获取device号。使用mp.spawn方式与shell方式启动时需删除此项 args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size, args) # 需将Python拉起命令中设置的参数传入main函数
拉起训练
以下拉起训练的命令为示例,用户可根据实际情况自行更改。
- shell脚本方式
export HCCL_WHITELIST_DISABLE=1 RANK_ID_START=0 WORLD_SIZE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 ddp_test_shell.py & done wait
- mp.spawn方式
export HCCL_WHITELIST_DISABLE=1 python3 ddp_test_spwn.py
- Python方式
# master_addr和master_port参数需用户根据实际情况设置 export HCCL_WHITELIST_DISABLE=1 python3 -m torch.distributed.launch --nproc_per_node 8 --master_addr localhost --master_port *** ddp_test.py
- torchrun方式(PyTorch 1.11.0及以上版本支持)
export HCCL_WHITELIST_DISABLE=1 torchrun --standalone --nnodes=1 --nproc_per_node=8 ddp_test_shell.py
当屏幕打印类似下图中的Loss数值时,说明拉起训练成功。
父主题: 多卡分布式训练