下载
中文
注册

开发过程

简介

UDF python开发过程中进行自定义处理函数的编写。本小节介绍进行UDF的开发过程。该部分代码可以通过模板自动生成。

UDF的实现包括两部分:

  • 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
  • 2P场景下,仅支持一次输入对应一次输出。
  • 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。

UDF实现文件(通过UDF实现自定义功能)

用户需要在工程的“workspace/src_python/xxx.py”文件中进行用户自定义函数的开发。以“func_add.py”为例进行介绍。

函数分析:

该函数实现Add功能,把两个输入转换为numpy数组并进行加法运算,将运算结果进行输出。

明确输入和输出:

Add函数有两个输入,一个输出。

注意事项:

用户在python文件中定义的函数名称要和cpp文件中获取python模块的名称保持一致。

如用户在python文件中定义初始化函数为init_flow_func,在cpp的Init函数中查找python模块属性时应查找init_flow_func。详细实例可参照如下代码。

函数实现:

包括初始化函数和实现函数(可以有多个)。

用户在编写UDF代码时,通常需要导入dataflow.data_type、dataflow.flow_func模块

flow_func中开放了UDF编写的常用接口,data_type封装了常用的数据类型。具体模块内容用户可以导入后自行查看。

1
2
3
4
5
import dataflow.flow_func as ff
import dataflow.data_type as dt
# 定义类
class Add():
# 类名称和实际开发UDF功能相关
  • 初始化函数,执行初始化动作。示例如下。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
        @ff.init_wrapper() # 该描述是必须的wrapper中将C++的对象和python开放的对象进行了转换
        def init_flow_func(self, meta_params):  # 入参有且只能有meta_params类型为MetaParams
            name = meta_params.get_name()
            ff.logger.info("func name is %s", name)
            input_num= meta_params.get_input_num()
            ff.logger.info("input num %d", input_num)
            device_id = meta_params.get_running_device_id()
            ff.logger.info("device id %d", device_id)
            out_type = meta_params.get_attr_tensor_dtype("out_type")
            if out_type[0] != ff.FLOW_FUNC_SUCCESS:
                 ff.logger.error("get dtype failed")
                 return ff.FLOW_FUNC_FAILED
            self.count = 0
            return ff.FLOW_FUNC_SUCCESS
    
  • 实现函数。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @ff.proc_wrapper() # 该描述是必须的wrapper中将C++的对象和python开放的对象进行了转换
    def Add1(self, run_context, input_flow_msgs): # 入参有且只能有MetaRunContext类型对象及FlowMsg类型对象
        for msg in input_flow_msgs:
            ff.logger.error("msg code %d", msg.get_ret_code())
            if msg.get_ret_code() != ff.FLOW_FUNC_SUCCESS:
                ff.logger.error("invalid input")
                return ff.FLOW_FUNC_FAILED
        # add方法下应该只有两个输入
        if input_flow_msgs.__len__() != 2:
            ff.logger.error("invalid input")
            # 返回flow_func模块中定义的错误码
            return ff.FLOW_FUNC_ERR_PARAM_INVALID
        tensor1 = input_flow_msgs[0].get_tensor()
        tensor2 = input_flow_msgs[1].get_tensor()
    
        dtype1 = tensor1.get_data_type()
        dtype2 = tensor2.get_data_type()
        # 两个输入的datatype预期应该相同
        if dtype1 != dtype2:
            ff.logger.error("input data type is not same")
            return ff.FLOW_FUNC_FAILED
        ff.logger.info("element size %d", tensor1.get_data_size())
        ff.logger.event("data type is same")
        shape1 = tensor1.get_shape()
        shape2 = tensor2.get_shape()
        if shape1 != shape2:
            ff.logger.error("input data shape is not same")
            return ff.FLOW_FUNC_FAILED
        ff.logger.info("shape is same")
        # 申请输出的msg对象
        out = run_context.alloc_tensor_msg(shape1, dt.DT_INT32)
        data_size = out.get_tensor().get_data_size()
        ff.logger.error("data_size %d", data_size)
        ele_cnt = out.get_tensor().get_element_cnt()
        ff.logger.error("ele_cnt %d", ele_cnt)
        np1 = tensor1.numpy()
        np2 = tensor2.numpy()
        a = out.get_tensor().numpy()
        # 通过获取tensor,进一步获取tensor中的数组数据,进行加法运算
        a[...] = np1 + np2
        ff.logger.event("prepare to set output in add1")
        if run_context.set_output(0, out) != ff.FLOW_FUNC_SUCCESS:
            ff.logger.error("set output failed")
            return ff.FLOW_FUNC_FAILED
        self.count += 1
        return ff.FLOW_FUNC_SUCCESS
    
  • 如果用户需要定义多个功能,可以在同一个类中定义多个实现函数。
    如下示例为一个类中包括两个实现函数:Add1和Add2。
    1
    2
    3
    4
    5
    6
    7
    @ff.proc_wrapper()
    def Add1(self, run_context, input_flow_msgs): 
        xxxx
    
    @ff.proc_wrapper()
    def Add2(self, run_context, input_flow_msgs)
        xxxx
    

