开发过程(调用MetaFlowFunc类)
简介
在UDF开发过程中,用户可以调用MetaFlowFunc类进行自定义的单func处理函数的编写。也可以调用MetaMultiFunc类进行自定义的多func处理函数的编写。本小节介绍调用MetaFlowFunc类进行UDF的开发过程。
UDF的实现包括两部分:
- UDF实现文件:用户自定义函数功能的代码实现,文件后缀为.cpp。按场景分为通过UDF实现用户自定义功能和通过调用NN实现自定义功能。
- UDF注册:通过REGISTER_FLOW_FUNC宏将实现类声明为func name,注册到UDF框架中。

- 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
- 2P场景下,仅支持一次输入对应一次输出。
- 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。
UDF实现文件(通过UDF实现自定义功能)
用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“add_flow_func.cpp”为例进行介绍。
函数分析:
该函数实现Add功能,支持float类型和int类型的Add功能。
明确输入和输出:
包含两个输入,一个输出。
函数实现:
用户继承meta_flow_func.h文件的MetaFlowFunc基类,重写Init和Proc两个函数。
如果有资源需要释放,需要在析构函数中处理。
1 2 3 |
namespace FlowFunc { class AddFlowFunc: public MetaFlowFunc{}; } |
- Init():执行初始化动作,如变量初始化,获取属性等,在本例中需要获取在DataFlow构图时在FunctionPp设置的out_type属性。示例如下。
1 2 3 4 5 6 7 8 9 10 11
int32_t Init() override { // 通过MetaContext类的GetAttr方法,获取在算子构图中设置的out_type属性,属性值保存在类的私有变量outDataType_中,用于把源类型转换成outDataType_类型。 auto getRet = context_->GetAttr("out_type", outDataType_); if (getRet != FLOW_FUNC_SUCCESS) { FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. "); return getRet; } FLOW_FUNC_RUN_LOG_INFO("Add flow func init success"); return FLOW_FUNC_SUCCESS; }
- Proc():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override { FLOW_FUNC_LOG_DEBUG("proc start"); // 在Add方法下,只有两个输入 if (inputMsgs.size() != 2) { // 返回flow_func_defines.h中定义的错误码 FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size()); return FLOW_FUNC_ERR_PARAM_INVALID; } // invalid input auto inputFlowMsg1 = inputFlowMsgs[0]; auto inputFlowMsg2 = inputFlowMsgs[1]; FLOW_FUNC_LOG_INFO("inputFlowMsg1.RetCode=%d, inputFlowMsg2.RetCode=%d", inputFlowMsg1->GetRetCode(), inputFlowMsg2->GetRetCode()); // 判断输入msg中的RetCode是否是0,RetCode是上步的处理结果,如果是0表示正常,如果非0则报错 if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) { FLOW_FUNC_LOG_ERROR("invalid input"); return -1; } MsgType MsgType1 = inputFlowMsg1->GetMsgType(); MsgType MsgType2 = inputFlowMsg2->GetMsgType(); // 判断输入msg中的消息类型是否是tensor类型 if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) { // 获取输入的tensor auto inputTensor1 = inputFlowMsg1->GetTensor(); auto inputTensor2 = inputFlowMsg2->GetTensor(); auto inputShape1 = inputTensor1->GetShape(); // 根据输入shape和输出数据的dataType信息,申请FlowFunc算子的输出flow msg auto outputMsg = context_->AllocTensorMsg(inputShape1, outDataType_); if (outputMsg == nullptr) { FLOW_FUNC_LOG_ERROR("allow tensor msg failed"); return -1; } // 调用第三方库中的AddTensor接口,实现Add功能 if (AddDepend::Instance().AddTensor(inputTensor1, inputTensor2, outputMsg, outDataType_) != 0) { FLOW_FUNC_LOG_ERROR("add tensor failed"); return -1; } // 输出计算结果 return context_->SetOutput(0, outputMsg); } // 输入消息不是Tensor类型,返回错误 FLOW_FUNC_LOG_ERROR("MsgType is not Tensor."); return -1; }
UDF实现文件(通过调用NN实现自定义功能)
用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“call_nn_flow_func.cpp”为例进行介绍。
函数分析:
该函数实现Add功能。
明确输入和输出:
包含两个输入,一个输出。
函数实现:
用户继承meta_flow_func.h文件的MetaFlowFunc基类,重写Init和Proc两个函数。
1
|
class CallNnFlowFunc: public MetaFlowFunc{} |
- Init():执行初始化动作,如变量初始化,获取属性等,在本例中Init不需要做处理,直接返回SUCCESS。
1 2 3 4 5
int32_t Init() override { FLOW_FUNC_RUN_LOG_INFO("call nn flow func init success"); return FLOW_FUNC_SUCCESS; }
- Proc():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override { std::vector<std::shared_ptr<FlowMsg>> outputMsgs; // 调用RunFlowModel执行自定义处理函数中调用的NN模型,该NN模型实现了Add功能,是用户通过IR构图实现的。该模型执行结果放在outputMsgs中。 auto ret = context_->RunFlowModel(dependKey_.c_str(), inputMsgs, outputMsgs, 100000); if (ret != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("Run flow model failed, ret=%d, dependKey=%s", ret, dependKey_.c_str()); return ret; } // 将调用NN模型的输出结果outputMsgs通过SetOutput输出。 for (size_t i = 0; i < outputMsgs.size(); ++i) { ret = context_->SetOutput(i, outputMsgs[i]); if (ret != FLOW_FUNC_SUCCESS) { FLOW_FUNC_LOG_ERROR("Set output failed, ret=%d, index=%zu", ret, i); return ret; } } return FLOW_FUNC_SUCCESS; } // 其中dependKey_是类CallNnFlowFunc定义的私有数据成员: private: std::string dependKey_{"invoke_graph"};
UDF注册
通过REGISTER_FLOW_FUNC宏将实现类声明为func name,注册到UDF框架中。
1
|
REGISTER_FLOW_FUNC("call_nn", AddFlowFunc); |
- call_nn:用户自定义函数名,需要用户根据自己的函数功能进行定制。
- AddFlowFunc:类名,需要和UDF实现文件(通过UDF实现自定义功能)中的类名保持一致。
父主题: UDF开发