下载
中文
注册

使用指导

使用限制

使用Atlas 300I Duo卡的推理服务场景,推理请求的后处理参数不支持top_k。如果并发发送推理(包括EndPoint提供的RESTful接口和Engine提供的Forward接口),并且部分推理请求的后处理参数设置了top_k,部分请求后处理参数不设置top_k,会造成推理服务异常,导致后续推理请求执行失败,需要重启推理服务。

场景说明

  1. MindIE Server提供EndPoint模块对推理服务化协议和接口封装,兼容Triton/OpenAI/TGI/vLLM等第三方框架接口。使用单节点安装模式安装MindIE Server之后,用户使用http/https客户端(Linux curl命令,Postman工具等)发送http/https请求,即可调用EndPoint提供的接口。
  2. MindIE Server通过Engine模块提供C++接口给客户,客户可以基于C++接口做二次集成开发。

EndPoint RESTful接口使用说明

http/https请求的URL的IP地址和端口号在config.json中进行配置,详情请参见表1的ServeParam参数。

  • 以Linux curl工具发送generate请求,URL请求格式如下:
    • 操作类型:POST
    • URL:http[s]://{ip}:{port}/generate
  • 未开启https,发送推理请求:
    curl -H "Accept: application/json" -H "Content-type: application/json" -X POST -d '{
      "inputs": "My name is Olivier and I",
      "parameters": {
        "details": true,
        "do_sample": true,
        "repetition_penalty": 1.1,
        "return_full_text": false,
        "seed": null,
        "temperature": 1,
        "top_p": 0.99
      },
      "stream": false
    }' http://{ip}:{port}/generate
  • https双向认证的请求方式示例:
    curl --location --request POST 'https://{ip}:{port}/generate' \
    --header 'Content-Type: application/json' \
    --cacert /home/runs/static_conf/ca/ca.pem \
    --cert /home/runs/static_conf/cert/client.pem \
    --key /home/runs/static_conf/cert/client.key.pem \
    --data-raw '{
        "inputs": "My name is Olivier and I",
        "parameters": {
            "best_of": 1,
            "decoder_input_details": false,
            "details": false,
            "do_sample": true,
            "max_new_tokens": 20,
            "repetition_penalty": 2,
            "return_full_text": false,
            "seed": 12,
            "temperature": 0.1,
            "top_k": 1,
            "top_p": 0.9,
            "truncate": 1024
        },
        "stream": true
    }'
    • --cacert:验签证书文件路径。
    • ca.pem为MindIE Server服务端证书的验签证书/根证书。
    • --cert:客户端证书文件路径。
    • client.pem为客户端证书。
    • --key:客户端私钥文件路径。
    • client.key.pem为客户端证书私钥(未加密,建议采用加密密钥)。

    请用户根据实际情况对相应参数进行修改。

提供的RESTful API列表如下:
表1 服务状态查询API(管理面的查询类接口)

API

接口类型

URL

说明

支持框架

Server Live

GET

/v2/health/live

检查服务器是否在线。

Triton

Server Ready

GET

/v2/health/ready

检查服务器是否准备。

Triton

Model Ready

GET

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/ready

检查模型是否准备。

Triton

health

GET

/health

服务健康检查。

TGI/vLLM

查询TGI EndPoint信息

GET

/info

查询TGI EndPoint信息。

TGI

Slot统计

GET

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/getSlotCount

参考Triton格式,自定义的Slot统计信息查询接口。

华为自研

表2 模型/服务查询API(业务面的查询接口)

API

接口类型

URL

说明

支持框架

models列表

GET

/v1/models

列举当前可用模型列表。

OpenAI

model详情

GET

/v1/models/{model}

查询模型信息。

OpenAI

服务元数据查询

GET

/v2

获取服务元数据。

Triton

模型元数据查询

GET

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]

查询模型元数据信息。

Triton

查询模型配置

GET

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/config

查询模型配置。

Triton

表3 推理API(业务面的业务接口)

API

接口类型

URL

说明

支持框架

推理任务

POST

/

TGI推理接口,stream==false返回文本推理结果,stream==true返回流式推理结果。

TGI

POST

/generate

TGI和vLLM的推理接口,通过请求参数来区分是哪种服务的接口。

TGI/vLLM

POST

/generate_stream

TGI流式推理接口,使用Server-Sent Events格式返回结果。

TGI

POST

/v1/chat/completions

OpenAI文本推理接口。

OpenAI

POST

/infer

华为自研推理接口,支持文本/流式返回结果。

