下载
中文
注册

aclnnGroupedMatMulAllReduce

支持的产品型号

  • Atlas A2训练系列产品/Atlas 800I A2推理产品。

接口原型

每个算子分为两段式接口,必须先调用“aclnnGroupedMatMulAllReduceGetWorkspaceSize”接口获取入参并根据计算流程计算所需workspace大小,再调用“aclnnGroupedMatMulAllReduce”接口执行计算。

  • aclnnStatus aclnnGroupedMatMulAllReduceGetWorkspaceSize(const aclTensorList* x, const aclTensorList* weight, const aclTensorList* bias, const aclIntArray* groupListOptional, int64_t splitItem, const char* group, const char* reduceOp, int64_t commTurn, int64_t streamMode, const aclTensorList* y, uint64_t* workspaceSize, aclOpExecutor** executor)
  • aclnnStatus aclnnGroupedMatMulAllReduce(void* workspace, uint64_t workspaceSize, aclOpExecutor* executor, aclrtStream stream)

功能描述

  • 算子功能:在grouped_matmul的基础上实现多卡并行AllReduce的功能.实现分组矩阵乘计算,每组矩阵乘的维度大小可以不同。根据x、weight、y的Tensor数量支持如下4种场景:
    • x、weight、y的Tensor数量等于组数,即每组的数据对应的Tensor是独立的。
    • x的Tensor数量为1,weight/y的Tensor数量等于组数,此时需要通过可选参数group_list说明x在行上的分组情况,如group_list[0]=10说明x的前10行参与第一组矩阵乘计算。
    • x、weight的Tensor数量等于组数,y的Tensor数量为1,此时每组矩阵乘的结果放在同一个Tensor中连续存放。
    • x、y的Tensor数量为1,weight数量等于组数,属于前两种情况的组合。
  • 计算公式:
    • 非量化场景:
    yi=xi×weighti+biasiy_i=x_i\times weight_i + bias_i

aclnnGroupedMatMulAllReduceGetWorkspaceSize

  • 参数说明:

    • x(aclTensorList*,计算输入):必选参数,Device侧的aclTensorList,公式中的输入x,数据类型支持FLOAT16、BFLOAT16,数据格式支持ND,支持的最大长度为64个。
    • weight(aclTensorList*,计算输入):必选参数,Device侧的aclTensorList,公式中的weight,数据类型支持FLOAT16、BFLOAT16,数据格式支持ND,支持的最大长度为64个。
    • bias(aclTensorList*,计算输入)可选参数,Device侧的aclTensorList,公式中的bias,数据类型支持FLOAT16、FLOAT32,数据格式支持ND,支持的最大长度为64个。
    • groupListOptional(aclIntArray*,计算输入):可选参数,Host侧的aclIntArray类型,代表输入和输出M方向的matmul大小分布,数据类型支持INT64,数据格式支持ND,支持的最大长度为64个。
    • splitItemOptional(int64_t,计算输入):可选属性,代表输入和输出是否要做tensor切分,0代表输入和输出都不用切分;1代表输入需要切分,输出不需要切分;2代表输入不需要切分,输出需要切分;3代表输入和输出都需要切分,默认值为0。
    • group(char*,计算输入):Host侧标识列组的字符串。通信域名称。数据类型支持:string。通过Hccl提供的接口获取:extern HcclResult HcclGetCommName(HcclComm comm, char* commName); commName即为group。
    • reduceOp(char*,计算输入):reduce操作类型。数据类型支持:String。当前版本仅支持输入"sum"。
    • commTurn(int64_t,计算输入):Host侧的整型,通信数据切分数,即总数据量/单次通信量。数据类型支持:int64_t。当前版本仅支持输入0。
    • streamMode(int64_t,计算输入):Host侧的整型,acl流模式的枚举,当前只支持值1,类型支持:int64_t。
    • y(aclTensorList*,计算输出):Device侧的aclTensorList,公式中的输出y,数据类型支持FLOAT16、BFLOAT16,数据格式支持ND,支持的最大长度为64个。
    • workspaceSize(uint64_t*,出参):返回需要在Device侧申请的workspace大小。
    • executor(aclOpExecutor**,出参):返回op执行器,包含了算子计算流程。
  • 返回值:

    返回aclnnStatus状态码,具体参见aclnn返回码

    说明: 第一段接口完成入参校验,若出现以下错误码,则对应原因为:

    • 返回161001(ACLNN_ERR_PARAM_NULLPTR):如果传入参数是必选输入、输出或者必选属性,且是空指针。
    • 返回161002(ACLNN_ERR_PARAM_INVALID):x、weight、bias、scale、offset、antiquant_scale、antiquant_offset、groupListOptional、splitItemOptional、y的数据类型和数据格式不在支持的范围内;x的长度不等于1且不等于weight的长度;如bias不为空,bias的长度不等于weight的长度;splitItemOptional为1的场景,x的长度不等于1;splitItemOptional为2的场景,y的长度不等于1;splitItemOptional为1、3的场景,x和y的长度不等于1;reduceOp不等于“sum”;commTurn不等于0;streamMode不等于1。

