下载
中文
注册

aclnnAlltoAllAllGatherBatchMatMul

支持的产品型号

当前版本不支持该接口。

接口原型

每个算子分为两段式接口,必须先调用“aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize”接口获取计算所需workspace大小以及包含了算子计算流程的执行器,再调用“aclnnAlltoAllAllGatherBatchMatMul”接口执行计算。

  • aclnnStatus aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize(const aclTensor* x, const aclTensor* weight, const aclTensor* biasOptional, const char* groupEp, const char* groupTp, int64_t epWorldSize, int64_t tpWorldSize, int64_t xShardType, int64_t actType, aclTensor* y1Out, aclTensor* y2OutOptional, aclTensor* y3OutOptional, uint64_t* workspaceSize, aclOpExecutor** executor)
  • aclnnStatus aclnnAlltoAllAllGatherBatchMatMul(void* workspace, uint64_t workspaceSize, aclOpExecutor* executor, aclrtStream stream)

功能描述

  • 算子功能:完成AllToAll、AllGather集合通信与BatchMatMul计算融合、并行。

  • 计算公式

  • 计算逻辑如下,其中y1 y2 y3为输出

    x1=AllToAll(x)x1 = AllToAll(x) y2=AllGather(x1)y2 = AllGather(x1) y3=BatchMatMul(y2,weight,bias)y3 = BatchMatMul(y2, weight, bias) y1=激活函数(y3)y1 = 激活函数(y3)

aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize

  • 参数说明:

    • x(aclTensor*,计算输入):通信后结果作为BatchMatMul计算的左矩阵。数据类型支持float16,bfloat16。该输入进行AllToAll、AllGather集合通信,必须为3维,数据格式支持ND。
    • weight(aclTensor*,计算输入):BatchMatMul计算的右矩阵。数据类型支持float16, bfloat16,类型与x保持一致,必须为3维,数据格式支持ND。
    • biasOptional(aclTensor*,计算输入):BatchMatMul计算的bias,数据类型支持float16, float32。x为float16时,bias需为float16;x为bfloat16时,bias需为float32,支持两维或三维,数据格式支持ND。支持传入空指针。
    • groupEp(char*,计算输入):ep通信域名称,专家并行的通信域。字符串长度需大于0,小于128。
    • groupTp(char*,计算输入):tp通信域名称,Tensor并行的通信域。字符串长度需大于0,小于128。
    • epWorldSize(int64_t,计算输入):ep通信域size,支持2/4/8/16。
    • tpWorldSize(int64_t,计算输入):tp通信域size,支持2/4/8/16。
    • xShardType(int64_t,计算输入):0表示在H维度(即x的第2维,x为3维,分别为第0维、第1维、第2维)按tp域进行allgather,1表示在C维度(即x的第1维)上按tp域进行allgather。当前仅支持xShardType等于1的场景。
    • actType(int64_t,计算输入):激活函数类型,支持0/1/2/3/4的输入,0表示无激活函数,对应关系为[0:None,1:GELU,2:Silu,3:Relu,4:FastGELU]。
    • y1Out(aclTensor*,计算输出):数据类型float16, bfloat16,支持3维,数据类型与输入x保持一致。数据格式支持:ND。最终计算结果,如果有激活函数则为激活函数的输出,否则为BatchMatMul的输出。
    • y2OutOptional(aclTensor*,计算输出):可选输出,数据类型float16, bfloat16,支持3维,数据类型与输入x保持一致。数据格式支持:ND。AllGather的输出,反向可能需要。空指针表示不需要该输出。
    • y3OutOptional(aclTensor*,计算输出):可选输出,数据类型float16, bfloat16,支持3维,数据类型与输入x保持一致。数据格式支持:ND。有激活函数时,BatchMatMul的输出,空指针表示不需要该输出。
    • workspaceSize(uint64_t*,出参):返回需要在Device侧申请的workspace大小。
    • executor(aclOpExecutor**,出参):返回op执行器,包含了算子计算流程。
  • 返回值:

    返回aclnnStatus状态码,具体参见aclnn返回码