华为自研

POST

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/infer

Triton的token推理接口。

Triton

POST

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/stopInfer

参考Triton接口定义,提供提前终止请求接口。

华为自研

POST

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/generate

Triton文本推理接口。

Triton

POST

/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/generate_stream

Triton流式推理接口。

Triton

  • ${MODEL_NAME}字段指定需要查询的模型名称。
  • [/versions/${MODEL_VERSION}]字段暂不支持,不传递。

Engine模块提供C++接口

使用Engine模块提供C++接口,需要开发代码来集成,以下代码提供接口使用样例,仅供参考。

/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
 */
#include <iostream>
#include <memory>
#include <map>
#include <thread>
#include <future>
#include <chrono>
#include <algorithm>
#include <wait.h>
#include "sdk/infer_engine.h"
#include "sdk/infer_request.h"
using namespace SimpleLLMInference;
using namespace infrastructure;
using SC = std::chrono::steady_clock;
IOManager g_Manager;
Statistics g_Statistics;
std::map<std::string, Metrics> g_Metrics;
volatile int g_CompleteNum = 0;
volatile uint32_t warmupCompleted = 0;
uint64_t warmupNum = 10;
InferenceEngine g_engine;
static constexpr int FINALIZE_WAIT_TIME = 5;
std::mutex g_MutexWarmup;
std::mutex g_Mutex;
std::mutex g_MetricsMutex;
bool g_RecordOutput = false;
namespace SimpleLLMInference {
/**
 *
 * @param response
 */
Status ParseEosAttr(std::shared_ptr<InferenceResponse> &response, int64_t *flag, int64_t *outputLen)
{
    InferenceResponse::Output *output;
    auto status = response->ImmutableOutput("IBIS_EOS_ATTR", &output);
    if (!status.IsOk()) {
        return status;
    }
    if (output == nullptr) {
        std::cout << "Error: Output is nullptr" << std::endl;
        return Status(infrastructure::Error::Code::ERROR, "Output is nullptr");
    }
    auto *eosData = static_cast<int64_t *>(output->Buffer());
    *flag = eosData[0];
    *outputLen = eosData[1];
    return Status(infrastructure::Error::Code::OK, "Success");
}
/**
 * 解析返回的token id
 * @param response
 */
Status ParseOutputId(std::shared_ptr<InferenceResponse> &response, std::vector<int64_t> &outputIds)
{
    InferenceResponse::Output *output;
    auto status = response->ImmutableOutput("OUTPUT_IDS", &output);
    if (!status.IsOk()) {
        return status;
    }
    // 获取输出长度
    auto len = output->Shape()[0];
    auto *data = static_cast<int64_t *>(output->Buffer());
    for (int i = 0; i < len; ++i) {
        outputIds.push_back(data[i]);
    }
    return Status(infrastructure::Error::Code::OK, "Success");
}
/**
 * 请求回调
 * @param response
 */
void ResponseCallback(std::shared_ptr<InferenceResponse> &response)
{
    auto reqId = response->GetRequestId().StringValue();
    size_t decodeTime = 0;
    auto now = SC::now();
    g_Manager.SetOutputData(reqId);
    {
        std::unique_lock lock(g_MetricsMutex);
        // 生成token数
        int64_t flag = 0;
        int64_t outputLen = 0;
        auto ret = ParseEosAttr(response, &flag, &outputLen);
        if (!ret.IsOk()) {
            std::cout << "ReqId:" << reqId << ", Error:" << ret.StatusMsg() << std::endl;
            return;
        }
        g_Metrics[reqId].tokensOutput += outputLen;
        if (g_Metrics[reqId].firstTokenCost == 0) {
            // prefill 记录首token时间
            decodeTime = GetDuration(now, g_Metrics[reqId].startingTime);
            g_Metrics[reqId].firstTokenCost = decodeTime;
        } else {
            // decode 记录每次decode的时间
            decodeTime = GetDuration(now, g_Metrics[reqId].lastTokenTime);
            // 针对投机场景适配,decode返回小于等于gamma个token,四舍五入
            auto avgDecodeTime = (decodeTime + outputLen / 2) / outputLen;
            for (int i = 0; i < outputLen; ++i) {
                g_Metrics[reqId].decodeTime.push_back(avgDecodeTime);
            }
        }
        g_Metrics[reqId].lastTokenTime = now;
        // 生成token id
        if (g_RecordOutput) {
            ret = ParseOutputId(response, g_Metrics[reqId].outputTokenIds);
            if (!ret.IsOk()) {
                std::cout << "ReqId:" << reqId << ", Error:" << ret.StatusMsg() << std::endl;
                return;
            }
        }
        if (response->IsEOS()) {
            g_Metrics[reqId].endingTime = now;
            // 最后一个Token耗时
            g_Metrics[reqId].lastTokenCost = decodeTime;
        }
    }
    if (response->IsEOS()) {
        std::unique_lock lock(g_Mutex);
        g_CompleteNum++;
        std::cout << "ReqId:" << reqId << " Finished" << std::endl;
    }
}
void SendRequest(InferenceEngine &engine, uint64_t maxBatchSize)
{
    uint64_t processingNum = 0;
    Status status = engine.GetProcessingRequest(&processingNum);
    if (!status.IsOk()) {
        std::cout << "failed to get processing request." << std::endl;
        return;
    }
    std::cout << "the processing request num is " << processingNum << " at first." << std::endl;
    uint64_t slotNum = 0;
    uint64_t remainBlocks = 0;
    uint64_t remainPrefillSlots = 0;
    uint64_t remainPrefillTokens = 0;
    uint64_t invalidReqNum = 0;
    while (!g_Manager.Empty()) {
        // 2. 获取可用的slot数目
        Status res = engine.GetRequestBlockQuotas(&remainBlocks, &remainPrefillSlots, &remainPrefillTokens);
        if (!res.IsOk()) {
            std::cout << "failed to get request block quotas." << std::endl;
            break;
        }
        res = engine.GetProcessingRequest(&processingNum);
        if (!res.IsOk()) {
            std::cout << "failed to get processing request." << std::endl;
            break;
        }
        slotNum = maxBatchSize - processingNum;
        if (remainPrefillSlots > 0 && remainPrefillTokens > 0) {
            // 3. Set input
            std::vector<std::shared_ptr<Data>> data =
                g_Manager.GetInputDataByQuotas(remainPrefillSlots, remainPrefillTokens, slotNum);
            if (!data.empty()) {
                std::vector<std::shared_ptr<InferenceRequest>> requests = Data2Request(data);
                g_Statistics.requestNumber += requests.size(); // total num
                // 4. forward(异步)
                for (size_t i = 0; i < requests.size(); ++i) {
                    auto reqId = requests[i]->GetRequestId().StringValue();
                    {
                        std::unique_lock lock(g_MetricsMutex);
                        g_Metrics[reqId].startingTime = SC::now();
                        g_Metrics[reqId].tokensInput = data[i]->size;
                    }
                    auto ret = engine.Forward(requests[i]);
                    if (!ret.IsOk()) {
                        invalidReqNum++;
                        g_Statistics.requestNumber--;
                        {
                            std::unique_lock lock(g_MetricsMutex);
                            g_Metrics.erase(reqId);
                        }
                    }
                }
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(20L));
    }
    status = engine.GetProcessingRequest(&processingNum);
    if (!status.IsOk()) {
        std::cout << "failed to get processing request." << std::endl;
        return;
    }
    std::cout << "the processing request num is " << processingNum << " when all requests dispatched." << std::endl;
    std::cout << "invalid request count is " << invalidReqNum << std::endl;
}
void Warmup(InferenceEngine &engine, IOManager &manager, size_t warmupSize)
{
    std::vector<std::shared_ptr<Data>> data = manager.GetWarmupInputs(warmupSize);
    std::vector<std::shared_ptr<InferenceRequest>> WarmupRequests = Data2Request(data);
    uint64_t totalWarmupNum = WarmupRequests.size();
    std::cout<<"Total warm up count : "<<totalWarmupNum<<std::endl;
    uint64_t invalidReqNum = 0;
    for (size_t i = 0; i < totalWarmupNum; ++i) {
        SendResponseCallback responseCallback = [](std::shared_ptr<InferenceResponse> &response) {
            if (response->IsEOS()) {
                std::unique_lock<std::mutex> lock(g_MutexWarmup);
                warmupCompleted++;
                std::cout<<"Warm up completed count: "<<warmupCompleted<<std::endl;
            }
        };
        WarmupRequests[i]->SetSendResponseCallback(responseCallback);
        auto ret = engine.Forward(WarmupRequests[i]);
        if (!ret.IsOk()) {
            invalidReqNum++;
        }
    }
    std::cout<<"Invalid warmup request count : "<<invalidReqNum<<std::endl;
    while (warmupCompleted < totalWarmupNum - invalidReqNum) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10L));
    }
}
void FinalizeEngine()
{
    auto res = g_engine.Finalize();
    std::cout << "inferenceEngine finalize message is : " << res.StatusMsg() << std::endl;
}
void Exit()
{
    std::promise<bool> promise;
    std::future<bool> future = promise.get_future();
    std::thread worker(FinalizeEngine);
    std::this_thread::sleep_for(std::chrono::seconds(FINALIZE_WAIT_TIME));
    if (future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
        // if DestructEngine is not finished after the fixed waiting time,
        // force kill the process group to ensure that the subprocesses are guaranteed to be cleaned
        kill(-getpgrp(), SIGKILL);
    } else {
        exit(0);
    }
}
void RunEngine(std::string dataset)
{
    TtimeT start;
    TtimeT end;
    if (g_Manager.SetInputData(dataset) != 0) {
        std::cout << "failed to load data" << std::endl;
        return;
    }
    // 初始化engine
    auto ret = g_engine.Init(ResponseCallback);
    if (!ret.IsOk()) {
        std::cout << "engine init failed:  " << ret.StatusMsg() << std::endl;
        return;
    }
    uint64_t maxBatchSize = 0;
    ret = g_engine.GetMaxBatchSize(&maxBatchSize);
    if (!ret.IsOk()) {
        std::cout << "GetMaxBatchSize failed:  " << ret.StatusMsg() << std::endl;
        return;
    }
    // 统计打点信息
    g_Statistics.modelFullName = "";
    if (!GetModelInfo(g_Statistics.modelFullName, g_Statistics.tp, g_Statistics.serverCount)) {
        std::cout << "GetModelInfo failed" << std::endl;
        return;
    }
    std::cout << "*** Warm up ***"<< std::endl;
    Warmup(g_engine, g_Manager, warmupNum);
    start = SC::now();
    SendRequest(g_engine, maxBatchSize);
    while (g_CompleteNum < g_Statistics.requestNumber) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10L));
    }
    end = SC::now();
    g_Statistics.latencyForAll = GetDuration(end, start);
    FormatMetrics(g_Metrics, g_Statistics);
    PrintStatistics(g_Statistics);
    if (g_RecordOutput) {
        std::map<std::string, std::vector<int64_t>> outputTokensId;
        for (auto &m : g_Metrics) {
            outputTokensId[m.first] = m.second.outputTokenIds;
        }
        WriteOutputIds(outputTokensId, "./token_output.csv");
    }
    FinalizeEngine();
}
void SignalInterruptHandler(int signal)
{
    std::cout << "Received signal[" << signal << "]" << std::endl;
    std::cout << "Test program is exiting..." << std::endl;
    int status = 0;
    pid_t pid = 0;
    while ((pid = waitpid(0, &status, WNOHANG)) > 0) {
        std::cout << "Test program wait pid with " << pid << ", status " << status << std::endl;
    }
    Exit();
}
void SignalChldHandler(int signal)
{
    int status = 0;
    pid_t pid = 0;
    bool exitFlag = false;
    while ((pid = waitpid(0, &status, WNOHANG)) > 0) {
        std::cout << "Test program wait pid with " << pid << ", status " << status << std::endl;
        if (!WIFEXITED(status)) {
            exitFlag = true;
        }
    }
    if (exitFlag) {
        Exit();
    }
}
void RegisterSignal(void)
{
    signal(SIGINT, SignalInterruptHandler);
    signal(SIGTERM, SignalInterruptHandler);
    signal(SIGCHLD, SignalChldHandler);
}
}
int main(int argc, char *argv[])
{
    setpgrp();
    RegisterSignal(); // register signal
    // 数据集管理
    std::string dataset = "token_input_gsm.csv";
    try {
        dataset = argc > 1 ? argv[1] : "token_input_gsm.csv";
        g_RecordOutput = argc > 2 && std::stoi(argv[2]);
        warmupNum = argc > 3 ? std::stoull(argv[3], nullptr, 10) : 10;
        if (argc > 4) {
            std::cout << "unexpected arguments. please refer to the documentation. " << std::endl;
            return -1;
        }
    } catch (const std::exception &e) {
        std::cout << "[llm_engine_test] read argv fail: " << typeid(e).name() << ", " << e.what() << std::endl;
        return -1;
    } catch (...) {
        std::cout << "[llm_engine_test] get unknown error" << std::endl;
        return -1;
    }
    // 启动业务线程
    std::thread businessThread(RunEngine, dataset);
    businessThread.join();
    return 0;
}