下载
中文
注册

开发过程(调用MetaFlowFunc类)

简介

在UDF开发过程中,用户可以调用MetaFlowFunc类进行自定义的单func处理函数的编写。也可以调用MetaMultiFunc类进行自定义的多func处理函数的编写。本小节介绍调用MetaFlowFunc类进行UDF的开发过程。

UDF的实现包括两部分:

  • 单P场景下,支持一次输入对应多次输出,或者一次输入对应一次输出,或者多次输入对应一次输出。
  • 2P场景下,仅支持一次输入对应一次输出。
  • 如下以实现add功能的函数为例,介绍如何进行UDF开发、编译、构图及验证。

UDF实现文件(通过UDF实现自定义功能)

用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“add_flow_func.cpp”为例进行介绍。

函数分析:

该函数实现Add功能,支持float类型和int类型的Add功能。

明确输入和输出:

包含两个输入,一个输出。

函数实现:

用户继承meta_flow_func.h文件的MetaFlowFunc基类,重写Init和Proc两个函数。

如果有资源需要释放,需要在析构函数中处理。

1
2
3
namespace FlowFunc {
class AddFlowFunc: public MetaFlowFunc{};
}

如果用户需要使用TPRT(Task Parallel Runtime)并发功能,如下的Init()和Proc()函数中的实现请参考开发过程(调用MetaMultiFunc类)

  • Init():执行初始化动作,如变量初始化,获取属性等,在本例中需要获取在DataFlow构图时在FunctionPp设置的out_type属性。示例如下。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    int32_t Init() override
        {
            // 通过MetaContext类的GetAttr方法,获取在算子构图中设置的out_type属性,属性值保存在类的私有变量outDataType_中,用于把源类型转换成outDataType_类型。
            auto getRet = context_->GetAttr("out_type", outDataType_);
            if (getRet != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_RUN_LOG_ERROR("GetAttr dType not exist. ");
                return getRet;
            }
            FLOW_FUNC_RUN_LOG_INFO("Add flow func init success");
            return FLOW_FUNC_SUCCESS;
        }
    
  • Proc():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override
    {
        FLOW_FUNC_LOG_DEBUG("proc start");
        // 在Add方法下,只有两个输入
        if (inputMsgs.size() != 2) {
            // 返回flow_func_defines.h中定义的错误码
            FLOW_FUNC_LOG_ERROR("add must have 2 inputs, but %zu", inputMsgs.size());
            return FLOW_FUNC_ERR_PARAM_INVALID;
        }
    
        // invalid input
        auto inputFlowMsg1 = inputFlowMsgs[0];
        auto inputFlowMsg2 = inputFlowMsgs[1];
        FLOW_FUNC_LOG_INFO("inputFlowMsg1.RetCode=%d, inputFlowMsg2.RetCode=%d", inputFlowMsg1->GetRetCode(), inputFlowMsg2->GetRetCode());
        // 判断输入msg中的RetCode是否是0,RetCode是上步的处理结果,如果是0表示正常,如果非0则报错
        if (inputFlowMsg1->GetRetCode() != 0 || inputFlowMsg2->GetRetCode() != 0) {
            FLOW_FUNC_LOG_ERROR("invalid input");
            return -1;
        }
        MsgType MsgType1 = inputFlowMsg1->GetMsgType();
        MsgType MsgType2 = inputFlowMsg2->GetMsgType();
        // 判断输入msg中的消息类型是否是tensor类型
        if (MsgType1 == MsgType::MSG_TYPE_TENSOR_DATA && MsgType2 == MsgType::MSG_TYPE_TENSOR_DATA) {
            // 获取输入的tensor
            auto inputTensor1 = inputFlowMsg1->GetTensor();
            auto inputTensor2 = inputFlowMsg2->GetTensor();
            auto inputShape1 = inputTensor1->GetShape();
            // 根据输入shape和输出数据的dataType信息,申请FlowFunc算子的输出flow msg
            auto outputMsg = context_->AllocTensorMsg(inputShape1, outDataType_);
            if (outputMsg == nullptr) {
                FLOW_FUNC_LOG_ERROR("allow tensor msg failed");
                return -1;
            }
            // 调用第三方库中的AddTensor接口,实现Add功能
            if (AddDepend::Instance().AddTensor(inputTensor1, inputTensor2, outputMsg, outDataType_) != 0) {
                FLOW_FUNC_LOG_ERROR("add tensor failed");
                return -1;
            }
            // 输出计算结果
            return context_->SetOutput(0, outputMsg);
        }
        // 输入消息不是Tensor类型,返回错误
        FLOW_FUNC_LOG_ERROR("MsgType is not Tensor.");
        return -1;
    }
    

UDF实现文件(通过调用NN实现自定义功能)

用户需要在工程的“workspace/xx.cpp”文件中进行用户自定义函数的开发。如下以“call_nn_flow_func.cpp”为例进行介绍。

函数分析:

该函数实现Add功能。

明确输入和输出:

包含两个输入,一个输出。

函数实现:

用户继承meta_flow_func.h文件的MetaFlowFunc基类,重写Init和Proc两个函数。

1
class CallNnFlowFunc: public MetaFlowFunc{}
  • Init():执行初始化动作,如变量初始化,获取属性等,在本例中Init不需要做处理,直接返回SUCCESS。
    1
    2
    3
    4
    5
    int32_t Init() override
    {
        FLOW_FUNC_RUN_LOG_INFO("call nn flow func init success");
        return FLOW_FUNC_SUCCESS;
    }
    
  • Proc():用户自定义的计算处理函数。UDF框架在接收到输入tensor的数据后,会调用此方法。
    示例如下:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    int32_t Proc(const std::vector<std::shared_ptr<FlowMsg>> &inputMsgs) override
    {
        std::vector<std::shared_ptr<FlowMsg>> outputMsgs;
        // 调用RunFlowModel执行自定义处理函数中调用的NN模型,该NN模型实现了Add功能,是用户通过IR构图实现的。该模型执行结果放在outputMsgs中。
        auto ret = context_->RunFlowModel(dependKey_.c_str(), inputMsgs, outputMsgs, 100000);
        if (ret != FLOW_FUNC_SUCCESS) {
            FLOW_FUNC_LOG_ERROR("Run flow model failed, ret=%d, dependKey=%s", ret, dependKey_.c_str());
            return ret;
        }
        // 将调用NN模型的输出结果outputMsgs通过SetOutput输出。
        for (size_t i = 0; i < outputMsgs.size(); ++i) {
            ret = context_->SetOutput(i, outputMsgs[i]);
            if (ret != FLOW_FUNC_SUCCESS) {
                FLOW_FUNC_LOG_ERROR("Set output failed, ret=%d, index=%zu", ret, i);
                return ret;
            }
        }
        return FLOW_FUNC_SUCCESS;
    }
    // 其中dependKey_是类CallNnFlowFunc定义的私有数据成员:
    private:
        std::string dependKey_{"invoke_graph"};
    

UDF注册

通过REGISTER_FLOW_FUNC宏将实现类声明为func name,注册到UDF框架中。

1
REGISTER_FLOW_FUNC("call_nn", AddFlowFunc);