aclnnAlltoAllAllGatherBatchMatMul

  • 参数说明:

    • workspace(void*,入参):在Device侧申请的workspace内存地址。
    • workspaceSize(uint64_t,入参):在Device侧申请的workspace大小,由第一段接口aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize获取。
    • executor(aclOpExecutor*,入参):op执行器,包含了算子计算流程。
    • stream(aclrtStream,入参):指定执行任务的AscendCL stream流。
  • 返回值:

    返回aclnnStatus状态码,具体参见aclnn返回码

约束与限制

因为集合通信及BatchMatMul计算所需,输入输出shape需满足以下数学关系:(其中ep=epWorldSize,tp=tpWorldSize)

  • x: (E,C/tp,H)
  • weight:(E/ep,H,M/tp)
  • biasOptional:非空指针情况下,三维时为(E/ep,1,M/tp),两维时为(E/ep,M/tp)
  • y1Out:(E/ep,ep*tp*C/tp,M/tp)
  • y2OutOptional:(E/ep,ep*tp*C/tp,H)
  • y3OutOptional:(E/ep,ep*tp*C/tp,M/tp) 数据关系说明:
  • 比如x.size(0)等于E,weight.size(0)等于E/ep,则表示,x.size(0) = ep*weight.size(0),x.size(0)是ep的整数倍;其他关系类似。
  • E的取值范围为[2, 512],且E是ep的整数倍。
  • H的取值范围为:[1, 65535]。
  • M/tp的取值范围为:[1, 65535]。
  • E/ep的取值范围为:[1, 32]。
  • ep、tp均仅支持2、4、8、16。
  • groupEp和groupTp名称不能相同。
  • C必须大于0,上限为算子device内存上限。
  • 通算融合算子不支持并发调用,不同的通算融合算子也不支持并发调用。
  • 不支持跨超节点,只支持超节点内,ep域AlltoAll支持超节点内跨节点,tp域AllGather仅支持超节点内单一节点。

调用示例

示例代码如下,仅供参考:

#include <thread>
#include <iostream>
#include <string>
#include <vector>
#include "acl/acl.h"
#include "hccl/hccl.h"
#include "aclnnop/aclnn_all_to_all_all_gather_batch_matmul.h"

#define CHECK_RET(cond, return_expr) \
    do {                             \
        if (!(cond)) {               \
            return_expr;             \
        }                            \
    } while (0)

