下载
中文
注册

基于flow标记的图切分

功能介绍

前端表达仅支持IR表达,在同构引擎间,若希望进行Flow表达,可以通过"_flow_attr"设置Op或者SubGraphOp的flow属性,分别拆分成不同的Flow图,并分别进行图编译、部署和执行;对于子图,可以通过PartitionedCall算子来实现子图的嵌套;

"_flow_attr"支持两种设置方式:

  • 使用SetAttr接口对Op设置flow属性,表达该Op的所有输入输出均为flow,与其相连接的Op均会被拆分成不同的FlowGraph。
  • 使用SetInputAttr/SetOutputAttr接口对Op的输入或输出设置flow属性,表达该Op的指定输入/输出为flow,与其相连接的Op会被拆成不同的FlowGraph。

队列驱动场景会在异步Graph之间插入队列,队列的配置属性如下:

  • "_flow_attr": BOOL类型属性,可以配置为true或false, true表示存在flow属性,false表示不存在flow属性。
  • "_flow_attr_depth": INT类型属性,指定了队列的深度,范围:大于等于2,若不配置,默认深度为128。
  • "_flow_attr_enqueue_policy": STRING类型属性,指定的入队的策略,仅支持"FIFO"和"OVERWRITE","FIFO"表示队列数据顺序入队,队列满的时候会等待dequeue,"OVERWRITE"表示入队不会等待,数据会循环覆盖,若不配置,默认策略为FIFO。
  • 不支持对while/loop的V1控制算子结构进行FlowGraph拆分。

使用方法

IR构图示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    auto data1 = op::Data("Data1").set_attr_index(0);
    auto data2 = op::Data("Data2").set_attr_index(1);
    auto data3 = op::Data("Data3").set_attr_index(2);
    auto data4 = op::Data("Data4").set_attr_index(3);

    float inputData1[2] = {1,1};
    Tensor input1(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData1, 2 * sizeof(float));  
    input.push_back(input1);

    float inputData2[2] = {1,1};
    Tensor input2(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData2, 2 * sizeof(float));
    input.push_back(input2);

    float inputData3[2] = {1,1};
    Tensor input3(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData3, 2 * sizeof(float));  
    input.push_back(input3);

    float inputData4[2] = {1,1};
    Tensor input4(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_FLOAT), (uint8_t *)inputData4, 2 * sizeof(float));
    input.push_back(input4);

    auto OP_Reciprocal = op::Reciprocal("Reciprocal").set_input_x(data1);
    // 设置flow属性,op
    OP_Reciprocal.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true);
    AscendString policy("FIFO");
    OP_Reciprocal.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy);

    auto OP_Square = op::Square("Square").set_input_x(data2);
    // 设置flow属性,op
    OP_Square.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true);
    OP_Square.SetAttr(ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(16));
    AscendString policy1("");
    OP_Square.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy1);

    auto OP_Add = op::Add("Add1").set_input_x1(OP_Reciprocal).set_input_x2(OP_Square);
    // 设置flow属性,op
    OP_Add.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true);
    OP_Add.SetAttr(ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(16));
    AscendString policy2("OVERWRITE");
    OP_Add.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy2);
    
    auto OP_Sub = op::Sub("Sub").set_input_x1(data3).set_input_x2(data4);

    auto OP_Mul = op::Mul("Mul").set_input_x1(OP_Add).set_input_x2(OP_Sub);
    auto partition1 = op::PartitionedCall("partitio_001")
	.create_dynamic_input_args(1)
	.set_dynamic_input_args(0, OP_Mul)  
	.create_dynamic_output_output(1)
	.set_subgraph_builder_f(build_graph1);
    // 设置flow属性,PartitionedCall
    partition1.SetAttr(ATTR_NAME_FLOW_ATTR.c_str(), true);
    partition1.SetAttr(ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(16));
    AscendString policy3("FIFO");
    partition1.SetAttr(ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy3);
    
    auto result_output = op::Add("Add2").set_input_x1(partition1).set_input_x2(partition1);
    // 设置flow属性,input+output
    result_output.SetInputAttr(0,ATTR_NAME_FLOW_ATTR.c_str(), true);
    result_output.SetInputAttr(0,ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(32));
    AscendString policy4("OVERWRITE");
    result_output.SetInputAttr(0,ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy4);
    result_output.SetOutputAttr(0,ATTR_NAME_FLOW_ATTR.c_str(), true);
    result_output.SetOutputAttr(0,ATTR_NAME_FLOW_ATTR_DEPTH.c_str(), static_cast<int32_t>(32));
    AscendString policy5("OVERWRITE");
    result_output.SetOutputAttr(0,ATTR_NAME_FLOW_ATTR_ENQUEUE_POLICY.c_str(), policy5);
    
    std::vector<Operator> inputs{data1,data2,data3,data4};

    std::vector<Operator> outputs{result_output};
    graph.SetInputs(inputs).SetOutputs(outputs);
    ge::Session* session = new Session(options);
    session->AddGraph(graphId, graph);
    session->RunGraph(graphId, input, output);
    session->RemoveGraph(graphId);
    GEFinalize();
其中build_graph定义为:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
Graph build_graph1(){
Graph graph("build_graph1");
std::vector<Tensor> input;
std::vector<Tensor> output;
auto data1 = op::Data("Data1").set_attr_index(0);
float inputData1[2] = {1,1};
Tensor input1(TensorDesc(ge::Shape({2}), FORMAT_ND, DT_INT32), (uint8_t *)inputData1, 2 * sizeof(float));
input.push_back(input1);
auto output_result=op::Cast("Cast_01").set_input_x(data1).set_attr_dst_type(DT_FLOAT);
std::vector<Operator> inputs{data1};
std::vector<Operator> outputs{output_result};
graph.SetInputs(inputs).SetOutputs(outputs);
return graph;
}

Dataflow构图示例:

1
2
3
4
5
6
7
8
9
// Dataflow构图
auto node0 = dflow::FlowNode("node0", 2, 1)         // 创建FlowNode的FlowOperator实例
  .SetInput(0, data0)                     // 设置FlowNode第一个输入为data0
  .SetInput(1, data1);                    // 设置FlowNode第二个输入为data1
// 设置Flow属性
node0.SetAttr("_flow_attr", true);
node0.SetAttr("_flow_attr_depth", static_cast<int32_t>(108));
AscendString policy1("FIFO");
node0.SetAttr("_flow_attr_enqueue_policy", policy1);