开发过程
简介
UDF python开发过程中进行自定义处理函数的编写。本小节介绍进行UDF的开发过程。该部分代码可以通过模板自动生成。
UDF的实现包括两部分:
- UDF实现文件:用户自定义函数功能的代码实现,文件后缀为.py。按场景分为UDF实现文件(通过UDF实现自定义功能)和通过调用NN实现自定义功能。
- UDF封装及注册文件:用户基于工具生成或按照规范自定义cpp文件。用于获取自定义的python UDF,将其封装成C++函数,并调用FLOW_FUNC_REGISTRAR完成函数的注册。

- 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
- 2P场景下,仅支持一次输入对应一次输出。
- 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。
UDF实现文件(通过UDF实现自定义功能)
用户需要在工程的“workspace/src_python/xxx.py”文件中进行用户自定义函数的开发。以“func_add.py”为例进行介绍。
函数分析:
该函数实现Add功能,把两个输入转换为numpy数组并进行加法运算,将运算结果进行输出。
明确输入和输出:
Add函数有两个输入,一个输出。
注意事项:
用户在python文件中定义的函数名称要和cpp文件中获取python模块的名称保持一致。
如用户在python文件中定义初始化函数为init_flow_func,在cpp的Init函数中查找python模块属性时应查找init_flow_func。详细实例可参照如下代码。
函数实现:
包括初始化函数和实现函数(可以有多个)。
用户在编写UDF代码时,通常需要导入dataflow.data_type、dataflow.flow_func模块
flow_func中开放了UDF编写的常用接口,data_type封装了常用的数据类型。具体模块内容用户可以导入后自行查看。
1 2 3 4 5 |
import dataflow.flow_func as ff import dataflow.data_type as dt # 定义类 class Add(): # 类名称和实际开发UDF功能相关 |
- 初始化函数,执行初始化动作。示例如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@ff.init_wrapper() # 该描述是必须的,wrapper中将C++的对象和python开放的对象进行了转换 def init_flow_func(self, meta_params): # 入参有且只能有meta_params,类型为MetaParams name = meta_params.get_name() ff.logger.info("func name is %s", name) input_num= meta_params.get_input_num() ff.logger.info("input num %d", input_num) device_id = meta_params.get_running_device_id() ff.logger.info("device id %d", device_id) out_type = meta_params.get_attr_tensor_dtype("out_type") if out_type[0] != ff.FLOW_FUNC_SUCCESS: ff.logger.error("get dtype failed") return ff.FLOW_FUNC_FAILED self.count = 0 return ff.FLOW_FUNC_SUCCESS
- 实现函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
@ff.proc_wrapper() # 该描述是必须的,wrapper中将C++的对象和python开放的对象进行了转换 def Add1(self, run_context, input_flow_msgs): # 入参有且只能有MetaRunContext类型对象及FlowMsg类型对象 for msg in input_flow_msgs: ff.logger.error("msg code %d", msg.get_ret_code()) if msg.get_ret_code() != ff.FLOW_FUNC_SUCCESS: ff.logger.error("invalid input") return ff.FLOW_FUNC_FAILED # add方法下应该只有两个输入 if input_flow_msgs.__len__() != 2: ff.logger.error("invalid input") # 返回flow_func模块中定义的错误码 return ff.FLOW_FUNC_ERR_PARAM_INVALID tensor1 = input_flow_msgs[0].get_tensor() tensor2 = input_flow_msgs[1].get_tensor() dtype1 = tensor1.get_data_type() dtype2 = tensor2.get_data_type() # 两个输入的datatype预期应该相同 if dtype1 != dtype2: ff.logger.error("input data type is not same") return ff.FLOW_FUNC_FAILED ff.logger.info("element size %d", tensor1.get_data_size()) ff.logger.event("data type is same") shape1 = tensor1.get_shape() shape2 = tensor2.get_shape() if shape1 != shape2: ff.logger.error("input data shape is not same") return ff.FLOW_FUNC_FAILED ff.logger.info("shape is same") # 申请输出的msg对象 out = run_context.alloc_tensor_msg(shape1, dt.DT_INT32) data_size = out.get_tensor().get_data_size() ff.logger.error("data_size %d", data_size) ele_cnt = out.get_tensor().get_element_cnt() ff.logger.error("ele_cnt %d", ele_cnt) np1 = tensor1.numpy() np2 = tensor2.numpy() a = out.get_tensor().numpy() # 通过获取tensor,进一步获取tensor中的数组数据,进行加法运算 a[...] = np1 + np2 ff.logger.event("prepare to set output in add1") if run_context.set_output(0, out) != ff.FLOW_FUNC_SUCCESS: ff.logger.error("set output failed") return ff.FLOW_FUNC_FAILED self.count += 1 return ff.FLOW_FUNC_SUCCESS
- 如果用户需要定义多个功能,可以在同一个类中定义多个实现函数。
UDF实现文件(通过调用NN实现自定义功能)
用户需要在工程的“workspace/src_python/xxx.py”文件中进行用户自定义函数的开发。
函数分析:
该函数实现Add功能。
明确输入和输出:
包含两个输入,一个输出。
函数实现:
包括初始化函数和实现函数(可以有多个)。
用户在编写UDF代码时,通常需要导入dataflow.data_type、dataflow.flow_func模块
flow_func中开放了udf编写的常用接口,data_type封装了常用的数据类型。具体模块内容用户可以导入后自行查看。
1 2 3 4 5 |
import dataflow.flow_func as ff import dataflow.data_type as dt # 定义类 class Add(): # 类名称和实际开发UDF功能相关 |
- 初始化函数,执行初始化动作。示例如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@ff.init_wrapper() # 该描述是必须的,wrapper中将C++的对象和python开放的对象进行了转换 def init_flow_func(self, meta_params): # 入参有且只能有meta_params,类型为MetaParams name = meta_params.get_name() ff.logger.info("func name is %s", name) input_num= meta_params.get_input_num() ff.logger.info("input num %d", input_num) device_id = meta_params.get_running_device_id() ff.logger.info("device id %d", device_id) out_type = meta_params.get_attr_tensor_dtype("out_type") if out_type[0] != ff.FLOW_FUNC_SUCCESS: ff.logger.error("get dtype failed") return ff.FLOW_FUNC_FAILED self.count = 0 return ff.FLOW_FUNC_SUCCESS
- Add():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@ff.proc_wrapper() # 该描述是必须的,wrapper中将C++的对象和python开放的对象进行了转换 def AddNN(self, run_context, input_flow_msgs): # 入参有且只能有MetaRunContext类型对象及FlowMsg类型对象 ret = run_context.run_flow_model("invoke_graph", input_flow_msgs, 1000) if ret[0] != ff.FLOW_FUNC_SUCCESS: ff.logger.error("run nn failed") return ff.FLOW_FUNC_FAILED ff.logger.event("run nn success") i = 0 for out in ret[1]: if run_context.set_output(i, out) != ff.FLOW_FUNC_SUCCESS: ff.logger.error("set output failed") return ff.FLOW_FUNC_FAILED i = i + 1 return ff.FLOW_FUNC_SUCCESS
- 如果用户需要定义多个功能,可以在同一个类中定义多个实现函数。
UDF封装及注册
python开发UDF需要准备python文件外,还需要准备C++的接口调用文件。建议通过工具直接生成,也可以自定义,示例如下。
C++的类要继承MetaMultiFunc,C++文件中核心内容包括三部分
Init函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
int32_t Init(const std::shared_ptr<MetaParams> ¶ms) override { FLOW_FUNC_LOG_DEBUG("Init enter"); PyAcquire(); ScopeGuard gilGuard([this]() { PyRelease(); }); int32_t result = FLOW_FUNC_SUCCESS; try { PyRun_SimpleString("import sys"); std::string append = std::string("sys.path.append('") + params->GetWorkPath() + "')"; PyRun_SimpleString(append.c_str()); constexpr const char *pyModuleName = "func_add"; // 与python udf的文件名保持一致,如上述例子中的func_add constexpr const char *pyClzName = "Add"; FLOW_FUNC_LOG_INFO("Load py module name: %s", pyModuleName); auto module = py::module_::import(pyModuleName); FLOW_FUNC_LOG_INFO("%s.%s import success", pyModuleName, pyClzName); pyModule_ = module.attr(pyClzName)(); if (CheckProcExists() != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("%s.%s check proc exists failed", pyModuleName, pyClzName); return FLOW_FUNC_FAILED; } if (py::hasattr(pyModule_, "init_flow_func")) { // 与python udf中定义的初始化函数名称保持一致,如上述例子中的init_flow_func result = pyModule_.attr("init_flow_func")(params).cast<int32_t>(); if (result != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("%s.%s init_flow_func result=%d", pyModuleName, pyClzName, result); return result; } FLOW_FUNC_LOG_INFO("%s.%s init_flow_func success", pyModuleName, pyClzName); } else { FLOW_FUNC_LOG_INFO("%s.%s has no init_flow_func method, no need init", pyModuleName, pyClzName); } } catch (std::exception &e) { FLOW_FUNC_LOG_ERROR("init failed: %s", e.what()); result = FLOW_FUNC_FAILED; } FLOW_FUNC_LOG_DEBUG("FlowFunc Init end."); return result; } |
proc()函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
int32_t AddProc1( const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) { FLOW_FUNC_LOG_INFO("enter"); PyAcquire(); ScopeGuard gilGuard([this]() { PyRelease(); }); int32_t result = FLOW_FUNC_SUCCESS; try { result = pyModule_.attr("Add1")(runContext, inputFlowMsgs).cast<int32_t>(); // 此处attr的name与python udf定义的处理函数一致,如上述例子中的Add1 Add2或Sub1 AddNN if (result != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR(".Add Add return %d", result); return result; } FLOW_FUNC_LOG_INFO("call Add result: %d", result); } catch (std::exception &e) { FLOW_FUNC_LOG_ERROR("proc failed: %s", e.what()); result = FLOW_FUNC_FAILED; } return result; } // 如果用户在同一个UDF python类中编写了多个处理函数,这里可以通过多个C++函数调用python不同attr的方式,实现多FUNC int32_t AddProc2( const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) |
通过FLOW_FUNC_REGISTRAR宏将实现类声明为func name,注册到UDF框架中。
1 2 3 4 |
FLOW_FUNC_REGISTRAR(Add) .RegProcFunc("Add1", &Add::AddProc1); .RegProcFunc("Add2", &Add::AddProc2); } |