下载
中文
注册

Horovod脚本迁移

Horovod是基于TensorFlow、Keras、PyTorch以及MXNet的分布式训练框架,目的是提升分布式训练的性能。不同于传统的TensorFlow分布式训练采用PS-Worker架构,Horovod使用Allreduce进行聚合梯度,能够更好地利用带宽,解决PS-Worker的瓶颈问题。本节介绍如何迁移基于Horovod开发的分布式训练脚本,使其在昇腾AI处理器进行分布式训练。

关于Horovod的介绍,可参见Horovod官网。

Horovod原始代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)

迁移后的代码:

# 导入NPU库
import tensorflow as tf
from npu_bridge.npu_init import *

# 本示例调用了HCCL的group管理接口,因此需要另起session进行HCCL初始化,更多介绍请参考集合通信初始化
npu_int = npu_ops.initialize_system()
npu_shutdown = npu_ops.shutdown_system()
config = tf.ConfigProto(allow_soft_placement=True)
# 添加名字为“NpuOptimizer”的NPU优化器,网络编译时,NPU只会遍历“NpuOptimizer”下的session配置custom_op =  config.graph_options.rewrite_options.custom_optimizers.add()
custom_op.name =  "NpuOptimizer"
# 必须显式关闭TensorFlow的remapping、memory_optimization功能,避免与NPU中的功能冲突。
config.graph_options.rewrite_options.remapping = RewriterConfig.OFF
config.graph_options.rewrite_options.memory_optimization = RewriterConfig.OFF  
init_sess = tf.Session(config=config)
init_sess.run(npu_int)

# Pin GPU to be used to process local rank (one GPU per process)
config.gpu_options.visible_device_list = str(get_local_rank_id())  # "hvd.local_rank"修改为"get_local_rank_id"

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * get_rank_size())   # "hvd.size"修改为"get_rank_size"

# NPU allreduce
# 将"hvd.DistributedOptimizer"修改为"npu_distributed_optimizer_wrapper"
opt = npu_distributed_optimizer_wrapper(opt)   
# Add hook to broadcast variables from rank 0 to all other processes during initialization.
hooks = [NPUBroadcastGlobalVariablesHook(0)]

# 在session run模式下调用集合通信接口broadcast进行变量广播:
input = tf.trainable_variables()
bcast_global_variables_op = hccl_ops.broadcast(input, 0)

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if get_rank_id() == 0 else None  # "hvd.rank"修改为"get_rank_id"

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  # 变量广播
  mon_sess.run(bcast_global_variables_op)  
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op) 
  
# 训练结束后执行shutdown_system,同时关闭session
init_sess.run(npu_shutdown)
init_sess.close()

NPUDistributedOptimizer分布式优化器在当前版本依然兼容。