下载
中文
注册

HcclCommInitClusterInfo初始化方式

准备ranktable文件

该样例通过获取ranktable的方式进行初始化,所以需准备一份ranktable文件配置集群信息,供后续调用接口时使用。

配置“RANK_TABLE_FILE”环境变量,指定ranktable文件所在路径,如下所示,文件名称为“ranktable.json”。

export RANK_TABLE_FILE=/home/test/ranktable.json
Atlas A2 训练系列产品,组网为单机8卡为例,ranktable.json配置示例如下,不同产品形态ranktable文件的配置示例及详细参数说明可参见ranktable文件配置资源信息
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
{
        "status":"completed",   // ranktable可用标识,completed为可用
        "version": "1.0",
        "server_count": "1",   // 参与训练的AI Server数目
        "server_list": [{
                "server_id": "SERVER_ID_SV1",   // AI Server标识,String类型,请确保全局唯一
                "device": [{          // AI Server中的Device列表
                        "device_id": "0",
                        "device_ip": "192.168.1.8",
                        "rank_id": "0"
                },
                {
                        "device_id": "1",
                        "device_ip": "192.168.1.9",
                        "rank_id": "1"
                },
                {
                        "device_id": "2",
                        "device_ip": "192.168.1.10",
                        "rank_id": "2"
                },
                {
                        "device_id": "3",
                        "device_ip": "192.168.1.11",
                        "rank_id": "3"
                },
                {
                        "device_id": "4",
                        "device_ip": "192.168.1.12",
                        "rank_id": "4"
                },
                {
                        "device_id": "5",
                        "device_ip": "192.168.1.13",
                        "rank_id": "5"
                },
                {
                        "device_id": "6",
                        "device_ip": "192.168.1.14",
                        "rank_id": "6"
                },
                {
                        "device_id": "7",
                        "device_ip": "192.168.1.15",
                        "rank_id": "7"
                }]
        }]
}

HcclSend/HcclRecv操作代码样例

该样例仅支持单机8卡的组网。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
#include <iostream>
#include <vector>
#include <memory>
#include <thread>
#include <chrono>
#include "hccl/hccl.h"
#include "hccl/hccl_types.h"
#include "mpi.h"

#define ACLCHECK(ret) do { \
    if(ret != ACL_SUCCESS)\
    {\
        printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
        return ret;\
    }\
} while(0)

