下载
中文
注册

支持数据并行(Allreduce)

AllReduce是主流的数据并行架构,各个节点按照算法协同工作,适用于对训练算力要求高、设备规模大的场景。本节介绍如何将TensorFlow训练脚本在昇腾AI处理器上通过AllReduce架构进行分布式训练。

Allreduce实现原理

大规模AI训练集群中,通常采用数据并行的方式完成训练。数据并行即每个设备使用相同的模型、不同的训练样本,每个device计算得到的梯度数据需要聚合之后进行参数更新。
图1 数据并行方式训练的示意图

如果按照梯度聚合方式进行分类,数据并行的主流实现有PS-Worker架构AllReduce架构两种。在AllReduce架构中,每个参与训练的Device形成一个环,没有中心节点来聚合所有计算梯度。AllReduce算法将参与训练的Device放置在一个逻辑环路(logical ring)中。每个Device从上行的Device接收数据,并向下行的Device发送数据,可充分利用每个Device的上下行带宽。

AllReduce架构是为了解决了PS-Worker架构无法线性扩展问题而提出的改良架构。各个节点按照算法协同工作,算法的目标是减少传输数据量,并充分利用硬件通信带宽。一般适合训练算力要求高、设备规模大的场景。AllReduce架构的实现原理如下图所示。

图2 Allreduce模式

以Ring算法为例介绍allreduce模式(称为Ring-allreduce),如图2所示,在Ring-allreduce架构下,每个设备都是worker,并且形成一个环,不需要中心节点来聚合所有worker计算的梯度。在一个迭代过程,每个worker完成一份mini-batch样本数据的前向计算、反向计算,得到梯度数据,然后使用Ring-allreduce算法完成梯度数据的同步。Ring-allreduce算法包括ReduceScatter和AllGather两部分,梯度数据分多个步骤传递给环中的下一个worker,同时它也多次接收上一个worker的梯度数据。对于一个包含N个worker的环,每个worker需要从其它worker接收2*(N-1)次梯度数据(每次接收1/N的数据),并向其他节点发送2*(N-1)次梯度数据(每次发送1/N的数据)。

使用的接口

在TensorFlow中,一般使用tf.distribute.Strategy进行分布式训练,具体请参考https://www.tensorflow.org/guide/distributed_training。而昇腾AI处理器暂不支持上述分布式策略,TF Adapter提供了分布式接口npu_distributed_optimizer_wrapper,对传入的optimizer梯度函数添加NPU的allreduce操作,最终返回输入的优化器,从而支持单机多卡、多机多卡等组网形式下,各个Device之间计算梯度后执行梯度聚合操作。用户调用该函数后,在生成的训练图中,梯度计算和更新算子之间插入了allreduce算子节点。

图3 使用的接口

因此,对于原始TensorFlow训练脚本,需要经过修改后,才可在昇腾AI处理器上支持分布式训练。

数据集切分

分布式训练时,用户可以使用TensorFlow接口进行数据集切分。如果数据集切分时需要获取处理器资源信息,用户可以通过集合通信接口get_rank_size获取昇腾AI处理器数量,通过get_rank_id获取处理器id,例如:

1
  dataset = dataset.shard(get_rank_size(),get_rank_id())

Estimator模式下脚本迁移

  1. TensorFlow会将策略对象传递到Estimator的Runconfig中,但是TF Adapter暂不支持这种方式,用户需要将相关代码删除。例如:
    迁移前:
    1
    2
    3
    4
    5
    6
    mirrored_strategy = tf.distribute.MirroredStrategy()
    config = tf.estimator.RunConfig(
      train_distribute=mirrored_strategy, 
      eval_distribute=mirrored_strategy,
      session_config=session_config,
      save_checkpoints_secs=60*60*24)
    

    迁移后:

    1
    2
    3
    config = tf.estimator.NPURunConfig(
      session_config=session_config,
      save_checkpoints_secs=60*60*24)
    
  1. 然后调用npu_distributed_optimizer_wrapper,对传入的optimizer梯度函数添加NPU的allreduce操作,最终返回输入的优化器,从而在昇腾AI处理器上实现分布式计算。具体方法为:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    def cnn_model_fn(features,labels,mode):    
      #搭建网络   
      xxx    
      #计算loss
      xxx    
    
      #Configure the TrainingOp(for TRAIN mode)    
      if mode == tf.estimator.ModeKeys.TRAIN:      
        optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) # 使用SGD优化器
        optimizer = npu_distributed_optimizer_wrapper(optimizer) # 使用NPU分布式计算,更新梯度
        train_op=optimizer.minimize(loss=loss,global_step=tf.train.get_global_step()) # 最小化loss
        return tf.estimator.EstimatorSpec(mode=mode,loss=loss,train_op=train_op)
    
    • NPUDistributedOptimizer分布式优化器在当前版本依然兼容。
    • Estimator模式下,使用npu_distributed_optimizer_wrapper实现allreduce功能时,由于NPUEstimator中自动添加了NPUBroadcastGlobalVariablesHook,因此无需手写实现broadcast功能。

    如果原始脚本使用TensorFlow接口计算梯度,例如grads = tf.gradients(loss, tvars),需要在计算完梯度之后,调用npu_allreduce接口对梯度进行allreduce。

    迁移前:

    1
    grads = tf.gradients(a + b, [a, b], stop_gradients=[a, b])
    

    迁移后:

    1
    grads = npu_allreduce(tf.gradients(a + b, [a, b], stop_gradients=[a, b]))
    