#define LOG_PRINT(message, ...)         \
    do {                                \
        printf(message, ##__VA_ARGS__); \
    } while(0)

constexpr int EP_WORLD_SIZE = 4;
constexpr int TP_WORLD_SIZE = 2;
constexpr int DEV_NUM = EP_WORLD_SIZE * TP_WORLD_SIZE;

int64_t GetShapeSize(const std::vector<int64_t> &shape)
{
    int64_t shape_size = 1;
    for (auto i : shape) {
        shape_size *= i;
    }
    return shape_size;
}

template<typename T>
int CreateAclTensor(const std::vector<T> &hostData, const std::vector<int64_t> &shape, void **deviceAddr,
    aclDataType dataType, aclTensor **tensor)
{
    auto size = GetShapeSize(shape) * sizeof(T);
    auto ret = aclrtMalloc(deviceAddr, size, ACL_MEM_MALLOC_HUGE_FIRST);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMalloc failed. ret: %d\n", ret); return ret);
    ret = aclrtMemcpy(*deviceAddr, size, hostData.data(), size, ACL_MEMCPY_HOST_TO_DEVICE);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMemcpy failed. ret: %d\n", ret); return ret);
    std::vector<int64_t> strides(shape.size(), 1);
    for (int64_t i = shape.size() - 2; i >= 0; i--) {
        strides[i] = shape[i + 1] * strides[i + 1];
    }
    *tensor = aclCreateTensor(shape.data(), shape.size(), dataType, strides.data(), 0, aclFormat::ACL_FORMAT_ND,
        shape.data(), shape.size(), *deviceAddr);
    return 0;
}

struct Args {
    int rankId;
    HcclComm hcclEpComm;
    HcclComm hcclTpComm;
    aclrtStream stream;
    aclrtContext context;
};

int LaunchOneThreadAlltoAllAllGatherBmm(Args &args)
{
    int ret = aclrtSetCurrentContext(args.context);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSetCurrentContext failed. ret: %d\n", ret); return ret);
    char hcomEpName[128] = {0};
    ret = HcclGetCommName(args.hcclEpComm, hcomEpName);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclGetEpCommName failed. ret: %d\n", ret); return -1);
    char hcomTpName[128] = {0};
    ret = HcclGetCommName(args.hcclTpComm, hcomTpName);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclGetTpCommName failed. ret: %d\n", ret); return -1);
    LOG_PRINT("[INFO] rank = %d, hcomEpName = %s, hcomTpName = %s, stream = %p, context = %p\n", args.rankId,
    hcomEpName, hcomTpName, args.stream, args.context);
    
    int64_t E = 4;
    int64_t C = 2;
    int64_t H = 6;
    int64_t M = 4;
    int64_t xShardType = 1;
    int64_t actType = 1;
    
    std::vector<int64_t> xShape;
    std::vector<int64_t> weightShape;
    std::vector<int64_t> biasShape;
    std::vector<int64_t> y1OutShape;
    std::vector<int64_t> y2OutShape;
    std::vector<int64_t> y3OutShape;
    
    if (xShardType == 1) {
        xShape = {E, C / TP_WORLD_SIZE, H};
        weightShape = {E / EP_WORLD_SIZE, H, M / TP_WORLD_SIZE};
        biasShape = {E / EP_WORLD_SIZE, 1, M / TP_WORLD_SIZE};
        y1OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, M / TP_WORLD_SIZE};
        y2OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, H};
        y3OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, M / TP_WORLD_SIZE};
    } else if (xShardType == 0) {
        xShape = {E, C, H / TP_WORLD_SIZE};
        weightShape = {E / EP_WORLD_SIZE, H, M / TP_WORLD_SIZE};
        biasShape = {E / EP_WORLD_SIZE, 1, M / TP_WORLD_SIZE};
        y1OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, M / TP_WORLD_SIZE};
        y2OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, H};
        y3OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, M / TP_WORLD_SIZE};
    } else {
        LOG_PRINT("[ERROR] unsupported xShardType = %ld.\n", xShardType);
        return -1;
    }

    void *xDeviceAddr = nullptr;
    void *weightDeviceAddr = nullptr;
    void *biasDeviceAddr = nullptr;
    void *y1OutDeviceAddr = nullptr;
    void *y2OutDeviceAddr = nullptr;
    void *y3OutDeviceAddr = nullptr;
    aclTensor *x = nullptr;
    aclTensor *weight = nullptr;
    aclTensor *bias = nullptr;
    aclTensor *y1Out = nullptr;
    aclTensor *y2Out = nullptr;
    aclTensor *y3Out = nullptr;

    uint64_t workspaceSize = 0;
    aclOpExecutor *executor = nullptr;
    void *workspaceAddr = nullptr;

    long long xShapeSize = GetShapeSize(xShape);
    long long weightShapeSize = GetShapeSize(weightShape);
    long long biasShapeSize = GetShapeSize(biasShape);
    long long y1OutShapeSize = GetShapeSize(y1OutShape);
    long long y2OutShapeSize = GetShapeSize(y2OutShape);
    long long y3OutShapeSize = GetShapeSize(y3OutShape);
    
    std::vector<int16_t> xHostData(xShapeSize, 0);
    std::vector<int16_t> weightHostData(weightShapeSize, 0);
    std::vector<int16_t> biasHostData(biasShapeSize, 0);
    std::vector<int16_t> y1OutHostData(y1OutShapeSize, 0);
    std::vector<int16_t> y2OutHostData(y2OutShapeSize, 0);
    std::vector<int16_t> y3OutHostData(y3OutShapeSize, 0);

    ret = CreateAclTensor(xHostData, xShape, &xDeviceAddr, aclDataType::ACL_FLOAT16, &x);
    CHECK_RET(ret == ACL_SUCCESS, return ret);
    ret = CreateAclTensor(weightHostData, weightShape, &weightDeviceAddr, aclDataType::ACL_FLOAT16, &weight);
    CHECK_RET(ret == ACL_SUCCESS, return ret);
    ret = CreateAclTensor(y1OutHostData, y1OutShape, &y1OutDeviceAddr, aclDataType::ACL_FLOAT16, &y1Out);
    CHECK_RET(ret == ACL_SUCCESS, return ret);
    ret = CreateAclTensor(y2OutHostData, y2OutShape, &y2OutDeviceAddr, aclDataType::ACL_FLOAT16, &y2Out);
    CHECK_RET(ret == ACL_SUCCESS, return ret);
    ret = CreateAclTensor(y3OutHostData, y3OutShape, &y3OutDeviceAddr, aclDataType::ACL_FLOAT16, &y3Out);
    CHECK_RET(ret == ACL_SUCCESS, return ret);

    // 调用第一阶段接口
    ret = aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize(x, weight, bias, hcomEpName, hcomTpName, EP_WORLD_SIZE,
        TP_WORLD_SIZE, xShardType, actType, y1Out, y2Out, y3Out, &workspaceSize, &executor);
    CHECK_RET(ret == ACL_SUCCESS,
        LOG_PRINT("[ERROR] aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize failed. ret = %d \n", ret); return ret);
    // 根据第一阶段接口计算出的workspaceSize申请device内存
    if (workspaceSize > 0) {
        ret = aclrtMalloc(&workspaceAddr, workspaceSize, ACL_MEM_MALLOC_HUGE_FIRST);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMalloc workspace failed. ret = %d \n", ret); return ret);
    }
    // 调用第二阶段接口
    ret = aclnnAlltoAllAllGatherBatchMatMul(workspaceAddr, workspaceSize, executor, args.stream);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclnnAlltoAllAllGatherBatchMatMul failed. ret = %d \n", ret);
        return ret);
    // (固定写法)同步等待任务执行结束
    ret = aclrtSynchronizeStreamWithTimeout(args.stream, 10000);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSynchronizeStreamWithTimeout failed. ret = %d \n", ret);
        return ret);
    LOG_PRINT("[INFO] device_%d aclnnAlltoAllAllGatherBatchMatMul execute successfully.\n", args.rankId);
    // 释放device资源,需要根据具体API的接口定义修改
    if (x != nullptr) {
        aclDestroyTensor(x);
    }
    if (weight != nullptr) {
        aclDestroyTensor(weight);
    }
    if (bias != nullptr) {
        aclDestroyTensor(bias);
    }
    if (y1Out != nullptr) {
        aclDestroyTensor(y1Out);
    }
    if (y2Out != nullptr) {
        aclDestroyTensor(y2Out);
    }
    if (y3Out != nullptr) {
        aclDestroyTensor(y3Out);
    }
    if (xDeviceAddr != nullptr) {
        aclrtFree(xDeviceAddr);
    }
    if (weightDeviceAddr != nullptr) {
        aclrtFree(weightDeviceAddr);
    }
    if (biasDeviceAddr != nullptr) {
        aclrtFree(biasDeviceAddr);
    }
    if (y1OutDeviceAddr != nullptr) {
        aclrtFree(y1OutDeviceAddr);
    }
    if (y2OutDeviceAddr != nullptr) {
        aclrtFree(y2OutDeviceAddr);
    }
    if (y3OutDeviceAddr != nullptr) {
        aclrtFree(y3OutDeviceAddr);
    }
    if (workspaceSize > 0) {
        aclrtFree(workspaceAddr);
    }
    aclrtDestroyStream(args.stream);
    aclrtDestroyContext(args.context);
    HcclCommDestroy(args.hcclEpComm);
    HcclCommDestroy(args.hcclTpComm);
    aclrtResetDevice(args.rankId);
    return 0;
}