aclnnGroupedMatMulAllReduce

  • 参数说明:

    • workspace(void*,入参):在Device侧申请的workspace内存地址。
    • workspaceSize(uint64_t,入参):在Device侧申请的workspace大小,由第一段接口aclnnGroupedMatMulAllReduceGetWorkspaceSize获取。
    • executor(aclOpExecutor*,入参):op执行器,包含了算子计算流程。
    • stream(aclrtStream,入参):指定执行任务的AscendCL stream流。
  • 返回值:

    返回aclnnStatus状态码,具体参见aclnn返回码

约束与限制

  1. x、weight、bias三个输入支持多种数据类型,此算子支持的数据类型组合为 “x-FLOAT16、weight-FLOAT16、bias-FLOAT16”,“x-BFLOAT16、weight-BFLOAT16、bias-FLOAT32”。
  2. 当splitItemOptional为0时,x支持输入维度为2维-6维,y支持输入维度为2维-6维;当splitItemOptional为1/2/3时,x支持输入维度为2维,y支持输入维度为2维;splitItemOptional为0/1/2/3时,weight支持输入维度为2维。
  3. 支持2、4、8卡。
  4. x和weight中每一组tensor的最后一维大小都应小于65536。xix_i的最后一维指当属性transpose_x为false时xix_i的K轴或当transpose_x为true时xix_i的M轴。weightiweight_i的最后一维指当属性transpose_weight为false时weightiweight_i的N轴或当transpose_weight为true时weightiweight_i的K轴。
  5. x和weight中每一组tensor的每一维大小在32字节对齐后都应小于int32的最大值2147483647。

调用示例

示例代码如下,仅供参考,具体编译和执行过程请参考编译与运行样例

#include <iostream>
#include <unistd.h>
#include <fstream>
#include <vector>
#include <map>
#include <memory>
#include <thread>
#include <iomanip>
#include <cassert>
#include <fcntl.h>
#include <sys/prctl.h>
#include <sys/syscall.h>
#include "hccl/hccl.h"
#include <hccl/hccl_types.h>

#include "acl/acl.h"
#include "acl/acl_base.h"
#include "aclnnop/aclnn_grouped_mat_mul_all_reduce.h"
using namespace std;

#ifdef __aarch64__
    #define gettid() syscall(SYS_gettid)
#endif
#define N 128

struct TensorInfo {
    std::string name;
    std::string dtype;
    std::string data_file;
    std::vector<int64_t> shape;
};

struct TensorsInfo {
    std::vector<TensorInfo> x_array;
    std::vector<TensorInfo> w_array;
    std::vector<TensorInfo> b_array;
    std::vector<TensorInfo> grouplist_array;
    std::vector<TensorInfo> y_array;
};

std::map<string, aclDataType> DtypeMap = {
    {"float16", ACL_FLOAT16},
    {"bfloat16", ACL_BF16},
    {"int64", ACL_INT64},
    {"uint64", ACL_UINT64}
};
thread_local int g_dev_id;
int32_t g_ndev = 0;
string input_dir = "./golden";
string output_dir = "./output";
string json_path;
vector<int> device_list;

int32_t loop = 1;

