aclnnAlltoAllAllGatherBatchMatMul
支持的产品型号
当前版本不支持该接口。
接口原型
每个算子分为两段式接口,必须先调用“aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize”接口获取计算所需workspace大小以及包含了算子计算流程的执行器,再调用“aclnnAlltoAllAllGatherBatchMatMul”接口执行计算。
aclnnStatus aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize(const aclTensor* x, const aclTensor* weight, const aclTensor* biasOptional, const char* groupEp, const char* groupTp, int64_t epWorldSize, int64_t tpWorldSize, int64_t xShardType, int64_t actType, aclTensor* y1Out, aclTensor* y2OutOptional, aclTensor* y3OutOptional, uint64_t* workspaceSize, aclOpExecutor** executor)
aclnnStatus aclnnAlltoAllAllGatherBatchMatMul(void* workspace, uint64_t workspaceSize, aclOpExecutor* executor, aclrtStream stream)
功能描述
算子功能:完成AllToAll、AllGather集合通信与BatchMatMul计算融合、并行。
计算公式:
计算逻辑如下,其中y1 y2 y3为输出
aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize
参数说明:
- x(aclTensor*,计算输入):通信后结果作为BatchMatMul计算的左矩阵。数据类型支持float16,bfloat16。该输入进行AllToAll、AllGather集合通信,必须为3维,数据格式支持ND。
- weight(aclTensor*,计算输入):BatchMatMul计算的右矩阵。数据类型支持float16, bfloat16,类型与x保持一致,必须为3维,数据格式支持ND。
- biasOptional(aclTensor*,计算输入):BatchMatMul计算的bias,数据类型支持float16, float32。x为float16时,bias需为float16;x为bfloat16时,bias需为float32,支持两维或三维,数据格式支持ND。支持传入空指针。
- groupEp(char*,计算输入):ep通信域名称,专家并行的通信域。字符串长度需大于0,小于128。
- groupTp(char*,计算输入):tp通信域名称,Tensor并行的通信域。字符串长度需大于0,小于128。
- epWorldSize(int64_t,计算输入):ep通信域size,支持2/4/8/16。
- tpWorldSize(int64_t,计算输入):tp通信域size,支持2/4/8/16。
- xShardType(int64_t,计算输入):0表示在H维度(即x的第2维,x为3维,分别为第0维、第1维、第2维)按tp域进行allgather,1表示在C维度(即x的第1维)上按tp域进行allgather。当前仅支持xShardType等于1的场景。
- actType(int64_t,计算输入):激活函数类型,支持0/1/2/3/4的输入,0表示无激活函数,对应关系为[0:None,1:GELU,2:Silu,3:Relu,4:FastGELU]。
- y1Out(aclTensor*,计算输出):数据类型float16, bfloat16,支持3维,数据类型与输入x保持一致。数据格式支持:ND。最终计算结果,如果有激活函数则为激活函数的输出,否则为BatchMatMul的输出。
- y2OutOptional(aclTensor*,计算输出):可选输出,数据类型float16, bfloat16,支持3维,数据类型与输入x保持一致。数据格式支持:ND。AllGather的输出,反向可能需要。空指针表示不需要该输出。
- y3OutOptional(aclTensor*,计算输出):可选输出,数据类型float16, bfloat16,支持3维,数据类型与输入x保持一致。数据格式支持:ND。有激活函数时,BatchMatMul的输出,空指针表示不需要该输出。
- workspaceSize(uint64_t*,出参):返回需要在Device侧申请的workspace大小。
- executor(aclOpExecutor**,出参):返回op执行器,包含了算子计算流程。
返回值:
返回aclnnStatus状态码,具体参见aclnn返回码。
aclnnAlltoAllAllGatherBatchMatMul
参数说明:
- workspace(void*,入参):在Device侧申请的workspace内存地址。
- workspaceSize(uint64_t,入参):在Device侧申请的workspace大小,由第一段接口aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize获取。
- executor(aclOpExecutor*,入参):op执行器,包含了算子计算流程。
- stream(aclrtStream,入参):指定执行任务的AscendCL stream流。
返回值:
返回aclnnStatus状态码,具体参见aclnn返回码。
约束与限制
因为集合通信及BatchMatMul计算所需,输入输出shape需满足以下数学关系:(其中ep=epWorldSize,tp=tpWorldSize)
- x: (E,C/tp,H)
- weight:(E/ep,H,M/tp)
- biasOptional:非空指针情况下,三维时为(E/ep,1,M/tp),两维时为(E/ep,M/tp)
- y1Out:(E/ep,ep*tp*C/tp,M/tp)
- y2OutOptional:(E/ep,ep*tp*C/tp,H)
- y3OutOptional:(E/ep,ep*tp*C/tp,M/tp) 数据关系说明:
- 比如x.size(0)等于E,weight.size(0)等于E/ep,则表示,x.size(0) = ep*weight.size(0),x.size(0)是ep的整数倍;其他关系类似。
- E的取值范围为[2, 512],且E是ep的整数倍。
- H的取值范围为:[1, 65535]。
- M/tp的取值范围为:[1, 65535]。
- E/ep的取值范围为:[1, 32]。
- ep、tp均仅支持2、4、8、16。
- groupEp和groupTp名称不能相同。
- C必须大于0,上限为算子device内存上限。
- 通算融合算子不支持并发调用,不同的通算融合算子也不支持并发调用。
- 不支持跨超节点,只支持超节点内,ep域AlltoAll支持超节点内跨节点,tp域AllGather仅支持超节点内单一节点。
调用示例
示例代码如下,仅供参考:
#include <thread>
#include <iostream>
#include <string>
#include <vector>
#include "acl/acl.h"
#include "hccl/hccl.h"
#include "aclnnop/aclnn_all_to_all_all_gather_batch_matmul.h"
#define CHECK_RET(cond, return_expr) \
do { \
if (!(cond)) { \
return_expr; \
} \
} while (0)
#define LOG_PRINT(message, ...) \
do { \
printf(message, ##__VA_ARGS__); \
} while(0)
constexpr int EP_WORLD_SIZE = 4;
constexpr int TP_WORLD_SIZE = 2;
constexpr int DEV_NUM = EP_WORLD_SIZE * TP_WORLD_SIZE;
int64_t GetShapeSize(const std::vector<int64_t> &shape)
{
int64_t shape_size = 1;
for (auto i : shape) {
shape_size *= i;
}
return shape_size;
}
template<typename T>
int CreateAclTensor(const std::vector<T> &hostData, const std::vector<int64_t> &shape, void **deviceAddr,
aclDataType dataType, aclTensor **tensor)
{
auto size = GetShapeSize(shape) * sizeof(T);
auto ret = aclrtMalloc(deviceAddr, size, ACL_MEM_MALLOC_HUGE_FIRST);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMalloc failed. ret: %d\n", ret); return ret);
ret = aclrtMemcpy(*deviceAddr, size, hostData.data(), size, ACL_MEMCPY_HOST_TO_DEVICE);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMemcpy failed. ret: %d\n", ret); return ret);
std::vector<int64_t> strides(shape.size(), 1);
for (int64_t i = shape.size() - 2; i >= 0; i--) {
strides[i] = shape[i + 1] * strides[i + 1];
}
*tensor = aclCreateTensor(shape.data(), shape.size(), dataType, strides.data(), 0, aclFormat::ACL_FORMAT_ND,
shape.data(), shape.size(), *deviceAddr);
return 0;
}
struct Args {
int rankId;
HcclComm hcclEpComm;
HcclComm hcclTpComm;
aclrtStream stream;
aclrtContext context;
};
int LaunchOneThreadAlltoAllAllGatherBmm(Args &args)
{
int ret = aclrtSetCurrentContext(args.context);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSetCurrentContext failed. ret: %d\n", ret); return ret);
char hcomEpName[128] = {0};
ret = HcclGetCommName(args.hcclEpComm, hcomEpName);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclGetEpCommName failed. ret: %d\n", ret); return -1);
char hcomTpName[128] = {0};
ret = HcclGetCommName(args.hcclTpComm, hcomTpName);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclGetTpCommName failed. ret: %d\n", ret); return -1);
LOG_PRINT("[INFO] rank = %d, hcomEpName = %s, hcomTpName = %s, stream = %p, context = %p\n", args.rankId,
hcomEpName, hcomTpName, args.stream, args.context);
int64_t E = 4;
int64_t C = 2;
int64_t H = 6;
int64_t M = 4;
int64_t xShardType = 1;
int64_t actType = 1;
std::vector<int64_t> xShape;
std::vector<int64_t> weightShape;
std::vector<int64_t> biasShape;
std::vector<int64_t> y1OutShape;
std::vector<int64_t> y2OutShape;
std::vector<int64_t> y3OutShape;
if (xShardType == 1) {
xShape = {E, C / TP_WORLD_SIZE, H};
weightShape = {E / EP_WORLD_SIZE, H, M / TP_WORLD_SIZE};
biasShape = {E / EP_WORLD_SIZE, 1, M / TP_WORLD_SIZE};
y1OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, M / TP_WORLD_SIZE};
y2OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, H};
y3OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * TP_WORLD_SIZE * C / TP_WORLD_SIZE, M / TP_WORLD_SIZE};
} else if (xShardType == 0) {
xShape = {E, C, H / TP_WORLD_SIZE};
weightShape = {E / EP_WORLD_SIZE, H, M / TP_WORLD_SIZE};
biasShape = {E / EP_WORLD_SIZE, 1, M / TP_WORLD_SIZE};
y1OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, M / TP_WORLD_SIZE};
y2OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, H};
y3OutShape = {E / EP_WORLD_SIZE, EP_WORLD_SIZE * C, M / TP_WORLD_SIZE};
} else {
LOG_PRINT("[ERROR] unsupported xShardType = %ld.\n", xShardType);
return -1;
}
void *xDeviceAddr = nullptr;
void *weightDeviceAddr = nullptr;
void *biasDeviceAddr = nullptr;
void *y1OutDeviceAddr = nullptr;
void *y2OutDeviceAddr = nullptr;
void *y3OutDeviceAddr = nullptr;
aclTensor *x = nullptr;
aclTensor *weight = nullptr;
aclTensor *bias = nullptr;
aclTensor *y1Out = nullptr;
aclTensor *y2Out = nullptr;
aclTensor *y3Out = nullptr;
uint64_t workspaceSize = 0;
aclOpExecutor *executor = nullptr;
void *workspaceAddr = nullptr;
long long xShapeSize = GetShapeSize(xShape);
long long weightShapeSize = GetShapeSize(weightShape);
long long biasShapeSize = GetShapeSize(biasShape);
long long y1OutShapeSize = GetShapeSize(y1OutShape);
long long y2OutShapeSize = GetShapeSize(y2OutShape);
long long y3OutShapeSize = GetShapeSize(y3OutShape);
std::vector<int16_t> xHostData(xShapeSize, 0);
std::vector<int16_t> weightHostData(weightShapeSize, 0);
std::vector<int16_t> biasHostData(biasShapeSize, 0);
std::vector<int16_t> y1OutHostData(y1OutShapeSize, 0);
std::vector<int16_t> y2OutHostData(y2OutShapeSize, 0);
std::vector<int16_t> y3OutHostData(y3OutShapeSize, 0);
ret = CreateAclTensor(xHostData, xShape, &xDeviceAddr, aclDataType::ACL_FLOAT16, &x);
CHECK_RET(ret == ACL_SUCCESS, return ret);
ret = CreateAclTensor(weightHostData, weightShape, &weightDeviceAddr, aclDataType::ACL_FLOAT16, &weight);
CHECK_RET(ret == ACL_SUCCESS, return ret);
ret = CreateAclTensor(y1OutHostData, y1OutShape, &y1OutDeviceAddr, aclDataType::ACL_FLOAT16, &y1Out);
CHECK_RET(ret == ACL_SUCCESS, return ret);
ret = CreateAclTensor(y2OutHostData, y2OutShape, &y2OutDeviceAddr, aclDataType::ACL_FLOAT16, &y2Out);
CHECK_RET(ret == ACL_SUCCESS, return ret);
ret = CreateAclTensor(y3OutHostData, y3OutShape, &y3OutDeviceAddr, aclDataType::ACL_FLOAT16, &y3Out);
CHECK_RET(ret == ACL_SUCCESS, return ret);
// 调用第一阶段接口
ret = aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize(x, weight, bias, hcomEpName, hcomTpName, EP_WORLD_SIZE,
TP_WORLD_SIZE, xShardType, actType, y1Out, y2Out, y3Out, &workspaceSize, &executor);
CHECK_RET(ret == ACL_SUCCESS,
LOG_PRINT("[ERROR] aclnnAlltoAllAllGatherBatchMatMulGetWorkspaceSize failed. ret = %d \n", ret); return ret);
// 根据第一阶段接口计算出的workspaceSize申请device内存
if (workspaceSize > 0) {
ret = aclrtMalloc(&workspaceAddr, workspaceSize, ACL_MEM_MALLOC_HUGE_FIRST);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtMalloc workspace failed. ret = %d \n", ret); return ret);
}
// 调用第二阶段接口
ret = aclnnAlltoAllAllGatherBatchMatMul(workspaceAddr, workspaceSize, executor, args.stream);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclnnAlltoAllAllGatherBatchMatMul failed. ret = %d \n", ret);
return ret);
// (固定写法)同步等待任务执行结束
ret = aclrtSynchronizeStreamWithTimeout(args.stream, 10000);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSynchronizeStreamWithTimeout failed. ret = %d \n", ret);
return ret);
LOG_PRINT("[INFO] device_%d aclnnAlltoAllAllGatherBatchMatMul execute successfully.\n", args.rankId);
// 释放device资源,需要根据具体API的接口定义修改
if (x != nullptr) {
aclDestroyTensor(x);
}
if (weight != nullptr) {
aclDestroyTensor(weight);
}
if (bias != nullptr) {
aclDestroyTensor(bias);
}
if (y1Out != nullptr) {
aclDestroyTensor(y1Out);
}
if (y2Out != nullptr) {
aclDestroyTensor(y2Out);
}
if (y3Out != nullptr) {
aclDestroyTensor(y3Out);
}
if (xDeviceAddr != nullptr) {
aclrtFree(xDeviceAddr);
}
if (weightDeviceAddr != nullptr) {
aclrtFree(weightDeviceAddr);
}
if (biasDeviceAddr != nullptr) {
aclrtFree(biasDeviceAddr);
}
if (y1OutDeviceAddr != nullptr) {
aclrtFree(y1OutDeviceAddr);
}
if (y2OutDeviceAddr != nullptr) {
aclrtFree(y2OutDeviceAddr);
}
if (y3OutDeviceAddr != nullptr) {
aclrtFree(y3OutDeviceAddr);
}
if (workspaceSize > 0) {
aclrtFree(workspaceAddr);
}
aclrtDestroyStream(args.stream);
aclrtDestroyContext(args.context);
HcclCommDestroy(args.hcclEpComm);
HcclCommDestroy(args.hcclTpComm);
aclrtResetDevice(args.rankId);
return 0;
}
int main(int argc, char *argv[])
{
int ret = aclInit(nullptr);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclInit failed. ret = %d \n", ret); return ret);
aclrtStream stream[DEV_NUM];
aclrtContext context[DEV_NUM];
for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
ret = aclrtSetDevice(rankId);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtSetDevice failed. ret = %d \n", ret); return ret);
ret = aclrtCreateContext(&context[rankId], rankId);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtCreateContext failed. ret = %d \n", ret); return ret);
ret = aclrtCreateStream(&stream[rankId]);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] aclrtCreateStream failed. ret = %d \n", ret); return ret);
}
int32_t devices[DEV_NUM];
for (int i = 0; i < DEV_NUM; i++) {
devices[i] = i;
}
// 初始化集合通信域
HcclComm comms[DEV_NUM];
ret = HcclCommInitAll(DEV_NUM, devices, comms);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll failed. ret = %d \n", ret); return ret);
// 初始化ep域 ep = 4 {0,1,2,3} {4,5,6,7}
HcclComm commsEp[DEV_NUM];
for (int i = 0; i < DEV_NUM / EP_WORLD_SIZE; i++) {
ret = HcclCommInitAll(EP_WORLD_SIZE, &devices[i * EP_WORLD_SIZE], &commsEp[i * EP_WORLD_SIZE]);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll ep world %d failed. ret = %d \n", i, ret);
return ret);
}
// 初始化tp域 tp=2 {0,4,1,5,2,6,3,7} {0,4} {1,5} {2,6} {3,7}
int32_t devicesTp[DEV_NUM];
for (int i = 0; i < DEV_NUM - 1; i++) { // Tp init按照顺序init,但同域是device id配对为[0,4],[1,5],...,[3,7].
devicesTp[i] = devices[DEV_NUM / TP_WORLD_SIZE * i % (DEV_NUM - 1)];
std::cout << "test devices id " << i << " = " << devicesTp[i] << std::endl;
}
devicesTp[DEV_NUM - 1] = devices[DEV_NUM - 1];
std::cout << "test devices id " << DEV_NUM - 1 << " = " << devicesTp[DEV_NUM - 1] << std::endl;
HcclComm commsTp[DEV_NUM];
for (int i = 0; i < DEV_NUM / TP_WORLD_SIZE; i++) {
ret = HcclCommInitAll(TP_WORLD_SIZE, &devicesTp[i * TP_WORLD_SIZE], &commsTp[i * TP_WORLD_SIZE]);
CHECK_RET(ret == ACL_SUCCESS, LOG_PRINT("[ERROR] HcclCommInitAll tp world %d failed. ret = %d \n", i, ret);
return ret);
}
Args args[DEV_NUM];
// 启动多线程
std::vector<std::unique_ptr<std::thread>> threads(DEV_NUM);
for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
args[rankId].rankId = rankId;
args[rankId].hcclEpComm = commsEp[rankId];
// Tp init按照顺序init,但同域是device id配对为[0,4],[1,5],...,[3,7].
args[rankId].hcclTpComm = commsTp[TP_WORLD_SIZE * rankId % (DEV_NUM - 1)];
std::cout << "test devices id " << rankId << " = " << devicesTp[rankId] << std::endl;
if (rankId == (DEV_NUM - 1)) {
args[rankId].hcclTpComm = commsTp[rankId];
}
args[rankId].stream = stream[rankId];
args[rankId].context = context[rankId];
threads[rankId].reset(new std::thread(&LaunchOneThreadAlltoAllAllGatherBmm, std::ref(args[rankId])));
}
for (uint32_t rankId = 0; rankId < DEV_NUM; rankId++) {
threads[rankId]->join();
}
aclFinalize();
return 0;
}