UDF实现文件(通过调用NN实现自定义功能)

用户需要在工程的“workspace/src_python/xxx.py”文件中进行用户自定义函数的开发。

函数分析:

该函数实现Add功能。

明确输入和输出:

包含两个输入,一个输出。

函数实现:

包括初始化函数和实现函数(可以有多个)。

用户在编写UDF代码时,通常需要导入dataflow.data_type、dataflow.flow_func模块

flow_func中开放了udf编写的常用接口,data_type封装了常用的数据类型。具体模块内容用户可以导入后自行查看。

1
2
3
4
5
import dataflow.flow_func as ff
import dataflow.data_type as dt
# 定义类
class Add():
# 类名称和实际开发UDF功能相关
  • 初始化函数,执行初始化动作。示例如下。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
        @ff.init_wrapper() # 该描述是必须的wrapper中将C++的对象和python开放的对象进行了转换
        def init_flow_func(self, meta_params):  # 入参有且只能有meta_params类型为MetaParams
            name = meta_params.get_name()
            ff.logger.info("func name is %s", name)
            input_num= meta_params.get_input_num()
            ff.logger.info("input num %d", input_num)
            device_id = meta_params.get_running_device_id()
            ff.logger.info("device id %d", device_id)
            out_type = meta_params.get_attr_tensor_dtype("out_type")
            if out_type[0] != ff.FLOW_FUNC_SUCCESS:
                 ff.logger.error("get dtype failed")
                 return ff.FLOW_FUNC_FAILED
            self.count = 0
            return ff.FLOW_FUNC_SUCCESS
    
  • Add():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
    示例如下:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    @ff.proc_wrapper() # 该描述是必须的wrapper中将C++的对象和python开放的对象进行了转换
    def AddNN(self, run_context, input_flow_msgs): # 入参有且只能有MetaRunContext类型对象及FlowMsg类型对象
        ret = run_context.run_flow_model("invoke_graph", input_flow_msgs, 1000)
        if ret[0] != ff.FLOW_FUNC_SUCCESS:
            ff.logger.error("run nn failed")
            return ff.FLOW_FUNC_FAILED
        ff.logger.event("run nn success")
        i = 0
        for out in ret[1]:
            if run_context.set_output(i, out) != ff.FLOW_FUNC_SUCCESS:
                ff.logger.error("set output failed")
                return ff.FLOW_FUNC_FAILED
            i = i + 1
        return ff.FLOW_FUNC_SUCCESS
    
  • 如果用户需要定义多个功能,可以在同一个类中定义多个实现函数。
    如下示例为一个类中包括两个实现函数:Add1和Add2。
    1
    2
    3
    4
    5
    6
    7
    @ff.proc_wrapper()
    def Add1(self, run_context, input_flow_msgs): 
        xxxx
    
    @ff.proc_wrapper()
    def Add2(self, run_context, input_flow_msgs)
        xxxx
    

