文档
注册
评分
提单
论坛
小AI

拉起多卡分布式训练

在单机和多机场景下,有4种方式可拉起分布式训练,分别为shell脚本方式(推荐)、mp.spawn方式、Python方式、torchrun方式。其中torchrun方式仅在PyTorch 1.11.0及以上版本支持使用。以下内容以一个简单模型脚本为样例,展示前3种拉起方式分别需要对脚本代码进行的修改。torchrun方式的代码修改与shell脚本方式完全相同。

  1. 集合通信存在如下约束:
    • 数据并行模式中不同device上执行的图相同。
    • 针对Atlas 训练系列产品:allreduce和reduce_scatter仅支持int8、int32、float16和float32数据类型。
    • 针对Atlas A2 训练系列产品:allreduce和reduce_scatter仅支持int8, int32, float16, float32和bfp16数据类型。
    • 分布式训练场景下,HCCL会使用Host服务器的部分端口进行集群信息收集,需要操作系统预留该部分端口。默认情况下,HCCL使用60000-60015端口,若通过环境变量HCCL_IF_BASE_PORT指定了Host网卡起始端口,则需要预留以该端口起始的16个端口。

      操作系统端口号预留示例:

      sysctl -w net.ipv4.ip_local_reserved_ports=60000-60015
  2. 若用户准备进行2卡训练,可将8卡训练脚本进行改写,改为2卡训练脚本。可参见以下修改方法:
    1. 若8卡脚本的batchsize是单卡脚本的batchsize的8倍,则将8卡训练时的batch size和learning rate同时除以4,作为2卡训练时的batch size和learning rate。
    2. 如果使用for循环启动训练入口脚本,则将for循环的次数改为2次。
    3. world size或者rank size修改为2,并确保训练脚本中dist.init_process_group()中world_size参数为2。
    4. 如果有指定device list参数,且取值取值范围为0-7,则将其改为0-1。

构建简单模型

我们先构建一个简单的神经网络。

# 导入依赖和库
import torch
from torch import nn
import torch_npu
import torch.distributed as dist
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import time
import torch.multiprocessing as mp
import os

torch.manual_seed(0)
# 下载训练数据
training_data = datasets.FashionMNIST(
    root="./data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# 下载测试数据
test_data = datasets.FashionMNIST(
    root="./data",
    train=False,
    download=True,
    transform=ToTensor(),
)

# 构建模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

获取超参数

在主函数main中获取训练所需的超参数。

  • shell脚本/torchrun方式
    def main(world_size: int,  batch_size = 64, total_epochs = 5,):    # 用户可自行设置
        ngpus_per_node = world_size
        main_worker(args.gpu, ngpus_per_node, args)
  • mp.spawn方式
    def main(world_size: int,  batch_size = 64, total_epochs = 5,):    # 用户可自行设置
        ngpus_per_node = world_size
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))    # mp.spawn方式启动
  • Python方式
    def main(world_size: int,  batch_size, args):    # 使用Python拉起命令中设置的超参数
        ngpus_per_node = world_size
        args.gpu = args.local_rank    # 任务拉起后,local_rank自动获得device号
        main_worker(args.gpu, ngpus_per_node, args)

设置地址和端口号

由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要配置MASTER_ADDR、MASTER_PORT等参数。用户需根据自己实际情况配置。

  • shell脚本方式、mp.spawn拉起方式和torchrun方式的配置代码相同,如下所示:
    def ddp_setup(rank, world_size):
        """
        Args:
            rank: Unique identifier of each process
            world_size: Total number of processes
        """
        os.environ["MASTER_ADDR"] = "localhost"    # 用户需根据自己实际情况设置
        os.environ["MASTER_PORT"] = "***"    # 用户需根据自己实际情况设置
        dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
  • Python方式需要把配置参数的命令放到拉起训练中。脚本中代码如下所示:
    def ddp_setup(rank, world_size):
        """
        Args:
            rank: Unique identifier of each process
            world_size: Total number of processes
        """
        dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)

添加分布式逻辑

不同的拉起训练方式下,device号的获取方式不同:

  • shell脚本方式:在shell脚本中循环传入local_rank变量作为指定的device。
  • mp.spawn方式:mp.spawn多进程拉起main_worker后,第一个参数GPU自动获得device号(0 ~ ngpusper_node - 1)。
  • Python方式:任务拉起后,local_rank自动获得device号。

