下载
中文
注册

开发过程(调用MetaMultiFunc类)

简介

在UDF开发过程中,用户可以调用MetaFlowFunc类进行自定义的单func处理函数的编写。也可以调用MetaMultiFunc类进行自定义的多func处理函数的编写。本小节介绍调用MetaMultiFunc类进行UDF的开发过程。

UDF的实现包括两部分:

  • 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
  • 2P场景下,仅支持一次输入对应一次输出。
  • 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。

UDF实现文件(通过UDF实现自定义功能)

用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“add_flow_func.cpp”为例进行介绍。

函数分析:

该函数实现Add功能,支持float类型和int类型的Add功能。第一个处理函数实现a+b的功能,第二个函数实现2a+b的功能。

明确输入和输出:

每个proc函数都包含两个输入,一个输出。两个proc处理函数的输入队列不共享,输出队列共享。

注意事项:

多Function场景下多个处理函数会并发调度,用户需要考虑多线程并发问题,如果加锁可能会影响确定性调度。

函数实现:

用户继承meta_multi_func.h文件的MetaMultiFunc基类,重写Init函数和自定义的多func处理函数。

如果有资源需要释放,需要在析构函数中处理。

namespace FlowFunc {
class AddFlowFunc: public MetaMultiFunc{};
}
  • Init():执行初始化动作,如变量初始化,获取属性等。如果FlowFunc中不需要进行初始化操作,则不需要添加该函数。在本例中需要获取在DataFlow构图时在FunctionPp设置的out_type属性。示例如下。
    int32_t Init(const std::shared_ptr<MetaParams> &params) override
        {
            // 通过MetaParams类的GetAttr方法,获取在算子构图中设置的out_type属性,属性值保存在类的私有变量outDataType_中,用于把源类型转换成outDataType_类型。
            auto getRet = params->GetAttr("out_type", outDataType_);
            if (getRet != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. ");
                return getRet;
            }
            // 初始化多func处理函数共享变量。
            setOutputCount_ = 0;
            // 如果需要开启TPRT并发功能,需要增加如下加粗内容。
            //(void)UdfTprt::Init(params);
            return 0;
        }
  • template:Add实现方法的模板函数。
        // Add1实现方法的模板函数
        template<typename srcT, typename dstT>
        void Add1(srcT *src1, srcT *src2, dstT *dst, size_t count)
        {
            for (size_t i = 0; i < count; ++i) {
                dst[i] = src1[i] + src2[i];
            }
            /* 如果想使能上面三行代码运算的TPRT并发,可以使用下面五行加粗代码替换上面三行加粗代码(如下示例使用的是ParallelFor,也可以使用Submit+Wait。)
            std::function<void(const int64_t index)> func = [src1, src2, dst] 
               (const int64_t index) {
                dst[index] = src1[index] + src2[index];
            };
            UdfTprt::ParallelFor(0, count, func);
            */
        }
        // Add2实现方法的模板函数
        template<typename srcT, typename dstT>
        void Add2(srcT *src1, srcT *src2, dstT *dst, size_t count)
        {
            for (size_t i = 0; i < count; ++i) {
                dst[i] = 2 * src1[i] + src2[i];
            }
            /* 如果想使能上面三行代码运算的TPRT并发,可以使用下面七行加粗代码替换上面三行加粗代码(如下示例使用的是Submit+Wait,也可以使用ParallelFor。)
            UdfTprtTaskAttr attr;
            for (size_t i = 0; i < count; ++i) {
                UdfTprt::Submit([src1, src2, dst, i]() {
                    dst[i] = 2 * src1[i] + src2[i];
                }, {}, {}, attr);
            }
            UdfTprt::Wait();
            */
        }
    
        template<typename T>
        void Add1(T *src1, T *src2, void *dst, size_t count)
        {
            if (outDataType_ == TensorDataType::DT_FLOAT) {
                Add1(src1, src2, static_cast<float *>(dst), count);
            }
            if (outDataType_ == TensorDataType::DT_UINT32) {
                Add1(src1, src2, static_cast<uint32_t *>(dst), count);
            }
            else if (outDataType_ == TensorDataType::DT_INT64) {
                Add1(src1, src2, static_cast<int64_t *>(dst), count);
            }
        }
        template<typename T>
        void Add2(T *src1, T *src2, void *dst, size_t count)
        {
            if (outDataType_ == TensorDataType::DT_FLOAT) {
                Add2(src1, src2, static_cast<float *>(dst), count);
            }
            if (outDataType_ == TensorDataType::DT_UINT32) {
                Add2(src1, src2, static_cast<uint32_t *>(dst), count);
            }
            else if (outDataType_ == TensorDataType::DT_INT64) {
                Add2(src1, src2, static_cast<int64_t *>(dst), count);
            }
        }
  • 多func处理函数:用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
    代码示例如下。
    // 用户自定义的多func处理函数名:“Proc1”。该函数名用户自行定义,入参和返回值需要和原型保持一致。
    int32_t Proc1(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
        {
            // 校验add操作是否是有两个输入
            if (inputFlowMsgs.size() != 2) {
                FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size());
                return -1;
            }
            auto inputFlowMsg1 = inputFlowMsgs[0];
            auto inputFlowMsg2 = inputFlowMsgs[1];
            // 无效输入校验:校验输入消息的错误码是否非0
            if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) {
                FLOW_FUNC_LOG_ERROR("invalid input");
                return -1;
            }
            // 获取输入数据的消息类型
            MsgType MsgType1 = inputFlowMsg1->GetMsgType();
            MsgType MsgType2 = inputFlowMsg2->GetMsgType();
            // 校验输入数据是否是tensor类型
            if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) {
                // 从msg中获取tensor
                auto inputTensor1 = inputFlowMsg1->GetTensor();
                auto inputTensor2 = inputFlowMsg2->GetTensor();
                // 从tensor中获取data type
                auto inputDataType1 = inputTensor1->GetDataType();
                auto inputDataType2 = inputTensor2->GetDataType();
                // 校验两个输入的data type是否一致
                if (inputDataType1 != inputDataType2) {
                    FLOW_FUNC_LOG_ERROR("input type not be same");
                    return -1;
                }
                // 从tensor中获取shape信息
                auto &inputShape1 = inputTensor1->GetShape();
                auto &inputShape2 = inputTensor2->GetShape();
    
                // 校验两个输入tensor的shape是否一致
                if (inputShape1 != inputShape2) {
                    FLOW_FUNC_LOG_ERROR("input shape not be same");
                    return -1;
                }
                // 申请输出的msg信息
                auto outputMsg = runContext->AllocTensorMsg(inputShape1, outDataType_);
                if (outputMsg == nullptr) {
                    FLOW_FUNC_LOG_ERROR("all tensor fail");
                    return -1;
                }
                auto outputTensor = outputMsg->GetTensor();
                // 从tensor中获取tensor的数据区大小
                auto dataSize1 = inputTensor1->GetDataSize();
                auto dataSize2 = inputTensor2->GetDataSize();
    
                if (dataSize1 == 0) {
                    return runContext->SetOutput(0, outputMsg);
                }
                // 从tensor中获取tensor的数据指针
                auto inputData1 = inputTensor1->GetData();
                auto inputData2 = inputTensor2->GetData();
                auto outputData = outputTensor->GetData();
                // 根据不同的数据类型,执行不同的add操作
                switch (inputDataType1) {
                    case TensorDataType::DT_FLOAT:
                        Add1(static_cast<float *>(inputData1), static_cast<float *>(inputData2), outputData, dataSize1 / sizeof(float));
                        break;
                    case TensorDataType::DT_INT16:
                        Add1(static_cast<int16_t *>(inputData1), static_cast<int16_t *>(inputData2), outputData, dataSize1 / sizeof(int16_t));
                        break;
                    case TensorDataType::DT_UINT16:
                        Add1(static_cast<uint16_t *>(inputData1), static_cast<uint16_t *>(inputData2), outputData, dataSize1 / sizeof(uint16_t));
                        break;
                    case TensorDataType::DT_UINT32:
                        Add1(static_cast<uint32_t *>(inputData1), static_cast<uint32_t *>(inputData2), outputData, dataSize1 / sizeof(uint32_t));
                        break;
                    case TensorDataType::DT_INT8:
                        Add1(static_cast<int8_t *>(inputData1), static_cast<int8_t *>(inputData2), outputData, dataSize1 / sizeof(int8_t));
                        break;
                    case TensorDataType::DT_INT64:
                        Add1(static_cast<int64_t *>(inputData1), static_cast<int64_t *>(inputData2), outputData, dataSize1 / sizeof(int64_t));
                        break;
                    default:
                        // 不支持的data type,设置错误码输出。
                        outputMsg->SetRetCode(100);
                        break;
                }
                // 多个处理函数并发调度,用户需要自己加锁处理共享数据
                std::unique_lock<std::mutex> lock(countMutex_);
                setOutputCount_++;
                return runContext->SetOutput(0, outputMsg);
            }
            FLOW_FUNC_LOG_ERROR("MsgType is not Tensor.");
            return -1;
        }
        // 用户自定义的多func处理函数名:“Proc2”。该函数名用户自行定义,入参和返回值需要和原型保持一致。
        int32_t Proc2(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
        {
            // 校验add操作是否是有两个输入
            if (inputFlowMsgs.size() != 2) {
                FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size());
                return -1;
            }
            auto inputFlowMsg1 = inputFlowMsgs[0];
            auto inputFlowMsg2 = inputFlowMsgs[1];
            // 无效输入校验:校验输入消息的错误码是否非0
            if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) {
                FLOW_FUNC_LOG_ERROR("invalid input");
                return -1;
            }
            // 获取输入数据的消息类型
            MsgType MsgType1 = inputFlowMsg1->GetMsgType();
            MsgType MsgType2 = inputFlowMsg2->GetMsgType();
            // 校验输入数据是否是tensor类型
            if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) {
                // 从输入msg中获取tensor
                auto inputTensor1 = inputFlowMsg1->GetTensor();
                auto inputTensor2 = inputFlowMsg2->GetTensor();
                // 从tensor中获取data type
                auto inputDataType1 = inputTensor1->GetDataType();
                auto inputDataType2 = inputTensor2->GetDataType();
    
                // 校验两个输入的data type是否一致
                if (inputDataType1 != inputDataType2) {
                    FLOW_FUNC_LOG_ERROR("allow tensor msg failed");
                    return -1;
                }
                // 从tensor中获取shape信息
                auto &inputShape1 = inputTensor1->GetShape();
                auto &inputShape2 = inputTensor2->GetShape();
    
                // 校验两个输入tensor的shape是否一致
                if (inputShape1 != inputShape2) {
                    FLOW_FUNC_LOG_ERROR("input shape not be same");
                    return -1;
                }
                // 申请输出的tensor信息
                auto outputMsg = runContext->AllocTensorMsg(inputShape1, outDataType_);
                if (outputMsg == nullptr) {
                    FLOW_FUNC_LOG_ERROR("all tensor fail");
                    return -1;
                }
                auto outputTensor = outputMsg->GetTensor();
                // 从tensor中获取tensor的数据区大小
                auto dataSize1 = inputTensor1->GetDataSize();
                auto dataSize2 = inputTensor2->GetDataSize();
                if (dataSize1 == 0) {
                    return runContext->SetOutput(0, outputMsg);
                }
                // 从tensor中获取tensor的数据指针
                auto inputData1 = inputTensor1->GetData();
                auto inputData2 = inputTensor2->GetData();
                auto outputData = outputTensor->GetData();
                // 根据不同的数据类型,执行具体的add操作
                switch (inputDataType1) {
                    case TensorDataType::DT_FLOAT:
                        Add2(static_cast<float *>(inputData1), static_cast<float *>(inputData2), outputData, dataSize1 / sizeof(float));
                        break;
                    case TensorDataType::DT_INT16:
                        Add2(static_cast<int16_t *>(inputData1), static_cast<int16_t *>(inputData2), outputData, dataSize1 / sizeof(int16_t));
                        break;
                    case TensorDataType::DT_UINT16:
                        Add2(static_cast<uint16_t *>(inputData1), static_cast<uint16_t *>(inputData2), outputData, dataSize1 / sizeof(uint16_t));
                        break;
                    case TensorDataType::DT_UINT32:
                        Add2(static_cast<uint32_t *>(inputData1), static_cast<uint32_t *>(inputData2), outputData, dataSize1 / sizeof(uint32_t));
                        break;
                    case TensorDataType::DT_INT8:
                        Add2(static_cast<int8_t *>(inputData1), static_cast<int8_t *>(inputData2), outputData, dataSize1 / sizeof(int8_t));
                        break;
                    case TensorDataType::DT_INT64:
                        Add2(static_cast<int64_t *>(inputData1), static_cast<int64_t *>(inputData2), outputData, dataSize1 / sizeof(int64_t));
                        break;
                    default:
                        // not support
                        outputMsg->SetRetCode(100);
                        break;
                }
                // 多个处理函数并发调度,用户需要自己加锁处理共享数据
                std::unique_lock<std::mutex> lock(countMutex_);
                setOutputCount_++;
                return runContext->SetOutput(0, outputMsg);
            }
            FLOW_FUNC_LOG_ERROR("MsgType is not Tensor.");
            return -1;
        }
    
    private:
        TensorDataType outDataType_;
        std::mutex countMutex_;
        uint32_t setOutputCount_;
    };