UDF封装及注册

python开发UDF需要准备python文件外,还需要准备C++的接口调用文件。建议通过工具直接生成,也可以自定义,示例如下。

C++的类要继承MetaMultiFunc,C++文件中核心内容包括三部分

Init函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    int32_t Init(const std::shared_ptr<MetaParams> &params) override
    {
        FLOW_FUNC_LOG_DEBUG("Init enter");
        PyAcquire();
        ScopeGuard gilGuard([this]() { PyRelease(); });
        int32_t result = FLOW_FUNC_SUCCESS;
        try {
            PyRun_SimpleString("import sys");
            std::string append = std::string("sys.path.append('") + params->GetWorkPath() + "')";
            PyRun_SimpleString(append.c_str());

            constexpr const char *pyModuleName = "func_add"; // 与python udf的文件名保持一致,如上述例子中的func_add
            constexpr const char *pyClzName = "Add";
            FLOW_FUNC_LOG_INFO("Load py module name: %s", pyModuleName);
            auto module = py::module_::import(pyModuleName);
            FLOW_FUNC_LOG_INFO("%s.%s import success", pyModuleName, pyClzName);
            pyModule_ = module.attr(pyClzName)();
            if (CheckProcExists() != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_LOG_ERROR("%s.%s check proc exists failed", pyModuleName, pyClzName);
                return FLOW_FUNC_FAILED;
            }
            if (py::hasattr(pyModule_, "init_flow_func")) { // 与python udf中定义的初始化函数名称保持一致,如上述例子中的init_flow_func
                result = pyModule_.attr("init_flow_func")(params).cast<int32_t>();
                if (result != FLOW_FUNC_SUCCESS) {
                    FLOW_FUNC_LOG_ERROR("%s.%s init_flow_func result=%d", pyModuleName, pyClzName, result);
                    return result;
                }
                FLOW_FUNC_LOG_INFO("%s.%s init_flow_func success", pyModuleName, pyClzName);
            } else {
                FLOW_FUNC_LOG_INFO("%s.%s has no init_flow_func method, no need init", pyModuleName, pyClzName);
            }
        } catch (std::exception &e) {
            FLOW_FUNC_LOG_ERROR("init failed: %s", e.what());
            result = FLOW_FUNC_FAILED;
        }
        FLOW_FUNC_LOG_DEBUG("FlowFunc Init end.");
        return result;
    }

proc()函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    int32_t AddProc1(
        const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
    {
        FLOW_FUNC_LOG_INFO("enter");
        PyAcquire();
        ScopeGuard gilGuard([this]() { PyRelease(); });
        int32_t result = FLOW_FUNC_SUCCESS;
        try {
            result = pyModule_.attr("Add1")(runContext, inputFlowMsgs).cast<int32_t>(); // 此处attr的name与python udf定义的处理函数一致,如上述例子中的Add1 Add2或Sub1 AddNN
            if (result != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_LOG_ERROR(".Add Add return %d", result);
                return result;
            }
            FLOW_FUNC_LOG_INFO("call Add result: %d", result);
        } catch (std::exception &e) {
            FLOW_FUNC_LOG_ERROR("proc failed: %s", e.what());
            result = FLOW_FUNC_FAILED;
        }
        return result;
    }
// 如果用户在同一个UDF python类中编写了多个处理函数,这里可以通过多个C++函数调用python不同attr的方式,实现多FUNC
    int32_t AddProc2(
        const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)

通过FLOW_FUNC_REGISTRAR宏将实现类声明为func name,注册到UDF框架中。

1
2
3
4
FLOW_FUNC_REGISTRAR(Add)
    .RegProcFunc("Add1", &Add::AddProc1);
    .RegProcFunc("Add2", &Add::AddProc2);
}