用户需根据自己选择的方式对代码做不同的修改。

  • shell脚本/torchrun方式
    def main_worker(gpu, ngpus_per_node, args):
    
        start_epoch = 0
        end_epoch = 5
        args.gpu = int(os.environ['LOCAL_RANK'])    # 在shell脚本中循环传入local_rank变量作为指定的device
        ddp_setup(args.gpu, args.world_size)
    
        torch_npu.npu.set_device(args.gpu)
        total_batch_size = args.batch_size
        total_workers = ngpus_per_node
    
        batch_size = int(total_batch_size / ngpus_per_node)    
        workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node)
    
        model = NeuralNetwork()
    
        device = torch.device("npu")
    
        train_sampler = torch.utils.data.distributed.DistributedSampler(training_data)
        test_sampler = torch.utils.data.distributed.DistributedSampler(test_data)
    
        train_loader = torch.utils.data.DataLoader(
            training_data, batch_size=batch_size, shuffle=(train_sampler is None),
            num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True)
    
        val_loader = torch.utils.data.DataLoader(
            test_data, batch_size=batch_size, shuffle=(test_sampler is None),
            num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True)
    
        loc = 'npu:{}'.format(args.gpu)
        model = model.to(loc)
        criterion = nn.CrossEntropyLoss().to(loc)
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    
        for epoch in range(start_epoch, end_epoch):
            print("curr epoch: ", epoch)
            train_sampler.set_epoch(epoch)
            train(train_loader, model, criterion, optimizer, epoch, args.gpu)
    
    
    def train(train_loader, model, criterion, optimizer, epoch, gpu):
        size = len(train_loader.dataset)
        model.train()
    
        end = time.time()
        for i, (images, target) in enumerate(train_loader):
            # measure data loading time
    
            loc = 'npu:{}'.format(gpu)
            target = target.to(torch.int32)        
            images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
    
            # compute output
            output = model(images)
            loss = criterion(output, target)
    
    
            # compute gradient and do SGD step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
            end = time.time()
            if i % 100 == 0:
                loss, current = loss.item(), i * len(target)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
  • mp.spawn方式
    不需要专门设置args.gpu,将shell脚本方式中main_worker里的args.gpu均替换为gpu。
    def main_worker(gpu, ngpus_per_node, args):
    
        start_epoch = 0
        end_epoch = 5
        ddp_setup(gpu, args.world_size)
    
        torch_npu.npu.set_device(gpu)
        total_batch_size = args.batch_size
        total_workers = ngpus_per_node
    
        batch_size = int(total_batch_size / ngpus_per_node)    
        workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node)
    
        model = NeuralNetwork()
    
        device = torch.device("npu")
    
        train_sampler = torch.utils.data.distributed.DistributedSampler(training_data)
        test_sampler = torch.utils.data.distributed.DistributedSampler(test_data)
    
        train_loader = torch.utils.data.DataLoader(
            training_data, batch_size=batch_size, shuffle=(train_sampler is None),
            num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True)
    
        val_loader = torch.utils.data.DataLoader(
            test_data, batch_size=batch_size, shuffle=(test_sampler is None),
            num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True)
    
        loc = 'npu:{}'.format(gpu)
        model = model.to(loc)
        criterion = nn.CrossEntropyLoss().to(loc)
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
    
        for epoch in range(start_epoch, end_epoch):
            print("curr epoch: ", epoch)
            train_sampler.set_epoch(epoch)
            train(train_loader, model, criterion, optimizer, epoch, gpu)
    ......    # train函数代码同shell脚本方式
  • Python方式
    def main_worker(gpu, ngpus_per_node, args):
    
        start_epoch = 0
        end_epoch = 5
        args.gpu = args.local_rank    # 任务拉起后,local_rank自动获得device号
    
        ddp_setup(args.gpu, args.world_size)
    ......    # 其余代码同shell脚本方式

设置参数

在模型脚本中,根据拉起方式不同,设置不同的参数。

  • shell脚本/torchrun方式
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
        parser.add_argument('--gpu', default=None, type=int,
                        help='GPU id to use.')
        args = parser.parse_args()
        world_size = torch.npu.device_count()
        args.world_size = world_size
        main(args.world_size, args.batch_size)
  • mp.spawn方式
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
        args = parser.parse_args()
        world_size = torch.npu.device_count()
        args.world_size = world_size
        main(args.world_size, args.batch_size)
  • Python方式
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
        parser.add_argument('--gpu', default=None, type=int,
                        help='GPU id to use.')
        parser.add_argument("--local_rank", default=-1, type=int)   # local_rank用于自动获取device号。使用mp.spawn方式与shell方式启动时需删除此项
        args = parser.parse_args()
        world_size = torch.npu.device_count()
        args.world_size = world_size
        main(args.world_size, args.batch_size, args)    # 需将Python拉起命令中设置的参数传入main函数

拉起训练

以下拉起训练的命令为示例,用户可根据实际情况自行更改。

  • shell脚本方式
    export HCCL_WHITELIST_DISABLE=1
    RANK_ID_START=0
    WORLD_SIZE=8
    for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++));
    do
        echo "Device ID: $RANK_ID"
        export LOCAL_RANK=$RANK_ID
        python3 ddp_test_shell.py &
    done
    wait 
  • mp.spawn方式
    export HCCL_WHITELIST_DISABLE=1
    python3 ddp_test_spwn.py
  • Python方式
    # master_addr和master_port参数需用户根据实际情况设置
    export HCCL_WHITELIST_DISABLE=1
    python3 -m torch.distributed.launch --nproc_per_node 8 --master_addr localhost  --master_port *** ddp_test.py
  • torchrun方式(PyTorch 1.11.0及以上版本支持)
    export HCCL_WHITELIST_DISABLE=1
    torchrun --standalone --nnodes=1 --nproc_per_node=8 ddp_test_shell.py

当屏幕打印类似下图中的Loss数值时,说明拉起训练成功。

搜索结果
找到“0”个结果

当前产品无相关内容

未找到相关内容,请尝试其他搜索词