下载
中文
注册
我要评分
文档获取效率
文档正确性
内容完整性
文档易理解
在线提单
论坛求助
昇腾小AI

拉起多卡训练脚本示例

构建模型脚本

# 导入依赖和库 
import torch 
from torch import nn 
import torch_npu 
import torch.distributed as dist 
from torch.utils.data import DataLoader 
from torchvision import datasets 
from torchvision.transforms import ToTensor 
import time 
import torch.multiprocessing as mp 
import os 
 
torch.manual_seed(0) 
# 下载训练数据 
training_data = datasets.FashionMNIST( 
    root="./data", 
    train=True, 
    download=True, 
    transform=ToTensor(), 
) 
 
# 下载测试数据 
test_data = datasets.FashionMNIST( 
    root="./data", 
    train=False, 
    download=True, 
    transform=ToTensor(), 
) 
 
# 构建模型 
class NeuralNetwork(nn.Module): 
    def __init__(self): 
        super().__init__() 
        self.flatten = nn.Flatten() 
        self.linear_relu_stack = nn.Sequential( 
            nn.Linear(28*28, 512), 
            nn.ReLU(), 
            nn.Linear(512, 512), 
            nn.ReLU(), 
            nn.Linear(512, 10) 
        ) 
 
    def forward(self, x): 
        x = self.flatten(x) 
        logits = self.linear_relu_stack(x) 
        return logits 
 
def test(dataloader, model, loss_fn): 
    size = len(dataloader.dataset) 
    num_batches = len(dataloader) 
    model.eval() 
    test_loss, correct = 0, 0 
    with torch.no_grad(): 
        for X, y in dataloader: 
            X, y = X.to(device), y.to(device) 
            pred = model(X) 
            test_loss += loss_fn(pred, y).item() 
            correct += (pred.argmax(1) == y).type(torch.float).sum().item() 
    test_loss /= num_batches 
    correct /= size 
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

获取分布式超参数

在模型脚本中,构建主函数main,在其中获取分布式训练所需的超参数。

  • shell脚本/torchrun/torch_npu_run方式
    def main(world_size: int,  batch_size = 64, total_epochs = 5,):    # 用户可自行设置
        ngpus_per_node = world_size
        main_worker(args.gpu, ngpus_per_node, args)
  • mp.spawn方式
    def main(world_size: int,  batch_size = 64, total_epochs = 5,):    # 用户可自行设置
        ngpus_per_node = world_size
        mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))    # mp.spawn方式启动
  • Python方式
    def main(world_size: int,  batch_size, args):    # 使用Python拉起命令中设置的超参数
        ngpus_per_node = world_size
        args.gpu = args.local_rank    # 任务拉起后,local_rank自动获得device号
        main_worker(args.gpu, ngpus_per_node, args)

设置地址和端口号

在模型脚本中设置地址与端口号,用于拉起分布式训练。由于昇腾AI处理器初始化进程组时initmethod只支持env:// (即环境变量初始化方式),所以在初始化前需要根据实际情况配置MASTER_ADDR、MASTER_PORT等参数。

  • shell脚本/mp.spawn/torchrun方式的配置代码相同,如下所示:
    def ddp_setup(rank, world_size):
        """
        Args:
            rank: Unique identifier of each process
            world_size: Total number of processes
        """
        os.environ["MASTER_ADDR"] = "localhost"    # 用户根据实际情况设置为rank0节点通信ip,单机时可设置为localhost
        os.environ["MASTER_PORT"] = "***"    # 用户根据实际情况设置为rank0节点的端口号
        dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)
  • torch_npu_run方式,使用时需切换通信方式,修改init_process_group接口的参数。

    修改前:init_method使用tcp方式

    dist.init_process_group(..., init_method="tcp://xx:**", ......, ) # xx为rank0节点通信ip,**为端口号,根据实际选择一个闲置端口填写

    修改后:init_method使用parallel方式

    dist.init_process_group(..., init_method="parallel://xx:**", ......, ) # xx为rank0节点通信ip,**为端口号,根据实际选择一个闲置端口填写
  • Python方式需要把配置参数的命令放到拉起训练中。脚本中代码如下所示:
    def ddp_setup(rank, world_size):
        """
        Args:
            rank: Unique identifier of each process
            world_size: Total number of processes
        """
        dist.init_process_group(backend="hccl", rank=rank, world_size=world_size)

