HcclCommInitClusterInfoConfig初始化方式
准备ranktable文件
该样例通过获取ranktable的方式进行初始化,所以需准备一份ranktable文件配置集群信息,供后续调用接口时使用。
配置“RANK_TABLE_FILE”环境变量,指定ranktable文件所在路径,如下所示,文件名称为“ranktable.json”。
export RANK_TABLE_FILE=/home/test/ranktable.json
以Atlas A2 训练系列产品,组网为单机8卡为例,ranktable.json配置示例如下,不同产品形态ranktable文件的配置示例及详细参数说明可参见ranktable文件配置资源信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | { "status":"completed", // ranktable可用标识,completed为可用 "version": "1.0", "server_count": "1", // 参与训练的AI Server数目 "server_list": [{ "server_id": "SERVER_ID_SV1", // AI Server标识,String类型,请确保全局唯一 "device": [{ // AI Server中的Device列表 "device_id": "0", "device_ip": "192.168.1.8", "rank_id": "0" }, { "device_id": "1", "device_ip": "192.168.1.9", "rank_id": "1" }, { "device_id": "2", "device_ip": "192.168.1.10", "rank_id": "2" }, { "device_id": "3", "device_ip": "192.168.1.11", "rank_id": "3" }, { "device_id": "4", "device_ip": "192.168.1.12", "rank_id": "4" }, { "device_id": "5", "device_ip": "192.168.1.13", "rank_id": "5" }, { "device_id": "6", "device_ip": "192.168.1.14", "rank_id": "6" }, { "device_id": "7", "device_ip": "192.168.1.15", "rank_id": "7" }] }] } |
HcclSend/HcclRecv操作代码样例
该样例仅支持单机8卡的组网。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | #include <iostream> #include <vector> #include <memory> #include <thread> #include <chrono> #include <cstring> #include "hccl/hccl.h" #include "hccl/hccl_types.h" #include "mpi.h" #define ACLCHECK(ret) do { \ if(ret != ACL_SUCCESS)\ {\ printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\ return ret;\ }\ } while(0) #define HCCLCHECK(ret) do { \ if(ret != HCCL_SUCCESS) \ { \ printf("hccl interface return errreturn err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret); \ return ret;\ } \ } while(0) struct ThreadContext { HcclComm comm; int32_t device; }; int Sample(void *arg) { ThreadContext* ctx = (ThreadContext *)arg; // 申请通信用device、sendBuf,recvBuf内存、stream等资源 ACLCHECK(aclrtSetDevice(ctx->device)); aclrtStream stream; ACLCHECK(aclrtCreateStream(&stream)); void* sendBuff; void* recvBuff; void* hostBuff; uint64_t count = 8; int mallocSize = count * sizeof(float); //初始化输入内存 ACLCHECK(aclrtMallocHost((void**)&hostBuff, mallocSize)); float* tmpHostBuff = static_cast<float*>(hostBuff); for (uint32_t i = 0; i < count; ++i) { tmpHostBuff[i] = 2; } ACLCHECK(aclrtMalloc((void**)&sendBuff, mallocSize, ACL_MEM_MALLOC_HUGE_FIRST)); ACLCHECK(aclrtMemcpy((void*)sendBuff, mallocSize, (void*)hostBuff, mallocSize, ACL_MEMCPY_HOST_TO_DEVICE)); ACLCHECK(aclrtMalloc((void**)&recvBuff, mallocSize, ACL_MEM_MALLOC_HUGE_FIRST)); // 执行SendRecv操作 if (ctx->device / 4 == 0) { HCCLCHECK(HcclSend(sendBuff, count, HCCL_DATA_TYPE_FP32, ctx->device + 4, ctx->comm, stream)); } else { HCCLCHECK(HcclRecv(recvBuff, count, HCCL_DATA_TYPE_FP32, ctx->device - 4, ctx->comm, stream)); } ACLCHECK(aclrtSynchronizeStream(stream)); if (ctx->device / 4 == 1) { void* resultBuff; ACLCHECK(aclrtMallocHost((void**)&resultBuff, mallocSize)); ACLCHECK(aclrtMemcpy((void*)resultBuff, mallocSize, (void*)recvBuff, mallocSize, ACL_MEMCPY_DEVICE_TO_HOST)); float* tmpResBuff = static_cast<float*>(resultBuff); for (uint32_t i = 0; i < count; ++i) { std::cout << "rankId:" << ctx->device << ",i" << i << " " << tmpResBuff[i] << std::endl; } ACLCHECK(aclrtFreeHost(resultBuff)); } // 释放通信用sendBuf、recvBuf内存,stream等资源 ACLCHECK(aclrtFreeHost(hostBuff)); ACLCHECK(aclrtFree(recvBuff)); ACLCHECK(aclrtFree(sendBuff)); ACLCHECK(aclrtDestroyStream(stream)); ACLCHECK(aclrtResetDevice(ctx->device)); return 0; } int main() { MPI_Init(NULL, NULL); int procSize = 0; int procRank = 0; // 获取当前进程在所属进程组的编号 MPI_Comm_size(MPI_COMM_WORLD, &procSize); MPI_Comm_rank(MPI_COMM_WORLD, &procRank); int devId = procRank; int devCount = procSize; // 设备资源初始化 ACLCHECK(aclInit(NULL)); // 获取ranktable路径 char* rankTableFile = getenv("RANK_TABLE_FILE"); // 指定集合通信操作使用的设备 ACLCHECK(aclrtSetDevice(devId)); // 创建并初始化通信域配置项 HcclCommConfig config; HcclCommConfigInit(&config); // 根据需要修改通信域配置 config.hcclBufferSize = 50; strcpy(config.hcclCommName, "comm_1"); HcclComm hcclComm; HcclCommInitClusterInfoConfig(rankTableFile, devId, &config, &hcclComm); struct ThreadContext args; args.comm = hcclComm; args.device = devId; Sample((void *)&args); HCCLCHECK(HcclCommDestroy(hcclComm)); // 设备资源去初始化 ACLCHECK(aclFinalize()); MPI_Finalize(); return 0; } |
HcclAllReduce操作代码样例
该样例支持单机N卡的组网,N需要小于等于8。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | #include <iostream> #include <vector> #include <memory> #include <thread> #include <chrono> #include <cstring> #include "hccl/hccl.h" #include "hccl/hccl_types.h" #include "mpi.h" #define ACLCHECK(ret) do { \ if(ret != ACL_SUCCESS)\ {\ printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\ return ret;\ }\ } while(0) #define HCCLCHECK(ret) do { \ if(ret != HCCL_SUCCESS) \ { \ printf("hccl interface return errreturn err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret); \ return ret;\ } \ } while(0) struct ThreadContext { HcclComm comm; int32_t device; }; int Sample(void *arg) { ThreadContext* ctx = (ThreadContext *)arg; void* host_buf = nullptr; void* send_buff = nullptr; void* recv_buff = nullptr; uint64_t count = 1; int malloc_kSize = count * sizeof(float); aclrtEvent start_event, end_event; aclrtStream stream; ACLCHECK(aclrtCreateStream(&stream)); ACLCHECK(aclrtCreateEvent(&start_event)); ACLCHECK(aclrtCreateEvent(&end_event)); //申请集合通信操作的内存 ACLCHECK(aclrtMalloc((void**)&send_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST)); ACLCHECK(aclrtMalloc((void**)&recv_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST)); //初始化输入内存 ACLCHECK(aclrtMallocHost((void**)&host_buf, malloc_kSize)); ACLCHECK(aclrtMemcpy((void*)send_buff, malloc_kSize, (void*)host_buf, malloc_kSize, ACL_MEMCPY_HOST_TO_DEVICE)); //执行集合通信操作 HCCLCHECK(HcclAllReduce((void *)send_buff, (void*)recv_buff, count, HCCL_DATA_TYPE_FP32, HCCL_REDUCE_SUM, ctx->comm, stream)); //等待stream中集合通信任务执行完成 ACLCHECK(aclrtSynchronizeStream(stream)); if (ctx->device < 8) { void* resultBuff; ACLCHECK(aclrtMallocHost((void**)&resultBuff, malloc_kSize)); ACLCHECK(aclrtMemcpy((void*)resultBuff, malloc_kSize, (void*)recv_buff, malloc_kSize, ACL_MEMCPY_DEVICE_TO_HOST)); float* tmpResBuff = static_cast<float*>(resultBuff); for (uint32_t i = 0; i < count; ++i) { std::cout << "rankId:" << ctx->device << ",i" << i << " " << tmpResBuff[i] << std::endl; } ACLCHECK(aclrtFreeHost(resultBuff)); } ACLCHECK(aclrtFree(send_buff)); ACLCHECK(aclrtFree(recv_buff)); ACLCHECK(aclrtFreeHost(host_buf)); //销毁任务流 ACLCHECK(aclrtDestroyStream(stream)); ACLCHECK(aclrtDestroyEvent(start_event)); ACLCHECK(aclrtDestroyEvent(end_event)); return 0; } int main() { MPI_Init(NULL, NULL); int procSize = 0; int procRank = 0; // 获取当前进程在所属进程组的编号 MPI_Comm_size(MPI_COMM_WORLD, &procSize); MPI_Comm_rank(MPI_COMM_WORLD, &procRank); int devId = procRank; int devCount = procSize; // 设备资源初始化 ACLCHECK(aclInit(NULL)); // 获取ranktable路径 char* rankTableFile = getenv("RANK_TABLE_FILE"); // 指定集合通信操作使用的设备 ACLCHECK(aclrtSetDevice(devId)); // 创建并初始化通信域配置项 HcclCommConfig config; HcclCommConfigInit(&config); // 根据需要修改通信域配置 config.hcclBufferSize = 50; strcpy(config.hcclCommName, "comm_1"); HcclComm hcclComm; HcclCommInitClusterInfoConfig(rankTableFile, devId, &config, &hcclComm); struct ThreadContext args; args.comm = hcclComm; args.device = devId; Sample((void *)&args); HCCLCHECK(HcclCommDestroy(hcclComm)); // 设备资源去初始化 ACLCHECK(aclFinalize()); MPI_Finalize(); return 0; } |
父主题: 样例代码