训练迭代循环下沉

概述

iterations_per_loop是针对一次session.run调用,在Device侧执行训练迭代的次数。Device侧会运行iterations_per_loop指定的迭代次数,然后再返回到Host侧,该参数可以减少Host与Device间的交互次数,缩短训练时长。

本节内容介绍通过迁移工具迁移后,如何使能训练迭代下沉。

使用约束

iterations_per_loop默认为1,配置该参数大于1即可使能此特性,使用该特性时需要注意:

Estimator模式修改

  1. 检查迁移后的脚本是否存在“init_resource”。
    • 如果存在,则需要参考下面示例进行修改;修改完后,执行下一步。
    • 如果不存在,则直接执行下一步。
    if __name__ == '__main__':
    
      session_config = tf.ConfigProto()
      custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add()
      custom_op.name = "NpuOptimizer"
      custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件
      custom_op.parameter_map["iterations_per_loop"].i = 10
    
      (npu_sess, npu_shutdown) = init_resource(config=session_config)
      tf.app.run()
      shutdown_resource(npu_sess, npu_shutdown)
      close_session(npu_sess)
  2. 在脚本中找到“npu_run_config_init”:
    session_config = tf.ConfigProto(allow_soft_placement=True)
    
    run_config = tf.estimator.RunConfig(
        train_distribute=distribution_strategy,
        session_config=session_config,
        save_checkpoints_secs=60*60*24)
    
    classifier = tf.estimator.Estimator(
        model_fn=model_function, model_dir=flags_obj.model_dir, config=npu_run_config_init(run_config=run_config))
  3. 配置“iterations_per_loop”:
    session_config = tf.ConfigProto(allow_soft_placement=True)
    custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add()
    custom_op.name = 'NpuOptimizer'
    custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件
    custom_op.parameter_map["iterations_per_loop"].i = 10
    
    run_config = tf.estimator.RunConfig(
        train_distribute=distribution_strategy,
        session_config=session_config,
        save_checkpoints_secs=60*60*24)
    
    classifier = tf.estimator.Estimator(
        model_fn=model_function, model_dir=flags_obj.model_dir, config=npu_run_config_init(run_config=run_config))
  4. 如果脚本中的运行配置函数,例如RunConfig中没有传入session_config参数,需要自行传入session_config:
    session_config = tf.ConfigProto()
    custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add()
    custom_op.name = 'NpuOptimizer'
    custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件
    custom_op.parameter_map["iterations_per_loop"].i = 10
    
    run_config = tf.estimator.RunConfig(
        train_distribute=distribution_strategy,
        session_config=session_config,
        save_checkpoints_secs=60*60*24)
    
    classifier = tf.estimator.Estimator(
        model_fn=model_function, model_dir=flags_obj.model_dir, config=npu_run_config_init(run_config=run_config))
  5. 增加“SetIterationsVarHook”:
    train_hooks = hooks_helper.get_train_hooks(
        flags_obj.hooks,
        model_dir=flags_obj.model_dir,
        batch_size=flags_obj.batch_size)
    train_hooks.append(SetIterationsVarHook(10))
  6. 在train_op中增加“IterationOp”:
    train_op = opt.apply_gradients( grad_var_list, global_step = global_step )
    train_op = tf.group(train_op, name="IterationOp")   #该name设置到梯度更新返回的op

session.run模式修改

