下载
中文
注册

aclnnBatchMatMulReduceScatterAlltoAll

支持的产品型号

该接口为试验版本,当前不支持应用于商用产品中,后续版本会作为正式功能更新发布。

接口原型

每个算子分为两段式接口,必须先调用“aclnnBatchMatMulReduceScatterAlltoAllGetWorkspaceSize”接口获取计算所需workspace大小以及包含了算子计算流程的执行器,再调用“aclnnBatchMatMulReduceScatterAlltoAll”接口执行计算。

  • aclnnStatus aclnnBatchMatMulReduceScatterAlltoAllGetWorkspaceSize(const aclTensor* x, const aclTensor* weight, const aclTensor* biasOptional, const char* groupEp, const char* groupTp, int64_t epWorldSize, int64_t tpWorldSize, int64_t yShardType, aclTensor* out, uint64_t* workspaceSize, aclOpExecutor** executor)
  • aclnnStatus aclnnBatchMatMulReduceScatterAlltoAll(void* workspace, uint64_t workspaceSize, aclOpExecutor* executor, aclrtStream stream)

功能描述

  • 算子功能:BatchMatMulReduceScatterAllToAll是通算融合算子,实现BatchMatMul计算与ReduceScatter、AllToAll集合通信并行的算子。

  • 计算公式:大体计算流程为:BatchMatMul计算-->转置(yShardType等于0时需要)-->ReduceScatter集合通信-->Add-->AllToAll集合通信。

  • 计算逻辑如下,其中y为输出

    temp1=BatchMatMul(xweight)temp1 = BatchMatMul(x,weight) temp2=ReduceScatter(temp1)temp2 = ReduceScatter(temp1) temp3=Add(temp2,bias)temp3 = Add(temp2, bias) y=AllToAll(temp3)y = AllToAll(temp3)

aclnnBatchMatMulReduceScatterAlltoAllGetWorkspaceSize

  • 参数说明:

    • x(aclTensor*,计算输入):BatchMatMul计算的左矩阵。数据类型支持FLOAT16、BFLOAT16,必须为3维。数据格式支持ND。
    • weight(aclTensor*,计算输入):BatchMatMul计算的右矩阵。数据类型支持FLOAT16、BFLOAT16,必须为3维,必须为3维,类型与x保持一致。数据格式支持ND。
    • biasOptional(aclTensor*,计算输入):BatchMatMul计算的bias。数据类型支持FLOAT16、FLOAT32。x为FLOAT16时,biasOptional需为FLOAT16;x为BFLOAT16时,biasOptional需为FLOAT32。支持两维或三维。数据格式支持ND。(由于要进行ReduceScatter通信,因此需要在通信之后再Add)。支持传入空指针。
    • groupEp(char*,计算输入):专家并行的通信域名。字符串长度需大于0,小于128。
    • groupTp(char*,计算输入):Tensor并行的通信域名。字符串长度需大于0,小于128。
    • epWorldSize(int64_t,计算输入):ep通信域size,支持2/4/8/16。
    • tpWorldSize(int64_t,计算输入):tp通信域size,支持2/4/8/16。
    • yShardType(int64_t,计算输入):默认值为0。0表示在H维度(即BatchMatMul计算结果的第2维,计算结果共3维,分别为第0维、第1维、第2维)按tp进行ReduceScatter,1表示在C维度(即BatchMatMul计算结果的第1维)按tp进行ReduceScatter。当前仅支持yShardType等于1的场景。
    • out(aclTensor*,计算输出):Device侧的aclTensor,batch_matmul计算+reduce_scatter计算+all_to_all通信的结果。数据类型支持FLOAT16、BFLOAT16,必须为3维。类型与输入x保持一致。数据格式支持ND。
    • workspaceSize(uint64_t*,出参):返回需要在Device侧申请的workspace大小。
    • executor(aclOpExecutor**,出参):返回op执行器,包含了算子计算流程。
  • 返回值:

    返回aclnnStatus状态码,具体参见aclnn返回码