添加分布式逻辑

不同的拉起训练方式下,device号的获取方式不同:

  • shell脚本/torchrun/torch_npu_run方式:在shell脚本中循环传入local_rank变量作为指定的device。
  • mp.spawn方式:mp.spawn多进程拉起main_worker后,第一个参数GPU自动获得device号(0 ~ ngpus_per_node - 1)。
  • Python方式:任务拉起后,local_rank自动获得device号。

用户需根据自己选择的方式对代码做不同的修改。下面是修改代码示例:

  • shell脚本/torchrun/torch_npu_run方式
    def main_worker(gpu, ngpus_per_node, args):
    
        start_epoch = 0
        end_epoch = 5
        args.gpu = int(os.environ['LOCAL_RANK'])    # 在shell脚本中循环传入local_rank变量作为指定的device
        ddp_setup(args.gpu, args.world_size)
    
        torch_npu.npu.set_device(args.gpu)
        total_batch_size = args.batch_size
        total_workers = ngpus_per_node
    
        batch_size = int(total_batch_size / ngpus_per_node)    
        workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node)
    
        model = NeuralNetwork()
    
        device = torch.device("npu")
    
        train_sampler = torch.utils.data.distributed.DistributedSampler(training_data)
        test_sampler = torch.utils.data.distributed.DistributedSampler(test_data)
    
        train_loader = torch.utils.data.DataLoader(
            training_data, batch_size=batch_size, shuffle=(train_sampler is None),
            num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True)
    
        val_loader = torch.utils.data.DataLoader(
            test_data, batch_size=batch_size, shuffle=(test_sampler is None),
            num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True)
    
        loc = 'npu:{}'.format(args.gpu)
        model = model.to(loc)
        criterion = nn.CrossEntropyLoss().to(loc)
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
    
        for epoch in range(start_epoch, end_epoch):
            print("curr epoch: ", epoch)
            train_sampler.set_epoch(epoch)
            train(train_loader, model, criterion, optimizer, epoch, args.gpu)
    
    
    def train(train_loader, model, criterion, optimizer, epoch, gpu):
        size = len(train_loader.dataset)
        model.train()
    
        end = time.time()
        for i, (images, target) in enumerate(train_loader):
            # measure data loading time
    
            loc = 'npu:{}'.format(gpu)
            target = target.to(torch.int32)        
            images, target = images.to(loc, non_blocking=False), target.to(loc, non_blocking=False)
    
            # compute output
            output = model(images)
            loss = criterion(output, target)
    
    
            # compute gradient and do SGD step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
            end = time.time()
            if i % 100 == 0:
                loss, current = loss.item(), i * len(target)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
  • mp.spawn方式
    不需要专门设置args.gpu,将shell脚本方式中main_worker里的args.gpu均替换为gpu。
    def main_worker(gpu, ngpus_per_node, args):
    
        start_epoch = 0
        end_epoch = 5
        ddp_setup(gpu, args.world_size)
    
        torch_npu.npu.set_device(gpu)
        total_batch_size = args.batch_size
        total_workers = ngpus_per_node
    
        batch_size = int(total_batch_size / ngpus_per_node)    
        workers = int((total_workers + ngpus_per_node - 1) / ngpus_per_node)
    
        model = NeuralNetwork()
    
        device = torch.device("npu")
    
        train_sampler = torch.utils.data.distributed.DistributedSampler(training_data)
        test_sampler = torch.utils.data.distributed.DistributedSampler(test_data)
    
        train_loader = torch.utils.data.DataLoader(
            training_data, batch_size=batch_size, shuffle=(train_sampler is None),
            num_workers=workers, pin_memory=False, sampler=train_sampler, drop_last=True)
    
        val_loader = torch.utils.data.DataLoader(
            test_data, batch_size=batch_size, shuffle=(test_sampler is None),
            num_workers=workers, pin_memory=False, sampler=test_sampler, drop_last=True)
    
        loc = 'npu:{}'.format(gpu)
        model = model.to(loc)
        criterion = nn.CrossEntropyLoss().to(loc)
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
    
        for epoch in range(start_epoch, end_epoch):
            print("curr epoch: ", epoch)
            train_sampler.set_epoch(epoch)
            train(train_loader, model, criterion, optimizer, epoch, gpu)
    ......    # train函数代码同shell脚本方式
  • Python方式
    def main_worker(gpu, ngpus_per_node, args):
    
        start_epoch = 0
        end_epoch = 5
        args.gpu = args.local_rank    # 任务拉起后,local_rank自动获得device号
    
        ddp_setup(args.gpu, args.world_size)
    ......    # 其余代码同shell脚本方式

