拉起多卡训练脚本示例
构建模型脚本
# 导入依赖和库 import torch from torch import nn import torch_npu import torch.distributed as dist from torch.utils.data import DataLoader from torchvision import datasets from torchvision.transforms import ToTensor import time import torch.multiprocessing as mp import os torch.manual_seed(0) # 下载训练数据 training_data = datasets.FashionMNIST( root="./data", train=True, download=True, transform=ToTensor(), ) # 下载测试数据 test_data = datasets.FashionMNIST( root="./data", train=False, download=True, transform=ToTensor(), ) # 构建模型 class NeuralNetwork(nn.Module): def __init__(self): super().__init__() self.flatten = nn.Flatten() self.linear_relu_stack = nn.Sequential( nn.Linear(28*28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10) ) def forward(self, x): x = self.flatten(x) logits = self.linear_relu_stack(x) return logits def test(dataloader, model, loss_fn): size = len(dataloader.dataset) num_batches = len(dataloader) model.eval() test_loss, correct = 0, 0 with torch.no_grad(): for X, y in dataloader: X, y = X.to(device), y.to(device) pred = model(X) test_loss += loss_fn(pred, y).item() correct += (pred.argmax(1) == y).type(torch.float).sum().item() test_loss /= num_batches correct /= size print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
获取分布式超参数
在模型脚本中,构建主函数main,在其中获取分布式训练所需的超参数。
- shell脚本/torchrun/torch_npu_run方式
def main(world_size: int, batch_size = 64, total_epochs = 5,): # 用户可自行设置 ngpus_per_node = world_size main_worker(args.gpu, ngpus_per_node, args)
- mp.spawn方式
def main(world_size: int, batch_size = 64, total_epochs = 5,): # 用户可自行设置 ngpus_per_node = world_size mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) # mp.spawn方式启动
- Python方式
def main(world_size: int, batch_size, args): # 使用Python拉起命令中设置的超参数 ngpus_per_node = world_size args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号 main_worker(args.gpu, ngpus_per_node, args)
设置地址和端口号
在模型脚本中设置地址与端口号,用于拉起分布式训练。由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要根据实际情况配置MASTER_ADDR、MASTER_PORT等参数。
- shell脚本/mp.spawn/torchrun方式的配置代码相同,如下所示:
def ddp_setup(rank, world_size): """ Args: rank: Unique identifier of each process world_size: Total number of processes """ os.environ["MASTER_ADDR"] = "localhost" # 用户根据实际情况设置为rank0节点通信ip,单机时可设置为localhost os.environ["MASTER_PORT"] = "***" # 用户根据实际情况设置为rank0节点的端口号 dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
- torch_npu_run方式,使用时需切换通信方式,修改init_process_group接口的参数。
dist.init_process_group(..., init_method="tcp://xx:**", ......, ) # xx为rank0节点通信ip,**为端口号,根据实际选择一个闲置端口填写
修改后:init_method使用parallel方式
dist.init_process_group(..., init_method="parallel://xx:**", ......, ) # xx为rank0节点通信ip,**为端口号,根据实际选择一个闲置端口填写
- Python方式需要把配置参数的命令放到拉起训练中。脚本中代码如下所示:
def ddp_setup(rank, world_size): """ Args: rank: Unique identifier of each process world_size: Total number of processes """ dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
添加分布式逻辑
不同的拉起训练方式下,device号的获取方式不同:
- shell脚本/torchrun/torch_npu_run方式:在shell脚本中循环传入local_rank变量作为指定的device。
- mp.spawn方式:mp.spawn多进程拉起main_worker后,第一个参数GPU自动获得device号(0 ~ ngpus_per_node - 1)。
- Python方式:任务拉起后,local_rank自动获得device号。
用户需根据自己选择的方式对代码做不同的修改。下面是修改代码示例:
- shell脚本/torchrun/torch_npu_run方式
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 args.gpu = int(os.environ['LOCAL_RANK']) # 在shell脚本中循环传入local_rank变量作为指定的device ddp_setup(args.gpu, args.world_size) torch_npu.npu.set_device(args.gpu) total_batch_size = args.batch_size total_workers = ngpus_per_node batch_size = int(total_batch_size / ngpus_per_node) workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node) model = NeuralNetwork() device = torch.device("npu") train_sampler = torch.utils.data.distributed.DistributedSampler(training_data) test_sampler = torch.utils.data.distributed.DistributedSampler(test_data) train_loader = torch.utils.data.DataLoader( training_data, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( test_data, batch_size=batch_size, shuffle=(test_sampler is None), num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True) loc = 'npu:{}'.format(args.gpu) model = model.to(loc) criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) for epoch in range(start_epoch, end_epoch): print("curr epoch: ", epoch) train_sampler.set_epoch(epoch) train(train_loader, model, criterion, optimizer, epoch, args.gpu) def train(train_loader, model, criterion, optimizer, epoch, gpu): size = len(train_loader.dataset) model.train() end = time.time() for i, (images, target) in enumerate(train_loader): # measure data loading time loc = 'npu:{}'.format(gpu) target = target.to(torch.int32) images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False) # compute output output = model(images) loss = criterion(output, target) # compute gradient and do SGD step optimizer.zero_grad() loss.backward() optimizer.step() end = time.time() if i % 100 == 0: loss, current = loss.item(), i * len(target) print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
- mp.spawn方式
不需要专门设置args.gpu,将shell脚本方式中main_worker里的args.gpu均替换为gpu。
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 ddp_setup(gpu, args.world_size) torch_npu.npu.set_device(gpu) total_batch_size = args.batch_size total_workers = ngpus_per_node batch_size = int(total_batch_size / ngpus_per_node) workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node) model = NeuralNetwork() device = torch.device("npu") train_sampler = torch.utils.data.distributed.DistributedSampler(training_data) test_sampler = torch.utils.data.distributed.DistributedSampler(test_data) train_loader = torch.utils.data.DataLoader( training_data, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True) val_loader = torch.utils.data.DataLoader( test_data, batch_size=batch_size, shuffle=(test_sampler is None), num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True) loc = 'npu:{}'.format(gpu) model = model.to(loc) criterion = nn.CrossEntropyLoss().to(loc) optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) for epoch in range(start_epoch, end_epoch): print("curr epoch: ", epoch) train_sampler.set_epoch(epoch) train(train_loader, model, criterion, optimizer, epoch, gpu) ...... # train函数代码同shell脚本方式
- Python方式
def main_worker(gpu, ngpus_per_node, args): start_epoch = 0 end_epoch = 5 args.gpu = args.local_rank # 任务拉起后,local_rank自动获得device号 ddp_setup(args.gpu, args.world_size) ...... # 其余代码同shell脚本方式
配置传参逻辑
在模型脚本中,根据拉起方式不同,需要传入不同的参数,传参配置逻辑如下(此处使用argparse逻辑):
- shell脚本/torchrun/torch_npu_run方式
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size)
- mp.spawn方式
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size)
- Python方式
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='simple distributed training job') parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)') parser.add_argument('--gpu', default=None, type=int, help='GPU id to use.') parser.add_argument("--local_rank", default=-1, type=int) # local_rank用于自动获取device号。使用mp.spawn方式与shell方式启动时需删除此项 args = parser.parse_args() world_size = torch.npu.device_count() args.world_size = world_size main(args.world_size, args.batch_size, args) # 需将Python拉起命令中设置的参数传入main函数
拉起训练
我们给出了每种方式的拉起命令示例,用户可根据实际情况自行更改。
- shell脚本方式
export HCCL_WHITELIST_DISABLE=1 RANK_ID_START=0 WORLD_SIZE=8 for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++)); do echo "Device ID: $RANK_ID" export LOCAL_RANK=$RANK_ID python3 ddp_test_shell.py & done wait
- mp.spawn方式
export HCCL_WHITELIST_DISABLE=1 python3 ddp_test_spwn.py
- Python方式
# master_addr和master_port参数需用户根据实际情况设置 export HCCL_WHITELIST_DISABLE=1 python3 -m torch.distributed.launch --nproc_per_node 8 --master_addr localhost --master_port *** ddp_test.py
- torchrun方式
export HCCL_WHITELIST_DISABLE=1 torchrun --standalone --nnodes=1 --nproc_per_node=8 ddp_test_shell.py
- torch_npu_run方式
export HCCL_WHITELIST_DISABLE=1 export MASTER_IP_ADDR=** # 将**填写node_rank0的IP地址 export MASTER_PORT=** # 将**填写为一个空闲的tcp端口号 torch_npu_run --rdzv_backend=parallel --master_addr=$MASTER_IP_ADDR \ --master_port=$MASTER_PORT --nnodes=8 --nproc_per_node=8 ddp_test_shell.py
当屏幕打印/定向日志中出现模型加载、训练等正常运行日志时,说明拉起多卡训练成功。
父主题: 附录