使用指导
使用限制
使用Atlas 300I Duo卡的推理服务场景,推理请求的后处理参数不支持top_k。如果并发发送推理(包括EndPoint提供的RESTful接口和Engine提供的Forward接口),并且部分推理请求的后处理参数设置了top_k,部分请求后处理参数不设置top_k,会造成推理服务异常,导致后续推理请求执行失败,需要重启推理服务。
场景说明
- MindIE Server提供EndPoint模块对推理服务化协议和接口封装,兼容Triton/OpenAI/TGI/vLLM等第三方框架接口。使用单节点安装模式安装MindIE Server之后,用户使用http/https客户端(Linux curl命令,Postman工具等)发送http/https请求,即可调用EndPoint提供的接口。
- MindIE Server通过Engine模块提供C++接口给客户,客户可以基于C++接口做二次集成开发。
EndPoint RESTful接口使用说明
http/https请求的URL的IP地址和端口号在config.json中进行配置,详情请参见表1的ServeParam参数。
- 以Linux curl工具发送generate请求,URL请求格式如下:
- 操作类型:POST
- URL:http[s]://{ip}:{port}/generate
- 未开启https,发送推理请求:
curl -H "Accept: application/json" -H "Content-type: application/json" -X POST -d '{ "inputs": "My name is Olivier and I", "parameters": { "details": true, "do_sample": true, "repetition_penalty": 1.1, "return_full_text": false, "seed": null, "temperature": 1, "top_p": 0.99 }, "stream": false }' http://{ip}:{port}/generate
- https双向认证的请求方式示例:
curl --location --request POST 'https://{ip}:{port}/generate' \ --header 'Content-Type: application/json' \ --cacert /home/runs/static_conf/ca/ca.pem \ --cert /home/runs/static_conf/cert/client.pem \ --key /home/runs/static_conf/cert/client.key.pem \ --data-raw '{ "inputs": "My name is Olivier and I", "parameters": { "best_of": 1, "decoder_input_details": false, "details": false, "do_sample": true, "max_new_tokens": 20, "repetition_penalty": 2, "return_full_text": false, "seed": 12, "temperature": 0.1, "top_k": 1, "top_p": 0.9, "truncate": 1024 }, "stream": true }'
- --cacert:验签证书文件路径。
- ca.pem为MindIE Server服务端证书的验签证书/根证书。
- --cert:客户端证书文件路径。
- client.pem为客户端证书。
- --key:客户端私钥文件路径。
- client.key.pem为客户端证书私钥(未加密,建议采用加密密钥)。
请用户根据实际情况对相应参数进行修改。
API |
接口类型 |
URL |
说明 |
支持框架 |
---|---|---|---|---|
Server Live |
GET |
/v2/health/live |
检查服务器是否在线。 |
Triton |
Server Ready |
GET |
/v2/health/ready |
检查服务器是否准备。 |
Triton |
Model Ready |
GET |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/ready |
检查模型是否准备。 |
Triton |
health |
GET |
/health |
服务健康检查。 |
TGI/vLLM |
查询TGI EndPoint信息 |
GET |
/info |
查询TGI EndPoint信息。 |
TGI |
Slot统计 |
GET |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/getSlotCount |
参考Triton格式,自定义的Slot统计信息查询接口。 |
华为自研 |
API |
接口类型 |
URL |
说明 |
支持框架 |
---|---|---|---|---|
models列表 |
GET |
/v1/models |
列举当前可用模型列表。 |
OpenAI |
model详情 |
GET |
/v1/models/{model} |
查询模型信息。 |
OpenAI |
服务元数据查询 |
GET |
/v2 |
获取服务元数据。 |
Triton |
模型元数据查询 |
GET |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}] |
查询模型元数据信息。 |
Triton |
查询模型配置 |
GET |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/config |
查询模型配置。 |
Triton |
API |
接口类型 |
URL |
说明 |
支持框架 |
---|---|---|---|---|
推理任务 |
POST |
/ |
TGI推理接口,stream==false返回文本推理结果,stream==true返回流式推理结果。 |
TGI |
POST |
/generate |
TGI和vLLM的推理接口,通过请求参数来区分是哪种服务的接口。 |
TGI/vLLM |
|
POST |
/generate_stream |
TGI流式推理接口,使用Server-Sent Events格式返回结果。 |
TGI |
|
POST |
/v1/chat/completions |
OpenAI文本推理接口。 |
OpenAI |
|
POST |
/infer |
华为自研推理接口,支持文本/流式返回结果。 |
华为自研 |
|
POST |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/infer |
Triton的token推理接口。 |
Triton |
|
POST |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/stopInfer |
参考Triton接口定义,提供提前终止请求接口。 |
华为自研 |
|
POST |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/generate |
Triton文本推理接口。 |
Triton |
|
POST |
/v2/models/${MODEL_NAME}[/versions/${MODEL_VERSION}]/generate_stream |
Triton流式推理接口。 |
Triton |
- ${MODEL_NAME}字段指定需要查询的模型名称。
- [/versions/${MODEL_VERSION}]字段暂不支持,不传递。
Engine模块提供C++接口
使用Engine模块提供C++接口,需要开发代码来集成,以下代码提供接口使用样例,仅供参考。
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #include <iostream> #include <memory> #include <map> #include <thread> #include <future> #include <chrono> #include <algorithm> #include <wait.h> #include "sdk/infer_engine.h" #include "sdk/infer_request.h" using namespace SimpleLLMInference; using namespace infrastructure; using SC = std::chrono::steady_clock; IOManager g_Manager; Statistics g_Statistics; std::map<std::string, Metrics> g_Metrics; volatile int g_CompleteNum = 0; volatile uint32_t warmupCompleted = 0; uint64_t warmupNum = 10; InferenceEngine g_engine; static constexpr int FINALIZE_WAIT_TIME = 5; std::mutex g_MutexWarmup; std::mutex g_Mutex; std::mutex g_MetricsMutex; bool g_RecordOutput = false; namespace SimpleLLMInference { /** * * @param response */ Status ParseEosAttr(std::shared_ptr<InferenceResponse> &response, int64_t *flag, int64_t *outputLen) { InferenceResponse::Output *output; auto status = response->ImmutableOutput("IBIS_EOS_ATTR", &output); if (!status.IsOk()) { return status; } if (output == nullptr) { std::cout << "Error: Output is nullptr" << std::endl; return Status(infrastructure::Error::Code::ERROR, "Output is nullptr"); } auto *eosData = static_cast<int64_t *>(output->Buffer()); *flag = eosData[0]; *outputLen = eosData[1]; return Status(infrastructure::Error::Code::OK, "Success"); } /** * 解析返回的token id * @param response */ Status ParseOutputId(std::shared_ptr<InferenceResponse> &response, std::vector<int64_t> &outputIds) { InferenceResponse::Output *output; auto status = response->ImmutableOutput("OUTPUT_IDS", &output); if (!status.IsOk()) { return status; } // 获取输出长度 auto len = output->Shape()[0]; auto *data = static_cast<int64_t *>(output->Buffer()); for (int i = 0; i < len; ++i) { outputIds.push_back(data[i]); } return Status(infrastructure::Error::Code::OK, "Success"); } /** * 请求回调 * @param response */ void ResponseCallback(std::shared_ptr<InferenceResponse> &response) { auto reqId = response->GetRequestId().StringValue(); size_t decodeTime = 0; auto now = SC::now(); g_Manager.SetOutputData(reqId); { std::unique_lock lock(g_MetricsMutex); // 生成token数 int64_t flag = 0; int64_t outputLen = 0; auto ret = ParseEosAttr(response, &flag, &outputLen); if (!ret.IsOk()) { std::cout << "ReqId:" << reqId << ", Error:" << ret.StatusMsg() << std::endl; return; } g_Metrics[reqId].tokensOutput += outputLen; if (g_Metrics[reqId].firstTokenCost == 0) { // prefill 记录首token时间 decodeTime = GetDuration(now, g_Metrics[reqId].startingTime); g_Metrics[reqId].firstTokenCost = decodeTime; } else { // decode 记录每次decode的时间 decodeTime = GetDuration(now, g_Metrics[reqId].lastTokenTime); // 针对投机场景适配,decode返回小于等于gamma个token,四舍五入 auto avgDecodeTime = (decodeTime + outputLen / 2) / outputLen; for (int i = 0; i < outputLen; ++i) { g_Metrics[reqId].decodeTime.push_back(avgDecodeTime); } } g_Metrics[reqId].lastTokenTime = now; // 生成token id if (g_RecordOutput) { ret = ParseOutputId(response, g_Metrics[reqId].outputTokenIds); if (!ret.IsOk()) { std::cout << "ReqId:" << reqId << ", Error:" << ret.StatusMsg() << std::endl; return; } } if (response->IsEOS()) { g_Metrics[reqId].endingTime = now; // 最后一个Token耗时 g_Metrics[reqId].lastTokenCost = decodeTime; } } if (response->IsEOS()) { std::unique_lock lock(g_Mutex); g_CompleteNum++; std::cout << "ReqId:" << reqId << " Finished" << std::endl; } } void SendRequest(InferenceEngine &engine, uint64_t maxBatchSize) { uint64_t processingNum = 0; Status status = engine.GetProcessingRequest(&processingNum); if (!status.IsOk()) { std::cout << "failed to get processing request." << std::endl; return; } std::cout << "the processing request num is " << processingNum << " at first." << std::endl; uint64_t slotNum = 0; uint64_t remainBlocks = 0; uint64_t remainPrefillSlots = 0; uint64_t remainPrefillTokens = 0; uint64_t invalidReqNum = 0; while (!g_Manager.Empty()) { // 2. 获取可用的slot数目 Status res = engine.GetRequestBlockQuotas(&remainBlocks, &remainPrefillSlots, &remainPrefillTokens); if (!res.IsOk()) { std::cout << "failed to get request block quotas." << std::endl; break; } res = engine.GetProcessingRequest(&processingNum); if (!res.IsOk()) { std::cout << "failed to get processing request." << std::endl; break; } slotNum = maxBatchSize - processingNum; if (remainPrefillSlots > 0 && remainPrefillTokens > 0) { // 3. Set input std::vector<std::shared_ptr<Data>> data = g_Manager.GetInputDataByQuotas(remainPrefillSlots, remainPrefillTokens, slotNum); if (!data.empty()) { std::vector<std::shared_ptr<InferenceRequest>> requests = Data2Request(data); g_Statistics.requestNumber += requests.size(); // total num // 4. forward(异步) for (size_t i = 0; i < requests.size(); ++i) { auto reqId = requests[i]->GetRequestId().StringValue(); { std::unique_lock lock(g_MetricsMutex); g_Metrics[reqId].startingTime = SC::now(); g_Metrics[reqId].tokensInput = data[i]->size; } auto ret = engine.Forward(requests[i]); if (!ret.IsOk()) { invalidReqNum++; g_Statistics.requestNumber--; { std::unique_lock lock(g_MetricsMutex); g_Metrics.erase(reqId); } } } } } std::this_thread::sleep_for(std::chrono::milliseconds(20L)); } status = engine.GetProcessingRequest(&processingNum); if (!status.IsOk()) { std::cout << "failed to get processing request." << std::endl; return; } std::cout << "the processing request num is " << processingNum << " when all requests dispatched." << std::endl; std::cout << "invalid request count is " << invalidReqNum << std::endl; } void Warmup(InferenceEngine &engine, IOManager &manager, size_t warmupSize) { std::vector<std::shared_ptr<Data>> data = manager.GetWarmupInputs(warmupSize); std::vector<std::shared_ptr<InferenceRequest>> WarmupRequests = Data2Request(data); uint64_t totalWarmupNum = WarmupRequests.size(); std::cout<<"Total warm up count : "<<totalWarmupNum<<std::endl; uint64_t invalidReqNum = 0; for (size_t i = 0; i < totalWarmupNum; ++i) { SendResponseCallback responseCallback = [](std::shared_ptr<InferenceResponse> &response) { if (response->IsEOS()) { std::unique_lock<std::mutex> lock(g_MutexWarmup); warmupCompleted++; std::cout<<"Warm up completed count: "<<warmupCompleted<<std::endl; } }; WarmupRequests[i]->SetSendResponseCallback(responseCallback); auto ret = engine.Forward(WarmupRequests[i]); if (!ret.IsOk()) { invalidReqNum++; } } std::cout<<"Invalid warmup request count : "<<invalidReqNum<<std::endl; while (warmupCompleted < totalWarmupNum - invalidReqNum) { std::this_thread::sleep_for(std::chrono::milliseconds(10L)); } } void FinalizeEngine() { auto res = g_engine.Finalize(); std::cout << "inferenceEngine finalize message is : " << res.StatusMsg() << std::endl; } void Exit() { std::promise<bool> promise; std::future<bool> future = promise.get_future(); std::thread worker(FinalizeEngine); std::this_thread::sleep_for(std::chrono::seconds(FINALIZE_WAIT_TIME)); if (future.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { // if DestructEngine is not finished after the fixed waiting time, // force kill the process group to ensure that the subprocesses are guaranteed to be cleaned kill(-getpgrp(), SIGKILL); } else { exit(0); } } void RunEngine(std::string dataset) { TtimeT start; TtimeT end; if (g_Manager.SetInputData(dataset) != 0) { std::cout << "failed to load data" << std::endl; return; } // 初始化engine auto ret = g_engine.Init(ResponseCallback); if (!ret.IsOk()) { std::cout << "engine init failed: " << ret.StatusMsg() << std::endl; return; } uint64_t maxBatchSize = 0; ret = g_engine.GetMaxBatchSize(&maxBatchSize); if (!ret.IsOk()) { std::cout << "GetMaxBatchSize failed: " << ret.StatusMsg() << std::endl; return; } // 统计打点信息 g_Statistics.modelFullName = ""; if (!GetModelInfo(g_Statistics.modelFullName, g_Statistics.tp, g_Statistics.serverCount)) { std::cout << "GetModelInfo failed" << std::endl; return; } std::cout << "*** Warm up ***"<< std::endl; Warmup(g_engine, g_Manager, warmupNum); start = SC::now(); SendRequest(g_engine, maxBatchSize); while (g_CompleteNum < g_Statistics.requestNumber) { std::this_thread::sleep_for(std::chrono::milliseconds(10L)); } end = SC::now(); g_Statistics.latencyForAll = GetDuration(end, start); FormatMetrics(g_Metrics, g_Statistics); PrintStatistics(g_Statistics); if (g_RecordOutput) { std::map<std::string, std::vector<int64_t>> outputTokensId; for (auto &m : g_Metrics) { outputTokensId[m.first] = m.second.outputTokenIds; } WriteOutputIds(outputTokensId, "./token_output.csv"); } FinalizeEngine(); } void SignalInterruptHandler(int signal) { std::cout << "Received signal[" << signal << "]" << std::endl; std::cout << "Test program is exiting..." << std::endl; int status = 0; pid_t pid = 0; while ((pid = waitpid(0, &status, WNOHANG)) > 0) { std::cout << "Test program wait pid with " << pid << ", status " << status << std::endl; } Exit(); } void SignalChldHandler(int signal) { int status = 0; pid_t pid = 0; bool exitFlag = false; while ((pid = waitpid(0, &status, WNOHANG)) > 0) { std::cout << "Test program wait pid with " << pid << ", status " << status << std::endl; if (!WIFEXITED(status)) { exitFlag = true; } } if (exitFlag) { Exit(); } } void RegisterSignal(void) { signal(SIGINT, SignalInterruptHandler); signal(SIGTERM, SignalInterruptHandler); signal(SIGCHLD, SignalChldHandler); } } int main(int argc, char *argv[]) { setpgrp(); RegisterSignal(); // register signal // 数据集管理 std::string dataset = "token_input_gsm.csv"; try { dataset = argc > 1 ? argv[1] : "token_input_gsm.csv"; g_RecordOutput = argc > 2 && std::stoi(argv[2]); warmupNum = argc > 3 ? std::stoull(argv[3], nullptr, 10) : 10; if (argc > 4) { std::cout << "unexpected arguments. please refer to the documentation. " << std::endl; return -1; } } catch (const std::exception &e) { std::cout << "[llm_engine_test] read argv fail: " << typeid(e).name() << ", " << e.what() << std::endl; return -1; } catch (...) { std::cout << "[llm_engine_test] get unknown error" << std::endl; return -1; } // 启动业务线程 std::thread businessThread(RunEngine, dataset); businessThread.join(); return 0; }