第一段接口完成入参校验,出现以下场景时报错:
161001 (ACLNN_ERR_PARAM_NULLPTR): 1. 传入的x、weight、groupEp、groupTp或out是空指针。
161002 (ACLNN_ERR_PARAM_INVALID): 1. groupEp或groupTp字符串长度不合法;
                                  2. 输入不支持的数据类型;
                                  3. 属性值不合法;
                                  4. aclTensor维度不合法;
                                  5. aclTensor shape不合法。

aclnnBatchMatMulReduceScatterAlltoAll

  • 参数说明:

    • workspace(void*,入参):在Device侧申请的workspace内存地址。
    • workspaceSize(uint64_t,入参):在Device侧申请的workspace大小,由第一段接口aclnnBatchMatMulReduceScatterAlltoAllGetWorkspaceSize获取。
    • executor(aclOpExecutor*,入参):op执行器,包含了算子计算流程。
    • stream(aclrtStream,入参):指定执行任务的AscendCL stream流。
  • 返回值:

    返回aclnnStatus状态码,具体参见aclnn返回码

约束与限制

因为集合通信及BatchMatMul计算所需,输入输出shape需满足以下数学关系:(其中ep=epWorldSize,tp=tpWorldSize) 按H轴进行ReduceScatter场景,即shard_type为0场景(暂不支持该场景):

  • x: (E/ep, ep*C, M/tp)
  • weight:(E/ep, M/tp, H)
  • biasOptional:非空指针情况下,三维时为(E/ep, 1, H/tp) 两维时为(E/ep, H/tp)
  • y:(E, C, H/tp)

按C轴进行ReduceScatter场景,即shard_type为1场景:

  • x: (E/ep, ep*tp*C/tp, M/tp)
  • weight:(E/ep, M/tp, H)
  • biasOptional:(E/ep, 1, H) 两维时为(E/ep, H)
  • y:(E, C/tp, H)

数据关系说明:

  • 比如x.size(0)等于E/tp,y.size(0)等于E,则表示,y.size(0) = ep*x.size(0),y.size(0)是ep的整数倍;其他关系类似。
  • E的取值范围为[2, 512],且E是ep的整数倍。
  • H的取值范围为:[1, 65535]。
  • M/tp的取值范围为:[1, 65535]。
  • E/ep的取值范围为:[1, 32]。
  • ep、tp均仅支持2、4、8、16。
  • groupEp和groupTp名称不能相同。
  • C大于0,上限为算子device内存上限。
  • 通算融合算子不支持并发调用,不同的通算融合算子也不支持并发调用。
  • 不支持跨超节点,只支持超节点内,ep域AlltoAll支持超节点内跨节点,tp域ReduceScatter仅支持超节点内单一节点。

调用示例

示例代码如下,仅供参考,具体编译和执行过程请参考编译与运行样例

#include <thread>
#include <iostream>
#include <string>
#include <vector>
#include "acl/acl.h"
#include "hccl/hccl.h"
#include "aclnnop/aclnn_batch_matmul_reduce_scatter_all_to_all.h"

#define CHECK_RET(cond, return_expr) \
    do {                             \
        if (!(cond)) {               \
            return_expr;             \
        }                            \
    } while (0)

