基于flow标记的图切分
功能介绍
前端表达仅支持IR表达,在同构引擎间,若希望进行Flow表达,可以通过"_flow_attr"设置Op或者SubGraphOp的flow属性,分别拆分成不同的Flow图,并分别进行图编译、部署和执行;对于子图,可以通过PartitionedCall算子来实现子图的嵌套;
"_flow_attr"支持两种设置方式:
- 使用SetAttr接口对Op设置flow属性,表达该Op的所有输入输出均为flow,与其相连接的Op均会被拆分成不同的FlowGraph。
- 使用SetInputAttr/SetOutputAttr接口对Op的输入或输出设置flow属性,表达该Op的指定输入/输出为flow,与其相连接的Op会被拆成不同的FlowGraph。
队列驱动场景会在异步Graph之间插入队列,队列的配置属性如下:
- "_flow_attr": BOOL类型属性,可以配置为true或false, true表示存在flow属性,false表示不存在flow属性。
- "_flow_attr_depth": INT类型属性,指定了队列的深度,范围:大于等于2,若不配置,默认深度为128。
- "_flow_attr_enqueue_policy": STRING类型属性,指定的入队的策略,仅支持"FIFO"和"OVERWRITE","FIFO"表示队列数据顺序入队,队列满的时候会等待dequeue,"OVERWRITE"表示入队不会等待,数据会循环覆盖,若不配置,默认策略为FIFO。
- 不支持对while/loop的V1控制算子结构进行FlowGraph拆分。
使用方法
IR构图示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | auto data1 = op::Data("Data1").set_attr_index(0); auto data2 = op::Data("Data2").set_attr_index(1); auto data3 = op::Data("Data3").set_attr_index(2); auto data4 = op::Data("Data4").set_attr_index(3); float inputData1[2] = {1,1}; Tensor input1(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData1, 2 * sizeof(float)); input.push_back(input1); float inputData2[2] = {1,1}; Tensor input2(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData2, 2 * sizeof(float)); input.push_back(input2); float inputData3[2] = {1,1}; Tensor input3(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData3, 2 * sizeof(float)); input.push_back(input3); float inputData4[2] = {1,1}; Tensor input4(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData4, 2 * sizeof(float)); input.push_back(input4); auto OP_Reciprocal = op::Reciprocal("Reciprocal").set_input_x(data1); // 设置flow属性,op OP_Reciprocal.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true); AscendString policy("FIFO"); OP_Reciprocal.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy); auto OP_Square = op::Square("Square").set_input_x(data2); // 设置flow属性,op OP_Square.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true); OP_Square.SetAttr(ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(16)); AscendString policy1(""); OP_Square.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy1); auto OP_Add = op::Add("Add1").set_input_x1(OP_Reciprocal).set_input_x2(OP_Square); // 设置flow属性,op OP_Add.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true); OP_Add.SetAttr(ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(16)); AscendString policy2("OVERWRITE"); OP_Add.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy2); auto OP_Sub = op::Sub("Sub").set_input_x1(data3).set_input_x2(data4); auto OP_Mul = op::Mul("Mul").set_input_x1(OP_Add).set_input_x2(OP_Sub); auto partition1 = op::PartitionedCall("partitio_001") .create_dynamic_input_args(1) .set_dynamic_input_args(0, OP_Mul) .create_dynamic_output_output(1) .set_subgraph_builder_f(build_graph1); // 设置flow属性,PartitionedCall partition1.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true); partition1.SetAttr(ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(16)); AscendString policy3("FIFO"); partition1.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy3); auto result_output = op::Add("Add2").set_input_x1(partition1).set_input_x2(partition1); // 设置flow属性,input+output result_output.SetInputAttr(0,ATTR_NAME_FLOW_ATTR.c_str(), true); result_output.SetInputAttr(0,ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(32)); AscendString policy4("OVERWRITE"); result_output.SetInputAttr(0,ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy4); result_output.SetOutputAttr(0,ATTR_NAME_FLOW_ATTR.c_str(), true); result_output.SetOutputAttr(0,ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(32)); AscendString policy5("OVERWRITE"); result_output.SetOutputAttr(0,ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy5); std::vector<Operator> inputs{data1,data2,data3,data4}; std::vector<Operator> outputs{result_output}; graph.SetInputs(inputs).SetOutputs(outputs); ge::Session* session = new Session(options); session->AddGraph(graphId, graph); session->RunGraph(graphId, input, output); session->RemoveGraph(graphId); GEFinalize(); |
其中build_graph定义为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | Graph build_graph1(){ Graph graph("build_graph1"); std::vector<Tensor> input; std::vector<Tensor> output; auto data1 = op::Data("Data1").set_attr_index(0); float inputData1[2] = {1,1}; Tensor input1(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_INT32), (uint8_t *)inputData1, 2 * sizeof(float)); input.push_back(input1); auto output_result=op::Cast("Cast_01").set_input_x(data1).set_attr_dst_type(DT_FLOAT); std::vector<Operator> inputs{data1}; std::vector<Operator> outputs{output_result}; graph.SetInputs(inputs).SetOutputs(outputs); return graph; } |
Dataflow构图示例:
1 2 3 4 5 6 7 8 9 | // Dataflow构图 auto node0 = dflow::FlowNode("node0", 2, 1) // 创建FlowNode的FlowOperator实例 .SetInput(0, data0) // 设置FlowNode第一个输入为data0 .SetInput(1, data1); // 设置FlowNode第二个输入为data1 // 设置Flow属性 node0.SetAttr("_flow_attr", true); node0.SetAttr("_flow_attr_depth", static_cast<int32_t>(108)); AscendString policy1("FIFO"); node0.SetAttr("_flow_attr_enqueue_policy", policy1); |
父主题: 专题