void set_device_id(int id)
{
    g_dev_id = id;
}
int get_device_id(void)
{
    return g_dev_id;
}

typedef signed char s8;
typedef signed short s16;
typedef signed int s32;
typedef signed long long s64;
typedef unsigned char u8;
typedef unsigned short u16;
typedef unsigned int u32;
typedef unsigned long long u64;

#define ACL_CHECK(ret)                                                                                     \
    do {                                                                                                   \
        auto retcode = ret;                                                                                \
        if (retcode != ACL_SUCCESS) {                                                                      \
            printf("[ERROR] acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, retcode); \
            return retcode;                                                                                \
        }                                                                                                  \
    } while (0)

#define CHECK_RET(cond, return_expr) \
    do {                             \
        if (!(cond)) {               \
            return_expr;             \
        }                            \
    } while (0)

#define LOG_PRINT(message, ...)     \
  do {                              \
    printf(message, ##__VA_ARGS__); \
  } while (0)


struct DataSize {
    u64 min_bytes;
    u64 max_bytes;
    u64 step_bytes = 0;
    double step_factor;
    u64 count;
    u64 data_size;
    u64 type_size;
    int op_flag;
    void Print()
    {
        printf("acl min_bytes : %lld, max_bytes: %lld, step_bytes: %lld, step_factor: %f, count: %lld, data_size: "
               "%lld,  type_size: %lld\n",
            min_bytes,
            max_bytes,
            step_bytes,
            step_factor,
            count,
            data_size,
            type_size);
    }
};

struct Resource {
    aclrtStream rtStream;
    aclrtStream rtStream_aicpu;
    aclrtStream rtStream_record;
    aclrtEvent startEvent;
    aclrtEvent endEvent;
    aclrtContext context;
  };

struct Args {
    int ndev;
    int rankId;
    uint32_t logicDeviceId;
    uint32_t rootRank;
    HcclComm hcclComm;
    Resource resources;
    DataSize dataPara;
    uint32_t m;
    uint32_t k;
    uint32_t n;
    std::string dtype;
    std::string bin_path;
    std::string run_type;
    uint32_t loop_cnt;
    int bias_flag;
    std::map<std::string, int64_t> addrMap;
};

int64_t GetShapeSize(const std::vector<int64_t>& shape) {
  int64_t shapeSize = 1;
  for (auto i : shape) {
    shapeSize *= i;
  }
  return shapeSize;
}

int Init(int32_t deviceId, aclrtContext* context, aclrtStream* stream) {
  // 固定写法,AscendCL初始化
  auto ret = aclInit(nullptr);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclInit failed. ERROR: %d\n", ret); return ret);
  ret = aclrtSetDevice(deviceId);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclrtSetDevice failed. ERROR: %d\n", ret); return ret);
  ret = aclrtCreateContext(context, deviceId);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclrtCreateContext failed. ERROR: %d\n", ret); return ret);
  ret = aclrtSetCurrentContext(*context);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclrtSetCurrentContext failed. ERROR: %d\n", ret); return ret);
  ret = aclrtCreateStream(stream);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclrtCreateStream failed. ERROR: %d\n", ret); return ret);
  return 0;
}

int ReleaseAddr(vector<TensorInfo>& tensorList, map<string, int64_t> &addrMap) {
    int len = tensorList.size();
    if (len == 0 || (len == 1 && tensorList[0].shape.size() == 0)) {
        return 0;
    }
    for (size_t j=0; j < len; ++j) {
        string name = tensorList[j].name;
        if (addrMap.find(name) != addrMap.end()) {
            aclrtFree((void*)addrMap.at(name));
        }
    }
    return 0;
}

int DestoryTensor(void* items[], int item_type[], int num) {
  for (size_t i = 0; i < num; ++i) {
    if (items[i] != nullptr) {
      continue;
    }
    if (item_type[i] == 0) {
      aclDestroyTensor((aclTensor*)items[i]);
    } else if (item_type[i] == 1) {
      aclDestroyTensorList((aclTensorList*)items[i]);
    } else {
      aclDestroyIntArray((aclIntArray*)items[i]);
    }
  }
  return 0;
}

int CreateAclTensor(std::vector<int64_t>& shape,  std::string& name, std::string& dataTypeStr,
                    void*& deviceAddr, void*& tensor, bool int_array, int rankId=0) {
  uint32_t axis = 0;
  uint32_t byteBlock = 0;
  auto dataType = DtypeMap[dataTypeStr];
  uint32_t dataTypeSize = aclDataTypeSize(dataType);
  if (name.compare(0, 1, "x") == 0) {
    if (shape[1] % g_ndev != 0) {
      printf("[ERROR] X_k cannot be divided by ndev!\n");
      return -1;
    } 
    shape[1] /= g_ndev;
    axis = 1;
    byteBlock = shape[1] * dataTypeSize;
  } else if (name.compare(0, 6, "weight") == 0) {
    if (shape[0] % g_ndev != 0) {
      printf("[ERROR] Weight_k cannot be divided by ndev!\n");
      return -1;
    } 
    shape[0] /= g_ndev;
    axis = 0;
    byteBlock = shape[0] * dataTypeSize;
  } 
  auto size = GetShapeSize(shape) * dataTypeSize;
  // 调用aclrtMalloc申请Device侧内存
  auto ret = aclrtMalloc(&deviceAddr, size, ACL_MEM_MALLOC_HUGE_FIRST);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclrtMalloc failed. ERROR: %d\n", ret); return ret);

  // 调用aclrtMemcpy将Host侧数据拷贝到Device侧内存上
  uint8_t* hostData = new (std::nothrow) uint8_t[size];
  if (int_array) {
    tensor = (void*)aclCreateIntArray((int64_t*)hostData, size / sizeof(int64_t));
    delete[] hostData;
    return 0; 
  }
  ret = aclrtMemcpy(deviceAddr, size, hostData, size, ACL_MEMCPY_HOST_TO_DEVICE);
  CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("aclrtMemcpy failed. ERROR: %d\n", ret); return ret);

  // 计算连续tensor的strides
  std::vector<int64_t> strides(shape.size(), 1);
  for (int64_t i = shape.size() - 2; i >= 0; i--) {
    strides[i] = shape[i + 1] * strides[i + 1];
  }

  // 调用aclCreateTensor接口创建aclTensor
  tensor = (void*)aclCreateTensor(shape.data(), shape.size(), dataType, strides.data(), 0, aclFormat::ACL_FORMAT_ND,shape.data(), shape.size(), deviceAddr);
  return 0;
}

int CreateTensor_(int i, vector<TensorInfo>& tensorList, map<string, int64_t> &addrMap, void* items[], int item_type[], bool copy, int rankId) {
    int len = tensorList.size();
    if (len == 0 || (len == 1 && tensorList[0].shape.size() == 0)) {
        items[i] = nullptr;
        item_type[i] = 1;
        return 0;
    }
    void* tensors[len];
    for (int j = 0; j < len; ++j) {
        string name = tensorList[j].name;
        vector<int64_t> shape = tensorList[j].shape;
        string dtype = tensorList[j].dtype;
        void* deviceAddr = nullptr;
        auto ret = CreateAclTensor(shape, name, dtype, deviceAddr, tensors[j], false, rankId);
        addrMap[name] = (int64_t)deviceAddr;
        if (ret != 0) {return -1;}
    }
    items[i] = (void*)aclCreateTensorList((aclTensor**)tensors, len);
    item_type[i] = 1;
    return 0;
}


int CreateTensor(TensorsInfo& config, map<string, int64_t> &addrMap, void* items[], int item_type[], bool copy, int rankId) {
    int ret = 0;
    if (!copy) {
        ret = CreateTensor_(0, config.y_array, addrMap, items, item_type, copy, rankId);
        if (ret != 0) {return -1;}
    } else {
        ret = CreateTensor_(0, config.x_array, addrMap, items, item_type, copy, rankId);
        if (ret != 0) {return -1;}
        ret = CreateTensor_(1, config.w_array, addrMap, items, item_type, copy, rankId);
        if (ret != 0) {return -1;}
        ret = CreateTensor_(2, config.b_array, addrMap, items, item_type, copy, rankId);
        if (ret != 0) {return -1;}

        bool int_array = true;
        void* deviceAddr = nullptr;
        TensorInfo& grouplist = config.grouplist_array[0];
        ret = CreateAclTensor(grouplist.shape, grouplist.name, grouplist.dtype, deviceAddr, items[3], int_array, rankId);
        addrMap[grouplist.name] = (int64_t)deviceAddr;
        item_type[3] = 2;
        if (ret != 0) {return -1;}
    }
    return 0;
}

int PrepareDeviceMemAndData(TensorsInfo& config, std::map<std::string, int64_t>& addrMap, void** inputs, void** outputs, int* input_type, int* output_type, int rankId) {
  int ret = 0;
  ret = CreateTensor(config, addrMap, inputs, input_type, true, rankId);
  CHECK_RET(ret == ACL_SUCCESS, printf("CreateTensor Failed\n"); return -1);
  ret = CreateTensor(config, addrMap, outputs, output_type, false, rankId);
  CHECK_RET(ret == ACL_SUCCESS, printf("CreateTensor Failed\n"); return -1);
  return 0;
}

typedef int32_t rtError_t;
typedef void *rtNotify_t;
typedef void *rtStream_t;

// init hccl args end
extern "C" rtError_t rtNotifyWait(rtNotify_t notify, rtStream_t stm);
extern "C" rtError_t rtNotifyRecord(rtNotify_t notify, rtStream_t stm);
extern "C" rtError_t rtNotifyCreate(int32_t deviceId, rtNotify_t *notify);
extern "C" HcclResult HcclCreateComResource(char *commName, uint32_t streamMode, void **commContext);
extern "C" HcclResult HcclGetAicpuOpStreamNotify(char *commName, rtStream_t *Opstream, void **aicpuNotify);

int callMatmulAndAicpu(HcclComm hcclComm, void **inputs, void **outputs, int* input_type, int* output_type, DataSize &dataPara, aclrtStream rtStream, aclrtStream rtStream_aicpu, aclrtStream rtStream_record,
    aclrtEvent startEvent, aclrtEvent endEvent, int checkErr, int ndev, int rankId, int deviceId, int rootRank, std::string dtype, std::map<std::string, int64_t>& addrMap)  // 主函数
{
    struct timespec tp1, tp2;
    long cost;
    clock_gettime(CLOCK_MONOTONIC, &tp1);
    set_device_id(deviceId);

    // **********Add Hcommname start********
    char hcom_name[N];
    auto hccl_name_ret = HcclGetCommName(hcclComm, hcom_name);
    CHECK_RET(
        hccl_name_ret == ACL_SUCCESS, printf("[ERROR] HcclGetCommName failed. hccl_name_ret = %d \n", hccl_name_ret);
        return -1);
    printf("rank %d hcom name is %s\n", rankId, hcom_name);
    // **********Add Hcommname end*********

    void *workspaceAddr = nullptr;
    uint64_t workspaceSize = 0;
    aclOpExecutor* executor;
    int64_t split_item = 0;
    inputs[3] = nullptr;
    int64_t comm_turn = 0;
    
    auto aclnnRet = aclnnGroupedMatMulAllReduceGetWorkspaceSize(
        (aclTensorList*)inputs[0], (aclTensorList*)inputs[1], (aclTensorList*)inputs[2], (aclIntArray*)inputs[3], split_item,  hcom_name, "sum", comm_turn, 1, (aclTensorList*)outputs[0], &workspaceSize, &executor);
    CHECK_RET(aclnnRet == ACL_SUCCESS,
                printf("[ERROR] aclnnGroupedMatMulAllReduceGetWorkspaceSize failed. aclnnRet = %d \n", aclnnRet);
                return -1);
    printf("[INFO] gmm_allreduce workspaceSize = %lu\n", workspaceSize);
    if (workspaceSize > 0) {
        ACL_CHECK(aclrtMalloc(&workspaceAddr, workspaceSize, ACL_MEM_MALLOC_HUGE_FIRST));
    }
    aclnnRet = aclnnGroupedMatMulAllReduce(workspaceAddr, workspaceSize, executor, rtStream);
    CHECK_RET(aclnnRet == ACL_SUCCESS, printf("[ERROR] aclnnGroupedMatMulAllReduce failed. aclnnRet = %d \n", aclnnRet);
                return -1);
    ACL_CHECK(aclrtSynchronizeStreamWithTimeout(rtStream, 10000));
    if (workspaceAddr != nullptr) {
      auto ret = aclrtFree(workspaceAddr);
      CHECK_RET(ret == ACL_SUCCESS, printf("[ERROR] aclnnFree workspaceAddr failed. ret = %d \n", ret);
                return -1);
    }

    clock_gettime(CLOCK_MONOTONIC, &tp2);
    if (tp2.tv_sec != tp1.tv_sec) {
        cost = tp2.tv_nsec - tp1.tv_nsec + 1000000000;
    } else {
        cost = tp2.tv_nsec - tp1.tv_nsec;
    }
    printf("[INFO] mc2 costtime = %lu, deviceid = %d\n", cost, rankId);

    return 0;
}

int launchOneThread(Args &args, TensorsInfo& config)
{
    std::string thread_name = "test" + std::to_string(args.logicDeviceId);
    prctl(PR_SET_NAME, thread_name.c_str());
    cpu_set_t cpu_set;
    CPU_ZERO(&cpu_set);

    int numa_node = args.logicDeviceId;
    int cpu_num = 8;
    int cpu_start = cpu_num * args.rankId;
    for (int i = cpu_start; i < cpu_start + cpu_num; i++) {
        CPU_SET(i, &cpu_set);
    }
    int ret = sched_setaffinity(gettid(), sizeof(cpu_set), &cpu_set);
    if (ret) {
        printf("failed to set cpu affinity, errno = %d\n", errno);
        return -1;
    }
    printf("0 rankId: %d, stream: %p, context : %p\n",
        args.logicDeviceId,
        args.resources.rtStream,
        args.resources.context);

    DataSize dataPara = args.dataPara;
    int checkErr = 0;
    ACL_CHECK(aclrtSetDevice(args.logicDeviceId));
    ACL_CHECK(aclrtSetCurrentContext(args.resources.context));
    
    constexpr int input_num = 4;
    void* inputs[input_num];
    int input_type[input_num];
    constexpr int output_num = 1;
    void* outputs[output_num];
    int output_type[output_num];
    args.addrMap.clear();
    ret = PrepareDeviceMemAndData(config, args.addrMap, inputs, outputs, input_type, output_type, args.rankId);
    CHECK_RET(ret == 0, return ret);

    for (int i = 0; i < args.loop_cnt; i++) {
        printf("======Startloop: %d / %d ============\n", i, args.loop_cnt);

        ret = callMatmulAndAicpu(args.hcclComm,
            inputs,
            outputs,
            input_type,
            output_type,
            dataPara,
            args.resources.rtStream,
            args.resources.rtStream_aicpu,
            args.resources.rtStream_record,
            args.resources.startEvent,
            args.resources.endEvent,
            checkErr,
            args.ndev,
            args.logicDeviceId,
            args.rankId,
            args.rootRank,
            args.run_type,
            args.addrMap);
        if (ret != 0) {
            printf("TestCall execute AICPU failed, Detailed logs are stored in path: /root/ascend/log/");
            return ret;
        }
    }
    // save output
    // if (args.logicDeviceId == args.rootRank) {
    //   SaveTensor(config["outputs"], args.addrMap, output_dir);
    // }
    ACL_CHECK(aclrtSynchronizeStreamWithTimeout(args.resources.rtStream, 10000));
    DestoryTensor(inputs, input_type, input_num);
    DestoryTensor(outputs, output_type, output_num);
    ReleaseAddr(config.x_array, args.addrMap);
    ReleaseAddr(config.w_array, args.addrMap);
    ReleaseAddr(config.b_array, args.addrMap);
    ReleaseAddr(config.grouplist_array, args.addrMap);
    ReleaseAddr(config.y_array, args.addrMap);

    ACL_CHECK(aclrtSynchronizeStreamWithTimeout(args.resources.rtStream, 10000));
    ACL_CHECK(aclrtDestroyStreamForce(args.resources.rtStream));
    ACL_CHECK(aclrtSynchronizeStreamWithTimeout(args.resources.rtStream_aicpu, 10000));
    ACL_CHECK(aclrtDestroyStreamForce(args.resources.rtStream_aicpu));
    ACL_CHECK(aclrtSynchronizeStreamWithTimeout(args.resources.rtStream_record, 10));
    ACL_CHECK(aclrtDestroyStreamForce(args.resources.rtStream_record));
    // 销毁集合通信域
    //  HCCLCHECK(HcclCommDestroy(args.hcclComm));
    HcclCommDestroy(args.hcclComm);
    // 重置设备
    ACL_CHECK(aclrtResetDevice(args.logicDeviceId));
    return 0;
}

extern "C" int rtStreamCreateWithFlags(void **stm, int32_t priority, uint32_t flags);
int launchMultiThread(Args &input_args, int32_t *devices, HcclComm *comms, Resource *resources)
{
    DataSize dataPara;
    dataPara.data_size = 10485760;
    dataPara.type_size = sizeof(uint16_t);
    dataPara.count = (dataPara.data_size + sizeof(uint16_t) - 1) / sizeof(uint16_t);  // data->count;
    dataPara.step_factor = 1;                                                         // data->step_factor;
    dataPara.step_bytes = 1;                                                          // data->step_bytes;
    dataPara.max_bytes = 10485760;                                                    // data->max_bytes;
    dataPara.min_bytes = 10485760;                                                    // data->min_bytes;
    dataPara.op_flag = 0;                                                             // op_flag;
    input_args.bin_path = "";
    input_args.dtype = "float16";

    TensorsInfo config;
    TensorInfo x0_json;
    x0_json.name = "x0";
    x0_json.dtype = "float16";
    x0_json.data_file = "x_0.bin";
    x0_json.shape = vector<int64_t>{256,256};
    TensorInfo x1_json;
    x1_json.name = "x1";
    x1_json.dtype = "float16";
    x1_json.data_file = "x_1.bin";
    x1_json.shape = vector<int64_t>{1024,256};
    config.x_array.push_back(x0_json);
    config.x_array.push_back(x1_json);
    
    TensorInfo w0_json;
    w0_json.name = "weight0";
    w0_json.dtype = "float16";
    w0_json.data_file = "w_0.bin";
    w0_json.shape = vector<int64_t>{256,256};
    TensorInfo w1_json;
    w1_json.name = "weight1";
    w1_json.dtype = "float16";
    w1_json.data_file = "w_1.bin";
    w1_json.shape = vector<int64_t>{256, 1024};
    config.w_array.push_back(w0_json);
    config.w_array.push_back(w1_json);

    TensorInfo b0_json;
    b0_json.name = "b0";
    b0_json.dtype = "float16";
    b0_json.data_file = "b_0.bin";
    b0_json.shape = vector<int64_t>{256};
    TensorInfo b1_json;
    b1_json.name = "b1";
    b1_json.dtype = "float16";
    b1_json.data_file = "b_1.bin";
    b1_json.shape = vector<int64_t>{1024};
    config.b_array.push_back(b0_json);
    config.b_array.push_back(b1_json);
    
    
    TensorInfo group_list_json;
    group_list_json.name = "group_list";
    group_list_json.dtype = "int64";
    group_list_json.data_file = "";
    group_list_json.shape = vector<int64_t>{};
    config.grouplist_array.push_back(group_list_json);
    
    TensorInfo y0_json;
    y0_json.name = "y0";
    y0_json.dtype = "float16";
    y0_json.data_file = "y_0.bin";
    y0_json.shape = vector<int64_t>{256, 256};
    TensorInfo y1_json;
    y1_json.name = "y1";
    y1_json.dtype = "float16";
    y1_json.data_file = "y_1.bin";
    y1_json.shape = vector<int64_t>{1024, 1024};
    config.y_array.push_back(y0_json);
    config.y_array.push_back(y1_json);

    uint32_t ndev = input_args.ndev;
    Args args[ndev];
    aclrtStream rtStream[ndev * 3];
    aclrtContext context[ndev];
    for (uint32_t rankId = 0; rankId < ndev; rankId++) {
        ACL_CHECK(aclrtSetDevice(devices[rankId]));
        // ACL_CHECK(aclrtCreateStream(&rtStream[rankId]));
        // ACL_CHECK(aclrtCreateStream(&rtStream[ndev + rankId]));
        ACL_CHECK(rtStreamCreateWithFlags(&rtStream[rankId], 0, 0x600));
        ACL_CHECK(rtStreamCreateWithFlags(&rtStream[ndev + rankId], 0, 0x600));
        ACL_CHECK(rtStreamCreateWithFlags(&rtStream[ndev * 2 + rankId], 0, 0x600));
        ACL_CHECK(aclrtGetCurrentContext(&context[rankId]));
        // printf("1 rankId: %d, stream: %p\n", rankId,  rtStream[rankId]);
    }

    std::vector<std::unique_ptr<std::thread>> threads(ndev);
    for (uint32_t rankId = 0; rankId < ndev; rankId++) {
        args[rankId].rankId = rankId;
        args[rankId].rootRank = devices[0];
        args[rankId].ndev = ndev;
        args[rankId].logicDeviceId = devices[rankId];
        args[rankId].hcclComm = comms[rankId];
        args[rankId].resources = resources[rankId];
        args[rankId].dataPara = dataPara;
        args[rankId].resources.rtStream = rtStream[rankId];
        args[rankId].resources.rtStream_aicpu = rtStream[ndev + rankId];
        args[rankId].resources.rtStream_record = rtStream[ndev * 2 + rankId];
        args[rankId].resources.context = context[rankId];
        args[rankId].m = input_args.m;
        args[rankId].k = input_args.k;
        args[rankId].n = input_args.n;
        args[rankId].dtype = input_args.dtype;
        args[rankId].bin_path = input_args.bin_path;
        args[rankId].run_type = input_args.run_type;
        args[rankId].loop_cnt = input_args.loop_cnt;
        args[rankId].bias_flag = input_args.bias_flag;
        threads[rankId].reset(new (std::nothrow) std::thread(&launchOneThread, std::ref(args[rankId]), std::ref(config)));
    }
    for (uint32_t rankId = 0; rankId < ndev; rankId++) {
        threads[rankId]->join();
    }
    return 0;
}

int InitParams(int argc, char** argv) {
  if (argc < 4) {
    LOG_PRINT("miss paramaters\n");
  }
  g_ndev = std::stoi(argv[1]);
  loop = std::stoi(argv[2]);
  std::stringstream str = std::stringstream(argv[3]);
  std::string seg;
  char c = ',';
  while(std::getline(str, seg, c)) {
    device_list.push_back(std::stoi(seg));
  }
  assert (g_ndev == device_list.size());
  return 0;
}


int main(int argc, char *argv[])
{
    // usage: ./main 2 1 0,1
    int ret = 0;
    CHECK_RET(InitParams(argc, argv) == 0, LOG_PRINT("init params failed.\n"); return 0);
    Args input_args;
    input_args.ndev = g_ndev;
    input_args.run_type = string("reduce");
    input_args.loop_cnt = loop;

    if (input_args.ndev != 1 && input_args.ndev != 2 && input_args.ndev != 4 && input_args.ndev != 8) {
        printf("device_num input error, only support 8,4,2,1.\n");
        return -1;
    }
    int32_t devices[input_args.ndev];
    for (int i = 0; i < input_args.ndev; i++) {
        devices[i] = device_list[i];
    }

    HcclComm comms[N];
    Resource resources[N];

    ACL_CHECK(aclInit(NULL));
    for (int i = 0; i < 1; i++) {
        // 初始化集合通信域
        for (int i = 0; i < input_args.ndev; i++) {
            ACL_CHECK(aclrtSetDevice(devices[i]));
        }
        ret = HcclCommInitAll(input_args.ndev, devices, comms);
        if (ret != 0) {
            printf("This is an error in init_hcclComm.\n");
            return -1;
        }

        // 启动测试
        ret = launchMultiThread(input_args, devices, comms, resources);
        if (ret != 0) {
            printf("This is an error in opbase_test_by_data_size.\n");
            // ACL_CHECK(aclFinalize());
            return -1;
        }
    }
    ACL_CHECK(aclFinalize());
    return ret;
}