#define LOG_PRINT(message, ...)         \
    do {                                \
        printf(message, ##__VA_ARGS__); \
    } while(0)

constexpr int EP_WORLD_SIZE = 4;
constexpr int TP_WORLD_SIZE = 2;
constexpr int DEV_NUM = EP_WORLD_SIZE * TP_WORLD_SIZE;

int64_t GetShapeSize(const std::vector<int64_t> &shape)
{
    int64_t shape_size = 1;
    for (auto i : shape) {
        shape_size *= i;
    }
    return shape_size;
}

template<typename T>
int CreateAclTensor(const std::vector<T> &hostData, const std::vector<int64_t> &shape, void **deviceAddr,
    aclDataType dataType, aclTensor **tensor)
{
    auto size = GetShapeSize(shape) * sizeof(T);
    auto ret = aclrtMalloc(deviceAddr, size, ACL_MEM_MALLOC_HUGE_FIRST);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMalloc failed. ret: %d\n", ret); return ret);
    ret = aclrtMemcpy(*deviceAddr, size, hostData.data(), size, ACL_MEMCPY_HOST_TO_DEVICE);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMemcpy failed. ret: %d\n", ret); return ret);
    std::vector<int64_t> strides(shape.size(), 1);
    for (int64_t i = shape.size() - 2; i >= 0; i--) {
        strides[i] = shape[i +1] * strides[i + 1];
    }
    *tensor = aclCreateTensor(shape.data(), shape.size(), dataType, strides.data(), 0, aclFormat::ACL_FORMAT_ND,
        shape.data(), shape.size(), *deviceAddr);
    return 0;
}

struct Args {
    int rankId;
    HcclComm hcclEpComm;
    HcclComm hcclTpComm;
    aclrtStream stream;
    aclrtContext context;
  };

int LaunchOneThreadBatchMMRSAlltoAll(Args &args)
{
    int ret = aclrtSetCurrentContext(args.context);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSetCurrentContext failed. ret: %d\n", ret); return ret);
    char hcomEpName[128] = {0};
    ret = HcclGetCommName(args.hcclEpComm, hcomEpName);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclGetEpCommName failed. ret: %d\n", ret); return -1);
    char hcomTpName[128] = {0};
    ret = HcclGetCommName(args.hcclTpComm, hcomTpName);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclGetTpCommName failed. ret: %d\n", ret); return -1);
    LOG_PRINT("[INFO] rank = %d, hcomEpName = %s, hcomTpName = %s, stream = %p, context = %p\n", args.rankId,
        hcomEpName, hcomTpName, args.stream, args.context);

    int64_t E = 4;
    int64_t C = 256;
    int64_t H = 128;
    int64_t M = 256;
    int64_t xShardType = 1;
    
    std::vector<int64_t> xShape;
    std::vector<int64_t> weightShape;
    std::vector<int64_t> biasShape;
    std::vector<int64_t> yOutShape;

    if (xShardType == 1) {
        xShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, M / TP_WORLD_SIZE};
        weightShape = {E / EP_WORLD_SIZE, M / TP_WORLD_SIZE, H};
        biasShape = {E / EP_WORLD_SIZE, 1, H};
        yOutShape = {E, C / TP_WORLD_SIZE, H};
    } else if (xShardType == 0) {
        xShape = {E, C, H / TP_WORLD_SIZE};
        weightShape = {E / EP_WORLD_SIZE, H, M / TP_WORLD_SIZE};
        biasShape = {E / EP_WORLD_SIZE, 1, M / TP_WORLD_SIZE};
        yOutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, M / TP_WORLD_SIZE};
    } else {
        LOG_PRINT("[ERROR] unsupported xShardType = %ld.\n", xShardType);
        return -1;
    }

    printf("x_shape: %d %d %d\n", xShape[0], xShape[1], xShape[2]);
    printf("weight_shape: %d %d %d\n", weightShape[0], weightShape[1], weightShape[2]);
    printf("bias_shape: %d %d %d\n", biasShape[0], biasShape[1], biasShape[2]);
    printf("y_shape: %d %d %d\n", yOutShape[0], yOutShape[1], yOutShape[2]);

    void *xDeviceAddr = nullptr;
    void *weightDeviceAddr = nullptr;
    void *biasDeviceAddr = nullptr;
    void *yOutDeviceAddr = nullptr;
    aclTensor *x = nullptr;
    aclTensor *weight = nullptr;
    aclTensor *bias = nullptr;
    aclTensor *yOut = nullptr;

    uint64_t workspaceSize = 0;
    aclOpExecutor *executor = nullptr;
    void *workspaceAddr = nullptr;

    long long xShapeSize = GetShapeSize(xShape);
    long long weightShapeSize = GetShapeSize(weightShape);
    long long biasShapeSize = GetShapeSize(biasShape);
    long long yOutShapeSize = GetShapeSize(yOutShape);

    std::vector<int16_t> xHostData(xShapeSize, 1);
    std::vector<int16_t> weightHostData(weightShapeSize, 2);
    std::vector<int16_t> biasHostData(biasShapeSize, 3);
    std::vector<int16_t> y1OutHostData(yOutShapeSize, 0);

    ret = CreateAclTensor(xHostData, xShape, &xDeviceAddr, aclDataType::ACL_FLOAT16, &x);
    CHECK_RET(ret == ACL_SUCCESS, return ret);
    ret = CreateAclTensor(weightHostData, weightShape, &weightDeviceAddr, aclDataType::ACL_FLOAT16, &weight);
    CHECK_RET(ret == ACL_SUCCESS, return ret);
    ret = CreateAclTensor(y1OutHostData, yOutShape, &yOutDeviceAddr, aclDataType::ACL_FLOAT16, &yOut);
    CHECK_RET(ret == ACL_SUCCESS, return ret);

    // 调用第一阶段接口
    ret = aclnnBatchMatMulReduceScatterAlltoAllGetWorkspaceSize(x, weight, bias, hcomEpName, hcomTpName, EP_WORLD_SIZE,
        TP_WORLD_SIZE, xShardType, yOut, &workspaceSize, &executor);
    CHECK_RET(ret == ACL_SUCCESS,
        LOG_PRINT("[ERROR] aclnnBatchMatMulReduceScatterAlltoAllGetWorkspaceSize failed. ret = %d \n", ret); return ret);
    // 根据第一阶段接口计算出的workspaceSize申请device内存
    if (workspaceSize > 0) {
        ret = aclrtMalloc(&workspaceAddr, workspaceSize, ACL_MEM_MALLOC_HUGE_FIRST);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMalloc workspace failed. ret = %d \n", ret); return ret);
    }
    // 调用第二阶段接口
    ret = aclnnBatchMatMulReduceScatterAlltoAll(workspaceAddr, workspaceSize, executor, args.stream);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclnnBatchMatMulReduceScatterAlltoAll failed. ret = %d \n", ret);
        return ret);
    // (固定写法)同步等待任务执行结束
    ret = aclrtSynchronizeStreamWithTimeout(args.stream, 10000);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSynchronizeStreamWithTimeout failed. ret = %d \n", ret);
        return ret);
    LOG_PRINT("[INFO] device_%d aclnnBatchMatMulReduceScatterAlltoAll execute successfully.\n", args.rankId);
    // 释放device资源,需要根据具体API的接口定义修改
    if (x != nullptr) {
        aclDestroyTensor(x);
    }
    if (weight != nullptr) {
        aclDestroyTensor(weight);
    }
    if (bias != nullptr) {
        aclDestroyTensor(bias);
    }
    if (yOut != nullptr) {
        aclDestroyTensor(yOut);
    }
    if (xDeviceAddr != nullptr) {
        aclrtFree(xDeviceAddr);
    }
    if (weightDeviceAddr != nullptr) {
        aclrtFree(weightDeviceAddr);
    }
    if (biasDeviceAddr != nullptr) {
        aclrtFree(biasDeviceAddr);
    }
    if (yOutDeviceAddr != nullptr) {
        aclrtFree(yOutDeviceAddr);
    }
    if (workspaceSize > 0) {
        aclrtFree(workspaceAddr);
    }
    ret = aclrtDestroyStream(args.stream);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtDestroyStream failed. ret = %d \n", ret); return ret);
    ret = aclrtResetDevice(args.rankId);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtResetDevice failed. ret = %d \n", ret); return ret);
    return 0;
}