session.run模式下,通过set_iteration_per_loop设置iterations_per_loop参数,并修改session.run调用次数为原调用次数除以iterations_per_loop。

  1. 检查迁移后的脚本是否存在“init_resource”。
    • 如果存在,则需要参考下面示例进行修改;修改完后,执行下一步。
    • 如果不存在,则直接执行下一步。
    if __name__ == '__main__':
    
      session_config = tf.ConfigProto()
      custom_op = session_config.graph_options.rewrite_options.custom_optimizers.add()
      custom_op.name = "NpuOptimizer"
      custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件
      custom_op.parameter_map["iterations_per_loop"].i = 10
    
      (npu_sess, npu_shutdown) = init_resource(config=session_config)
      tf.app.run()
      shutdown_resource(npu_sess, npu_shutdown)
      close_session(npu_sess)
  1. 在脚本中找到“npu_config_proto”,配置iterations_per_loop。
    from __future__ import print_function
    import input_data
    from npu_bridge.npu_init import *
     
    mnist = input_data.read_data_sets("/test/", one_hot=True)
     
    import tensorflow as tf
     
    # 设置模型
    # 学习率
    learning_rate = 0.01
    # 训练迭代次数
    training_epochs = 10
    # batch大小
    batch_size = 100
    # 每多少次迭代显示一次损失
    display_step = 1
     
    x = tf.placeholder(tf.float32, [None, 784])
    y = tf.placeholder(tf.float32, [None, 10])
     
    # 模型参数
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
     
    # 建立模型
    pred = tf.nn.softmax(tf.matmul(x, W) + b)
     
    # 定义损失函数:交叉熵
    cost = tf.reduce_mean(-tf.reduce_sum(y*tf.log(pred), reduction_indices=1))
     
    # 梯度下降
    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
     
    # 初始化所有变量
    init = tf.global_variables_initializer()
     
    config = tf.ConfigProto()
    custom_op =  config.graph_options.rewrite_options.custom_optimizers.add()
    custom_op.name =  "NpuOptimizer"
    custom_op.parameter_map["mix_compile_mode"].b = False  #关闭混合计算,根据实际情况配置,默认关闭
    custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件
    custom_op.parameter_map["iterations_per_loop"].i = 10 #此处设置的值和set_iteration_per_loop设置的iterations_per_loop值保持一致,用于判断是否进行训练迭代下沉
    config = npu_config_proto(config_proto=config)
     
    # 训练模型
    with tf.Session(config=config) as sess:
        sess.run(init)
        # sess.run模式下设置小循环次数为10
        train_op = util.set_iteration_per_loop(sess, optimizer, 10)
     
        for epoch in range(training_epochs):
            avg_cost = 0
            total_batch = int(mnist.train.num_examples / batch_size)
     
            for i in range(total_batch):
                batch_xs, batch_ys = mnist.train.next_batch(batch_size)
                _, c = sess.run([train_op, cost], feed_dict={x: batch_xs, y: batch_ys})
     
                avg_cost += c / total_batch

    由于上述接口中有改图的操作,如果图无法修改(例如冻结了图或者使用tf.train.Supervisor创建session等),则无法使用set_iteration_per_loop接口设置大小循环。此种情况下请使用create_iteration_per_loop_var和load_iteration_per_loop_var接口设置小循环次数,调用示例:

    from __future__ import print_function
    import input_data
    from npu_bridge.npu_init import *
     
    mnist = input_data.read_data_sets("/test/", one_hot=True)
     
    import tensorflow as tf
     
    # 设置模型
    # 学习率
    learning_rate = 0.01
    # 训练迭代次数
    training_epochs = 10
    # batch大小
    batch_size = 100
    # 每多少次迭代显示一次损失
    display_step = 1
     
    x = tf.placeholder(tf.float32, [None, 784])
    y = tf.placeholder(tf.float32, [None, 10])
     
    # 模型参数
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
     
    # 建立模型
    pred = tf.nn.softmax(tf.matmul(x, W) + b)
     
    # 定义损失函数:交叉熵
    cost = tf.reduce_mean(-tf.reduce_sum(y*tf.log(pred), reduction_indices=1))
     
    # 梯度下降
    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
     
    # 初始化所有变量
    init = tf.global_variables_initializer()
     
    config = tf.ConfigProto()
    custom_op =  config.graph_options.rewrite_options.custom_optimizers.add()
    custom_op.name =  "NpuOptimizer"
    custom_op.parameter_map["mix_compile_mode"].b = False  #关闭混合计算,根据实际情况配置,默认关闭
    custom_op.parameter_map["enable_data_pre_proc"].b = True # getnext算子下沉是迭代循环下沉的必要条件
    custom_op.parameter_map["iterations_per_loop"].i = 10 #此处设置的值和set_iteration_per_loop设置的iterations_per_loop值保持一致,用于功能校验
    config = npu_config_proto(config_proto=config)
     
    # 训练模型
    with tf.Session(config=config) as sess:
        sess.run(init)
        # sess.run模式下设置小循环次数为10
        iteration = util.IterationPerLoop() 
        train_op = iteration.create_iteration_per_loop_var(optimizer) #修改图
        tf.train.Supervisor(logdir="/home/xxxx",init_op=init)  #冻结图
        iteration.load_iteration_per_loop_var(sess, 10)  #设置小循环次数
    
        for epoch in range(training_epochs):
            avg_cost = 0
            total_batch = int(mnist.train.num_examples / batch_size)
     
            for i in range(total_batch):
                batch_xs, batch_ys = mnist.train.next_batch(batch_size)
                _, c = sess.run([train_op, cost], feed_dict={x: batch_xs, y: batch_ys})
     
                avg_cost += c / total_batch

检查iterations_per_loop生效

通过Host日志关键字Insert op success检查iterations_per_loop生效,如图1所示。

图1 日志信息(iterations_per_loop大于1)

对比iterations_per_loop为默认1时,相关日志信息,如图2所示。

图2 日志信息(iterations_per_loop等于1)