int main(int argc, char *argv[])
{
    int ret = aclInit(nullptr);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclInit failed. ret = %d \n", ret); return ret);
    aclrtStream stream[DEV_NUM];
    aclrtContext context[DEV_NUM];
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
    ret = aclrtSetDevice(rankId);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSetDevice failed. ret = %d \n", ret); return ret);
    ret = aclrtCreateContext(&context[rankId], rankId);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtCreateContext failed. ret = %d \n", ret); return ret);
    ret = aclrtCreateStream(&stream[rankId]);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtCreateStream failed. ret = %d \n", ret); return ret);
    }

    int32_t devices[DEV_NUM];
    for (int i = 0; i < DEV_NUM; i++) {
        devices[i] = i;
    }
    // 初始化集合通信域
    HcclComm comms[DEV_NUM];
    ret = HcclCommInitAll(DEV_NUM, devices, comms);
    CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll failed. ret = %d \n", ret); return ret);

    // 初始化ep域  ep = 4  {0,1,2,3} {4,5,6,7}
    HcclComm commsEp[DEV_NUM];
    for (int i = 0; i < DEV_NUM / EP_WORLD_SIZE; i++) {
        ret = HcclCommInitAll(EP_WORLD_SIZE, &devices[i * EP_WORLD_SIZE], &commsEp[i * EP_WORLD_SIZE]);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll ep world %d failed. ret = %d \n", i, ret);
            return ret);
    }
    // 初始化tp域 tp=2 {0,4,1,5,2,6,3,7}  {0,4} {1,5} {2,6} {3,7}
    int32_t devicesTp[DEV_NUM];
    for (int i = 0; i < DEV_NUM - 1; i++) { // Tp init按照顺序init,但同域是device id配对为[0,4],[1,5],...,[3,7].
        devicesTp[i] = devices[DEV_NUM / TP_WORLD_SIZE * i % (DEV_NUM - 1)];
        std::cout << "test devices id " << i << " = " << devicesTp[i] << std::endl;
    }
    devicesTp[DEV_NUM - 1] = devices[DEV_NUM - 1];
    std::cout << "test devices id " << DEV_NUM - 1 << " = " << devicesTp[DEV_NUM - 1] << std::endl;
    HcclComm commsTp[DEV_NUM];
    for (int i = 0; i < DEV_NUM / TP_WORLD_SIZE; i++) {
        ret = HcclCommInitAll(TP_WORLD_SIZE, &devicesTp[i * TP_WORLD_SIZE], &commsTp[i * TP_WORLD_SIZE]);
        CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll tp world %d failed. ret = %d \n", i, ret);
            return ret);
    }

    Args args[DEV_NUM];
    // 启动多线程
    std::vector<std::unique_ptr<std::thread>> threads(DEV_NUM);
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
        args[rankId].rankId = rankId;
        args[rankId].hcclEpComm = commsEp[rankId];
        // Tp init按照顺序init,但同域是device id配对为[0,4],[1,5],...,[3,7].
        args[rankId].hcclTpComm = commsTp[TP_WORLD_SIZE * rankId % (DEV_NUM - 1)];
        std::cout << "test devices id " << rankId << " = " << devicesTp[rankId] << std::endl;
        if (rankId == (DEV_NUM - 1)) {
            args[rankId].hcclTpComm = commsTp[rankId];
        }
        args[rankId].stream = stream[rankId];
        args[rankId].context = context[rankId];
        threads[rankId].reset(new std::thread(&LaunchOneThreadAlltoAllAllGatherBmm, std::ref(args[rankId])));
    }
    for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
        threads[rankId]->join();
    }
    aclFinalize();
    return 0;
}