下载
中文
注册

FuncProcessPoint

函数功能

FuncProcessPoint的构造函数,返回一个FuncProcessPoint对象。

函数原型

class FuncProcessPoint(compile_config_path: Optional[str] = None, name: Optional[str] = None,
py_func: Optional = None, workspace_dir: Optional = None)

参数说明

参数名称

数据类型

取值说明

compile_config_path

str

UDF的编译配置文件,与py_func互斥。传入py_func参数后compile_config_path不生效。

配置文件示例如下:

{"func_list":[{"func_name":"Add", "inputs_index":[1,0], "outputs_index":[0]}],"input_num":2,"output_num":1,"target_bin":"libadd.so","workspace":"./","cmakelist_path":"CMakeLists.txt","compiler": "./cpu_compile.json","running_resources_info":[{"type":"cpu","num":2},{"type":"memory","num":100}],"heavy_load":false}

name

str

处理点名称,框架会自动保证名称唯一,不设置时会自动生成FuncProcessPoint, FuncProcessPoint_1, FuncProcessPoint_2,...的名称。

py_func

class

用户开发的自定义python UDF类(需要使用proc_wrapper(func_list="ix:ox")注册输入输出),与compile_config_path互斥。传入py_func参数后compile_config_path不生效。

workspace_dir

str

自动生成的Python UDF临时工作空间目录,和py_func配合使用,

该字段为空时使用py_func的名字拼接“_ws”使用。

表1 FunctionPp的json配置文件

配置项

可选/必选

描述

workspace

必选

值为字符串,UDF的工作空间路径。

target_bin

必选

值为字符串,UDF工程编译出来的so名字,为防止被非法篡改,该字符串需要以lib***.so来命名,合法的字符包含大小写字母、数字、下划线和中划线。

input_num

必选

值为数字,表示UDF的输入个数,即FunctionPp的输入个数。

output_num

必选

值为数字,表示UDF的输出个数。即FunctionPp的输出个数。

func_list

必选

值为list,list的元素为单个function的描述,当前只支持一个function。

func_list.func_name

必选

值为字符串,函数名称,要和UDF里定义的function名称一致。多function场景下,func_name不允许重复。

func_list.inputs_index

可选

值为list,list元素为数字,表示该function取FunctionPp的哪些输入,单function情况下当前无效;多function情况下该字段必选。且多个处理函数input index不共享,不能重复。

func_list.outputs_index

可选

值为list,list元素为数字,表示该function对应FunctionPp的哪些输出,单function情况下当前无效。多function情况下output index可共享。

cmakelist_path

可选

值为字符串,源码编译的CMakeLists文件相对于workspace的路径,如果未指定,则取workspace下面的默认CMakeLists文件。

CMakeLists文件的详细信息请参考CMakeLists文件

compiler

可选

值为字符串,异构环境下编译源码的交叉编译工具路径配置文件,如果未指定,则取资源类型默认的编译工具。

compiler的json配置文件内容示例和各字段含义请参考•compiler的json配置文件

running_resources_info

可选

值为list,运行当前so需要的资源信息,list的元素为单个资源信息的描述。

running_resources_info.type

可选

当配置了running_resources_info时,该字段必选。

值为字符串,运行当前so需要的资源信息的类型,可选类型是"cpu"和"memory"。当资源类型是"memory"时,单位是M。

running_resources_info.num

可选

当配置了running_resources_info时,该字段必选。

值为数字,运行当前so需要的资源信息的数量。

heavy_load

可选

表示节点对算力的诉求。

  • true:重载,表示对算力的诉求大。
  • false:轻载,表示对算力的诉求小。

默认值为false。

当该参数取值为"true"时,在Atlas A2 训练系列产品/Atlas 800I A2 推理产品场景下会影响UDF的部署位置。

buf_cfg

可选