int main(int argc, char *argv[])
{
    int ret = aclInit(nullptr);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclInit failed. ret = %d \n", ret); return ret);
    aclrtStream stream[DEV_NUM];
    aclrtContext context[DEV_NUM];
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
        ret = aclrtSetDevice(rankId);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSetDevice failed. ret = %d \n", ret); return ret);
        ret = aclrtCreateContext(&context[rankId], rankId);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtCreateContext failed. ret = %d \n", ret); return ret);
        ret = aclrtCreateStream(&stream[rankId]);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtCreateStream failed. ret = %d \n", ret); return ret);
    }

    int32_t devices[DEV_NUM];
    for (int i = 0; i < DEV_NUM; i++) {
        devices[i] = i;
    }
    // 初始化集合通信域
    HcclComm comms[DEV_NUM];
    ret = HcclCommInitAll(DEV_NUM, devices, comms);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll failed. ret = %d \n", ret); return ret);

    // 初始化EP域,EP域内的TP域内是顺序,TP域之间跳 ep_size, ep_size=4, tp_size=2, {0,1,4,5,2,3,6,7} {0,1,4,5} {2,3,6,7}
    int32_t devicesEp[DEV_NUM];
    for (int i = 0; i < DEV_NUM; i++) {
        int epIdx = i / EP_WORLD_SIZE;
        int elementIdx = i - epIdx * EP_WORLD_SIZE;
        int deviceIdx = epIdx * TP_WORLD_SIZE + elementIdx % TP_WORLD_SIZE + elementIdx / TP_WORLD_SIZE * EP_WORLD_SIZE;
        devicesEp[i] = devices[deviceIdx];
        std::cout << "EP test devices id " << i << " = " << devicesEp[i] << std::endl;
    }
    HcclComm commsEp[DEV_NUM];
    for (int i = 0; i < DEV_NUM / EP_WORLD_SIZE; i++) {
        ret = HcclCommInitAll(EP_WORLD_SIZE, &devicesEp[i * EP_WORLD_SIZE], &commsEp[i * EP_WORLD_SIZE]);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll ep world %d failed. ret = %d \n", i, ret);
            return ret);
    }

    // 初始化TP域,TP域按照顺序,tp_size=2 {0,1,2,3,4,5,6,7} {0,1} {2,3} {4,5} {6,7}
    int32_t devicesTp[DEV_NUM];
    for (int i = 0; i < DEV_NUM; i++) {
        devicesTp[i] = devices[i];
        std::cout << "TP test devices id " << i << " = " << devicesTp[i] << std::endl;
    }
    HcclComm commsTp[DEV_NUM];
    for (int i = 0; i < DEV_NUM / TP_WORLD_SIZE; i++) {
        ret = HcclCommInitAll(TP_WORLD_SIZE, &devicesTp[i * TP_WORLD_SIZE], &commsTp[i * TP_WORLD_SIZE]);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll tp world %d failed. ret = %d \n", i, ret);
            return ret);
    }

    Args args[DEV_NUM];
    // 启动多线程
    std::vector<std::unique_ptr<std::thread>> threads(DEV_NUM);
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
        args[rankId].rankId = rankId;

        uint32_t commsEpIdx;
        for (uint32_t i = 0; i < DEV_NUM; i++) {
            if (devicesEp[i] == rankId) {
                commsEpIdx = i;
                break;
            }
        }
        args[rankId].hcclEpComm = commsEp[commsEpIdx];

        uint32_t commsTpIdx;
        for (uint32_t i = 0; i < DEV_NUM; i++) {
            if (devicesTp[i] == rankId) {
                commsTpIdx = i;
                break;
            }
        }
        args[rankId].hcclTpComm = commsTp[commsTpIdx];

        args[rankId].stream = stream[rankId];
        args[rankId].context = context[rankId];
        threads[rankId].reset(new std::thread(&LaunchOneThreadBatchMMRSAlltoAll, std::ref(args[rankId])));
    }
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
        threads[rankId]->join();
    }
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
        auto hcclRet = HcclCommDestroy(commsEp[rankId]);
        CHECK_RET(hcclRet == HCCL_SUCCESS, LOG_PRINT("[ERROR] HcclCommDestory failed. ret = %d \n", ret); return -1);
        hcclRet = HcclCommDestroy(commsTp[rankId]);
        CHECK_RET(hcclRet == HCCL_SUCCESS, LOG_PRINT("[ERROR] HcclCommDestory failed. ret = %d \n", ret); return -1);
    }
    aclFinalize();
    return 0;
}