适配代码仓的目录结构如下所示:
├── cover │ ├── requirements-ascend.txt │ ├── setup.py │ └── vllm │ └── __init__.py ├── examples │ ├── start_server.sh │ ├── test_offline.py │ └── test_offline.sh ├── install.sh │ └──vllm_npu ├── requirements.txt ├── setup.py │ └── vllm_npu ├── __init__.py ├── attention │ ├── __init__.py │ ├── backends.py │ └── selector.py ├── config.py ├── core │ └── __init__.py ├── engine │ ├── __init__.py │ ├── ascend_engine.py │ └── async_ascend_engine.py ├── executor │ ├── __init__.py │ ├── ascend_executor.py │ ├── ascend_ray_executor.py │ └── ray_utils.py ├── model_executor │ ├── __init__.py │ ├── ascend_model_loader.py │ ├── layers │ │ ├── __init__.py │ │ └── ascend_sampler.py │ └── models │ │ ├── __init__.py │ │ └── ascend │ │ │ ├── __init__.py │ │ │ └── mindie_llm_wrapper.py ├── npu_adaptor.py ├── usage │ ├── __init__.py │ └── usage_lib.py ├── utils.py │ └── worker ├── __init__.py ├── ascend_model_runner.py ├── ascend_worker.py └── cache_engine.py
其中主要包括如下四个部分:
代码仓中各个文件的代码内容:
cmake >= 3.21 ninja # For faster builds. psutil sentencepiece # Required for LLaMA tokenizer. numpy requests py-cpuinfo transformers >= 4.40.0 # Required for StarCoder2 & Llava, Llama 3. tokenizers >= 0.19.1 # Required for Llama 3. fastapi openai uvicorn[standard] pydantic >= 2.0 # Required for OpenAI server. prometheus_client >= 0.18.0 prometheus-fastapi-instrumentator >= 7.0.0 tiktoken == 0.6.0 # Required for DBRX tokenizer lm-format-enforcer == 0.10.1 typing_extensions filelock >= 3.10.4 # filelock starts to support `mode` argument from 3.10.4 ray == 2.9.3 pynvml == 11.5.0 outlines == 0.0.34
import importlib.util import io import logging import os import re import subprocess import sys from shutil import which from typing import Dict, List import torch from packaging.version import Version, parse from setuptools import Extension, find_packages, setup from setuptools.command.build_ext import build_ext from torch.utils.cpp_extension import CUDA_HOME, BuildExtension def load_module_from_path(module_name, path): spec = importlib.util.spec_from_file_location(module_name, path) module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) return module ROOT_DIR = os.path.dirname(__file__) logger = logging.getLogger(__name__) # cannot import envs directly because it depends on vllm, # which is not installed yet envs = load_module_from_path('envs', os.path.join(ROOT_DIR, 'vllm', 'envs.py')) VLLM_TARGET_DEVICE = 'npu' # vLLM only supports Linux platform assert sys.platform.startswith( "linux"), "vLLM only supports Linux platform (including WSL)." MAIN_CUDA_VERSION = "12.1" def is_sccache_available() -> bool: return which("sccache") is not None def is_ccache_available() -> bool: return which("ccache") is not None def is_ninja_available() -> bool: return which("ninja") is not None def remove_prefix(text, prefix): if text.startswith(prefix): return text[len(prefix):] return text class CMakeExtension(Extension): def __init__(self, name: str, cmake_lists_dir: str = '.', **kwa) -> None: super().__init__(name, sources=[], **kwa) self.cmake_lists_dir = os.path.abspath(cmake_lists_dir) def _is_cuda() -> bool: return VLLM_TARGET_DEVICE == "cuda" \ and torch.version.cuda is not None \ and not _is_neuron() def _is_hip() -> bool: return (VLLM_TARGET_DEVICE == "cuda" or VLLM_TARGET_DEVICE == "rocm") and torch.version.hip is not None def _is_neuron() -> bool: torch_neuronx_installed = True try: subprocess.run(["neuron-ls"], capture_output=True, check=True) except (FileNotFoundError, PermissionError, subprocess.CalledProcessError): torch_neuronx_installed = False return torch_neuronx_installed or envs.VLLM_BUILD_WITH_NEURON def _is_cpu() -> bool: return VLLM_TARGET_DEVICE == "cpu" def _is_npu() -> bool: return VLLM_TARGET_DEVICE == "npu" def _install_punica() -> bool: return envs.VLLM_INSTALL_PUNICA_KERNELS def get_hipcc_rocm_version(): # Run the hipcc --version command result = subprocess.run(['hipcc', '--version'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) # Check if the command was executed successfully if result.returncode != 0: print("Error running 'hipcc --version'") return None # Extract the version using a regular expression match = re.search(r'HIP version: (\S+)', result.stdout) if match: # Return the version string return match.group(1) else: print("Could not find HIP version in the output") return None def get_neuronxcc_version(): import sysconfig site_dir = sysconfig.get_paths()["purelib"] version_file = os.path.join(site_dir, "neuronxcc", "version", "__init__.py") # Check if the command was executed successfully with open(version_file, "rt") as fp: content = fp.read() # Extract the version using a regular expression match = re.search(r"__version__ = '(\S+)'", content) if match: # Return the version string return match.group(1) else: raise RuntimeError("Could not find HIP version in the output") def get_nvcc_cuda_version() -> Version: """Get the CUDA version from nvcc. Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py """ assert CUDA_HOME is not None, "CUDA_HOME is not set" nvcc_output = subprocess.check_output([CUDA_HOME + "/bin/nvcc", "-V"], universal_newlines=True) output = nvcc_output.split() release_idx = output.index("release") + 1 nvcc_cuda_version = parse(output[release_idx].split(",")[0]) return nvcc_cuda_version def get_path(*filepath) -> str: return os.path.join(ROOT_DIR, *filepath) def find_version(filepath: str) -> str: """Extract version information from the given filepath. Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py """ with open(filepath) as fp: version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M) if version_match: return version_match.group(1) raise RuntimeError("Unable to find version string.") def get_vllm_version() -> str: version = find_version(get_path("vllm", "__init__.py")) # return version if _is_cuda(): cuda_version = str(get_nvcc_cuda_version()) if cuda_version != MAIN_CUDA_VERSION: cuda_version_str = cuda_version.replace(".", "")[:3] version += f"+cu{cuda_version_str}" elif _is_hip(): # Get the HIP version hipcc_version = get_hipcc_rocm_version() if hipcc_version != MAIN_CUDA_VERSION: rocm_version_str = hipcc_version.replace(".", "")[:3] version += f"+rocm{rocm_version_str}" elif _is_neuron(): # Get the Neuron version neuron_version = str(get_neuronxcc_version()) if neuron_version != MAIN_CUDA_VERSION: neuron_version_str = neuron_version.replace(".", "")[:3] version += f"+neuron{neuron_version_str}" elif _is_npu(): version += "+npu" elif _is_cpu(): version += "+cpu" else: raise RuntimeError("Unknown runtime environment") return version def read_readme() -> str: """Read the README file if present.""" p = get_path("README.md") if os.path.isfile(p): return io.open(get_path("README.md"), "r", encoding="utf-8").read() else: return "" def get_requirements() -> List[str]: """Get Python package dependencies from requirements.txt.""" def _read_requirements(filename: str) -> List[str]: with open(get_path(filename)) as f: requirements = f.read().strip().split("\n") resolved_requirements = [] for line in requirements: if line.startswith("-r "): resolved_requirements += _read_requirements(line.split()[1]) else: resolved_requirements.append(line) return resolved_requirements if _is_cuda(): requirements = _read_requirements("requirements-cuda.txt") cuda_major, cuda_minor = torch.version.cuda.split(".") modified_requirements = [] for req in requirements: if "vllm-nccl-cu12" in req: req = req.replace("vllm-nccl-cu12", f"vllm-nccl-cu{cuda_major}") elif ("vllm-flash-attn" in req and not (cuda_major == "12" and cuda_minor == "1")): # vllm-flash-attn is built only for CUDA 12.1. # Skip for other versions. continue modified_requirements.append(req) requirements = modified_requirements elif _is_hip(): requirements = _read_requirements("requirements-rocm.txt") elif _is_neuron(): requirements = _read_requirements("requirements-neuron.txt") elif _is_npu(): requirements = _read_requirements("requirements-ascend.txt") elif _is_cpu(): requirements = _read_requirements("requirements-cpu.txt") else: raise ValueError( "Unsupported platform, please use CUDA, ROCm, Neuron, NPU or CPU.") return requirements ext_modules = [] if _is_cuda(): ext_modules.append(CMakeExtension(name="vllm._moe_C")) """ if not _is_neuron(): ext_modules.append(CMakeExtension(name="vllm._C")) if _install_punica(): ext_modules.append(CMakeExtension(name="vllm._punica_C")) """ package_data = { "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"] } if envs.VLLM_USE_PRECOMPILED: ext_modules = [] package_data["vllm"].append("*.so") setup( name="vllm", version=get_vllm_version(), author="vLLM Team", license="Apache 2.0", description=("A high-throughput and memory-efficient inference and " "serving engine for LLMs"), long_description=read_readme(), long_description_content_type="text/markdown", url="https://github.com/vllm-project/vllm", project_urls={ "Homepage": "https://github.com/vllm-project/vllm", "Documentation": "https://vllm.readthedocs.io/en/latest/", }, classifiers=[ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "License :: OSI Approved :: Apache Software License", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], packages=find_packages(exclude=("benchmarks", "csrc", "docs", "examples", "tests*")), python_requires=">=3.8", install_requires=get_requirements(), ext_modules=ext_modules, extras_require={ "tensorizer": ["tensorizer==2.9.0"], }, cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {}, package_data=package_data, )
"""vLLM: a high-throughput and memory-efficient inference engine for LLMs""" import vllm_npu from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.engine.llm_engine import LLMEngine from vllm.entrypoints.llm import LLM from vllm.executor.ray_utils import initialize_ray_cluster from vllm.model_executor.models import ModelRegistry from vllm.outputs import CompletionOutput, RequestOutput from vllm.sampling_params import SamplingParams __version__ = "0.4.2" __all__ = [ "LLM", "ModelRegistry", "SamplingParams", "RequestOutput", "CompletionOutput", "LLMEngine", "EngineArgs", "AsyncLLMEngine", "AsyncEngineArgs", "initialize_ray_cluster", ]
numpy decorator attrs psutil absl-py cloudpickle scipy tornado transformers accelerate pandas
import io import os import re from typing import List import setuptools ROOT_DIR = os.path.dirname(__file__) def get_path(*filepath) -> str: return os.path.join(ROOT_DIR, *filepath) def find_version(filepath: str): """Extract version information from the given filepath. """ with open(filepath) as fp: version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M) if version_match: return version_match.group(1) raise RuntimeError("Unable to find version string.") def read_readme() -> str: """Read the README file.""" return io.open(get_path("README.md"), "r", encoding="utf-8").read() def get_requirements() -> List[str]: """Get Python package dependencies from requirements.txt.""" with open(get_path("requirements.txt")) as f: requirements = f.read().strip().split("\n") return requirements setuptools.setup( name="vllm_npu", version=find_version(get_path("vllm_npu", "__init__.py")), author="Huawei", license="Apache 2.0", description=("A high-throughput and memory-efficient inference and " "serving engine for LLMs"), long_description=read_readme(), long_description_content_type="text/markdown", url="", project_urls={ "Homepage": "", "Documentation": "", }, classifiers=[ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")), python_requires=">=3.8", install_requires=get_requirements(), )
from vllm_npu.attention.selector import get_attn_backend import vllm.attention.selector as selector import vllm.worker.model_runner as mr import vllm.worker.cache_engine as ce selector.get_attn_backend = get_attn_backend mr.get_attn_backend = get_attn_backend ce.get_attn_backend = get_attn_backend
# Part of codes in this file was copied from project [vLLM Team][vllm] from dataclasses import dataclass from typing import List, Optional, Tuple from vllm.attention.backends.abstract import (AttentionBackend, AttentionMetadataPerStage) import torch def get_kv_cache_shape( num_blocks: int, block_size: int, num_kv_heads: int, head_size: int, ) -> Tuple[int, ...]: return (2, num_blocks, block_size, num_kv_heads, head_size) class AscendAttentionBackend(AttentionBackend): @staticmethod def get_name() -> str: return "ascend-attn-backend" @staticmethod def get_impl_cls(): return None @staticmethod def make_metadata(*args, **kwargs) -> "AttentionMetadata": return AttentionMetadata(*args, **kwargs) @staticmethod def get_kv_cache_shape( num_blocks: int, block_size: int, num_kv_heads: int, head_size: int, ) -> Tuple[int, ...]: return get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size) @staticmethod def swap_blocks( src_kv_cache: torch.Tensor, dst_kv_cache: torch.Tensor, src_to_dst: torch.Tensor, ) -> None: pass @staticmethod def copy_blocks( kv_caches: List[torch.Tensor], src_to_dists: torch.Tensor, ) -> None: pass @dataclass class AttentionMetadata(AttentionMetadataPerStage): """Metadata for AscendAttentionBackend. """ # Currently, input sequences can only contain all prompts # or all decoding. True if all sequences are prompts. is_prompt: bool # (batch_size,). The sequence length per sequence. Sequence length means # the computed tokens + new tokens None if it is a decoding. seq_lens: Optional[List[int]] # seq_lens stored as a tensor. seq_lens_tensor: Optional[torch.Tensor] # Maximum query length in the batch. max_query_len: Optional[int] # Maximum sequence length in the batch. max_seq_len: Optional[int] # (batch_size + 1,). The cumulative subquery lengths of the sequences in # the batch, used to index into subquery. E.g., if the subquery length # is [4, 6], it is [0, 4, 10]. subquery_start_loc: Optional[torch.Tensor] # (batch_size + 1,). The cumulative sequence lengths of the sequences in # the batch, used to index into sequence. E.g., if the sequence length is # [4, 6], it is [0, 4, 10]. seq_start_loc: Optional[torch.Tensor] # (batch_size,) A tensor of context lengths (tokens that are computed # so far). context_lens_tensor: Optional[torch.Tensor] block_tables: Optional[torch.Tensor] # Whether or not if cuda graph is enabled. use_cuda_graph: bool
# Part of codes in this file was copied from project [vLLM Team][vllm] from functools import lru_cache from typing import Type import torch from vllm.attention.backends.abstract import AttentionBackend from vllm.logger import init_logger logger = init_logger(__name__) @lru_cache(maxsize=None) def get_attn_backend(dtype: torch.dtype) -> Type[AttentionBackend]: logger.info("Using Ascend backend.") from vllm_npu.attention.backends import AscendAttentionBackend return AscendAttentionBackend
from vllm.engine.llm_engine import LLMEngine from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm_npu.engine.ascend_engine import from_engine_args from vllm_npu.engine.async_ascend_engine import from_engine_args_async LLMEngine.from_engine_args = from_engine_args AsyncLLMEngine.from_engine_args = from_engine_args_async
from vllm.engine.arg_utils import EngineArgs from vllm.usage.usage_lib import UsageContext from vllm_npu.executor.ray_utils import initialize_ray_cluster @classmethod def from_engine_args( cls, engine_args: EngineArgs, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, ) -> "LLMEngine": """Creates an LLM engine from the engine arguments.""" # Create the engine configs. engine_config = engine_args.create_engine_config() # Initialize the cluster and specify the executor class. if engine_config.device_config.device_type == "neuron": from vllm.executor.neuron_executor import NeuronExecutor executor_class = NeuronExecutor elif engine_config.device_config.device_type == "cpu": from vllm.executor.cpu_executor import CPUExecutor executor_class = CPUExecutor elif engine_config.device_config.device_type == "npu": if engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm_npu.executor.ascend_ray_executor import RayAscendExecutor executor_class = RayAscendExecutor else: from vllm_npu.executor.ascend_executor import AscendExecutor executor_class = AscendExecutor elif engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm.executor.ray_gpu_executor import RayGPUExecutor executor_class = RayGPUExecutor else: if engine_config.parallel_config.world_size != 1: raise ValueError("Ray is required if parallel_config.world_size > 1.") from vllm.executor.gpu_executor import GPUExecutor executor_class = GPUExecutor # Create the LLM engine. engine = cls( **engine_config.to_dict(), executor_class=executor_class, log_stats=not engine_args.disable_log_stats, usage_context=usage_context, ) return engine
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.usage.usage_lib import UsageContext from vllm_npu.executor.ray_utils import initialize_ray_cluster @classmethod def from_engine_args_async( cls, engine_args: AsyncEngineArgs, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, ) -> "AsyncLLMEngine": """Creates an async LLM engine from the engine arguments.""" # Create the engine configs. engine_config = engine_args.create_engine_config() if engine_config.device_config.device_type == "neuron": from vllm.executor.neuron_executor import NeuronExecutorAsync executor_class = NeuronExecutorAsync elif engine_config.device_config.device_type == "cpu": if engine_config.parallel_config.worker_use_ray: raise RuntimeError("Ray is not supported with the CPU backend.") from vllm.executor.cpu_executor import CPUExecutorAsync executor_class = CPUExecutorAsync elif engine_config.device_config.device_type == "npu": if engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm_npu.executor.ascend_ray_executor import RayAscendExecutorAsync executor_class = RayAscendExecutorAsync else: from vllm_npu.executor.ascend_executor import AscendExecutorAsync executor_class = AscendExecutorAsync elif engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync executor_class = RayGPUExecutorAsync else: if engine_config.parallel_config.world_size != 1: raise RuntimeError("Ray is required if parallel_config.world_size > 1.") from vllm.executor.gpu_executor import GPUExecutorAsync executor_class = GPUExecutorAsync # Create the async LLM engine. engine = cls( engine_config.parallel_config.worker_use_ray, engine_args.engine_use_ray, **engine_config.to_dict(), executor_class=executor_class, log_requests=not engine_args.disable_log_requests, log_stats=not engine_args.disable_log_stats, max_log_len=engine_args.max_log_len, start_engine_loop=start_engine_loop, usage_context=usage_context, ) return engine
from vllm_npu.executor.ray_utils import initialize_ray_cluster from vllm.executor import ray_utils ray_utils.initialize_ray_cluster = initialize_ray_cluster
# Part of codes in this file was copied from project [vLLM Team][vllm] from typing import List, Set, Tuple from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase from vllm.logger import init_logger from vllm.lora.request import LoRARequest from vllm.sequence import ExecuteModelRequest, SamplerOutput from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, make_async) from vllm_npu.worker.ascend_worker import AscendWorker logger = init_logger(__name__) class AscendExecutor(ExecutorBase): def _init_executor(self) -> None: assert (not self.speculative_config ), "Speculative decoding is not yet supported for Ascend backend." # Instantiate the worker and load the model to the device. self._init_worker() def _init_worker(self): distributed_init_method = get_distributed_init_method( get_ip(), get_open_port()) self.driver_worker = AscendWorker( self.model_config, self.parallel_config, self.scheduler_config, self.device_config, self.cache_config, self.load_config, local_rank=0, rank=0, distributed_init_method=distributed_init_method, lora_config=self.lora_config, is_driver_worker=True, ) self.driver_worker.init_device() self.driver_worker.load_model() def determine_num_available_blocks(self) -> Tuple[int, int]: """Determine the number of available KV blocks by invoking the underlying worker. """ return self.driver_worker.determine_num_available_blocks() def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None: """Initialize the KV cache by invoking the underlying worker. """ self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks) def execute_model( self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]: output = self.driver_worker.execute_model(execute_model_req) return output def add_lora(self, lora_request: LoRARequest) -> bool: return self.driver_worker.add_lora(lora_request) def remove_lora(self, lora_id: int) -> bool: return self.driver_worker.remove_lora(lora_id) def list_loras(self) -> Set[int]: return self.driver_worker.list_loras() def check_health(self) -> None: # NeuronExecutor will always be healthy as long as # it's running. return class AscendExecutorAsync(AscendExecutor, ExecutorAsyncBase): async def execute_model_async( self, execute_model_req: ExecuteModelRequest, ) -> List[SamplerOutput]: output = await make_async( self.driver_worker.execute_model )(execute_model_req=execute_model_req,) return output # async def check_health_async(self) -> None: # # AscendExecutor will always be healthy as long as # # it's running. # return
import asyncio import os import pickle from collections import defaultdict from itertools import islice, repeat from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import vllm.envs as envs from vllm.executor.distributed_gpu_executor import ( # yapf: disable DistributedGPUExecutor, DistributedGPUExecutorAsync) from vllm.executor.ray_utils import RayWorkerWrapper, ray from vllm.logger import init_logger from vllm.sequence import ExecuteModelRequest, SamplerOutput from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, get_vllm_instance_id, make_async) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup logger = init_logger(__name__) USE_RAY_COMPILED_DAG = envs.VLLM_USE_RAY_COMPILED_DAG class RayAscendExecutor(DistributedGPUExecutor): def _init_executor(self) -> None: assert (not self.speculative_config ), "Speculative decoding not yet supported for RayNPU backend." assert self.parallel_config.worker_use_ray placement_group = self.parallel_config.placement_group # Disable Ray usage stats collection. ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") if ray_usage != "1": os.environ["RAY_USAGE_STATS_ENABLED"] = "0" # Create the parallel GPU workers. self._init_workers_ray(placement_group) self.forward_dag = None if USE_RAY_COMPILED_DAG: self.forward_dag = self._compiled_ray_dag() def _init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): if self.parallel_config.tensor_parallel_size == 1: # For single GPU case, we use a ray worker with constrained memory. num_gpus = self.cache_config.gpu_memory_utilization else: # Otherwise, the ray workers are allocated with a full GPU. num_gpus = 1 # The driver dummy worker does not actually use any resources. # It holds the resource for the driver worker. self.driver_dummy_worker: Optional[RayWorkerWrapper] = None # The remaining workers are the actual ray actors. self.workers: List[RayWorkerWrapper] = [] if self.parallel_config.ray_workers_use_nsight: ray_remote_kwargs = self._configure_ray_workers_use_nsight( ray_remote_kwargs) # Create the workers. driver_ip = get_ip() for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=placement_group, placement_group_capture_child_tasks=True, placement_group_bundle_index=bundle_id, ) worker = ray.remote( num_cpus=0, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerWrapper).remote( worker_module_name="vllm_npu.worker.ascend_worker", worker_class_name="AscendWorker", trust_remote_code=self.model_config.trust_remote_code, ) worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker self.driver_worker = RayWorkerWrapper( worker_module_name="vllm_npu.worker.ascend_worker", worker_class_name="AscendWorker", trust_remote_code=self.model_config.trust_remote_code, ) else: # Else, added to the list of workers. self.workers.append(worker) if self.driver_dummy_worker is None: raise ValueError( "Ray does not allocate any GPUs on the driver node. Consider " "adjusting the Ray placement group or running the driver on a " "GPU node.") # Get the set of GPU IDs used on each node. worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True) node_workers = defaultdict(list) node_gpus = defaultdict(list) for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): node_workers[node_id].append(i) node_gpus[node_id].extend(gpu_ids) for node_id, gpu_ids in node_gpus.items(): node_gpus[node_id] = sorted(gpu_ids) VLLM_INSTANCE_ID = get_vllm_instance_id() # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({ "CUDA_VISIBLE_DEVICES": ",".join(map(str, node_gpus[node_id])), "VLLM_INSTANCE_ID": VLLM_INSTANCE_ID, "VLLM_TRACE_FUNCTION": str(envs.VLLM_TRACE_FUNCTION), }, ) for (node_id, _) in worker_node_and_gpu_ids] self._run_workers("update_environment_variables", all_args=all_args_to_update_environment_variables) distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) # Initialize the actual workers inside worker wrapper. init_worker_all_kwargs = [ self._get_worker_kwargs( local_rank=node_workers[node_id].index(rank), rank=rank, distributed_init_method=distributed_init_method, ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) ] self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) self._run_workers("init_device") self._run_workers("load_model", max_concurrent_workers=self.parallel_config. max_parallel_loading_workers) def execute_model( self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]: all_outputs = self._run_workers( "execute_model", driver_kwargs={"execute_model_req": execute_model_req}, use_ray_compiled_dag=USE_RAY_COMPILED_DAG) # Only the driver worker returns the sampling results. return all_outputs[0] def _run_workers( self, method: str, *args, driver_args: Optional[Tuple[Any, ...]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, all_args: Optional[List[Tuple[Any, ...]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, use_ray_compiled_dag: bool = False, **kwargs, ) -> Any: """Runs the given method on all workers. Can be used in the following ways: - args/kwargs: All workers share the same args/kwargs - args/kwargs and driver_args/driver_kwargs: Driver worker has different args - all_args/all_kwargs: args/kwargs for each worker are specified individually """ if max_concurrent_workers: raise NotImplementedError( "max_concurrent_workers is not supported yet.") if driver_args is None: driver_args = args if all_args is None else all_args[0] if driver_kwargs is None: driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] count = len(self.workers) all_worker_args = repeat(args, count) if all_args is None \ else islice(all_args, 1, None) all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \ else islice(all_kwargs, 1, None) if use_ray_compiled_dag: assert self.forward_dag is not None output_channels = self.forward_dag.execute(1) else: # Start the ray workers first. ray_worker_outputs = [ worker.execute_method.remote(method, *worker_args, **worker_kwargs) for (worker, worker_args, worker_kwargs ) in zip(self.workers, all_worker_args, all_worker_kwargs) ] # Start the driver worker after all the ray workers. if not use_dummy_driver: driver_worker_output = self.driver_worker.execute_method( method, *driver_args, **driver_kwargs) else: assert self.driver_dummy_worker is not None driver_worker_output = ray.get( self.driver_dummy_worker.execute_method.remote( method, *driver_args, **driver_kwargs)) # Get the results of the ray workers. if self.workers: if use_ray_compiled_dag: try: ray_worker_outputs = [ pickle.loads(chan.begin_read()) for chan in output_channels ] finally: # Has to call end_read in order to reuse the DAG. for chan in output_channels: chan.end_read() else: ray_worker_outputs = ray.get(ray_worker_outputs) return [driver_worker_output] + ray_worker_outputs def _compiled_ray_dag(self): import pkg_resources required_version = "2.9" current_version = pkg_resources.get_distribution("ray").version if current_version < required_version: raise ValueError(f"Ray version {required_version} or greater is " f"required, but found {current_version}") from ray.dag import InputNode, MultiOutputNode assert self.parallel_config.worker_use_ray with InputNode() as input_data: forward_dag = MultiOutputNode([ worker.execute_model_compiled_dag_remote. bind( # type: ignore[attr-defined] input_data) for worker in self.workers ]) return forward_dag.experimental_compile() def check_health(self) -> None: """Raises an error if engine is unhealthy.""" self._check_if_any_actor_is_dead() def _check_if_any_actor_is_dead(self): if not self.workers: return dead_actors = [] for actor in self.workers: actor_state = ray.state.actors(actor._ray_actor_id.hex()) # pylint: disable=protected-access if actor_state["State"] == "DEAD": dead_actors.append(actor) if dead_actors: raise RuntimeError("At least one Worker is dead. " f"Dead Workers: {dead_actors}. ") class RayAscendExecutorAsync(RayAscendExecutor, DistributedGPUExecutorAsync): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.driver_executor = make_async(self.driver_worker.execute_method) async def _run_workers_async( self, method: str, *args, driver_args: Optional[Tuple[Any, ...]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, **kwargs, ) -> Any: """Runs the given method on all workers.""" coros = [] if driver_args is None: driver_args = args if driver_kwargs is None: driver_kwargs = kwargs coros.append( self.driver_executor(method, *driver_args, **driver_kwargs)) # Run the ray workers asynchronously. for worker in self.workers: coros.append(worker.execute_method.remote(method, *args, **kwargs)) all_outputs = await asyncio.gather(*coros) return all_outputs
from typing import Optional, Tuple, TYPE_CHECKING from vllm.config import ParallelConfig from vllm.utils import is_hip from vllm.logger import init_logger logger = init_logger(__name__) try: import ray except ImportError as e: logger.warning(f"Failed to import Ray with {e!r}. " "For distributed inference, please install Ray with " "`pip install ray`.") ray = None if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup def initialize_ray_cluster( parallel_config: ParallelConfig, ray_address: Optional[str] = None, ): """Initialize the distributed cluster with Ray. it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker. Args: parallel_config: The configurations for parallel execution. ray_address: The address of the Ray cluster. If None, uses the default Ray cluster address. """ if ray is None: raise ImportError( "Ray is not installed. Please install Ray to use multi-node " "serving.") # Connect to a ray cluster. if is_hip(): ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) else: """start adapt""" # without setting num_gpus, the function will try to detect num of # GPUs, but in ascend environment it may fail to detect gpus, thus # needed to be manually setted. ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) """end adapt""" if parallel_config.placement_group: # Placement group is already set. return # Create placement group for worker processes current_placement_group = ray.util.get_current_placement_group() if current_placement_group: # We are in a placement group bundles = current_placement_group.bundle_specs # Verify that we can use the placement group. gpu_bundles = 0 for bundle in bundles: bundle_gpus = bundle.get("GPU", 0) if bundle_gpus > 1: raise ValueError( "Placement group bundle cannot have more than 1 GPU.") if bundle_gpus: gpu_bundles += 1 if parallel_config.world_size > gpu_bundles: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the placement group.") else: num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0) if parallel_config.world_size > num_gpus_in_cluster: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the cluster.") # Create a new placement group placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size) current_placement_group = ray.util.placement_group( placement_group_specs) # Wait until PG is ready - this will block until all # requested resources are available, and will timeout # if they cannot be provisioned. ray.get(current_placement_group.ready(), timeout=1800) # Set the placement group in the parallel config parallel_config.placement_group = current_placement_group
import vllm.model_executor.model_loader as vllm_model_loader import vllm_npu.model_executor.ascend_model_loader as ascend_model_loader vllm_model_loader.get_architecture_class_name = ascend_model_loader.get_architecture_class_name
# Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage import contextlib import torch import torch.nn as nn from vllm.config import DeviceConfig, ModelConfig, LoadConfig from vllm.model_executor.model_loader.weight_utils import initialize_dummy_weights from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper def get_architecture_class_name(model_config: ModelConfig) -> str: architectures = getattr(model_config.hf_config, "architectures", []) if (model_config.quantization is not None and model_config.quantization != "fp8" and "MixtralForCausalLM" in architectures): architectures = ["QuantMixtralForCausalLM"] return architectures[0] @contextlib.contextmanager def _set_default_torch_dtype(dtype: torch.dtype): """Sets the default torch dtype to the given dtype.""" old_dtype = torch.get_default_dtype() torch.set_default_dtype(dtype) yield torch.set_default_dtype(old_dtype) def get_model(model_config: ModelConfig, device_config: DeviceConfig, load_config: LoadConfig, mindie_model_config, **kwargs) -> nn.Module: lora_config = kwargs.get("lora_config", None) model_class = MindIELlmWrapper # Get the (maybe quantized) linear method. linear_method = None with _set_default_torch_dtype(model_config.dtype): # Create a model instance. # The weights will be initialized as empty tensors. with torch.device(device_config.device): if hasattr(model_class, "supported_lora_modules"): model = model_class(mindie_model_config, linear_method, lora_config) elif lora_config: raise ValueError( f"Model {model_class.__name__} does not support LoRA, " "but LoRA is enabled. Support for this model may " "be added in the future. If this is important to you, " "please open an issue on github.") else: model = model_class(mindie_model_config, linear_method) if load_config.load_format == "dummy": initialize_dummy_weights(model) else: # Load the weights from the cached or downloaded files. model.load_weights(model_config.model, load_config.download_dir, load_config.load_format, model_config.revision) model = model.npu() return model.eval()
# Part of codes in this file was copied from project [vLLM Team][vllm] import random from typing import Dict, List, Optional, Tuple import torch import torch.nn as nn import numpy as np from vllm.sampling_params import SamplingType from vllm.sequence import SamplerOutput from vllm.model_executor.sampling_metadata import SamplingMetadata, SequenceGroupToSample from vllm.model_executor.layers.sampler import _get_logprobs, _build_sampler_output from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam _SAMPLING_EPS = 1e-5 SampleResultType = List[Tuple[List[int], List[int]]] def _to_tensor(data, dtype=None): if dtype: return torch.tensor(data, dtype=dtype, device=torch.device("npu")) else: return torch.tensor(data, device=torch.device("npu")) class AscendSampler(nn.Module): def __init__(self, mindie_model): super().__init__() self.mindie_model = mindie_model self.include_gpu_probs_tensor = False def forward( self, logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> Optional[SamplerOutput]: _, vocab_size = logits.shape mindie_sampling_data, mindie_sampling_param = self.construct_data(sampling_metadata, vocab_size) probs = torch.softmax(logits, dim=-1, dtype=torch.float) logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float) next_tokens = self.mindie_model.sample( logits, sampling_data=mindie_sampling_data, sampling_param=mindie_sampling_param, ) sample_results, maybe_sampled_tokens_tensor = recover_data( sampling_metadata=sampling_metadata, sampled_tokens=next_tokens, logprobs=logprobs, include_gpu_probs_tensor=self.include_gpu_probs_tensor, ) if self.include_gpu_probs_tensor: if maybe_sampled_tokens_tensor is None: raise RuntimeError("maybe_sampled_tokens_tensor is None") on_device_tensors = (probs, logprobs, maybe_sampled_tokens_tensor) else: on_device_tensors = None # Get the logprobs query results. prompt_logprobs, sample_logprobs = _get_logprobs( logprobs, sampling_metadata, sample_results) return _build_sampler_output(sample_results, sampling_metadata, prompt_logprobs, sample_logprobs, on_device_tensors=on_device_tensors) def construct_data( self, sampling_metadata: SamplingMetadata, vocab_size: int, ) -> Tuple[SamplingData, SamplingParam]: all_input_tokens: List[List[int]] = [] prompt_tokens: List[List[int]] = [] output_tokens: List[List[int]] = [] top_ks: List[int] = [] temperatures: List[float] = [] top_ps: List[float] = [] min_ps: List[float] = [] presence_penalties: List[float] = [] frequency_penalties: List[float] = [] repetition_penalties: List[float] = [] sampling_seeds: List[int] = [] sample_indices: List[int] = [] do_samples: List[bool] = [] # To Do do_penalties = False do_top_p_top_k = False do_min_p = False greedy_flag = False if sampling_metadata.seq_groups is None: raise RuntimeError("sampling_metadata.seq_group is None, no data received.") for seq_group in sampling_metadata.seq_groups: do_samples.append(seq_group.do_sample) seq_ids = seq_group.seq_ids sampling_params = seq_group.sampling_params temperature = sampling_params.temperature p = sampling_params.presence_penalty f = sampling_params.frequency_penalty r = sampling_params.repetition_penalty top_p = sampling_params.top_p min_p = sampling_params.min_p is_greedy = sampling_params.sampling_type == SamplingType.GREEDY seed = sampling_params.seed if seed is None: if is_greedy: seed = 0 else: lo, hi = torch.iinfo(torch.long).min, torch.iinfo(torch.long).max seed = random.randint(lo, hi) if is_greedy: greedy_flag = True # k should not be greater than the vocab size. top_k = min(sampling_params.top_k, vocab_size) top_k = vocab_size if top_k == -1 else top_k if temperature < _SAMPLING_EPS: temperature = 1.0 if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS or top_k != vocab_size): do_top_p_top_k = True if not do_min_p and min_p > _SAMPLING_EPS: do_min_p = True if not do_penalties: if abs(p) >= _SAMPLING_EPS: do_penalties = True elif abs(f) >= _SAMPLING_EPS: do_penalties = True elif abs(r - 1.0) >= _SAMPLING_EPS: do_penalties = True is_prompt = seq_group.is_prompt if (seq_group.is_prompt and sampling_params.prompt_logprobs is not None): # For tokens in the prompt that we only need to get # their logprobs query_len = seq_group.query_len if query_len is None: raise RuntimeError("query_len is None") prefill_len = len(seq_group.prompt_logprob_indices) temperatures += [temperature] * prefill_len sampling_seeds += [seed] * prefill_len top_ps += [top_p] * prefill_len top_ks += [top_k] * prefill_len min_ps += [min_p] * prefill_len presence_penalties += [0] * prefill_len frequency_penalties += [0] * prefill_len repetition_penalties += [1] * prefill_len prompt_tokens.extend([] for _ in range(prefill_len)) output_tokens.extend([] for _ in range(prefill_len)) all_input_tokens.extend([] for _ in range(prefill_len)) if seq_group.do_sample: sample_lens = len(seq_group.sample_indices) if sample_lens != len(seq_ids): raise ValueError("sample_lens != len(seq_ids)") for seq_id in seq_ids: seq_data = seq_group.seq_data[seq_id] prompt_tokens.append(seq_data.prompt_token_ids) output_tokens.append(seq_data.output_token_ids) all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids) temperatures += [temperature] * len(seq_ids) sampling_seeds += [seed] * len(seq_ids) top_ps += [top_p] * len(seq_ids) top_ks += [top_k] * len(seq_ids) min_ps += [min_p] * len(seq_ids) presence_penalties += [p] * len(seq_ids) frequency_penalties += [f] * len(seq_ids) repetition_penalties += [r] * len(seq_ids) repetition_penalties = np.array(repetition_penalties, dtype=np.float32) frequency_penalties = np.array(frequency_penalties, dtype=np.float32) presence_penalties = np.array(presence_penalties, dtype=np.float32) temperatures = np.array(temperatures, dtype=np.float32) top_ks = np.array(top_ks, dtype=np.int32) top_ps = np.array(top_ps, dtype=np.float32) sampling_seeds = np.array(sampling_seeds) do_samples = np.array(do_samples) max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0) padded_all_input_tokens = [ tokens + [vocab_size] * (max_tokens_len - len(tokens)) for tokens in all_input_tokens ] padded_all_input_tokens = np.array(padded_all_input_tokens, dtype=np.int32) output_max_len = max([len(tokens) for tokens in output_tokens], default=0) padded_output_tokens = [ tokens + [vocab_size] * (output_max_len - len(tokens)) for tokens in output_tokens ] padded_output_tokens = np.array(padded_output_tokens, dtype=np.int32) all_input_ids_tensor = _to_tensor( padded_all_input_tokens, torch.int32 ) if padded_all_input_tokens is not None else None output_ids_tensor = _to_tensor( padded_output_tokens, torch.int32 ) if padded_output_tokens is not None else None mindie_sampling_data = SamplingData( all_input_ids=all_input_ids_tensor, output_ids=output_ids_tensor ) if greedy_flag: mindie_sampling_param = None else: mindie_sampling_param = SamplingParam.from_numpy( repetition_penalty=repetition_penalties, frequency_penalty=frequency_penalties, presence_penalty=presence_penalties, temperature=temperatures, top_k=top_ks, top_p=top_ps, seed=sampling_seeds, do_sample=do_samples, to_tensor=_to_tensor, ) return (mindie_sampling_data, mindie_sampling_param) def recover_data( sampling_metadata: SamplingMetadata, sampled_tokens: np.ndarray, logprobs: torch.Tensor, include_gpu_probs_tensor: bool, ) -> Tuple[SampleResultType, Optional[torch.Tensor]]: categorized_seq_group_ids: Dict[SamplingType, List[int]] = {t: [] for t in SamplingType} categorized_sample_indices = sampling_metadata.categorized_sample_indices for i, seq_group in enumerate(sampling_metadata.seq_groups): sampling_params = seq_group.sampling_params sampling_type = sampling_params.sampling_type categorized_seq_group_ids[sampling_type].append(i) sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {} sample_metadata = {} # Create output tensor for sampled token ids. if include_gpu_probs_tensor: sampled_token_ids_tensor = torch.empty(logprobs.shape[0], 1, dtype=torch.long, device=logprobs.device) else: sampled_token_ids_tensor = None for sampling_type in SamplingType: sample_indices = categorized_sample_indices[sampling_type][:, 0] num_tokens = len(sample_indices) if num_tokens == 0: continue seq_group_id = categorized_seq_group_ids[sampling_type] seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_id] sample_metadata[sampling_type] = (seq_group_id, seq_groups) for sampling_type in SamplingType: if sampling_type not in sample_metadata: continue (seq_group_id, seq_groups) = sample_metadata[sampling_type] if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED): sample_results = normal_wrap(seq_groups, sampled_tokens) elif sampling_type == SamplingType.BEAM: sample_results = beam_wrap(seq_groups, sampled_tokens) sample_results_dict.update(zip(seq_group_id, sample_results)) sample_results = [ sample_results_dict.get(i, ([], [])) for i in range(len(sampling_metadata.seq_groups)) ] return sample_results, sampled_token_ids_tensor def normal_wrap( selected_seq_groups: List[SequenceGroupToSample], samples: np.ndarray, ): samples = samples.tolist() sample_idx = 0 results: SampleResultType = [] for seq_group in selected_seq_groups: if not seq_group.do_sample: results.append(([], [])) continue seq_ids = seq_group.seq_ids num_parent_seqs = len(seq_ids) parent_ids = list(range(num_parent_seqs)) next_token_ids = [samples[sample_idx]] results.append((next_token_ids, parent_ids)) sample_idx += num_parent_seqs return results def beam_wrap( selected_seq_groups: List[SequenceGroupToSample], samples: np.ndarray, ): raise ValueError(f"Unsupported sampling type: beam search")
from typing import List, Optional import math import torch from torch import nn from vllm.model_executor import SamplingMetadata from vllm.sequence import SamplerOutput from vllm.attention import AttentionMetadata from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch from vllm_npu.worker.cache_engine import KVCache from vllm_npu.model_executor.layers.ascend_sampler import AscendSampler class MindIELlmWrapper(nn.Module): def __init__(self, mindie_model_config, linear_method=None, lora_config=None): super(MindIELlmWrapper, self).__init__() self.mindie_model_config = mindie_model_config self.rank = mindie_model_config['rank'] self.local_rank = mindie_model_config['local_rank'] self.npu_id = self.local_rank self.world_size = mindie_model_config['world_size'] self.mindie_model = None self.sampler = None def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, kv_caches: List[KVCache], attn_metadata: AttentionMetadata, ) -> torch.Tensor: is_prompt = attn_metadata.num_prefill_tokens > 0 if kv_caches[0][0] is None: kv_caches, block_tables, slots = self.create_dummy_kv_cache(attn_metadata, input_ids) else: if is_prompt: block_tables = torch.tensor([0], dtype=torch.int32, device="npu") else: block_tables = attn_metadata.decode_metadata.block_tables slots = attn_metadata.slot_mapping if is_prompt: input_lengths = attn_metadata.prefill_metadata.seq_lens_tensor.to(torch.int32) max_seq_len = int(attn_metadata.prefill_metadata.seq_lens_tensor.max()) lm_head_indices = (attn_metadata.prefill_metadata.seq_lens_tensor.cumsum(dim=-1) - 1).to(torch.int64) else: input_lengths = attn_metadata.decode_metadata.seq_lens_tensor max_seq_len = attn_metadata.decode_metadata.max_seq_len lm_head_indices = None logits = self.mindie_model.forward_tensor(input_ids, positions, is_prompt, kv_caches, block_tables, slots, input_lengths, max_seq_len, lm_head_indices) return logits def compute_logits(self, hidden_states: torch.Tensor, sampling_metadata: SamplingMetadata) -> torch.Tensor: return hidden_states def sample( self, logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> Optional[SamplerOutput]: # hidden_states is logits next_tokens = self.sampler(logits, sampling_metadata) return next_tokens def load_weights(self, model_name_or_path: str, cache_dir: Optional[str] = None, load_format: str = "auto", revision: Optional[str] = None): if load_format not in ['auto', 'safetensors', 'pt']: raise ValueError('load-format support [safetensors, pt]') self.weight_dtype = torch.get_default_dtype() torch.set_default_dtype(torch.float32) self.mindie_model = GeneratorTorch(self.mindie_model_config) self.sampler = AscendSampler(self.mindie_model) torch.set_default_dtype(self.weight_dtype) # when warmup, create dummy kvcache, block_tables, slot_mapping def create_dummy_kv_cache(self, attn_metadata, input_ids): dummy_block_num = 1 dummy_block_size = 128 model_runner = self.mindie_model.model_wrapper.model_runner kv_cache = [ ( torch.empty( (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size), dtype=self.weight_dtype, device="npu", ), torch.empty( (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size), dtype=self.weight_dtype, device="npu", ), ) for _ in range(model_runner.num_layers) ] max_s = max(attn_metadata.prefill_metadata.seq_lens_tensor) max_need_block = math.ceil(max_s / dummy_block_size) batch_size = len(attn_metadata.prefill_metadata.seq_lens_tensor) block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device="npu") slot = [i for i in range(dummy_block_size)] slots = [] warm_up_len = len(input_ids) while warm_up_len > 0: if warm_up_len > dummy_block_size: slots.extend(slot) warm_up_len -= dummy_block_size else: slots.extend(slot[:warm_up_len]) warm_up_len = 0 slots = torch.tensor(slots, dtype=torch.long, device="npu") return kv_cache, block_tables, slots
import types from vllm.engine.llm_engine import usage_message import vllm_npu.usage.usage_lib as vllm_npu_usage_lib usage_message._report_usage_once = types.MethodType(vllm_npu_usage_lib._report_usage_once, usage_message)
# Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage import platform from typing import Any, Dict import cpuinfo import psutil import torch import vllm.envs as envs from vllm.usage.usage_lib import UsageContext, _detect_cloud_provider, _get_current_timestamp_ns def _report_usage_once(self, model_architecture: str, usage_context: UsageContext, extra_kvs: Dict[str, Any]) -> None: # Platform information if torch.npu.is_available(): device_property = torch.npu.get_device_properties() self.gpu_count = torch.npu.device_count() self.gpu_type = device_property.name self.gpu_memory_per_device = device_property.total_memory self.provider = _detect_cloud_provider() self.architecture = platform.machine() self.platform = platform.platform() self.total_memory = psutil.virtual_memory().total info = cpuinfo.get_cpu_info() self.num_cpu = info.get("count", None) self.cpu_type = info.get("brand_raw", "") self.cpu_family_model_stepping = ",".join([ str(info.get("family", "")), str(info.get("model", "")), str(info.get("stepping", "")) ]) # vLLM information import vllm # delayed import to prevent circular import self.context = usage_context.value self.vllm_version = vllm.__version__ self.model_architecture = model_architecture # Metadata self.log_time = _get_current_timestamp_ns() self.source = envs.VLLM_USAGE_SOURCE data = vars(self) if data["_report_usage_once"] is not None: del data["_report_usage_once"] if extra_kvs: data.update(extra_kvs) self._write_to_file(data) self._send_to_server(data)
from vllm_npu.worker.cache_engine import _allocate_kv_cache from vllm.worker import cache_engine cache_engine.CacheEngine._allocate_kv_cache = _allocate_kv_cache
# Part of codes in this file was copied from project [vLLM Team][vllm] import torch from typing import List, Optional, Tuple from vllm.logger import init_logger from vllm.sequence import (SamplerOutput, SequenceGroupMetadata) from vllm.sampling_params import SamplingParams from vllm.worker.model_runner import _prepare_fake_inputs, ModelRunner from vllm.utils import is_hip from vllm.lora.request import LoRARequest from vllm.config import (DeviceConfig, LoadConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, VisionLanguageConfig) from vllm_npu.model_executor.ascend_model_loader import get_model logger = init_logger(__name__) LORA_WARMUP_RANK = 8 class AscendModelRunner(ModelRunner): def __init__( self, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, load_config: LoadConfig, lora_config: Optional[LoRAConfig], mindie_model_config, kv_cache_dtype: Optional[str] = "auto", is_driver_worker: bool = False, vision_language_config: Optional[VisionLanguageConfig] = None, ): super(AscendModelRunner, self).__init__( model_config, parallel_config, scheduler_config, device_config, load_config, lora_config, kv_cache_dtype, is_driver_worker, vision_language_config ) self.mindie_model_config = mindie_model_config def load_model(self) -> None: self.model = get_model( model_config=self.model_config, device_config=self.device_config, load_config=self.load_config, mindie_model_config=self.mindie_model_config, ) if self.kv_cache_dtype == "fp8" and is_hip(): # Currently scaled KV cache is only enabled on ROCm if self.model_config.quantization_param_path is not None: if callable(getattr(self.model, "load_kv_cache_scales", None)): self.model.load_kv_cache_scales( self.model_config.quantization_param_path) else: raise RuntimeError( "Using FP8 KV cache and scaling factors provided but " "model %s does not support loading scaling factors.", self.model.__class__) else: logger.warning( "Using FP8 KV cache but no scaling factors " "provided. Defaulting to scaling factors of 1.0. " "This may lead to less accurate results!") elif self.model_config.quantization_param_path is not None: logger.warning("KV cache scaling factors provided, " "but the KV cache data type is not FP8. " "KV cache scaling factors will not be used.") @torch.inference_mode() def execute_model( self, seq_group_metadata_list: List[SequenceGroupMetadata], kv_caches: List[Tuple[torch.Tensor, torch.Tensor]], ) -> Optional[SamplerOutput]: (input_tokens, input_positions, attn_metadata, sampling_metadata, lora_requests, lora_mapping, multi_modal_input ) = self.prepare_input_tensors(seq_group_metadata_list) if self.lora_config: self.set_active_loras(lora_requests, lora_mapping) # Currently cuda graph is only supported by the decode phase. prefill_meta = attn_metadata.prefill_metadata decode_meta = attn_metadata.decode_metadata model_executable = self.model execute_model_kwargs = { "input_ids": input_tokens, "positions": input_positions, "kv_caches": kv_caches, "attn_metadata": attn_metadata, } hidden_states = model_executable(**execute_model_kwargs) # Only perform sampling in the driver worker. if not self.is_driver_worker: return None # Sample the next token. output = self.model.sample( logits=hidden_states, sampling_metadata=sampling_metadata, ) return output @torch.inference_mode() def profile_run(self) -> None: # Enable top-k sampling to reflect the accurate memory usage. sampling_params = SamplingParams(top_p=0.99, top_k=self.vocab_size - 1) max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens max_num_seqs = self.scheduler_config.max_num_seqs dummy_lora_requests = [] dummy_lora_requests_per_seq = [] if self.lora_config: assert self.lora_manager is not None with self.lora_manager.dummy_lora_cache(): for idx in range(self.lora_config.max_loras): lora_id = idx + 1 dummy_lora_request = LoRARequest( lora_name=f"warmup_{lora_id}", lora_int_id=lora_id, lora_local_path="/not/a/real/path", ) self.lora_manager.add_dummy_lora(dummy_lora_request, rank=LORA_WARMUP_RANK) dummy_lora_requests.append(dummy_lora_request) dummy_lora_requests_per_seq = [ dummy_lora_requests[idx % len(dummy_lora_requests)] for idx in range(max_num_seqs) ] seqs: List[SequenceGroupMetadata] = [] if self.vision_language_config: max_num_seqs = min( max_num_seqs, int(max_num_batched_tokens / self.vision_language_config.image_feature_size)) for group_id in range(max_num_seqs): seq_len = (max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs)) seq_data, fake_multi_modal_input = _prepare_fake_inputs( seq_len, self.vision_language_config) seq = SequenceGroupMetadata( request_id=str(group_id), is_prompt=True, seq_data={group_id: seq_data}, sampling_params=sampling_params, block_tables=None, lora_request=dummy_lora_requests_per_seq[group_id] if dummy_lora_requests_per_seq else None, multi_modal_data=fake_multi_modal_input, ) seqs.append(seq) # Run the model with the dummy inputs. num_layers = self.model_config.get_num_layers(self.parallel_config) kv_caches = [(None, None)] * num_layers self.execute_model(seqs, kv_caches) torch.npu.synchronize() return
# Part of codes in this file was copied from project [vLLM Team][vllm] """A Ascend worker class.""" import gc from typing import Dict, List, Tuple, Set, Optional, Any import torch import torch.distributed from vllm.config import (CacheConfig, DeviceConfig, ModelConfig, LoadConfig, ParallelConfig, SchedulerConfig, LoRAConfig, VisionLanguageConfig) from vllm.model_executor import set_random_seed from vllm.distributed import (broadcast_tensor_dict, ensure_model_parallel_initialized, init_distributed_environment) from vllm.sequence import SamplerOutput, ExecuteModelRequest from vllm.worker.cache_engine import CacheEngine from vllm.lora.request import LoRARequest from vllm.worker.worker_base import WorkerBase from vllm.worker.worker import raise_if_cache_size_invalid from vllm_npu.worker.ascend_model_runner import AscendModelRunner class AscendWorker(WorkerBase): """A worker class that executes the model on a group of Ascend NPUs. """ def __init__( self, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, cache_config: CacheConfig, load_config: LoadConfig, local_rank: int, rank: int, distributed_init_method: str, lora_config: Optional[LoRAConfig] = None, vision_language_config: Optional[VisionLanguageConfig] = None, is_driver_worker: bool = False, ) -> None: self.model_config = model_config self.parallel_config = parallel_config self.scheduler_config = scheduler_config self.device_config = device_config self.cache_config = cache_config self.local_rank = local_rank self.rank = rank self.distributed_init_method = distributed_init_method self.lora_config = lora_config self.load_config = load_config self.is_driver_worker = is_driver_worker if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." if self.model_config.trust_remote_code: # note: lazy import to avoid importing torch before initializing from vllm.utils import init_cached_hf_modules init_cached_hf_modules() self.vision_language_config = vision_language_config mindie_model_config = { 'backend_type': 'atb', 'model_id': model_config.model, 'rank': rank, 'local_rank': local_rank, 'world_size': parallel_config.world_size, 'npu_device_id': local_rank, } self.model_runner = AscendModelRunner( model_config, parallel_config, scheduler_config, device_config, load_config=load_config, lora_config=self.lora_config, kv_cache_dtype=self.cache_config.cache_dtype, is_driver_worker=is_driver_worker, mindie_model_config=mindie_model_config) # Uninitialized cache engine. Will be initialized by # self.initialize_cache(). self.cache_engine: CacheEngine self.gpu_cache: List[torch.Tensor] def init_device(self) -> None: self.device = torch.device(f"npu:{self.local_rank}") torch.npu.set_device(self.device) # Initialize the distributed environment. init_worker_distributed_environment(self.parallel_config, self.rank, self.distributed_init_method, self.local_rank) # Initialize the model. set_random_seed(self.model_config.seed) def load_model(self): self.model_runner.load_model() @torch.inference_mode() def determine_num_available_blocks(self) -> Tuple[int, int]: """Profiles the peak memory usage of the model and returns the maximum number of NPU and CPU cache blocks that can be allocated. """ # Profile the memory usage of the model and get the maximum number of # cache blocks that can be allocated with the remaining free memory. torch.npu.empty_cache() torch.npu.reset_peak_memory_stats() # Execute a forward pass with dummy inputs to profile the memory usage # of the model. self.model_runner.profile_run() block_size = self.cache_config.block_size dummy_block_size = 128 dummy_num_blocks = dummy_block_size // block_size # Calculate the number of blocks that can be allocated with the # profiled peak memory. torch.npu.synchronize() peak_memory = torch.npu.max_memory_allocated() total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory cache_block_size = CacheEngine.get_cache_block_size( self.cache_config, self.model_config, self.parallel_config) num_gpu_blocks = int( (total_gpu_memory * self.cache_config.gpu_memory_utilization - peak_memory) // cache_block_size) + dummy_num_blocks num_cpu_blocks = int(self.cache_config.swap_space_bytes // cache_block_size) num_gpu_blocks = max(num_gpu_blocks, 0) num_cpu_blocks = max(num_cpu_blocks, 0) if self.model_runner.lora_manager: self.model_runner.remove_all_loras() gc.collect() torch.npu.empty_cache() return num_gpu_blocks, num_cpu_blocks def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None: raise_if_cache_size_invalid(num_gpu_blocks, self.cache_config.block_size, self.model_config.max_model_len) self.cache_config.num_gpu_blocks = num_gpu_blocks self.cache_config.num_cpu_blocks = num_cpu_blocks self._init_cache_engine() self._warm_up_model() def _init_cache_engine(self): assert self.cache_config.num_gpu_blocks is not None self.cache_engine = CacheEngine(self.cache_config, self.model_config, self.parallel_config) self.gpu_cache = self.cache_engine.gpu_cache self.model_runner.set_block_size(self.cache_engine.block_size) def _warm_up_model(self) -> None: pass def cache_swap( self, blocks_to_swap_in: Dict[int, int], blocks_to_swap_out: Dict[int, int], blocks_to_copy: Dict[int, List[int]], ) -> None: if blocks_to_swap_in: self.cache_engine.swap_in(blocks_to_swap_in) if blocks_to_swap_out: self.cache_engine.swap_out(blocks_to_swap_out) if blocks_to_copy: self.cache_engine.copy(blocks_to_copy) @torch.inference_mode() def execute_model( self, execute_model_req: Optional[ExecuteModelRequest] = None ) -> List[SamplerOutput]: if execute_model_req is None: seq_group_metadata_list = None else: seq_group_metadata_list = execute_model_req.seq_group_metadata_list if self.is_driver_worker: assert seq_group_metadata_list is not None assert execute_model_req is not None num_seq_groups = len(seq_group_metadata_list) blocks_to_swap_in = execute_model_req.blocks_to_swap_in blocks_to_swap_out = execute_model_req.blocks_to_swap_out blocks_to_copy = execute_model_req.blocks_to_copy data: Dict[str, Any] = { "num_seq_groups": num_seq_groups, "blocks_to_swap_in": blocks_to_swap_in, "blocks_to_swap_out": blocks_to_swap_out, "blocks_to_copy": blocks_to_copy, } broadcast_tensor_dict(data, src=0) else: data = broadcast_tensor_dict(src=0) num_seq_groups = data["num_seq_groups"] blocks_to_swap_in = data["blocks_to_swap_in"] blocks_to_swap_out = data["blocks_to_swap_out"] blocks_to_copy = data["blocks_to_copy"] self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy) # If there is no input, we don't need to execute the model. if num_seq_groups == 0: return [] output = self.model_runner.execute_model(seq_group_metadata_list, self.gpu_cache) # Worker only supports single-step execution. Wrap the output in a list # to conform to interface. return [output] def add_lora(self, lora_request: LoRARequest) -> bool: return self.model_runner.add_lora(lora_request) def remove_lora(self, lora_id: int) -> bool: return self.model_runner.remove_lora(lora_id) def list_loras(self) -> Set[int]: return self.model_runner.list_loras() def get_cache_block_size_bytes(self) -> int: """Get the size of the KV cache block size in bytes. """ return CacheEngine.get_cache_block_size(self.cache_config, self.model_config, self.parallel_config) def init_worker_distributed_environment( parallel_config: ParallelConfig, rank: int, distributed_init_method: Optional[str] = None, local_rank: int = -1, ) -> None: """Initialize the distributed environment.""" init_distributed_environment(parallel_config.world_size, rank, distributed_init_method, local_rank) ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size) def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype): # Check if the GPU supports the dtype. if torch_dtype == torch.bfloat16: compute_capability = torch.cuda.get_device_capability() if compute_capability[0] < 8: gpu_name = torch.cuda.get_device_name() raise ValueError( "Bfloat16 is only supported on GPUs with compute capability " f"of at least 8.0. Your {gpu_name} GPU has compute capability " f"{compute_capability[0]}.{compute_capability[1]}. " "You can use float16 instead by explicitly setting the" "`dtype` flag in CLI, for example: --dtype=half.")
from typing import Tuple, List import torch KVCache = Tuple[torch.Tensor, torch.Tensor] def _allocate_kv_cache( self, num_blocks: int, device: str, ) -> List[KVCache]: """Allocates KV cache on the specified device.""" kv_cache: List[KVCache] = [] key_block_shape = (self.block_size, self.num_heads, self.head_size) value_block_shape = (self.block_size, self.num_heads, self.head_size) for _ in range(self.num_layers): key_blocks = torch.empty( size=(num_blocks, *key_block_shape), dtype=self.dtype, device=device, ) value_blocks = torch.empty( size=(num_blocks, *value_block_shape), dtype=self.dtype, device=device, ) kv_cache.append((key_blocks, value_blocks)) return kv_cache
import warnings import torch class DeviceConfig: def __init__(self, device: str = "auto") -> None: if device == "auto": # Automated device type detection if getattr(torch.version, "cann", None) is not None: self.device_type = "npu" else: warnings.warn( "Failed to detect cann in your environment. \ Please check whether you have installed cann correctly. \ Now the device type for processing input is set to cpu." ) self.device_type = "cpu" else: # Device type is assigned explicitly self.device_type = device self.device = torch.device(self.device_type)
# Part of codes in this file was copied from project [vLLM Team][vllm] import socket import warnings from functools import lru_cache import vllm.envs as envs @lru_cache(maxsize=None) def is_ascend() -> bool: try: import torch_npu except ImportError: torch_npu = None return torch_npu is not None def get_ip() -> str: host_ip = envs.VLLM_HOST_IP if host_ip: return host_ip # IP is not set, try to get it from the network interface # try ipv4 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("localhost", 80)) # Doesn't need to be reachable socket_name = s.getsockname()[0] s.close() return socket_name except Exception: warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.") s.close() # try ipv6 try: s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # Google's public DNS server, see # https://developers.google.com/speed/public-dns/docs/using#addresses s.connect(("localhost", 80)) # Doesn't need to be reachable socket_name = s.getsockname()[0] s.close() return socket_name except Exception: warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.") s.close() s.close() warnings.warn( "Failed to get the IP address, using 0.0.0.0 by default." "The value can be set by the environment variable" " VLLM_HOST_IP or HOST_IP.", stacklevel=2) return "0.0.0.0"
import importlib import sys def replace_modules(old_module_name, new_module_name): if old_module_name in sys.modules: del sys.modules[old_module_name] sys.modules[old_module_name] = importlib.import_module(new_module_name) _default_ops = ( 'xformers', 'xformers.ops', 'vllm._C', 'xformers.ops.fmha.attn_bias', 'vllm.model_executor.layers.ops.sample' ) for _ops in _default_ops: replace_modules(_ops, 'vllm_npu') # dummy class to avoid import error. class ops: pass class cuda_utils: pass class cache_ops: pass class BlockDiagonalCausalMask: pass class LowerTriangularMaskWithTensorBias: pass def context_attention_fwd(): pass def get_num_triton_sampler_splits(): pass def sample(): pass
import torch import torch_npu from torch_npu.contrib import transfer_to_npu from vllm_npu.npu_adaptor import (BlockDiagonalCausalMask, LowerTriangularMaskWithTensorBias, cache_ops, cuda_utils, ops, context_attention_fwd, get_num_triton_sampler_splits, sample) import vllm_npu.core import vllm_npu.engine import vllm_npu.worker import vllm_npu.model_executor import vllm_npu.executor import vllm_npu.attention import vllm_npu.usage from vllm_npu.utils import get_ip from vllm_npu.config import DeviceConfig import vllm.utils as utils import vllm.executor.ray_utils as ray_utils import vllm.config as vconfig import vllm.engine.arg_utils as varg_utils utils.get_ip = get_ip ray_utils.get_ip = get_ip vconfig.DeviceConfig = DeviceConfig varg_utils.DeviceConfig = DeviceConfig __version__ = "0.4.2"
#!/bin/bash # ATB加速库环境变量 export ATB_LAYER_INTERNAL_TENSOR_REUSE=1 export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1 export ATB_OPERATION_EXECUTE_ASYNC=1 export TASK_QUEUE_ENABLE=1 export ATB_CONTENT_NCHW_TO_ND=1 export ATB_CONTEXT_WORKSPACE_RING=1 export HCCL_BUFFSIZE=120 export LCCL_DETERMINISTIC=1 export HCCL_DETERMINISTIC=true export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8 export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16 export ATB_LAUNCH_KERNEL_WITH_TILING=0 export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
from vllm import LLM, SamplingParams import json import argparse parser = argparse.ArgumentParser() parser.add_argument('--model_path', type=str, default="facebook/opt-125m") # input prompts for test prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] sampling_params = SamplingParams(max_tokens=512, temperature=0) args = parser.parse_args() model_path = args.model_path llm = LLM(model=model_path, block_size=128, max_model_len=4096, # max length of prompt tensor_parallel_size=8, # number of NPUs to be used max_num_seqs=256, # max batch number enforce_eager=True, # disable CUDA graph mode trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library ) outputs = llm.generate(prompts, sampling_params) for i, output in enumerate(outputs): prompt = output.prompt generated_text = output.outputs[0].text print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
#!/bin/bash # ATB加速环境变量 export ATB_LAYER_INTERNAL_TENSOR_REUSE=1 export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1 export ATB_OPERATION_EXECUTE_ASYNC=1 export TASK_QUEUE_ENABLE=1 export ATB_CONTENT_NCHW_TO_ND=1 export ATB_CONTEXT_WORKSPACE_RING=1 export HCCL_BUFFSIZE=120 export LCCL_DETERMINISTIC=1 export HCCL_DETERMINISTIC=true export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8 export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16 export ATB_LAUNCH_KERNEL_WITH_TILING=0 export VLLM_TARGET_DEVICE="npu" # add environment variable to use npu device export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors python3 test_offline.py --model_path facebook/opt-125m
#!/bin/bash if [ -d "./vllm" ]; then echo "./vllm directory has already exist!" exit 1 fi git clone -b v0.4.2 https://github.com/vllm-project/vllm.git vllm cp -r cover/* vllm/ cd vllm pip install -r requirements-ascend.txt python3 setup.py install cd ../vllm_npu pip install -r requirements.txt python3 setup.py install
将上述所有代码内容安装项目的目录结构完全还原后,进入项目根目录执行如下脚本,即可完成vLLM昇腾适配版的安装。
bash install.sh