配置传参逻辑

在模型脚本中,根据拉起方式不同,需要传入不同的参数,传参配置逻辑如下(此处使用argparse逻辑):

  • shell脚本/torchrun/torch_npu_run方式
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
        parser.add_argument('--gpu', default=None, type=int,
                        help='GPU id to use.')
        args = parser.parse_args()
        world_size = torch.npu.device_count()
        args.world_size = world_size
        main(args.world_size, args.batch_size)
  • mp.spawn方式
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
        args = parser.parse_args()
        world_size = torch.npu.device_count()
        args.world_size = world_size
        main(args.world_size, args.batch_size)
  • Python方式
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--batch_size', default=512, type=int, help='Input batch size on each device (default: 32)')
        parser.add_argument('--gpu', default=None, type=int,
                        help='GPU id to use.')
        parser.add_argument("--local_rank", default=-1, type=int)   # local_rank用于自动获取device号。使用mp.spawn方式与shell方式启动时需删除此项
        args = parser.parse_args()
        world_size = torch.npu.device_count()
        args.world_size = world_size
        main(args.world_size, args.batch_size, args)    # 需将Python拉起命令中设置的参数传入main函数

拉起训练

我们给出了每种方式的拉起命令示例,用户可根据实际情况自行更改。

  • shell脚本方式
    export HCCL_WHITELIST_DISABLE=1
    RANK_ID_START=0
    WORLD_SIZE=8
    for((RANK_ID=$RANK_ID_START;RANK_ID<$((WORLD_SIZE+RANK_ID_START));RANK_ID++));
    do
        echo "Device ID: $RANK_ID"
        export LOCAL_RANK=$RANK_ID
        python3 ddp_test_shell.py &
    done
    wait 
  • mp.spawn方式
    export HCCL_WHITELIST_DISABLE=1
    python3 ddp_test_spwn.py
  • Python方式
    # master_addr和master_port参数需用户根据实际情况设置
    export HCCL_WHITELIST_DISABLE=1
    python3 -m torch.distributed.launch --nproc_per_node 8 --master_addr localhost  --master_port *** ddp_test.py
  • torchrun方式
    export HCCL_WHITELIST_DISABLE=1
    torchrun --standalone --nnodes=1 --nproc_per_node=8 ddp_test_shell.py
  • torch_npu_run方式
    export HCCL_WHITELIST_DISABLE=1
    export MASTER_IP_ADDR=** # 将**填写node_rank0的IP地址
    export MASTER_PORT=** # 将**填写为一个空闲的tcp端口号
    torch_npu_run --rdzv_backend=parallel --master_addr=$MASTER_IP_ADDR \
    --master_port=$MASTER_PORT --nnodes=8 --nproc_per_node=8 ddp_test_shell.py

当屏幕打印/定向日志中出现模型加载、训练等正常运行日志时,说明拉起多卡训练成功。

搜索结果
找到“0”个结果

当前产品无相关内容

未找到相关内容,请尝试其他搜索词