开发过程(调用MetaMultiFunc类)
简介
在UDF开发过程中,用户可以调用MetaFlowFunc类进行自定义的单func处理函数的编写。也可以调用MetaMultiFunc类进行自定义的多func处理函数的编写。本小节介绍调用MetaMultiFunc类进行UDF的开发过程。
UDF的实现包括两部分:
- UDF实现文件:用户自定义函数功能的代码实现,文件后缀为.cpp。按场景分为通过UDF实现用户自定义功能和通过调用NN实现自定义功能。
- UDF注册:通过FLOW_FUNC_REGISTRAR宏将实现类声明为func name,注册到UDF框架中。

- 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
- 2P场景下,仅支持一次输入对应一次输出。
- 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。
UDF实现文件(通过UDF实现自定义功能)
用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“add_flow_func.cpp”为例进行介绍。
函数分析:
该函数实现Add功能,支持float类型和int类型的Add功能。第一个处理函数实现a+b的功能,第二个函数实现2a+b的功能。
明确输入和输出:
每个proc函数都包含两个输入,一个输出。两个proc处理函数的输入队列不共享,输出队列共享。
注意事项:
多Function场景下多个处理函数会并发调度,用户需要考虑多线程并发问题,如果加锁可能会影响确定性调度。
函数实现:
用户继承meta_multi_func.h文件的MetaMultiFunc基类,重写Init函数和自定义的多func处理函数。
如果有资源需要释放,需要在析构函数中处理。
namespace FlowFunc { class AddFlowFunc: public MetaMultiFunc{}; }
- Init():执行初始化动作,如变量初始化,获取属性等。如果FlowFunc中不需要进行初始化操作,则不需要添加该函数。在本例中需要获取在DataFlow构图时在FunctionPp设置的out_type属性。示例如下。
int32_t Init(const std::shared_ptr<MetaParams> ¶ms) override { // 通过MetaParams类的GetAttr方法,获取在算子构图中设置的out_type属性,属性值保存在类的私有变量outDataType_中,用于把源类型转换成outDataType_类型。 auto getRet = params->GetAttr("out_type", outDataType_); if (getRet != FLOW_FUNC_SUCCESS) { FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. "); return getRet; } // 初始化多func处理函数共享变量。 setOutputCount_ = 0; // 如果需要开启TPRT并发功能,需要增加如下加粗内容。 //(void)UdfTprt::Init(params); return 0; }
- template:Add实现方法的模板函数。
// Add1实现方法的模板函数 template<typename srcT, typename dstT> void Add1(srcT *src1, srcT *src2, dstT *dst, size_t count) { for (size_t i = 0; i < count; ++i) { dst[i] = src1[i] + src2[i]; } /* 如果想使能上面三行代码运算的TPRT并发,可以使用下面五行加粗代码替换上面三行加粗代码(如下示例使用的是ParallelFor,也可以使用Submit+Wait。) std::function<void(const int64_t index)> func = [src1, src2, dst] (const int64_t index) { dst[index] = src1[index] + src2[index]; }; UdfTprt::ParallelFor(0, count, func); */ } // Add2实现方法的模板函数 template<typename srcT, typename dstT> void Add2(srcT *src1, srcT *src2, dstT *dst, size_t count) { for (size_t i = 0; i < count; ++i) { dst[i] = 2 * src1[i] + src2[i]; } /* 如果想使能上面三行代码运算的TPRT并发,可以使用下面七行加粗代码替换上面三行加粗代码(如下示例使用的是Submit+Wait,也可以使用ParallelFor。) UdfTprtTaskAttr attr; for (size_t i = 0; i < count; ++i) { UdfTprt::Submit([src1, src2, dst, i]() { dst[i] = 2 * src1[i] + src2[i]; }, {}, {}, attr); } UdfTprt::Wait(); */ } template<typename T> void Add1(T *src1, T *src2, void *dst, size_t count) { if (outDataType_ == TensorDataType::DT_FLOAT) { Add1(src1, src2, static_cast<float *>(dst), count); } if (outDataType_ == TensorDataType::DT_UINT32) { Add1(src1, src2, static_cast<uint32_t *>(dst), count); } else if (outDataType_ == TensorDataType::DT_INT64) { Add1(src1, src2, static_cast<int64_t *>(dst), count); } } template<typename T> void Add2(T *src1, T *src2, void *dst, size_t count) { if (outDataType_ == TensorDataType::DT_FLOAT) { Add2(src1, src2, static_cast<float *>(dst), count); } if (outDataType_ == TensorDataType::DT_UINT32) { Add2(src1, src2, static_cast<uint32_t *>(dst), count); } else if (outDataType_ == TensorDataType::DT_INT64) { Add2(src1, src2, static_cast<int64_t *>(dst), count); } }
- 多func处理函数:用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
代码示例如下。
// 用户自定义的多func处理函数名:“Proc1”。该函数名用户自行定义,入参和返回值需要和原型保持一致。 int32_t Proc1(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) { // 校验add操作是否是有两个输入 if (inputFlowMsgs.size() != 2) { FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size()); return -1; } auto inputFlowMsg1 = inputFlowMsgs[0]; auto inputFlowMsg2 = inputFlowMsgs[1]; // 无效输入校验:校验输入消息的错误码是否非0 if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) { FLOW_FUNC_LOG_ERROR("invalid input"); return -1; } // 获取输入数据的消息类型 MsgType MsgType1 = inputFlowMsg1->GetMsgType(); MsgType MsgType2 = inputFlowMsg2->GetMsgType(); // 校验输入数据是否是tensor类型 if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) { // 从msg中获取tensor auto inputTensor1 = inputFlowMsg1->GetTensor(); auto inputTensor2 = inputFlowMsg2->GetTensor(); // 从tensor中获取data type auto inputDataType1 = inputTensor1->GetDataType(); auto inputDataType2 = inputTensor2->GetDataType(); // 校验两个输入的data type是否一致 if (inputDataType1 != inputDataType2) { FLOW_FUNC_LOG_ERROR("input type not be same"); return -1; } // 从tensor中获取shape信息 auto &inputShape1 = inputTensor1->GetShape(); auto &inputShape2 = inputTensor2->GetShape(); // 校验两个输入tensor的shape是否一致 if (inputShape1 != inputShape2) { FLOW_FUNC_LOG_ERROR("input shape not be same"); return -1; } // 申请输出的msg信息 auto outputMsg = runContext->AllocTensorMsg(inputShape1, outDataType_); if (outputMsg == nullptr) { FLOW_FUNC_LOG_ERROR("all tensor fail"); return -1; } auto outputTensor = outputMsg->GetTensor(); // 从tensor中获取tensor的数据大小 auto dataSize1 = inputTensor1->GetDataSize(); auto dataSize2 = inputTensor2->GetDataSize(); if (dataSize1 == 0) { return runContext->SetOutput(0, outputMsg); } // 从tensor中获取tensor的数据指针 auto inputData1 = inputTensor1->GetData(); auto inputData2 = inputTensor2->GetData(); auto outputData = outputTensor->GetData(); // 根据不同的数据类型,执行不同的add操作 switch (inputDataType1) { case TensorDataType::DT_FLOAT: Add1(static_cast<float *>(inputData1), static_cast<float *>(inputData2), outputData, dataSize1 / sizeof(float)); break; case TensorDataType::DT_INT16: Add1(static_cast<int16_t *>(inputData1), static_cast<int16_t *>(inputData2), outputData, dataSize1 / sizeof(int16_t)); break; case TensorDataType::DT_UINT16: Add1(static_cast<uint16_t *>(inputData1), static_cast<uint16_t *>(inputData2), outputData, dataSize1 / sizeof(uint16_t)); break; case TensorDataType::DT_UINT32: Add1(static_cast<uint32_t *>(inputData1), static_cast<uint32_t *>(inputData2), outputData, dataSize1 / sizeof(uint32_t)); break; case TensorDataType::DT_INT8: Add1(static_cast<int8_t *>(inputData1), static_cast<int8_t *>(inputData2), outputData, dataSize1 / sizeof(int8_t)); break; case TensorDataType::DT_INT64: Add1(static_cast<int64_t *>(inputData1), static_cast<int64_t *>(inputData2), outputData, dataSize1 / sizeof(int64_t)); break; default: // 不支持的data type,设置错误码输出。 outputMsg->SetRetCode(100); break; } // 多个处理函数并发调度,用户需要自己加锁处理共享数据 std::unique_lock<std::mutex> lock(countMutex_); setOutputCount_++; return runContext->SetOutput(0, outputMsg); } FLOW_FUNC_LOG_ERROR("MsgType is not Tensor."); return -1; } // 用户自定义的多func处理函数名:“Proc2”。该函数名用户自行定义,入参和返回值需要和原型保持一致。 int32_t Proc2(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) { // 校验add操作是否是有两个输入 if (inputFlowMsgs.size() != 2) { FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size()); return -1; } auto inputFlowMsg1 = inputFlowMsgs[0]; auto inputFlowMsg2 = inputFlowMsgs[1]; // 无效输入校验:校验输入消息的错误码是否非0 if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) { FLOW_FUNC_LOG_ERROR("invalid input"); return -1; } // 获取输入数据的消息类型 MsgType MsgType1 = inputFlowMsg1->GetMsgType(); MsgType MsgType2 = inputFlowMsg2->GetMsgType(); // 校验输入数据是否是tensor类型 if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) { // 从输入msg中获取tensor auto inputTensor1 = inputFlowMsg1->GetTensor(); auto inputTensor2 = inputFlowMsg2->GetTensor(); // 从tensor中获取data type auto inputDataType1 = inputTensor1->GetDataType(); auto inputDataType2 = inputTensor2->GetDataType(); // 校验两个输入的data type是否一致 if (inputDataType1 != inputDataType2) { FLOW_FUNC_LOG_ERROR("allow tensor msg failed"); return -1; } // 从tensor中获取shape信息 auto &inputShape1 = inputTensor1->GetShape(); auto &inputShape2 = inputTensor2->GetShape(); // 校验两个输入tensor的shape是否一致 if (inputShape1 != inputShape2) { FLOW_FUNC_LOG_ERROR("input shape not be same"); return -1; } // 申请输出的tensor信息 auto outputMsg = runContext->AllocTensorMsg(inputShape1, outDataType_); if (outputMsg == nullptr) { FLOW_FUNC_LOG_ERROR("all tensor fail"); return -1; } auto outputTensor = outputMsg->GetTensor(); // 从tensor中获取tensor的数据大小 auto dataSize1 = inputTensor1->GetDataSize(); auto dataSize2 = inputTensor2->GetDataSize(); if (dataSize1 == 0) { return runContext->SetOutput(0, outputMsg); } // 从tensor中获取tensor的数据指针 auto inputData1 = inputTensor1->GetData(); auto inputData2 = inputTensor2->GetData(); auto outputData = outputTensor->GetData(); // 根据不同的数据类型,执行具体的add操作 switch (inputDataType1) { case TensorDataType::DT_FLOAT: Add2(static_cast<float *>(inputData1), static_cast<float *>(inputData2), outputData, dataSize1 / sizeof(float)); break; case TensorDataType::DT_INT16: Add2(static_cast<int16_t *>(inputData1), static_cast<int16_t *>(inputData2), outputData, dataSize1 / sizeof(int16_t)); break; case TensorDataType::DT_UINT16: Add2(static_cast<uint16_t *>(inputData1), static_cast<uint16_t *>(inputData2), outputData, dataSize1 / sizeof(uint16_t)); break; case TensorDataType::DT_UINT32: Add2(static_cast<uint32_t *>(inputData1), static_cast<uint32_t *>(inputData2), outputData, dataSize1 / sizeof(uint32_t)); break; case TensorDataType::DT_INT8: Add2(static_cast<int8_t *>(inputData1), static_cast<int8_t *>(inputData2), outputData, dataSize1 / sizeof(int8_t)); break; case TensorDataType::DT_INT64: Add2(static_cast<int64_t *>(inputData1), static_cast<int64_t *>(inputData2), outputData, dataSize1 / sizeof(int64_t)); break; default: // not support outputMsg->SetRetCode(100); break; } // 多个处理函数并发调度,用户需要自己加锁处理共享数据 std::unique_lock<std::mutex> lock(countMutex_); setOutputCount_++; return runContext->SetOutput(0, outputMsg); } FLOW_FUNC_LOG_ERROR("MsgType is not Tensor."); return -1; } private: TensorDataType outDataType_; std::mutex countMutex_; uint32_t setOutputCount_; };
UDF实现文件(通过调用NN实现自定义功能)
用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“call_nn_flow_func.cpp”为例进行介绍。
函数分析:
该函数实现Add功能。
明确输入和输出:
包含两个输入,一个输出。
函数实现:
用户继承meta_flow_func.h文件的MetaFlowFunc基类,重写Init函数和自定义的处理函数。
class CallNnFlowFunc: public MetaMultiFunc{}
- Init():执行初始化动作,如变量初始化,获取属性等,在本例中Init不需要做处理,因此不需要该函数。
- Add():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
示例如下:
// 用户自定义的多func处理函数名:“Add”。该函数名用户自行定义,入参和返回值需要和原型保持一致。 int32_t Add(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs) { std::vector<std::shared_ptr<FlowMsg>> outputMsgs; // 使用runContext的RunFlowModel方法,执行自定义处理函数中调用的NN模型,该NN模型实现了Add功能,是用户通过IR构图实现的。该模型执行结果放在outputMsgs中。 auto ret = runContext->RunFlowModel("invoke_graph", inputMsgs, outputMsgs, 100000); if (ret != FLOW_FUNC_SUCCESS) { return ret; } // 将调用NN模型的输出结果outputMsgs通过SetOutput输出。 for (size_t i = 0; i < outputMsgs.size(); ++i) { ret = runContext->SetOutput(i, outputMsgs[i]); if (ret != FLOW_FUNC_SUCCESS) { return ret; } } return FLOW_FUNC_SUCCESS; }
UDF注册
通过FLOW_FUNC_REGISTRAR宏将实现类声明为func name,注册到UDF框架中。
FLOW_FUNC_REGISTRAR(AddFlowFunc) .RegProcFunc("Proc1", &AddFlowFunc::Proc1) .RegProcFunc("Proc2", &AddFlowFunc::Proc2); FLOW_FUNC_REGISTRAR(CallNnFlowFunc) .RegProcFunc("CallNn_Add", &CallNnFlowFunc::Add);
- Proc1:用户自定义多func处理函数名,需要和在AddFlowFunc中定义的处理函数名保持一致。
- Proc2:用户自定义多func处理函数名,需要和在AddFlowFunc中定义的处理函数名保持一致。
- Add:用户自定义函数名,需要和在CallNnFlowFunc中定义的处理函数名保持一致。
- AddFlowFunc:类名,需要和UDF实现文件(通过UDF实现自定义功能)中的类名保持一致。
- CallNnFlowFunc:类名,需要和UDF实现文件(通过调用NN实现自定义功能)中的类名保持一致。
父主题: UDF开发