下载
中文
注册

数据预处理

数据预处理流程与原始模型一致,部分位置经改造以适配昇腾AI处理器并提升计算性能,展示的示例代码包含改动位置。

定义输入函数input_fn

这里以imagenet数据集数据预处理为例,其数据预处理部分涉及到的适配昇腾AI处理器改造的py文件及相关函数接口介绍如下:

表1 数据预处理API

接口

简介

位置

input_fn()

输入函数,用于处理数据集用于Estimator训练,输出真实数据。

“official/r1/resnet/imagenet_main.py”

resnet_main()

包含数据输入、运行配置、训练以及验证的主接口。

“official/r1/resnet/resnet_run_loop.py”

  1. “official/r1/resnet/imagenet_main.py”文件中增加以下头文件:
    1
    2
    from hccl.manage.api import get_rank_size
    from hccl.manage.api import get_rank_id
    
  2. 在数据读取时,获取昇腾AI处理器数量以及昇腾AI处理器 id,用于支持数据并行。

    代码位置:“official/r1/resnet/imagenet_main.py”的input_fn()函数(修改部分为字体加粗部分):

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    def input_fn(is_training, data_dir, batch_size, num_epochs=1, dtype=tf.float32,
                  datasets_num_private_threads=None, parse_record_fn=parse_record,
                  input_context=None, drop_remainder=False, tf_data_experimental_slack=False):
         """提供训练和验证batches的函数。
         参数解释:
           is_training: 表示输入是否用于训练的布尔值。
           data_dir: 包含输入数据集的文件路径。
           batch_size: 每个batch的大小。
           num_epochs: 数据集的重复数。
           dtype: 图片/特征的数据类型。
           datasets_num_private_threads: tf.data的专用线程数。
           parse_record_fn: 解析tfrecords的入口函数。
           input_context: 由'tf.distribute.Strategy'传入的'tf.distribute.InputContext'对象。
           drop_remainder: 用于标示对于最后一个batch如果数据量达不到batch_size时保留还是抛弃。设置为True,则batch的维度固定。
           tf_data_experimental_slack: 是否启用tf.data的'experimental_slack'选项。
    
         Returns:
           返回一个可用于迭代的数据集。
         """
         # 获取文件路径
         filenames = get_filenames(is_training, data_dir)
         # 按第一个维度切分文件
         dataset = tf.data.Dataset.from_tensor_slices(filenames)
         if input_context:
             # 获取昇腾AI处理器数量以及id,用于支持数据并行
             ############## npu modify begin #############
             dataset = dataset.shard(get_rank_size(),get_rank_id())
             ############## npu modify end ###############
    
             # tf.compat.v1.logging.info(
             #     'Sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d' % (
             #         input_context.input_pipeline_id, input_context.num_input_pipelines))
             # dataset = dataset.shard(input_context.num_input_pipelines,
             #                         input_context.input_pipeline_id)
    
         if is_training:
             # 将文件顺序打乱
             dataset = dataset.shuffle(buffer_size=_NUM_TRAIN_FILES)
    
         # cycle_length = 10 并行读取并反序列化10个文件,CPU资源充足的场景下可适当增加该值。
         dataset = dataset.interleave(
             tf.data.TFRecordDataset,
             cycle_length=10,
             num_parallel_calls=tf.data.experimental.AUTOTUNE)
    
         return resnet_run_loop.process_record_dataset(
             dataset=dataset,
             is_training=is_training,
             batch_size=batch_size,
             shuffle_buffer=_SHUFFLE_BUFFER,
             parse_record_fn=parse_record_fn,
             num_epochs=num_epochs,
             dtype=dtype,
             datasets_num_private_threads=datasets_num_private_threads,
             drop_remainder=drop_remainder,
             tf_data_experimental_slack=tf_data_experimental_slack,
         )
    
  3. 用于训练和测试的输入函数接口中,drop_remainder必须设置为True。
    代码位置:“/official/r1/resnet/resnet_run_loop.py”中的resnet_main()函数(修改部分为input_fn_train()和input_fn_eval()子函数):
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
      def input_fn_train(num_epochs, input_context=None):
        ############## npu modify begin #############
        # 使用dtype=tf.float16提高数据传输性能。
        # 当前版本的drop_remainder只支持为True。
        # 这里的batch_size指的是单卡的batch大小而不是全局batch大小。
        return input_function(
            is_training=True,
            data_dir=flags_obj.data_dir,
            batch_size=flags_obj.batch_size,
            num_epochs=num_epochs,
            dtype=tf.float16,
            input_context=input_context,
            drop_remainder=True)
    
      def input_fn_eval():
        # 使用dtype=tf.float16提高数据传输性能
        # 当前版本的drop_remainder只支持为True
        # 这里的batch_size指的是单卡的batch大小而不是全局batch大小
         return input_function(
             is_training=False,
             data_dir=flags_obj.data_dir,
             batch_size=flags_obj.batch_size,
             num_epochs=1,
             dtype=tf.float16,
             input_context=True,
             drop_remainder=True)
      ############## npu modify end ###############
    
      # 原代码中用于训练的输入函数接口和用于验证的输入函数接口。
      # def input_fn_train(num_epochs, input_context=None):
      #     return input_function(
      #         is_training=True,
      #         data_dir=flags_obj.data_dir,
      #         batch_size=distribution_utils.per_replica_batch_size(
      #             flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
      #         num_epochs=num_epochs,
      #         dtype=flags_core.get_tf_dtype(flags_obj),
      #         datasets_num_private_threads=flags_obj.datasets_num_private_threads,
      #         input_context=input_context)
      #
      # def input_fn_eval():
      #     return input_function(
      #         is_training=False,
      #         data_dir=flags_obj.data_dir,
      #         batch_size=distribution_utils.per_replica_batch_size(
      #             flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
      #         num_epochs=1,
      #         dtype=flags_core.get_tf_dtype(flags_obj))