用户可以自定义配置内存池档位,通过自定义档位可以提升内存申请效率及减少内存碎片,如未设置该参数,将使用默认的档位配置初始化内存模块,默认档位配置及该参数样例见下文[buf_cfg的json配置格式],该配置最多支持64个档位,超过64编译报错。

  • compiler的json配置
    内容示例如下,各字段解释如所示。
    {"compiler":[{"resource_type":"X86","toolchain":"/usr/bin/g++"},{"resource_type":"Aarch","toolchain":"/usr/bin/g++"},{"resource_type":"Ascend","toolchain":"/usr/local/Ascend/hcc"}]}
    表2 compiler的json配置文件

    配置项

    可选/必选

    描述

    compiler

    必选

    值为list,list的元素为单个资源类型的编译工具的描述。

    compiler.resource_type

    必选

    值为字符串,设备支持的资源类型。

    compiler.toolchain

    必选

    值为字符串,该资源类型对应的编译工具路径。

  • buf_cfg的json配置
    内容示例如下,各字段解释如所示。
    "buf_cfg":[{"total_size":2097152,"blk_size":256,"max_buf_size":8192,"page_type":"normal"},        // 1.total:2M  max:8K
               {"total_size":10485760,"blk_size":4096,"max_buf_size":8388608,"page_type":"normal"},   // 2.total:10M  max:8M
               {"total_size":2097152,"blk_size":256,"max_buf_size":8192,"page_type":"huge"},          // 3.total:2M  max:8K     
               {"total_size":10485760,"blk_size":8192,"max_buf_size":8388608,"page_type":"huge"},     // 4.total:10M  max:8M
               {"total_size":69206016,"blk_size":8192,"max_buf_size":67108864,"page_type":"huge"}]    // 5.total:66M  max:64M
    说明:
      如上样例共配置了5个内存档位,前两条针对普通内存,后三条针对大页内存。
      使用该配置初始化内存管理模块后,如果进程申请8M大页内存,驱动会根据第4条配置项,生成并管理一个10M内存池,从其中申请8M内存。
      如本进程需要再次申请1M大页内存,由于第三条配置项中一次最大只能申请8K,因此仍然会落到第4条配置项对应的内存池中,此时上一次申请10M只使用了8M,剩余的内存仍大于1M,因此会在上一次生成的10M内存池中申请1M内存供本次使用。

    默认档位如下:

    ID

    total_size

    blk_size

    max_buf_size

    page_type

    0

    2M

    256B

    8K

    normal

    1

    32M

    8K

    8M

    normal

    3

    2M

    256B

    8K

    huge

    4

    66M

    8K

    64M

    huge

    表3 buf_cfg的json配置说明

    配置项

    可选/必选

    描述

    total_size

    必选

    当前档位内存池的大小,单位Byte

    约束:

    普通内存total_size是4K的倍数,大页内存total_size是2M的倍数,且total_size是blk_size的倍数

    blk_size

    必选

    当前档位一次可以申请的最小内存值,单位Byte

    约束:

    要求满足2^n,且在(0,2M]之间,小于max_buf_size

    max_buf_size

    必选

    当前档位一次可以申请的最大内存值,单位Byte

    约束:小于total_size

    page_type

    必选

    当前档位对应的内存类型,

    约束:

    有效值huge或normal,分别表示大页内存和普通内存

  • CMakeLists文件

    DataFlow UDF编译模块解析异构环境的resource.json资源配置和cpu_compiler配置,根据resource.json资源配置类型匹配选择cpu_compiler中指定的交叉编译工具,如果用户未指定cpu_compiler.json配置文件或者cpu_compiler.json未配置该类型的编译工具,则取环境上默认的编译工具进行编译。不同资源类型的编译工具名称和路径如下。${INSTALL_DIR}请替换为CANN软件安装后文件存储路径。若安装的Ascend-cann-toolkit软件包,以root安装举例,则安装后文件存储路径为:/usr/local/Ascend/ascend-toolkit/latest。

    • X86和Aarch场景下:g++
    • Ascend场景下:${INSTALL_DIR}/toolkit/toolchain/hcc/bin/aarch64-target-linux-gnu-g++
    用户的源代码工程要遵从如下规则:
    • 用户提供的源码工程目录下要包括所有执行代码和依赖库源码。
    • 用户要配置好FuntionPp执行代码和依赖库的编译脚本。
    • 编译脚本要使用RELEASE_DIR变量作为最终输出目录,如果有依赖的so文件,用户需要把依赖的so文件拷贝到该路径下。
    • 编译脚本要使用RESOURCE_TYPE变量判断资源类型,如果当前UDF不支持某一个资源类型,需要将对应的注释放开。
    • CMakeLists sample如下:
      cmake_minimum_required(VERSION 3.5)
      PROJECT(UDF)
      if ("x${RESOURCE_TYPE}" STREQUAL "xAscend")
        message(STATUS "ascend compiler enter")
        # if unsupport current resource type, please uncomment the next line.
        # message(FATAL_ERROR "Unsupport compile Ascend target!")
      elseif("x${RESOURCE_TYPE}" STREQUAL "xAarch")
        message(STATUS "aarch64 compiler enter")
        # if unsupport current resource type, please uncomment the next line.
        # message(FATAL_ERROR "Unsupport compile Aarch64 target!")
      else()
        message(STATUS "x86 compiler enter")
        # if unsupport current resource type, please uncomment the next line.
        # message(FATAL_ERROR "Unsupport compile X86 target!")
      endif()
      
      if(DEFINED ENV{ASCEND_INSTALL_PATH})
        set(ASCEND_INSTALL_PATH $ENV{ASCEND_INSTALL_PATH})
        message(STATUS "Read ASCEND_INSTALL_PATH from ENV: ${ASCEND_INSTALL_PATH}")
      else()
        set(ASCEND_INSTALL_PATH /usr/local/Ascend)
        message(STATUS "Default ASCEND_INSTALL_PATH=${ASCEND_INSTALL_PATH}, you can export ASCEND_INSTALL_PATH to set this environment")
      endif()
      
      # set dynamic library output path
      set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/${RELEASE_DIR})
      # set static library output path
      set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/${RELEASE_DIR})
      
      message(STATUS "CMAKE_LIBRARY_OUTPUT_DIRECTORY= ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}")
      
      set(INC_DIR "${ASCEND_INSTALL_PATH}/latest/x86_64-linux/include/flow_func")
      file(GLOB SRC_LIST "*.cpp")
      
      # Specify cross compiler
      add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0)
      
      # set c++ compiler
      set(CMAKE_CXX_COMPILER ${TOOLCHAIN})
      
      # =========================UDF so compile============================
      # check if SRC_LIST is exist
      if("x${SRC_LIST}" STREQUAL "x")
          message(UDF "=========no source file=============")
          add_custom_target(${UDF_TARGET_LIB}
              COMMAND echo "no source to make lib${UDF_TARGET_LIB}.so")
          return(0)
      endif()
      
      add_library(${UDF_TARGET_LIB} SHARED
        ${SRC_LIST}
      )
      
      target_include_directories(${UDF_TARGET_LIB} PRIVATE
        ${INC_DIR}
      )
      
      target_compile_options(${UDF_TARGET_LIB} PRIVATE
        -O2
        -std=c++11
        -ftrapv  
        -fstack-protector-all
        -fPIC
      )
      
      if ("x${RESOURCE_TYPE}" STREQUAL "xAscend")
        target_link_libraries(${UDF_TARGET_LIB} PRIVATE 
          -Wl,--whole-archive
          ${ASCEND_INSTALL_PATH}/latest/x86_64-linux/lib64/stub/aarch64/libflow_func.so
          -Wl,--no-whole-archive
        )
        # If there have any dependent so, please release the following comments and copy dependent so to ${PROJECT_BINARY_DIR}/${RELEASE_DIR}
        # [[execute_process(
          COMMAND cp ${ASCEND_INSTALL_PATH}/latest/x86_64-linux/lib64/stub/aarch64/libflow_func.so ${PROJECT_BINARY_DIR}/${RELEASE_DIR}
        )]]
      elseif("x${RESOURCE_TYPE}" STREQUAL "xAarch")
        target_link_libraries(${UDF_TARGET_LIB} PRIVATE 
          -Wl,--whole-archive
          ${ASCEND_INSTALL_PATH}/latest/x86_64-linux/lib64/stub/aarch64/libflow_func.so
          -Wl,--no-whole-archive
        )
        # If there have any dependent so, please release the following comments and copy dependent so to ${PROJECT_BINARY_DIR}/${RELEASE_DIR}
        # [[execute_process(
          COMMAND cp ${ASCEND_INSTALL_PATH}/latest/x86_64-linux/lib64/stub/aarch64/libflow_func.so ${PROJECT_BINARY_DIR}/${RELEASE_DIR}
        )]]
      else()
        target_link_libraries(${UDF_TARGET_LIB} PRIVATE 
          -Wl,--whole-archive
          ${ASCEND_INSTALL_PATH}/latest/x86_64-linux/lib64/stub/x86_64/libflow_func.so
          -Wl,--no-whole-archive
        )
        # If there have any dependent so, please release the following comments and copy dependent so to ${PROJECT_BINARY_DIR}/${RELEASE_DIR}
        # [[execute_process(
          COMMAND cp ${ASCEND_INSTALL_PATH}/latest/x86_64-linux/lib64/stub/x86_64/libflow_func.so ${PROJECT_BINARY_DIR}/${RELEASE_DIR}
        )]]
      endif()
      

返回值

正常场景下返回None。

返回“TypeError”表示参数类型不正确。

调用示例

import dataflow as df
pp = df.FuncProcessPoint(...)

约束说明