#define HCCLCHECK(ret) do {  \
    if(ret != HCCL_SUCCESS) \
    {   \
        printf("hccl interface return errreturn err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret); \
        return ret;\
    } \
} while(0)

struct ThreadContext {
    HcclComm comm;
    int32_t device;
};
int Sample(void *arg)
{
    ThreadContext* ctx = (ThreadContext *)arg;
    // 申请通信用device、sendBuf,recvBuf内存、stream等资源
    ACLCHECK(aclrtSetDevice(ctx->device));
    aclrtStream stream;
    ACLCHECK(aclrtCreateStream(&stream));
    void* sendBuff;
    void* recvBuff;
    void* hostBuff;
    uint64_t count = 8;
    int mallocSize = count * sizeof(float);
     //初始化输入内存
    ACLCHECK(aclrtMallocHost((void**)&hostBuff, mallocSize));
    float* tmpHostBuff = static_cast<float*>(hostBuff);
    for (uint32_t i = 0; i < count; ++i) {
        tmpHostBuff[i] = 2;
    }
    ACLCHECK(aclrtMalloc((void**)&sendBuff, mallocSize, ACL_MEM_MALLOC_HUGE_FIRST));
    ACLCHECK(aclrtMemcpy((void*)sendBuff, mallocSize, (void*)hostBuff, mallocSize, ACL_MEMCPY_HOST_TO_DEVICE));
    ACLCHECK(aclrtMalloc((void**)&recvBuff, mallocSize, ACL_MEM_MALLOC_HUGE_FIRST));
    // 执行SendRecv操作
    if (ctx->device / 4 == 0) {
        HCCLCHECK(HcclSend(sendBuff, count, HCCL_DATA_TYPE_FP32, ctx->device + 4, ctx->comm, stream));
    } else {
        HCCLCHECK(HcclRecv(recvBuff, count, HCCL_DATA_TYPE_FP32, ctx->device - 4, ctx->comm, stream));
    }
ACLCHECK(aclrtSynchronizeStream(stream));

    if (ctx->device / 4 == 1) {
        void* resultBuff;
        ACLCHECK(aclrtMallocHost((void**)&resultBuff, mallocSize));
        ACLCHECK(aclrtMemcpy((void*)resultBuff, mallocSize, (void*)recvBuff, mallocSize, ACL_MEMCPY_DEVICE_TO_HOST));
        float* tmpResBuff = static_cast<float*>(resultBuff);
        for (uint32_t i = 0; i < count; ++i) {
            std::cout <<  "rankId:" << ctx->device << ",i" << i << " " << tmpResBuff[i] << std::endl;
        }
        ACLCHECK(aclrtFreeHost(resultBuff));
    }
    // 释放通信用sendBuf、recvBuf内存,stream等资源
    ACLCHECK(aclrtFreeHost(hostBuff));
    ACLCHECK(aclrtFree(recvBuff));
    ACLCHECK(aclrtFree(sendBuff));
    ACLCHECK(aclrtDestroyStream(stream));
    ACLCHECK(aclrtResetDevice(ctx->device));
    return 0;
}
int main()
{
    MPI_Init(NULL, NULL);
    int procSize = 0;
    int procRank = 0;
    // 获取当前进程在所属进程组的编号
    MPI_Comm_size(MPI_COMM_WORLD, &procSize);
    MPI_Comm_rank(MPI_COMM_WORLD, &procRank);
    int devId = procRank;
    int devCount = procSize;
    // 设备资源初始化
    ACLCHECK(aclInit(NULL));
    // 获取ranktable路径
    char* rankTableFile = getenv("RANK_TABLE_FILE");
    // 指定集合通信操作使用的设备
    ACLCHECK(aclrtSetDevice(devId));
    HcclComm hcclComm;
    HcclCommInitClusterInfo(rankTableFile, devId, &hcclComm);
    struct ThreadContext args;
    args.comm = hcclComm;
    args.device = devId;
    Sample((void *)&args);
    HCCLCHECK(HcclCommDestroy(hcclComm));
    // 设备资源去初始化
    ACLCHECK(aclFinalize());
    MPI_Finalize();
    return 0;
}

HcclAllReduce操作代码样例

该样例支持单机N卡的组网,N需要小于等于8。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
#include <iostream>
#include <vector>
#include <memory>
#include <thread>
#include <chrono>
#include "hccl/hccl.h"
#include "hccl/hccl_types.h"
#include "mpi.h"

#define ACLCHECK(ret) do { \
    if(ret != ACL_SUCCESS)\
    {\
        printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
        return ret;\
    }\
} while(0)

#define HCCLCHECK(ret) do {  \
    if(ret != HCCL_SUCCESS) \
    {   \
        printf("hccl interface return errreturn err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret); \
        return ret;\
    } \
} while(0)

struct ThreadContext {
    HcclComm comm;
    int32_t device;
};
int Sample(void *arg)
{
    ThreadContext* ctx = (ThreadContext *)arg;
    void* host_buf = nullptr;
    void* send_buff = nullptr;
    void* recv_buff = nullptr;
    uint64_t count = 1;
    int malloc_kSize = count * sizeof(float);
    aclrtEvent start_event, end_event;
    aclrtStream stream;
    ACLCHECK(aclrtCreateStream(&stream));
    ACLCHECK(aclrtCreateEvent(&start_event));
    ACLCHECK(aclrtCreateEvent(&end_event));
	
    //申请集合通信操作的内存
    ACLCHECK(aclrtMalloc((void**)&send_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST));
    ACLCHECK(aclrtMalloc((void**)&recv_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST));
	
    //初始化输入内存
    ACLCHECK(aclrtMallocHost((void**)&host_buf, malloc_kSize));
    ACLCHECK(aclrtMemcpy((void*)send_buff, malloc_kSize, (void*)host_buf, malloc_kSize, ACL_MEMCPY_HOST_TO_DEVICE));
	
    //执行集合通信操作
    HCCLCHECK(HcclAllReduce((void *)send_buff, (void*)recv_buff, count, HCCL_DATA_TYPE_FP32, HCCL_REDUCE_SUM, ctx->comm, stream));

    //等待stream中集合通信任务执行完成
    ACLCHECK(aclrtSynchronizeStream(stream));

    if (ctx->device < 8) {
        void* resultBuff;
        ACLCHECK(aclrtMallocHost((void**)&resultBuff, malloc_kSize));
        ACLCHECK(aclrtMemcpy((void*)resultBuff, malloc_kSize, (void*)recv_buff, malloc_kSize, ACL_MEMCPY_DEVICE_TO_HOST));
        float* tmpResBuff = static_cast<float*>(resultBuff);
        for (uint32_t i = 0; i < count; ++i) {
            std::cout <<  "rankId:" << ctx->device << ",i" << i << " " << tmpResBuff[i] << std::endl;
        }
        ACLCHECK(aclrtFreeHost(resultBuff));
    }

    ACLCHECK(aclrtFree(send_buff));
    ACLCHECK(aclrtFree(recv_buff));
    ACLCHECK(aclrtFreeHost(host_buf));
    //销毁任务流
    ACLCHECK(aclrtDestroyStream(stream));
    ACLCHECK(aclrtDestroyEvent(start_event));
    ACLCHECK(aclrtDestroyEvent(end_event));
}

int main()
{
    MPI_Init(NULL, NULL);
    int procSize = 0;
    int procRank = 0;
    // 获取当前进程在所属进程组的编号
    MPI_Comm_size(MPI_COMM_WORLD, &procSize);
    MPI_Comm_rank(MPI_COMM_WORLD, &procRank);
    int devId = procRank;
    int devCount = procSize;
    // 设备资源初始化
    ACLCHECK(aclInit(NULL));
    // 获取ranktable路径
    char* rankTableFile = getenv("RANK_TABLE_FILE");
    // 指定集合通信操作使用的设备
    ACLCHECK(aclrtSetDevice(devId));
    HcclComm hcclComm;
    HcclCommInitClusterInfo(rankTableFile, devId, &hcclComm);
    struct ThreadContext args;
    args.comm = hcclComm;
    args.device = devId;
    Sample((void *)&args);
    HCCLCHECK(HcclCommDestroy(hcclComm));
    // 设备资源去初始化
    ACLCHECK(aclFinalize());
    MPI_Finalize();
    return 0;
}