sess.run模式下脚本迁移

Estimator模式下,使用npu_distributed_optimizer_wrapper实现allreduce功能时,由于NPUEstimator中自动添加了NPUBroadcastGlobalVariablesHook,因此无需手写实现broadcast功能。但sess.run模式的训练脚本还需要用户手写实现broadcast功能。具体方法为:

  1. 在变量初始化之后,训练之前,通过集合通信接口broadcast进行变量广播,关于broadcast接口的详细介绍请参见HCCL API(Python)
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from npu_bridge.npu_init import *
    
    def broadcast_global_variables(root_rank, index):
      """Broadcasts all global variables from root rank to all other processes.
      Arguments:
      root_rank: rank of the process from which global variables will be broadcasted
      to all other processes. 
      index: rank_id
      """
      op_list = []
      for var in tf.trainable_variables():
        # the input and out tensor of HCOMBroadcast interface are list
        if "float" in var.dtype.name:
          inputs = [var]
          outputs=hccl_ops.broadcast(tensor=inputs,root_rank=root_rank)
        if outputs is not None:
          op_list.append(outputs[0].op)
          op_list.append(tf.assign(var, outputs[0]))
    
      return tf.group(op_list)
    
    ...
    bcast_op = broadcast_global_variables(root_rank, index)
    sess = tf.Session()
    ...
    sess.run(bcast_op)
    

    此外,broadcast接口中有改图的操作,如果图无法修改(例如冻结了图或者使用tf.train.Supervisor创建session等),则需要先取消图冻结:

    1
    2
    3
    with sv.managed_session() as sess:
      sess.graph._unsafe_unfinalize() # 取消冻结的Graph
      sess.run(bcast_op)
    
  2. 执行训练时,在使用梯度优化器计算完各Device数据后,直接调用npu_distributed_optimizer_wrapper进行梯度数据聚合:
    1
    2
    3
    from npu_bridge.npu_init import *
    optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) # 使用SGD优化器
    distributedOptimizer=npu_distributed_optimizer_wrapper(optimizer) # 使用NPU分布式计算,更新梯度
    

    NPUDistributedOptimizer分布式优化器在当前版本依然兼容。

    如果原始脚本使用TensorFlow接口计算梯度,例如grads = tf.gradients(loss, tvars),需要在计算完梯度之后,调用npu_allreduce接口对梯度进行allreduce。

    迁移前:

    1
    grads = tf.gradients(a + b, [a, b], stop_gradients=[a, b])
    

    迁移后:

    1
    grads = npu_allreduce(tf.gradients(a + b, [a, b], stop_gradients=[a, b]))
    

Keras模式下脚本迁移

如需在Keras模式下进行分布式训练,需要在Keras模型编译时修改optimizer,调用npu_distributed_optimizer_wrapper,对传入的optimizer梯度函数添加NPU的allreduce操作;并且在keras_model.fit的callbacks参数中增加NPUBroadcastGlobalVariablesCallback。

迁移前:

1
2
3
4
5
6
7
8
from npu_bridge.npu_init import *

data = xxx
labels = xxx

opt = tf.keras.optimizers.Adam(learning_rate=0.001)
keras_model.compile(optimizer=opt,loss='sparse_categorical_crossentropy')
keras_model.fit(data, labels, epochs=10, batch_size=32) 
迁移后:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from npu_bridge.npu_init import *

data = xxx
labels = xxx

opt = tf.keras.optimizers.Adam(learning_rate=0.001)
opt = npu_distributed_optimizer_wrapper(opt)           # allreduce
keras_model.compile(optimizer=opt,loss='sparse_categorical_crossentropy')
callbacks = [NPUBroadcastGlobalVariablesCallback(0)]  # 变量进行广播
keras_model.fit(data, labels, epochs=10, batch_size=32, callbacks=callbacks) 

KerasDistributeOptimizer分布式优化器在当前版本依然兼容。