UDF实现文件(通过调用NN实现自定义功能)

用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“call_nn_flow_func.cpp”为例进行介绍。

函数分析:

该函数实现Add功能。

明确输入和输出:

包含两个输入,一个输出。

函数实现:

用户继承meta_flow_func.h文件的MetaFlowFunc基类,重写Init函数和自定义的处理函数。

class CallNnFlowFunc: public MetaMultiFunc{}
  • Init():执行初始化动作,如变量初始化,获取属性等,在本例中Init不需要做处理,因此不需要该函数。
  • Add():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
    示例如下:
    // 用户自定义的多func处理函数名:“Add”。该函数名用户自行定义,入参和返回值需要和原型保持一致。
    int32_t Add(const std::shared_ptr<MetaRunContext> &runContext, const std::vector<std::shared_ptr<FlowMsg>> &inputFlowMsgs)
    {
        std::vector<std::shared_ptr<FlowMsg>> outputMsgs;
        // 使用runContext的RunFlowModel方法,执行自定义处理函数中调用的NN模型,该NN模型实现了Add功能,是用户通过IR构图实现的。该模型执行结果放在outputMsgs中。
        auto ret = runContext->RunFlowModel("invoke_graph", inputMsgs, outputMsgs, 100000);
        if (ret != FLOW_FUNC_SUCCESS) {
            return ret;
        }
        // 将调用NN模型的输出结果outputMsgs通过SetOutput输出。
        for (size_t i = 0; i < outputMsgs.size(); ++i) {
            ret = runContext->SetOutput(i, outputMsgs[i]);
            if (ret != FLOW_FUNC_SUCCESS) {
                return ret;
            }
        }
        return FLOW_FUNC_SUCCESS;
    }

UDF注册

通过FLOW_FUNC_REGISTRAR宏将实现类声明为func name,注册到UDF框架中。

FLOW_FUNC_REGISTRAR(AddFlowFunc)
    .RegProcFunc("Proc1", &AddFlowFunc::Proc1)
    .RegProcFunc("Proc2", &AddFlowFunc::Proc2);
FLOW_FUNC_REGISTRAR(CallNnFlowFunc)
    .RegProcFunc("CallNn_Add", &CallNnFlowFunc::Add);
  • Proc1:用户自定义多func处理函数名,需要和在AddFlowFunc中定义的处理函数名保持一致。
  • Proc2:用户自定义多func处理函数名,需要和在AddFlowFunc中定义的处理函数名保持一致。
  • Add:用户自定义函数名,需要和在CallNnFlowFunc中定义的处理函数名保持一致。
  • AddFlowFunc:类名,需要和UDF实现文件(通过UDF实现自定义功能)中的类名保持一致。
  • CallNnFlowFunc:类名,需要和UDF实现文件(通过调用NN实现自定义功能)中的类名保持一致。