"""vLLM: a high-throughput and memory-efficient inference engine for LLMs""" import vllm_npu from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.engine.llm_engine import LLMEngine from vllm.engine.ray_utils import initialize_cluster from vllm.entrypoints.llm import LLM from vllm.outputs import CompletionOutput, RequestOutput from vllm.sampling_params import SamplingParams __version__ = "0.3.3" __all__ = [ "LLM", "SamplingParams", "RequestOutput", "CompletionOutput", "LLMEngine", "EngineArgs", "AsyncLLMEngine", "AsyncEngineArgs", "initialize_cluster", ]
ninja # For faster builds. psutil ray == 2.9.3 sentencepiece # Required for LLaMA tokenizer. numpy transformers >= 4.38.0 # Required for Gemma. fastapi uvicorn[standard] pydantic >= 2.0 # Required for OpenAI server. prometheus_client >= 0.18.0 pynvml == 11.5.0 outlines >= 0.0.27
import contextlib import io import os import re import subprocess import warnings from pathlib import Path from typing import List, Set from packaging.version import parse, Version import setuptools import torch import torch.utils.cpp_extension as torch_cpp_ext from torch.utils.cpp_extension import BuildExtension, CUDAExtension, CUDA_HOME, ROCM_HOME ROOT_DIR = os.path.dirname(__file__) # If you are developing the C++ backend of vLLM, consider building vLLM with # `python setup.py develop` since it will give you incremental builds. # The downside is that this method is deprecated, see # https://github.com/pypa/setuptools/issues/917 MAIN_CUDA_VERSION = "12.1" # Supported NVIDIA GPU architectures. NVIDIA_SUPPORTED_ARCHS = {"7.0", "7.5", "8.0", "8.6", "8.9", "9.0"} ROCM_SUPPORTED_ARCHS = {"gfx908", "gfx90a", "gfx942", "gfx1100"} # SUPPORTED_ARCHS = NVIDIA_SUPPORTED_ARCHS.union(ROCM_SUPPORTED_ARCHS) def _is_hip() -> bool: return torch.version.hip is not None def _is_neuron() -> bool: torch_neuronx_installed = True try: subprocess.run(["neuron-ls"], capture_output=True, check=True) except (FileNotFoundError, PermissionError): torch_neuronx_installed = False return torch_neuronx_installed def _is_cuda() -> bool: return (torch.version.cuda is not None) and not _is_neuron() # Compiler flags. CXX_FLAGS = ["-g", "-O2", "-std=c++17"] # TODO(woosuk): Should we use -O3? NVCC_FLAGS = ["-O2", "-std=c++17"] if _is_hip(): if ROCM_HOME is None: raise RuntimeError( "Cannot find ROCM_HOME. ROCm must be available to build the package." ) NVCC_FLAGS += ["-DUSE_ROCM"] NVCC_FLAGS += ["-U__HIP_NO_HALF_CONVERSIONS__"] NVCC_FLAGS += ["-U__HIP_NO_HALF_OPERATORS__"] if _is_cuda() and CUDA_HOME is None: raise RuntimeError( "Cannot find CUDA_HOME. CUDA must be available to build the package.") ABI = 1 if torch._C._GLIBCXX_USE_CXX11_ABI else 0 CXX_FLAGS += [f"-D_GLIBCXX_USE_CXX11_ABI={ABI}"] NVCC_FLAGS += [f"-D_GLIBCXX_USE_CXX11_ABI={ABI}"] def get_hipcc_rocm_version(): # Run the hipcc --version command result = subprocess.run(['hipcc', '--version'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) # Check if the command was executed successfully if result.returncode != 0: print("Error running 'hipcc --version'") return None # Extract the version using a regular expression match = re.search(r'HIP version: (\S+)', result.stdout) if match: # Return the version string return match.group(1) else: print("Could not find HIP version in the output") return None def glob(pattern: str): root = Path(__name__).parent return [str(p) for p in root.glob(pattern)] def get_neuronxcc_version(): import sysconfig site_dir = sysconfig.get_paths()["purelib"] version_file = os.path.join(site_dir, "neuronxcc", "version", "__init__.py") # Check if the command was executed successfully with open(version_file, "rt") as fp: content = fp.read() # Extract the version using a regular expression match = re.search(r"__version__ = '(\S+)'", content) if match: # Return the version string return match.group(1) else: raise RuntimeError("Could not find HIP version in the output") def get_nvcc_cuda_version(cuda_dir: str) -> Version: """Get the CUDA version from nvcc. Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py """ nvcc_output = subprocess.check_output([cuda_dir + "/bin/nvcc", "-V"], universal_newlines=True) output = nvcc_output.split() release_idx = output.index("release") + 1 nvcc_cuda_version = parse(output[release_idx].split(",")[0]) return nvcc_cuda_version def get_pytorch_rocm_arch() -> Set[str]: """Get the cross section of Pytorch,and vllm supported gfx arches ROCM can get the supported gfx architectures in one of two ways Either through the PYTORCH_ROCM_ARCH env var, or output from rocm_agent_enumerator. In either case we can generate a list of supported arch's and cross reference with VLLM's own ROCM_SUPPORTED_ARCHs. """ env_arch_list = os.environ.get("PYTORCH_ROCM_ARCH", None) # If we don't have PYTORCH_ROCM_ARCH specified pull the list from rocm_agent_enumerator if env_arch_list is None: command = "rocm_agent_enumerator" env_arch_list = subprocess.check_output([command]).decode('utf-8')\ .strip().replace("\n", ";") arch_source_str = "rocm_agent_enumerator" else: arch_source_str = "PYTORCH_ROCM_ARCH env variable" # List are separated by ; or space. pytorch_rocm_arch = set(env_arch_list.replace(" ", ";").split(";")) # Filter out the invalid architectures and print a warning. arch_list = pytorch_rocm_arch.intersection(ROCM_SUPPORTED_ARCHS) # If none of the specified architectures are valid, raise an error. if not arch_list: raise RuntimeError( f"None of the ROCM architectures in {arch_source_str} " f"({env_arch_list}) is supported. " f"Supported ROCM architectures are: {ROCM_SUPPORTED_ARCHS}.") invalid_arch_list = pytorch_rocm_arch - ROCM_SUPPORTED_ARCHS if invalid_arch_list: warnings.warn( f"Unsupported ROCM architectures ({invalid_arch_list}) are " f"excluded from the {arch_source_str} output " f"({env_arch_list}). Supported ROCM architectures are: " f"{ROCM_SUPPORTED_ARCHS}.", stacklevel=2) return arch_list def get_torch_arch_list() -> Set[str]: # TORCH_CUDA_ARCH_LIST can have one or more architectures, # e.g. "8.0" or "7.5,8.0,8.6+PTX". Here, the "8.6+PTX" option asks the # compiler to additionally include PTX code that can be runtime-compiled # and executed on the 8.6 or newer architectures. While the PTX code will # not give the best performance on the newer architectures, it provides # forward compatibility. env_arch_list = os.environ.get("TORCH_CUDA_ARCH_LIST", None) if env_arch_list is None: return set() # List are separated by ; or space. torch_arch_list = set(env_arch_list.replace(" ", ";").split(";")) if not torch_arch_list: return set() # Filter out the invalid architectures and print a warning. valid_archs = NVIDIA_SUPPORTED_ARCHS.union( {s + "+PTX" for s in NVIDIA_SUPPORTED_ARCHS}) arch_list = torch_arch_list.intersection(valid_archs) # If none of the specified architectures are valid, raise an error. if not arch_list: raise RuntimeError( "None of the CUDA architectures in `TORCH_CUDA_ARCH_LIST` env " f"variable ({env_arch_list}) is supported. " f"Supported CUDA architectures are: {valid_archs}.") invalid_arch_list = torch_arch_list - valid_archs if invalid_arch_list: warnings.warn( f"Unsupported CUDA architectures ({invalid_arch_list}) are " "excluded from the `TORCH_CUDA_ARCH_LIST` env variable " f"({env_arch_list}). Supported CUDA architectures are: " f"{valid_archs}.", stacklevel=2) return arch_list if _is_hip(): rocm_arches = get_pytorch_rocm_arch() NVCC_FLAGS += ["--offload-arch=" + arch for arch in rocm_arches] else: # First, check the TORCH_CUDA_ARCH_LIST environment variable. compute_capabilities = get_torch_arch_list() if _is_cuda() and not compute_capabilities: # If TORCH_CUDA_ARCH_LIST is not defined or empty, target all available # GPUs on the current machine. device_count = torch.cuda.device_count() for i in range(device_count): major, minor = torch.cuda.get_device_capability(i) if major < 7: raise RuntimeError( "GPUs with compute capability below 7.0 are not supported.") compute_capabilities.add(f"{major}.{minor}") ext_modules = [] if _is_cuda(): nvcc_cuda_version = get_nvcc_cuda_version(CUDA_HOME) if not compute_capabilities: # If no GPU is specified nor available, add all supported architectures # based on the NVCC CUDA version. compute_capabilities = NVIDIA_SUPPORTED_ARCHS.copy() if nvcc_cuda_version < Version("11.1"): compute_capabilities.remove("8.6") if nvcc_cuda_version < Version("11.8"): compute_capabilities.remove("8.9") compute_capabilities.remove("9.0") # Validate the NVCC CUDA version. if nvcc_cuda_version < Version("11.0"): raise RuntimeError( "CUDA 11.0 or higher is required to build the package.") if (nvcc_cuda_version < Version("11.1") and any(cc.startswith("8.6") for cc in compute_capabilities)): raise RuntimeError( "CUDA 11.1 or higher is required for compute capability 8.6.") if nvcc_cuda_version < Version("11.8"): if any(cc.startswith("8.9") for cc in compute_capabilities): # CUDA 11.8 is required to generate the code targeting compute capability 8.9. # However, GPUs with compute capability 8.9 can also run the code generated by # the previous versions of CUDA 11 and targeting compute capability 8.0. # Therefore, if CUDA 11.8 is not available, we target compute capability 8.0 # instead of 8.9. warnings.warn( "CUDA 11.8 or higher is required for compute capability 8.9. " "Targeting compute capability 8.0 instead.", stacklevel=2) compute_capabilities = set(cc for cc in compute_capabilities if not cc.startswith("8.9")) compute_capabilities.add("8.0+PTX") if any(cc.startswith("9.0") for cc in compute_capabilities): raise RuntimeError( "CUDA 11.8 or higher is required for compute capability 9.0.") NVCC_FLAGS_PUNICA = NVCC_FLAGS.copy() # Add target compute capabilities to NVCC flags. for capability in compute_capabilities: num = capability[0] + capability[2] NVCC_FLAGS += ["-gencode", f"arch=compute_{num},code=sm_{num}"] if capability.endswith("+PTX"): NVCC_FLAGS += [ "-gencode", f"arch=compute_{num},code=compute_{num}" ] if int(capability[0]) >= 8: NVCC_FLAGS_PUNICA += [ "-gencode", f"arch=compute_{num},code=sm_{num}" ] if capability.endswith("+PTX"): NVCC_FLAGS_PUNICA += [ "-gencode", f"arch=compute_{num},code=compute_{num}" ] # Use NVCC threads to parallelize the build. if nvcc_cuda_version >= Version("11.2"): nvcc_threads = int(os.getenv("NVCC_THREADS", 8)) num_threads = min(os.cpu_count(), nvcc_threads) NVCC_FLAGS += ["--threads", str(num_threads)] if nvcc_cuda_version >= Version("11.8"): NVCC_FLAGS += ["-DENABLE_FP8_E5M2"] # changes for punica kernels NVCC_FLAGS += torch_cpp_ext.COMMON_NVCC_FLAGS REMOVE_NVCC_FLAGS = [ '-D__CUDA_NO_HALF_OPERATORS__', '-D__CUDA_NO_HALF_CONVERSIONS__', '-D__CUDA_NO_BFLOAT16_CONVERSIONS__', '-D__CUDA_NO_HALF2_OPERATORS__', ] for flag in REMOVE_NVCC_FLAGS: with contextlib.suppress(ValueError): torch_cpp_ext.COMMON_NVCC_FLAGS.remove(flag) install_punica = bool(int(os.getenv("VLLM_INSTALL_PUNICA_KERNELS", "0"))) device_count = torch.cuda.device_count() for i in range(device_count): major, minor = torch.cuda.get_device_capability(i) if major < 8: install_punica = False break if install_punica: ext_modules.append( CUDAExtension( name="vllm._punica_C", sources=["csrc/punica/punica_ops.cc"] + glob("csrc/punica/bgmv/*.cu"), extra_compile_args={ "cxx": CXX_FLAGS, "nvcc": NVCC_FLAGS_PUNICA, }, )) elif _is_neuron(): neuronxcc_version = get_neuronxcc_version() vllm_extension_sources = [ ] if _is_cuda(): vllm_extension_sources.append("csrc/quantization/awq/gemm_kernels.cu") vllm_extension_sources.append( "csrc/quantization/marlin/marlin_cuda_kernel.cu") vllm_extension_sources.append("csrc/custom_all_reduce.cu") # Add MoE kernels. ext_modules.append( CUDAExtension( name="vllm._moe_C", sources=glob("csrc/moe/*.cu") + glob("csrc/moe/*.cpp"), extra_compile_args={ "cxx": CXX_FLAGS, "nvcc": NVCC_FLAGS, }, )) """ if not _is_neuron(): vllm_extension = CUDAExtension( name="vllm._C", sources=vllm_extension_sources, extra_compile_args={ "cxx": CXX_FLAGS, "nvcc": NVCC_FLAGS, }, libraries=["cuda"] if _is_cuda() else [], ) ext_modules.append(vllm_extension) """ def get_path(*filepath) -> str: return os.path.join(ROOT_DIR, *filepath) def find_version(filepath: str) -> str: """Extract version information from the given filepath. Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py """ with open(filepath) as fp: version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M) if version_match: return version_match.group(1) raise RuntimeError("Unable to find version string.") def get_vllm_version() -> str: version = find_version(get_path("vllm", "__init__.py")) return version if _is_hip(): # Get the HIP version hipcc_version = get_hipcc_rocm_version() if hipcc_version != MAIN_CUDA_VERSION: rocm_version_str = hipcc_version.replace(".", "")[:3] version += f"+rocm{rocm_version_str}" elif _is_neuron(): # Get the Neuron version neuron_version = str(neuronxcc_version) if neuron_version != MAIN_CUDA_VERSION: neuron_version_str = neuron_version.replace(".", "")[:3] version += f"+neuron{neuron_version_str}" else: cuda_version = str(nvcc_cuda_version) if cuda_version != MAIN_CUDA_VERSION: cuda_version_str = cuda_version.replace(".", "")[:3] version += f"+cu{cuda_version_str}" return version def read_readme() -> str: """Read the README file if present.""" p = get_path("README.md") if os.path.isfile(p): return io.open(get_path("README.md"), "r", encoding="utf-8").read() else: return "" def get_requirements() -> List[str]: """Get Python package dependencies from requirements.txt.""" if _is_hip(): with open(get_path("requirements-rocm.txt")) as f: requirements = f.read().strip().split("\n") elif _is_neuron(): with open(get_path("requirements-neuron.txt")) as f: requirements = f.read().strip().split("\n") else: with open(get_path("requirements.txt")) as f: requirements = f.read().strip().split("\n") return requirements package_data = { "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"] } if os.environ.get("VLLM_USE_PRECOMPILED"): ext_modules = [] package_data["vllm"].append("*.so") setuptools.setup( name="vllm", version=get_vllm_version(), author="vLLM Team", license="Apache 2.0", description=("A high-throughput and memory-efficient inference and " "serving engine for LLMs"), long_description=read_readme(), long_description_content_type="text/markdown", url="https://github.com/vllm-project/vllm", project_urls={ "Homepage": "https://github.com/vllm-project/vllm", "Documentation": "https://vllm.readthedocs.io/en/latest/", }, classifiers=[ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "License :: OSI Approved :: Apache Software License", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], packages=setuptools.find_packages(exclude=("benchmarks", "csrc", "docs", "examples", "tests")), python_requires=">=3.8", install_requires=get_requirements(), ext_modules=ext_modules, cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {}, package_data=package_data, )
#!/bin/bash # ATB加速库环境变量 export ATB_LAYER_INTERNAL_TENSOR_REUSE=1 export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1 export ATB_OPERATION_EXECUTE_ASYNC=1 export TASK_QUEUE_ENABLE=1 export ATB_CONTENT_NCHW_TO_ND=1 export ATB_CONTEXT_WORKSPACE_RING=1 export HCCL_BUFFSIZE=120 export LCCL_DETERMINISTIC=1 export HCCL_DETERMINISTIC=1 export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8 export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16 export ATB_LAUNCH_KERNEL_WITH_TILING=0 export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
from vllm import LLM, SamplingParams import json import argparse parser = argparse.ArgumentParser() parser.add_argument('--model_path', type=str, default="facebook/opt-125m") # input prompts for test prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] sampling_params = SamplingParams(max_tokens=512, temperature=0) args = parser.parse_args() model_path = args.model_path llm = LLM(model=model_path, max_model_len=4096, # max length of prompt tensor_parallel_size=8, # number of NPUs to be used max_num_seqs=256, # max batch number enforce_eager=True, # disable CUDA graph mode trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library ) outputs = llm.generate(prompts, sampling_params) for i, output in enumerate(outputs): prompt = output.prompt generated_text = output.outputs[0].text print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
#!/bin/bash # ATB加速库环境变量 export ATB_LAYER_INTERNAL_TENSOR_REUSE=1 export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1 export ATB_OPERATION_EXECUTE_ASYNC=1 export TASK_QUEUE_ENABLE=1 export ATB_CONTENT_NCHW_TO_ND=1 export ATB_CONTEXT_WORKSPACE_RING=1 export HCCL_BUFFSIZE=120 export LCCL_DETERMINISTIC=1 export HCCL_DETERMINISTIC=true export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8 export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16 export ATB_LAUNCH_KERNEL_WITH_TILING=0 export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors python3 test_offline.py --model_path facebook/opt-125m
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. from typing import Union import pytest import torch from mindie_llm.modeling.backend_type import BackendType from pytest_mock import MockerFixture from vllm.config import ModelConfig, ParallelConfig from vllm.model_executor.input_metadata import InputMetadata from vllm_npu import DeviceConfig from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper from vllm_npu.worker.ascend_worker import Worker MODELS = ["Ascend/Mock-Model-42B"] PARAMS = [ {"rank": 0, "local_rank": 0, "world_size": 1}, {"rank": 1, "local_rank": 1, "world_size": 2}, ] @pytest.mark.parametrize("model", MODELS) @pytest.mark.parametrize("params", PARAMS) @pytest.mark.parametrize("dtype", [torch.bfloat16]) def test_load_model( mocker: MockerFixture, model: str, params: list[dict[str, int]], dtype: Union[str, torch.dtype] ) -> None: world_size, local_rank, rank = params.get("world_size"), params.get("local_rank"), params.get("rank") mock_parallel_config = mocker.MagicMock(spec=ParallelConfig) mock_parallel_config.world_size = world_size mock_model_config = mocker.MagicMock(spec=ModelConfig) mock_model_config.model = model mock_model_config.dtype = dtype mock_model_config.load_format = "auto" mock_model_config.max_context_len_to_capture = 0 worker = Worker(mock_model_config, mock_parallel_config, None, DeviceConfig(), local_rank, rank, None) mock_get_model = mocker.patch("vllm_npu.model_executor.ascend_model_loader.MindIELlmWrapper") worker.load_model() # Check if the args are correctly passed to backend mock_get_model.assert_called_once_with( { "backend_type": BackendType.ATB, "model_id": model, "rank": rank, "local_rank": local_rank, "world_size": world_size, "npu_device_id": local_rank, } ) @pytest.mark.parametrize("load_format", ["auto", "safetensors", "pt"]) def test_load_weights(mocker: MockerFixture, load_format: str) -> None: mocker.patch("torch.get_default_dtype", return_value=torch.float32) mocker.patch("torch.set_default_dtype") mock_generator_torch = mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.GeneratorTorch") mock_sampler = mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.Sampler") mindie_llm_wrapper = MindIELlmWrapper(mocker.ANY) # Testing supported formats, raising inside mindie_llm_wrapper.load_weights(load_format) # Testing GeneratorTorch and Sampler call counts mock_generator_torch.assert_called_once() mock_sampler.assert_called_once() # Testing unsupported format try: mindie_llm_wrapper.load_weights("unsupported") except ValueError as e: if "load-format support [safetensors, pt]" not in str(e): raise AssertionError("Error message does not match expected text for unsupported formats") from e else: raise AssertionError("Expected a ValueError for unsupported format, but none was raised") @pytest.mark.parametrize("with_kv_cache", [True, False]) @pytest.mark.parametrize("is_prompt", [True, False]) def test_forward(mocker: MockerFixture, with_kv_cache: bool, is_prompt: bool) -> None: mocker.patch("torch.get_default_dtype", return_value=torch.float32) mocker.patch("torch.set_default_dtype") mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.GeneratorTorch") mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.Sampler") wrapper = MindIELlmWrapper(mocker.ANY) wrapper.load_weights() mock_forward_tensor = mocker.patch.object( wrapper.mindie_model, "forward_tensor", return_value=torch.tensor([[0.1, 0.2, 0.3]]) ) input_ids = torch.tensor([[1, 2, 3]]) positions = torch.tensor([[0, 1, 2]]) kv_caches = [(torch.tensor([0]), torch.tensor([0]))] if with_kv_cache else [(None, None)] lora_requests = [None] mock_input_metadata = mocker.MagicMock(spec=InputMetadata) mock_input_metadata.is_prompt = is_prompt mock_input_metadata.prompt_lens = torch.tensor([3], dtype=torch.int32) mock_input_metadata.context_lens = torch.tensor([3], dtype=torch.int32) mock_input_metadata.max_context_len = 3 mock_input_metadata.block_tables = torch.tensor([[0]], dtype=torch.int32, device="npu") mock_input_metadata.slot_mapping = torch.tensor([0, 1, 2], dtype=torch.int64) wrapper.forward(input_ids, positions, kv_caches, mock_input_metadata, lora_requests) if is_prompt: expected_lengths = mock_input_metadata.prompt_lens.to(torch.int32) expected_max_seq_len = int(mock_input_metadata.prompt_lens.max()) expected_lm_head_indices = (mock_input_metadata.prompt_lens.cumsum(dim=-1) - 1).to(torch.int64) else: expected_lengths = mock_input_metadata.context_lens expected_max_seq_len = mock_input_metadata.max_context_len expected_lm_head_indices = None if with_kv_cache: expected_kv_caches = kv_caches expected_block_tables = mock_input_metadata.block_tables expected_slots = mock_input_metadata.slot_mapping else: _, expected_block_tables, expected_slots = wrapper.create_dummy_kv_cache(mock_input_metadata, input_ids) # Hard to compare this object expected_kv_caches = mocker.ANY expected_adapter_ids = [None] mock_forward_tensor.assert_called_once_with( input_ids, positions, is_prompt, expected_kv_caches, expected_block_tables, expected_slots, expected_lengths, expected_max_seq_len, expected_lm_head_indices, adapter_ids = expected_adapter_ids, )
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import random from typing import Union import numpy as np import pytest import torch from mindie_llm.text_generator.utils.sampling_metadata import ( DoSampleMetadata, PenaltyMetadata, SamplingData, SamplingParam, SeedMetadata, TopKMetadata, TopPMetadata, ) from pytest_mock import MockerFixture from vllm.sampling_params import _SAMPLING_EPS from vllm.sequence import SamplingParams, SequenceData, SequenceGroupMetadata from vllm.worker.model_runner import ModelRunner from vllm_npu.model_executor.layers.sampler import Sampler torch.npu.set_device("npu") def _prepare_test( batch_size: int, is_prompt: bool = True, random_seq_length: bool = False, temprature: Union[int, float] = 0 ) -> tuple[torch.Tensor, Sampler]: torch.npu.set_device("npu") sampler = Sampler(None) model_runner = ModelRunner(None, None, None, None, None) model_runner.set_block_size(128) model_runner.sliding_window = None seq_group_metadata_list = [] prompt_lens = [] for i in range(batch_size): token_ids = list(range(random.randint(1, 128))) if random_seq_length else [1, 2, 3] seq_group_metadata_list.append( SequenceGroupMetadata( request_id=f"test_{i}", is_prompt=is_prompt, seq_data={0: SequenceData(token_ids)}, sampling_params=SamplingParams(temperature=temprature), block_tables={0: [1]}, ) ) prompt_lens.append(seq_group_metadata_list[-1].seq_data[0].get_len()) sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list, prompt_lens, prompt_lens) return sampling_metadata, sampler @pytest.mark.parametrize("vocab_size", [32000]) @pytest.mark.parametrize("dtype", [torch.bfloat16]) def test_forward(mocker: MockerFixture, vocab_size: int, dtype: torch.dtype): batch_size = random.randint(1, 256) sampling_metadata, sampler = _prepare_test(batch_size) fake_logits = torch.full((batch_size, vocab_size), 1e-1, dtype=dtype) fake_tokens = np.arange(batch_size) fake_sample_results = [([i], [0]) for i in range(batch_size)] deconstructor = sampler.get_sample_data_and_param reconstructor = sampler.get_sample_results mock_process = mocker.patch( "vllm_npu.model_executor.layers.sampler._apply_logits_processors", return_value=fake_logits ) mock_atb_model = mocker.patch.object(sampler, "atb_model") mock_deconstructor = mocker.patch.object( sampler, "get_sample_data_and_param", side_effect=lambda *args, **kwargs: deconstructor(*args, **kwargs) ) mock_backend_sample = mocker.patch.object(mock_atb_model, "sample", return_value=fake_tokens) mock_reconstructor = mocker.patch.object( sampler, "get_sample_results", side_effect=lambda *args, **kwargs: reconstructor(*args, **kwargs) ) mock_logprobs = mocker.patch( "vllm_npu.model_executor.layers.sampler._get_logprobs", return_value=(mocker.ANY, mocker.ANY) ) mocker.patch("vllm_npu.model_executor.layers.sampler._build_sampler_output") sampler.forward(fake_logits, sampling_metadata) mock_process.assert_called_once_with(fake_logits, sampling_metadata) mock_deconstructor.assert_called_once_with(sampling_metadata, vocab_size) mock_backend_sample.assert_called_once_with(fake_logits, mocker.ANY, mocker.ANY) mock_reconstructor.assert_called_once_with(sampling_metadata, fake_tokens) mock_logprobs.assert_called_once_with(mocker.ANY, sampling_metadata, fake_sample_results) @pytest.mark.parametrize("temperature", [0, 0.1, 1, 10]) @pytest.mark.parametrize("vocab_size", [32000]) def test_sample_data_and_param(temperature, vocab_size): batch_size = random.randint(1, 256) sampling_metadata, sampler = _prepare_test(batch_size, temprature=temperature) sampling_data, sampling_param = sampler.get_sample_data_and_param(sampling_metadata, vocab_size) # Sampling data if not isinstance(sampling_data, SamplingData): raise TypeError("The returned object is not an instance of SamplingData.") if not hasattr(sampling_data, "all_input_ids"): raise AttributeError("SamplingData does not have the 'all_input_ids' attribute.") if not hasattr(sampling_data, "output_ids"): raise AttributeError("SamplingData does not have the 'output_ids' attribute.") if not isinstance(sampling_data.all_input_ids, torch.Tensor): raise TypeError("all_input_ids should be a tensor.") if not isinstance(sampling_data.output_ids, torch.Tensor): raise TypeError("output_ids should be a tensor.") if sampling_data.all_input_ids.dtype != torch.int32: raise ValueError("The dtype of all_input_ids should be int32.") if sampling_data.output_ids.dtype != torch.int32: raise ValueError("The dtype of output_ids should be int32.") # Sampling param if temperature < _SAMPLING_EPS: if sampling_param is not None: raise ValueError("Sampling data should be None in greedy mode.") else: if not isinstance(sampling_param, SamplingParam): raise TypeError("The returned object is not an instance of SamplingParam.") if not hasattr(sampling_param, "penalty_meta"): raise AttributeError("SamplingParam does not have the 'penalty_meta' attribute.") if not hasattr(sampling_param, "temperature"): raise AttributeError("SamplingParam does not have the 'temperature' attribute.") if not hasattr(sampling_param, "top_k_meta"): raise AttributeError("SamplingParam does not have the 'top_k_meta' attribute.") if not hasattr(sampling_param, "top_p_meta"): raise AttributeError("SamplingParam does not have the 'top_p_meta' attribute.") if not hasattr(sampling_param, "seed_meta"): raise AttributeError("SamplingParam does not have the 'seed_meta' attribute.") if not hasattr(sampling_param, "do_sample_meta"): raise AttributeError("SamplingParam does not have the 'do_sample_meta' attribute.") if not isinstance(sampling_param.penalty_meta, PenaltyMetadata): raise TypeError("penalty_meta should be an instance of PenaltyMetadata.") if temperature != 1: if not isinstance(sampling_param.temperature, torch.Tensor): raise TypeError("Temperature should be a tensor.") else: if sampling_param.temperature is not None: raise ValueError("Temperature tensors should be None.") if not isinstance(sampling_param.top_k_meta, TopKMetadata): raise TypeError("top_k_meta should be an instance of TopKMetadata.") if not isinstance(sampling_param.top_p_meta, TopPMetadata): raise TypeError("top_p_meta should be an instance of TopPMetadata.") if not isinstance(sampling_param.seed_meta, SeedMetadata): raise TypeError("seed_meta should be an instance of SeedMetadata.") if not isinstance(sampling_param.do_sample_meta, DoSampleMetadata): raise TypeError("do_sample_meta should be an instance of DoSampleMetadata.") def test_sample_results_reconstruction(): batch_size = random.randint(1, 256) sampling_metadata, sampler = _prepare_test(batch_size) samples = np.arange(batch_size) sample_results = sampler.get_sample_results(sampling_metadata, samples) expected_sample_results = [([i], [0]) for i in range(batch_size)] if sample_results != expected_sample_results: raise ValueError(f"Sample results mismatch: Expected {expected_sample_results}, got {sample_results}.") selected_seq_groups = [([0], SamplingParams()), ([1, 2, 3], SamplingParams()), ([4, 5], SamplingParams())] samples = np.array([101, 102, 103, 104, 105, 106]) results = sampler.extract_next_tokens_and_parents(selected_seq_groups, samples) expected_results = [([101], [0]), ([102], [0, 1, 2]), ([105], [0, 1])] if results != expected_results: raise ValueError(f"Results mismatch: Expected {expected_results}, got {results}.") @pytest.mark.parametrize("is_prompt", [True, False]) @pytest.mark.parametrize("vocab_size", [32000]) def test_token_padding(is_prompt, vocab_size): batch_size = random.randint(1, 256) sampling_metadata, sampler = _prepare_test(batch_size, is_prompt, True) sampling_data, _ = sampler.get_sample_data_and_param(sampling_metadata, vocab_size) # Note that if padding fails, then sampling_data cannot be constructed. # So we just check some basic shape info here. if sampling_data.all_input_ids.shape[0] != batch_size: raise ValueError( f"Expected all_input_ids to have shape[0] of {batch_size}, but got {sampling_data.all_input_ids.shape[0]}." ) if sampling_data.output_ids.shape[0] != batch_size: raise ValueError( f"Expected output_ids to have shape[0] of {batch_size}, but got {sampling_data.output_ids.shape[0]}." )
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. from vllm_npu.core.scheduler import _schedule from vllm.core.scheduler import Scheduler Scheduler._schedule = _schedule
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] import time from typing import Dict, List, Deque, Optional from collections import deque from vllm.core.block_manager import AllocStatus from vllm.sequence import SequenceGroup, SequenceStatus from vllm.core.scheduler import logger, SchedulerOutputs def _schedule(self) -> SchedulerOutputs: # Blocks that need to be swapped or copied before model execution. blocks_to_swap_in: Dict[int, int] = {} blocks_to_swap_out: Dict[int, int] = {} blocks_to_copy: Dict[int, List[int]] = {} # Fix the current time. now = time.monotonic() # Join waiting sequences if possible. if not self.swapped: ignored_seq_groups: List[SequenceGroup] = [] scheduled: List[SequenceGroup] = [] # The total number of sequences on the fly, including the # requests in the generation phase. num_curr_seqs = sum(seq_group.get_max_num_running_seqs() for seq_group in self.running) num_batched_tokens = 0 curr_loras = set( seq_group.lora_int_id for seq_group in self.running) if self.lora_enabled else None # Optimization: We do not sort the waiting queue since the preempted # sequence groups are added to the front and the new sequence groups # are added to the back. leftover_waiting_sequences = deque() while self.waiting: seq_group = self.waiting[0] waiting_seqs = seq_group.get_seqs( status=SequenceStatus.WAITING) assert len(waiting_seqs) == 1, ( "Waiting sequence group should have only one prompt " "sequence.") num_prompt_tokens = waiting_seqs[0].get_len() if num_prompt_tokens > self.prompt_limit: logger.warning( f"Input prompt ({num_prompt_tokens} tokens) is too long" f" and exceeds limit of {self.prompt_limit}") for seq in waiting_seqs: seq.status = SequenceStatus.FINISHED_IGNORED ignored_seq_groups.append(seq_group) self.waiting.popleft() continue # If the sequence group cannot be allocated, stop. can_allocate = self.block_manager.can_allocate(seq_group) if can_allocate == AllocStatus.LATER: break elif can_allocate == AllocStatus.NEVER: logger.warning( f"Input prompt ({num_prompt_tokens} tokens) is too long" f" and exceeds the capacity of block_manager") for seq in waiting_seqs: seq.status = SequenceStatus.FINISHED_IGNORED ignored_seq_groups.append(seq_group) self.waiting.popleft() continue lora_int_id = 0 if self.lora_enabled: lora_int_id = seq_group.lora_int_id if lora_int_id > 0 and lora_int_id not in curr_loras and len( curr_loras) >= self.lora_config.max_loras: # We don't have a space for another LoRA, so # we ignore this request for now. leftover_waiting_sequences.appendleft(seq_group) self.waiting.popleft() continue # If the number of batched tokens exceeds the limit, stop. if (num_batched_tokens + num_prompt_tokens > self.scheduler_config.max_num_batched_tokens): break # The total number of sequences in the RUNNING state should not # exceed the maximum number of sequences. num_new_seqs = seq_group.get_max_num_running_seqs() if (num_curr_seqs + num_new_seqs > self.scheduler_config.max_num_seqs): break if lora_int_id > 0: curr_loras.add(lora_int_id) self.waiting.popleft() self._allocate(seq_group) self.running.append(seq_group) num_batched_tokens += num_prompt_tokens num_curr_seqs += num_new_seqs scheduled.append(seq_group) self.waiting.extendleft(leftover_waiting_sequences) if scheduled or ignored_seq_groups: scheduler_outputs = SchedulerOutputs( scheduled_seq_groups=scheduled, prompt_run=True, num_batched_tokens=num_batched_tokens, blocks_to_swap_in=blocks_to_swap_in, blocks_to_swap_out=blocks_to_swap_out, blocks_to_copy=blocks_to_copy, ignored_seq_groups=ignored_seq_groups, ) return scheduler_outputs # NOTE(woosuk): Preemption happens only when there is no available slot # to keep all the sequence groups in the RUNNING state. # In this case, the policy is responsible for deciding which sequence # groups to preempt. self.running = self.policy.sort_by_priority(now, self.running) # Reserve new token slots for the running sequence groups. running: Deque[SequenceGroup] = deque() preempted: List[SequenceGroup] = [] while self.running: seq_group = self.running.popleft() while not self.block_manager.can_append_slot(seq_group): if self.running: # Preempt the lowest-priority sequence groups. victim_seq_group = self.running.pop() self._preempt(victim_seq_group, blocks_to_swap_out) preempted.append(victim_seq_group) else: # No other sequence groups can be preempted. # Preempt the current sequence group. self._preempt(seq_group, blocks_to_swap_out) preempted.append(seq_group) break else: # Append new slots to the sequence group. self._append_slot(seq_group, blocks_to_copy) running.append(seq_group) self.running = running # Swap in the sequence groups in the SWAPPED state if possible. self.swapped = self.policy.sort_by_priority(now, self.swapped) if not preempted: num_curr_seqs = sum(seq_group.get_max_num_running_seqs() for seq_group in self.running) curr_loras = set( seq_group.lora_int_id for seq_group in self.running) if self.lora_enabled else None leftover_swapped = deque() while self.swapped: seq_group = self.swapped[0] lora_int_id = 0 if self.lora_enabled: lora_int_id = seq_group.lora_int_id if lora_int_id > 0 and lora_int_id not in curr_loras and len( curr_loras) >= self.lora_config.max_loras: # We don't have a space for another LoRA, so # we ignore this request for now. leftover_swapped.appendleft(seq_group) self.swapped.popleft() continue # If the sequence group cannot be swapped in, stop. if not self.block_manager.can_swap_in(seq_group): break # The total number of sequences in the RUNNING state should not # exceed the maximum number of sequences. num_new_seqs = seq_group.get_max_num_running_seqs() if (num_curr_seqs + num_new_seqs > self.scheduler_config.max_num_seqs): break if lora_int_id > 0: curr_loras.add(lora_int_id) self.swapped.popleft() self._swap_in(seq_group, blocks_to_swap_in) self._append_slot(seq_group, blocks_to_copy) num_curr_seqs += num_new_seqs self.running.append(seq_group) self.swapped.extendleft(leftover_swapped) # Each sequence in the generation phase only takes one token slot. # Therefore, the number of batched tokens is equal to the number of # sequences in the RUNNING state. num_batched_tokens = sum( seq_group.num_seqs(status=SequenceStatus.RUNNING) for seq_group in self.running) scheduler_outputs = SchedulerOutputs( scheduled_seq_groups=self.running, prompt_run=False, num_batched_tokens=num_batched_tokens, blocks_to_swap_in=blocks_to_swap_in, blocks_to_swap_out=blocks_to_swap_out, blocks_to_copy=blocks_to_copy, ignored_seq_groups=[], ) return scheduler_outputs
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. from vllm_npu.config import DeviceConfig from vllm_npu.utils import get_ip from vllm_npu.engine.llm_engine import DEVICE_TO_WORKER_MODULE_MAP from vllm_npu.engine.ray_utils import initialize_cluster from vllm.engine import arg_utils, llm_engine, ray_utils arg_utils.DeviceConfig = DeviceConfig llm_engine.DEVICE_TO_WORKER_MODULE_MAP = DEVICE_TO_WORKER_MODULE_MAP llm_engine.get_ip = get_ip llm_engine.initialize_cluster = initialize_cluster ray_utils.get_ip = get_ip ray_utils.initialize_cluster = initialize_cluster
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. DEVICE_TO_WORKER_MODULE_MAP = { "cuda": "vllm.worker.worker", "neuron": "vllm.worker.neuron_worker", "npu": "vllm_npu.worker.ascend_worker", }
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] from typing import Optional, Tuple, TYPE_CHECKING from vllm.config import ParallelConfig from vllm.utils import is_hip from vllm.logger import init_logger logger = init_logger(__name__) try: import ray except ImportError as e: logger.warning(f"Failed to import Ray with {e!r}. " "For distributed inference, please install Ray with " "`pip install ray`.") ray = None if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup def initialize_cluster( parallel_config: ParallelConfig, engine_use_ray: bool = False, ray_address: Optional[str] = None, ) -> Optional["PlacementGroup"]: """Initialize the distributed cluster probably with Ray. Args: parallel_config: The configurations for parallel execution. engine_use_ray: Whether to use Ray for async engine. ray_address: The address of the Ray cluster. If None, uses the default Ray cluster address. Returns: An optional `PlacementGroup`. It includes the specification of the resources for each distributed worker. None if Ray is not used. """ if parallel_config.worker_use_ray or engine_use_ray: if ray is None: raise ImportError( "Ray is not installed. Please install Ray to use distributed " "serving.") # Connect to a ray cluster. if is_hip(): ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) else: """start adapt""" ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) """end adapt""" if not parallel_config.worker_use_ray: assert parallel_config.world_size == 1, ( "Ray is required if parallel_config.world_size > 1.") return None # Create placement group for worker processes current_placement_group = ray.util.get_current_placement_group() if current_placement_group: # We are in a placement group bundles = current_placement_group.bundle_specs # Verify that we can use the placement group. gpu_bundles = 0 for bundle in bundles: bundle_gpus = bundle.get("GPU", 0) if bundle_gpus > 1: raise ValueError( "Placement group bundle cannot have more than 1 GPU.") if bundle_gpus: gpu_bundles += 1 if parallel_config.world_size > gpu_bundles: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the placement group.") else: num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0) if parallel_config.world_size > num_gpus_in_cluster: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the cluster.") # Create a new placement group placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size) current_placement_group = ray.util.placement_group( placement_group_specs) # Wait until PG is ready - this will block until all # requested resources are available, and will timeout # if they cannot be provisioned. ray.get(current_placement_group.ready(), timeout=1800) return current_placement_group
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of code in this file was copied from project [vLLM Team][vllm] """Utilities for selecting and loading models.""" import contextlib import torch import torch.nn as nn from vllm.config import DeviceConfig, ModelConfig from vllm.logger import init_logger from vllm.model_executor.weight_utils import initialize_dummy_weights from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper logger = init_logger(__name__) @contextlib.contextmanager def _set_default_torch_dtype(dtype: torch.dtype): """Sets the default torch dtype to the given dtype.""" old_dtype = torch.get_default_dtype() torch.set_default_dtype(dtype) yield torch.set_default_dtype(old_dtype) def get_model( model_config: ModelConfig, device_config: DeviceConfig, **kwargs, ) -> nn.Module: """Get model with MindIELlmWrapper Args: model_config (ModelConfig): Model configs. device_config (DeviceConfig): Device configs. mindie_model_config (dict[str, Any]): MindIE configs. Raises: ValueError: LoRA not supported. Returns: nn.Module: Model. """ if kwargs.get("lora_config"): logger.info( "Using LoRA(s) with MindIE backend:\n" "Please make sure your '--lora-modules' matches with your 'lora_adapter.json' in the model directory!\n" "Current config for LoRA(s): %s", kwargs.get("lora_config"), ) with _set_default_torch_dtype(model_config.dtype): with torch.device(device_config.device): model = MindIELlmWrapper(model_config.mindie_config) if model_config.load_format == "dummy": initialize_dummy_weights(model) else: model.load_weights(model_config.load_format) model = model.npu() model = model.eval() # For compatibility, move the config to the first hierarchy model.config = model.mindie_model.model_wrapper.model_runner.config return model
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import importlib import torch from vllm.config import DeviceConfig, ModelConfig DEVICE_TO_MODEL_LOADER_MAP = { "cuda": "vllm.model_executor.model_loader", "neuron": "vllm.model_executor.neuron_model_loader", "npu": "vllm_npu.model_executor.ascend_model_loader", } def get_model(model_config: ModelConfig, device_config: DeviceConfig, **kwargs) -> torch.nn.Module: model_loader_module = DEVICE_TO_MODEL_LOADER_MAP[device_config.device_type] imported_model_loader = importlib.import_module(f"{model_loader_module}") get_model_fn = imported_model_loader.get_model return get_model_fn(model_config, device_config, **kwargs)
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import vllm_npu.model_executor.ascend_model_loader import vllm_npu.model_executor.models.ascend.mindie_llm_wrapper from vllm_npu.model_executor.utils import DEVICE_TO_MODEL_LOADER_MAP, get_model from vllm.model_executor import utils from vllm import model_executor model_executor.get_model = get_model utils.DEVICE_TO_MODEL_LOADER_MAP = DEVICE_TO_MODEL_LOADER_MAP
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] """A layer that samples the next tokens from the model's outputs.""" from random import randint from typing import Dict, List, Optional, Tuple import numpy as np import torch import torch.nn as nn from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam from vllm.model_executor.layers.sampler import _apply_logits_processors, _build_sampler_output, _get_logprobs from vllm.model_executor.sampling_metadata import SamplingMetadata from vllm.sampling_params import SamplingParams, SamplingType from vllm.sequence import SamplerOutput _SAMPLING_EPS = 1e-5 def _to_tensor(data, dtype=None): if dtype: return torch.tensor(data, dtype=dtype, device=torch.device("npu")) else: return torch.tensor(data, device=torch.device("npu")) class Sampler(nn.Module): """Adapts the vLLM sample function to the backend sample function in MindIE.""" def __init__(self, atb_model: GeneratorTorch) -> None: super().__init__() self.atb_model = atb_model def forward(self, logits: torch.Tensor, sampling_metadata: SamplingMetadata) -> Optional[SamplerOutput]: _, vocab_size = logits.shape if not sampling_metadata.perform_sampling: return None # Apply logits processors (if any). logits = _apply_logits_processors(logits, sampling_metadata) sampling_data, sampling_param = self.get_sample_data_and_param(sampling_metadata, vocab_size) logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float) next_tokens = self.atb_model.sample(logits, sampling_data, sampling_param) # Sample the next tokens. sample_results = self.get_sample_results(sampling_metadata, next_tokens) # Get the logprobs query results. prompt_logprobs, sample_logprobs = _get_logprobs(logprobs, sampling_metadata, sample_results) return _build_sampler_output(sample_results, sampling_metadata, prompt_logprobs, sample_logprobs) def get_sample_data_and_param( self, sampling_metadata: "SamplingMetadata", vocab_size: int, ) -> tuple[SamplingData, SamplingParam]: """Get sample data and parameter from sampling metadata. Args: sampling_metadata (SamplingMetadata): Sampling metadata. vocab_size (int): Vocabulary size. Returns: tuple[SamplingData, SamplingParam]: Sample data and parameter. """ all_input_tokens: List[List[int]] = [] prompt_tokens: List[List[int]] = [] output_tokens: List[List[int]] = [] top_ks: List[int] = [] temperatures: List[float] = [] top_ps: List[float] = [] min_ps: List[float] = [] presence_penalties: List[float] = [] frequency_penalties: List[float] = [] repetition_penalties: List[float] = [] seeds: List[int] = [] do_penalties = False do_top_p_top_k = False do_min_p = False for i, seq_group in enumerate(sampling_metadata.seq_groups): seq_ids, sampling_params = seq_group temperature = sampling_params.temperature p = sampling_params.presence_penalty f = sampling_params.frequency_penalty r = sampling_params.repetition_penalty top_p = sampling_params.top_p min_p = sampling_params.min_p # k should not be greater than the vocab size. top_k = min(sampling_params.top_k, vocab_size) top_k = vocab_size if top_k == -1 else top_k is_greedy = sampling_params.sampling_type == SamplingType.GREEDY seed = sampling_params.seed or ( 0 if is_greedy else randint(torch.iinfo(torch.long).min, torch.iinfo(torch.long).max) ) if temperature < _SAMPLING_EPS: # NOTE: Zero temperature means deterministic sampling # (i.e., greedy sampling or beam search). # Set the temperature to 1 to avoid division by zero. temperature = 1.0 if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS or top_k != vocab_size): do_top_p_top_k = True if not do_min_p and min_p > _SAMPLING_EPS: do_min_p = True if not do_penalties and any( [abs(p) >= _SAMPLING_EPS, abs(f) >= _SAMPLING_EPS, abs(r - 1.0) >= _SAMPLING_EPS] ): do_penalties = True if i < sampling_metadata.num_prompts and sampling_params.prompt_logprobs is not None: # For tokens in the prompt that we only need to get their logprobs prompt_len = sampling_metadata.prompt_lens[i] temperatures += [temperature] * (prompt_len - 1) seeds += [seed] * (prompt_len - 1) top_ps += [top_p] * (prompt_len - 1) top_ks += [top_k] * (prompt_len - 1) min_ps += [min_p] * (prompt_len - 1) presence_penalties += [0] * (prompt_len - 1) frequency_penalties += [0] * (prompt_len - 1) repetition_penalties += [1] * (prompt_len - 1) prompt_tokens.extend([] for _ in range(prompt_len - 1)) output_tokens.extend([] for _ in range(prompt_len - 1)) all_input_tokens.extend([] for _ in range(prompt_len - 1)) for seq_id in seq_ids: seq_data = sampling_metadata.seq_data[seq_id] prompt_tokens.append(seq_data.prompt_token_ids) output_tokens.append(seq_data.output_token_ids) all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids) temperatures += [temperature] * len(seq_ids) top_ps += [top_p] * len(seq_ids) top_ks += [top_k] * len(seq_ids) min_ps += [min_p] * len(seq_ids) presence_penalties += [p] * len(seq_ids) frequency_penalties += [f] * len(seq_ids) repetition_penalties += [r] * len(seq_ids) max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0) padded_all_input_tokens = [ tokens + [vocab_size] * (max_tokens_len - len(tokens)) for tokens in all_input_tokens ] output_max_len = max([len(tokens) for tokens in output_tokens], default=0) padded_output_tokens = [tokens + [vocab_size] * (output_max_len - len(tokens)) for tokens in output_tokens] # To use from_numpy or not to, this is a question. # Convert to tensor if not None all_input_ids = padded_all_input_tokens and _to_tensor(padded_all_input_tokens, torch.int32) output_ids = padded_output_tokens and _to_tensor(padded_output_tokens, torch.int32) sampling_data = SamplingData(all_input_ids=all_input_ids, output_ids=output_ids) repetition_penalties = np.array(repetition_penalties, dtype=np.float32) frequency_penalties = np.array(frequency_penalties, dtype=np.float32) presence_penalties = np.array(presence_penalties, dtype=np.float32) temperatures = np.array(temperatures, dtype=np.float32) top_ks = np.array(top_ks, dtype=np.int32) top_ps = np.array(top_ps, dtype=np.float32) seeds = np.array(seeds, dtype=np.int32) do_sample = np.array([True]) # Temporary solution, need to find out a better way to deal with this if is_greedy: sampling_param = None else: sampling_param = SamplingParam.from_numpy( repetition_penalty=repetition_penalties, frequency_penalty=frequency_penalties, presence_penalty=presence_penalties, temperature=temperatures, top_k=top_ks, top_p=top_ps, seed=seeds, do_sample=do_sample, to_tensor=_to_tensor, ) return sampling_data, sampling_param def get_sample_results( self, sampling_metadata: SamplingMetadata, samples: np.array, ) -> List[Tuple[List[int], List[int]]]: """Reconstructs the sample results from the backend samples. Args: sampling_metadata (SamplingMetadata): Sampling metadata. samples (np.array): Backend samples. Raises: ValueError: Beam search not supported temporarily. Returns: List[Tuple[List[int], List[int]]]: Sample results. """ categorized_seq_group_ids = {t: [] for t in SamplingType} categorized_sample_indices = sampling_metadata.categorized_sample_indices for i, seq_group in enumerate(sampling_metadata.seq_groups): _, sampling_params = seq_group sampling_type = sampling_params.sampling_type categorized_seq_group_ids[sampling_type].append(i) sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {} sample_metadata = {} # Counterintiutively, having two loops here is actually faster. # The first loop can run without waiting on GPU<->CPU sync. for sampling_type in SamplingType: if len(categorized_sample_indices[sampling_type]) == 0: continue seq_group_ids = categorized_seq_group_ids[sampling_type] seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_ids] sample_metadata[sampling_type] = (seq_group_ids, seq_groups) for sampling_type in SamplingType: if sampling_type not in sample_metadata: continue seq_group_ids, seq_groups = sample_metadata[sampling_type] if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED): sample_results = self.extract_next_tokens_and_parents(seq_groups, samples) elif sampling_type == SamplingType.BEAM: raise ValueError(f"Unsupported sampling type: beam search.") sample_results_dict.update(zip(seq_group_ids, sample_results)) sample_results = [sample_results_dict.get(i) for i in range(len(sampling_metadata.seq_groups))] return sample_results def extract_next_tokens_and_parents( self, selected_seq_groups: List[Tuple[List[int], SamplingParams]], samples: np.array, ) -> List[Tuple[List[int], List[int]]]: samples = samples.tolist() sample_idx = 0 results = [] for seq_group in selected_seq_groups: seq_ids, _ = seq_group num_parent_seqs = len(seq_ids) parent_ids = list(range(num_parent_seqs)) next_token_ids = [samples[sample_idx]] results.append((next_token_ids, parent_ids)) sample_idx += num_parent_seqs return results
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import math from typing import List, Literal, Optional import torch from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch from torch import nn from vllm.lora.request import LoRARequest from vllm.model_executor import SamplingMetadata from vllm.model_executor.input_metadata import InputMetadata from vllm.sequence import SamplerOutput from vllm.worker.cache_engine import KVCache from vllm_npu.model_executor.layers.sampler import Sampler DEVICE_TYPE = "npu" DEFAULT_FORMAT = "auto" class MindIELlmWrapper(nn.Module): def __init__(self, mindie_config): super(MindIELlmWrapper, self).__init__() self.mindie_config = mindie_config self.mindie_model = None self.sampler = None def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, kv_caches: List[KVCache], input_metadata: InputMetadata, lora_requests: List[LoRARequest], ) -> torch.Tensor: if kv_caches[0][0] is None: kv_caches, block_tables, slots = self.create_dummy_kv_cache(input_metadata, input_ids) else: if input_metadata.is_prompt: block_tables = torch.tensor([0], dtype=torch.int32, device="npu") else: block_tables = input_metadata.block_tables slots = input_metadata.slot_mapping if input_metadata.is_prompt: input_lengths = input_metadata.prompt_lens.to(torch.int32) max_seq_len = int(input_metadata.prompt_lens.max()) lm_head_indices = (input_metadata.prompt_lens.cumsum(dim=-1) - 1).to(torch.int64) else: input_lengths = input_metadata.context_lens max_seq_len = input_metadata.max_context_len lm_head_indices = None adapter_ids = [lora_request.lora_name if lora_request else None for lora_request in lora_requests] logits = self.mindie_model.forward_tensor( input_ids, positions, input_metadata.is_prompt, kv_caches, block_tables, slots, input_lengths, max_seq_len, lm_head_indices, adapter_ids=adapter_ids, # batch_size len ) return logits def sample( self, hidden_states: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> Optional[SamplerOutput]: # hidden_states is logits next_tokens = self.sampler(hidden_states, sampling_metadata) return next_tokens def load_weights(self, load_format: Literal["auto", "safetensors", "pt"] = DEFAULT_FORMAT): """Load model weights. Args: load_format (Literal[auto, safetensors, pt], optional): The format of weights. Defaults to "auto". Raises: ValueError: Format not supported. """ if load_format not in ["auto", "safetensors", "pt"]: raise ValueError("load-format support [safetensors, pt]") self.weight_dtype = torch.get_default_dtype() torch.set_default_dtype(torch.float32) # Replacing the deprecated method with GeneratorTorch self.mindie_model = GeneratorTorch(self.mindie_config) self.sampler = Sampler(self.mindie_model) self.sampler.logits_as_hidden_states = True torch.set_default_dtype(self.weight_dtype) # when warmup, create dummy kvcache, block_tables, slot_mapping def create_dummy_kv_cache(self, input_metadata, input_ids): dummy_block_num = 1 dummy_block_size = 128 kv_cache = [ ( torch.empty( ( dummy_block_num, dummy_block_size, self.mindie_model.model_info.num_kv_heads, self.mindie_model.model_info.head_size, ), dtype=self.weight_dtype, device=DEVICE_TYPE, ), torch.empty( ( dummy_block_num, dummy_block_size, self.mindie_model.model_info.num_kv_heads, self.mindie_model.model_info.head_size, ), dtype=self.weight_dtype, device=DEVICE_TYPE, ), ) for _ in range(self.mindie_model.model_info.num_layers) ] max_s = max(input_metadata.prompt_lens) max_need_block = math.ceil(max_s / dummy_block_size) batch_size = len(input_metadata.prompt_lens) block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device=DEVICE_TYPE) slot = [i for i in range(dummy_block_size)] slots = [] warm_up_len = len(input_ids) while warm_up_len > 0: if warm_up_len > dummy_block_size: slots.extend(slot) warm_up_len -= dummy_block_size else: slots.extend(slot[:warm_up_len]) warm_up_len = 0 slots = torch.tensor(slots, dtype=torch.long, device=DEVICE_TYPE) return kv_cache, block_tables, slots
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] """A Ascend worker class.""" import gc from typing import Dict, List, Optional, Set, Tuple import torch import torch.distributed from mindie_llm.modeling.backend_type import BackendType from vllm.config import ( CacheConfig, DeviceConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, ) from vllm.lora.request import LoRARequest from vllm.model_executor import set_random_seed from vllm.model_executor.parallel_utils.communication_op import broadcast_tensor_dict from vllm.model_executor.parallel_utils.custom_all_reduce import init_custom_ar from vllm.model_executor.parallel_utils.parallel_state import ensure_model_parallel_initialized from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.worker.cache_engine import CacheEngine from vllm.worker.model_runner import ModelRunner class Worker: """A worker class that executes the model on a group of NPUs.""" def __init__( self, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, local_rank: int, rank: int, distributed_init_method: str, lora_config: Optional[LoRAConfig] = None, kv_cache_dtype: Optional[str] = "auto", is_driver_worker: bool = False, ) -> None: self.model_config = model_config self.parallel_config = parallel_config self.scheduler_config = scheduler_config self.device_config = device_config self.local_rank = local_rank self.rank = rank self.distributed_init_method = distributed_init_method self.lora_config = lora_config self.is_driver_worker = is_driver_worker if self.is_driver_worker and self.rank != 0: raise ValueError("The driver worker must have rank 0.") self.device = None self.model_runner = ModelRunner( model_config, parallel_config, scheduler_config, device_config, self.lora_config, kv_cache_dtype, is_driver_worker, ) # Uninitialized cache engine. Will be initialized by # self.init_cache_engine(). self.cache_config = None self.cache_engine = None self.cache_events = None self.gpu_cache = None def init_model(self, cupy_port: Optional[int] = None) -> None: self.device = torch.device(f"npu:{self.local_rank}") torch.npu.set_device(self.device) # Initialize the distributed environment. init_distributed_environment(self.parallel_config, self.rank, self.distributed_init_method) # Initialize the model. set_random_seed(self.model_config.seed) def load_model(self): # Add an attribute dynamically, not graceful, but useful self.model_config.mindie_config = { "backend_type": BackendType.ATB, "model_id": self.model_config.model, "rank": self.rank, "local_rank": self.local_rank, "world_size": self.parallel_config.world_size, "npu_device_id": self.local_rank, } self.model_runner.load_model() del self.model_config.mindie_config @torch.inference_mode() def profile_num_available_blocks( self, block_size: int, gpu_memory_utilization: float, cpu_swap_space: int, cache_dtype: str ) -> Tuple[int, int]: """Profiles the peak memory usage of the model and returns the maximum number of GPU and CPU cache blocks that can be allocated. Args: block_size: The size of the cache block. gpu_memory_utilization: The fraction of the total GPU memory to use. cpu_swap_space: The size of the CPU swap space in bytes. """ # Profile the memory usage of the model and get the maximum number of # cache blocks that can be allocated with the remaining free memory. torch.npu.empty_cache() torch.npu.reset_peak_memory_stats() # Execute a forward pass with dummy inputs to profile the memory usage # of the model. self.model_runner.profile_run() dummy_block_size = 128 dummy_num_blocks = dummy_block_size // block_size # Calculate the number of blocks that can be allocated with the # profiled peak memory. torch.npu.synchronize() peak_memory = torch.npu.max_memory_allocated() total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory cache_block_size = CacheEngine.get_cache_block_size( block_size, cache_dtype, self.model_config, self.parallel_config ) num_gpu_blocks = ( int((total_gpu_memory * gpu_memory_utilization - peak_memory) // cache_block_size) + dummy_num_blocks ) num_cpu_blocks = int(cpu_swap_space // cache_block_size) num_gpu_blocks = max(num_gpu_blocks, 0) num_cpu_blocks = max(num_cpu_blocks, 0) if self.model_runner.lora_manager: self.model_runner.remove_all_loras() gc.collect() torch.npu.empty_cache() return num_gpu_blocks, num_cpu_blocks def init_cache_engine(self, cache_config: CacheConfig) -> None: self.cache_config = cache_config self.cache_engine = CacheEngine(self.cache_config, self.model_config, self.parallel_config) self.cache_events = self.cache_engine.events self.gpu_cache = self.cache_engine.gpu_cache self.model_runner.set_block_size(self.cache_engine.block_size) def warm_up_model(self) -> None: pass def cache_swap( self, blocks_to_swap_in: Dict[int, int], blocks_to_swap_out: Dict[int, int], blocks_to_copy: Dict[int, List[int]], ) -> None: # Issue cache operations. issued_cache_op = False if blocks_to_swap_in: self.cache_engine.swap_in(blocks_to_swap_in) issued_cache_op = True if blocks_to_swap_out: self.cache_engine.swap_out(blocks_to_swap_out) issued_cache_op = True if blocks_to_copy: self.cache_engine.copy(blocks_to_copy) issued_cache_op = True cache_events = self.cache_events if issued_cache_op else None if cache_events is not None: for event in cache_events: event.wait() @torch.inference_mode() def execute_model( self, seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None, blocks_to_swap_in: Optional[Dict[int, int]] = None, blocks_to_swap_out: Optional[Dict[int, int]] = None, blocks_to_copy: Optional[Dict[int, List[int]]] = None, ) -> Optional[SamplerOutput]: if self.is_driver_worker: if seq_group_metadata_list is None: raise ValueError("seq_group_metadata_list must be provided for the driver worker.") num_seq_groups = len(seq_group_metadata_list) if any(x is None for x in [blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy]): raise ValueError( "blocks_to_swap_in, blocks_to_swap_out, and blocks_to_copy must be provided for the driver worker." ) data = { "num_seq_groups": num_seq_groups, "blocks_to_swap_in": blocks_to_swap_in, "blocks_to_swap_out": blocks_to_swap_out, "blocks_to_copy": blocks_to_copy, } broadcast_tensor_dict(data, src=0) else: data = broadcast_tensor_dict(src=0) num_seq_groups = data["num_seq_groups"] blocks_to_swap_in = data["blocks_to_swap_in"] blocks_to_swap_out = data["blocks_to_swap_out"] blocks_to_copy = data["blocks_to_copy"] self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy) # If there is no input, we don't need to execute the model. if num_seq_groups == 0: return {} output = self.model_runner.execute_model(seq_group_metadata_list, self.gpu_cache) return output def add_lora(self, lora_request: LoRARequest) -> bool: return self.model_runner.add_lora(lora_request) def remove_lora(self, lora_id: int) -> bool: return self.model_runner.remove_lora(lora_id) def list_loras(self) -> Set[int]: return self.model_runner.list_loras() def init_distributed_environment( parallel_config: ParallelConfig, rank: int, distributed_init_method: Optional[str] = None, ) -> None: """Initialize the distributed environment.""" if torch.distributed.is_initialized(): torch_world_size = torch.distributed.get_world_size() if torch_world_size != parallel_config.world_size: raise RuntimeError( "torch.distributed is already initialized but the torch world " "size does not match parallel_config.world_size " f"({torch_world_size} vs. {parallel_config.world_size})." ) elif not distributed_init_method: raise ValueError( "distributed_init_method must be set if torch.distributed " f"is not already initialized: {distributed_init_method}" ) else: torch.distributed.init_process_group( backend="nccl", world_size=parallel_config.world_size, rank=rank, init_method=distributed_init_method, ) ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size) # Initialize a custom fast all-reduce implementation. if not parallel_config.disable_custom_all_reduce: init_custom_ar()
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. from typing import Tuple def get_key_block_shape(self) -> Tuple[int, int, int]: return ( self.block_size, self.num_heads, self.head_size, ) def get_value_block_shape(self) -> Tuple[int, int, int]: return ( self.block_size, self.num_heads, self.head_size, )
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import vllm_npu.worker.ascend_worker from vllm_npu.worker.cache_engine import get_key_block_shape, get_value_block_shape from vllm_npu.worker.model_runner import ( _make_tensor_with_pad, _prepare_decode, _prepare_prompt, execute_model, load_model, profile_run, ) from vllm.worker import cache_engine, model_runner cache_engine.CacheEngine.get_key_block_shape = get_key_block_shape cache_engine.CacheEngine.get_value_block_shape = get_value_block_shape model_runner._make_tensor_with_pad = _make_tensor_with_pad model_runner.ModelRunner._prepare_decode = _prepare_decode model_runner.ModelRunner._prepare_prompt = _prepare_prompt model_runner.ModelRunner.execute_model = execute_model model_runner.ModelRunner.load_model = load_model model_runner.ModelRunner.profile_run = profile_run
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] from collections import namedtuple from typing import List, Optional, Set, Tuple, Union, NamedTuple import torch from vllm.lora.request import LoRARequest from vllm.model_executor import InputMetadata, get_model from vllm.sampling_params import SamplingParams from vllm.sequence import SamplerOutput, SequenceData, SequenceGroupMetadata from vllm.worker.model_runner import _BATCH_SIZES_TO_CAPTURE, _PAD_SLOT_ID, _pad_to_max class PreparedPromptData(NamedTuple): tokens: torch.Tensor positions: torch.Tensor metadata: InputMetadata prompt_lens: List[int] subquery_lens: List[int] lora_index_mapping: List[int] lora_prompt_mapping: List[int] requests: List[LoRARequest] class PreparedDecodeData(NamedTuple): tokens: torch.Tensor positions: torch.Tensor metadata: InputMetadata index_mapping: List[int] prompt_mapping: List[int] requests: List[LoRARequest] def load_model(self) -> None: self.model = get_model( self.model_config, self.device_config, lora_config=self.lora_config, parallel_config=self.parallel_config, scheduler_config=self.scheduler_config, ) def _prepare_prompt( self, seq_group_metadata_list: List[SequenceGroupMetadata], ) -> PreparedPromptData: assert len(seq_group_metadata_list) > 0 input_tokens: List[List[int]] = [] input_positions: List[List[int]] = [] slot_mapping: List[List[int]] = [] lora_index_mapping: List[int] = [] lora_prompt_mapping: List[int] = [] lora_requests: List[LoRARequest] = [] prompt_lens: List[int] = [] context_lens: List[int] = [] subquery_lens: List[int] = [] prefix_block_tables: List[List[int]] = [] for seq_group_metadata in seq_group_metadata_list: assert seq_group_metadata.is_prompt seq_ids = list(seq_group_metadata.seq_data.keys()) assert len(seq_ids) == 1 seq_id = seq_ids[0] seq_data = seq_group_metadata.seq_data[seq_id] prompt_tokens = seq_data.get_token_ids() prompt_len = len(prompt_tokens) prompt_lens.append(prompt_len) prefix_len = 0 prefix = seq_group_metadata.prefix if prefix is not None and prefix.computed: prefix_len = prefix.get_length() prompt_tokens = prompt_tokens[prefix_len:] prefix_block_tables.append(prefix.get_block_numbers()) else: prefix_block_tables.append([]) # actual prompt lens context_lens.append(prefix_len) subquery_lens.append(prompt_len - prefix_len) input_tokens.append(prompt_tokens) # NOTE(woosuk): Here we assume that the first token in the prompt # is always the first token in the sequence. input_positions.append(list(range(prefix_len, prefix_len + len(prompt_tokens)))) lora_id = seq_group_metadata.lora_int_id lora_requests.append(seq_group_metadata.lora_request) lora_index_mapping.append([lora_id] * (prompt_len - prefix_len)) lora_prompt_mapping.extend( [lora_id] * (prompt_len - prefix_len if seq_group_metadata.sampling_params.prompt_logprobs else 1) ) if seq_group_metadata.block_tables is None: # During memory profiling, the block tables are not initialized # yet. In this case, we just use a dummy slot mapping. slot_mapping.append([_PAD_SLOT_ID] * prompt_len) continue # Compute the slot mapping. slot_mapping.append([]) block_table = seq_group_metadata.block_tables[seq_id] # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID, # where start_idx is max(0, prompt_len - sliding_window). # For example, if the prompt len is 10, sliding window is 8, and # block size is 4, the first two tokens are masked and the slot # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1]. start_idx = 0 if self.sliding_window is not None: assert prefix_len == 0, "Prefix caching is currently not supported with " "sliding window attention" start_idx = max(0, prompt_len - self.sliding_window) for i in range(prefix_len, prompt_len): if i < start_idx: slot_mapping[-1].append(_PAD_SLOT_ID) continue block_number = block_table[i // self.block_size] block_offset = i % self.block_size slot = block_number * self.block_size + block_offset slot_mapping[-1].append(slot) max_prompt_len = max(subquery_lens) input_tokens = _make_tensor_with_pad(input_tokens, max_prompt_len, pad=0, dtype=torch.long, device=self.device) input_positions = _make_tensor_with_pad( input_positions, max_prompt_len, pad=0, dtype=torch.long, device=self.device ) slot_mapping = _make_tensor_with_pad( slot_mapping, max_prompt_len, pad=_PAD_SLOT_ID, dtype=torch.long, device=self.device ) lora_index_mapping = [_pad_to_max(mapping, max_prompt_len, pad=0) for mapping in lora_index_mapping] context_lens_tensor = torch.tensor(context_lens, dtype=torch.int, device=self.device) # Prepare prefix block tables max_prompt_block_table_len = max(len(t) for t in prefix_block_tables) block_tables = _make_tensor_with_pad( prefix_block_tables, max_len=max_prompt_block_table_len, pad=0, dtype=torch.int, device=self.device ) start_loc_tensor = torch.arange( 0, len(prompt_lens) * max_prompt_len, max_prompt_len, dtype=torch.long, device=self.device ) prompt_lens_tensor = torch.tensor(prompt_lens, dtype=torch.long, device=self.device) input_metadata = InputMetadata( is_prompt=True, slot_mapping=slot_mapping, prompt_lens=prompt_lens_tensor, max_seq_len=max_prompt_len, start_loc=start_loc_tensor, max_context_len=None, context_lens=context_lens_tensor, block_tables=block_tables, use_cuda_graph=False, kv_cache_dtype=self.kv_cache_dtype, ) return PreparedPromptData( input_tokens, input_positions, input_metadata, prompt_lens, subquery_lens, lora_index_mapping, lora_prompt_mapping, lora_requests, ) def _prepare_decode(self, seq_group_metadata_list: List[SequenceGroupMetadata]) -> PreparedDecodeData: if not seq_group_metadata_list: raise ValueError("seq_group_metadata_list is empty") input_tokens: List[List[int]] = [] input_positions: List[List[int]] = [] slot_mapping: List[List[int]] = [] context_lens: List[int] = [] block_tables: List[List[int]] = [] lora_index_mapping: List[int] = [] lora_prompt_mapping: List[int] = [] lora_requests: List[LoRARequest] = [] max_num_blocks_per_seq = 0 for seq_group_metadata in seq_group_metadata_list: if seq_group_metadata.is_prompt: raise ValueError("seq_group_metadata.is_prompt is True") seq_ids = list(seq_group_metadata.seq_data.keys()) lora_id = seq_group_metadata.lora_int_id lora_requests.append(seq_group_metadata.lora_request) for seq_id in seq_ids: seq_data = seq_group_metadata.seq_data[seq_id] generation_token = seq_data.get_last_token_id() input_tokens.append([generation_token]) seq_len = seq_data.get_len() position = seq_len - 1 input_positions.append([position]) context_len = seq_len if self.sliding_window is None else min(seq_len, self.sliding_window) context_lens.append(context_len) block_table = seq_group_metadata.block_tables[seq_id] max_num_blocks_per_seq = max(max_num_blocks_per_seq, len(block_table)) block_number = block_table[position // self.block_size] block_offset = position % self.block_size slot = block_number * self.block_size + block_offset slot_mapping.append([slot]) lora_index_mapping.append([lora_id]) lora_prompt_mapping.append(lora_id) if self.sliding_window is not None: sliding_window_blocks = self.sliding_window // self.block_size block_table = block_table[-sliding_window_blocks:] block_tables.append(block_table) batch_size = len(input_tokens) max_context_len = max(context_lens) use_captured_graph = ( not self.model_config.enforce_eager and batch_size <= _BATCH_SIZES_TO_CAPTURE[-1] and max_context_len <= self.max_context_len_to_capture ) input_tokens = _make_tensor_with_pad(input_tokens, max_len=1, pad=0, dtype=torch.long, device=self.device) input_positions = _make_tensor_with_pad(input_positions, max_len=1, pad=0, dtype=torch.long, device=self.device) slot_mapping = _make_tensor_with_pad( slot_mapping, max_len=1, pad=_PAD_SLOT_ID, dtype=torch.long, device=self.device ) context_lens = torch.tensor(context_lens, dtype=torch.int, device=self.device) if use_captured_graph: # The shape of graph_block_tables is # [max batch size, max context len // block size]. input_block_tables = self.graph_block_tables[:batch_size] for i, block_table in enumerate(block_tables): if block_table: input_block_tables[i, : len(block_table)] = block_table block_tables = torch.tensor(input_block_tables, device=self.device) else: padded_block_tables = [_pad_to_max(block_table, max_num_blocks_per_seq, 0) for block_table in block_tables] block_tables = torch.tensor(padded_block_tables, dtype=torch.int, device="npu") lora_index_mapping = [_pad_to_max(mapping, 1, pad=0) for mapping in lora_index_mapping] input_metadata = InputMetadata( is_prompt=False, slot_mapping=slot_mapping, prompt_lens=None, max_seq_len=None, start_loc=None, max_context_len=max_context_len, context_lens=context_lens, block_tables=block_tables, use_cuda_graph=use_captured_graph, kv_cache_dtype=self.kv_cache_dtype, ) return PreparedDecodeData( input_tokens, input_positions, input_metadata, lora_index_mapping, lora_prompt_mapping, lora_requests ) @torch.inference_mode() def execute_model( self, seq_group_meta_list: Optional[List[SequenceGroupMetadata]], kv_caches: List[Tuple[torch.Tensor, torch.Tensor]], ) -> Optional[SamplerOutput]: (tokens, positions, input_meta, sampling_meta, lora_requests, _) = self.prepare_input_tensors(seq_group_meta_list) hidden_states = self.model( input_ids=tokens, positions=positions, kv_caches=kv_caches, input_metadata=input_meta, lora_requests=lora_requests, ) # Sample the next token. output = self.model.sample(hidden_states=hidden_states, sampling_metadata=sampling_meta) return output @torch.inference_mode() def profile_run(self) -> None: # Enable top-k sampling to reflect the accurate memory usage. vocab_size = self.model_config.get_vocab_size() sampling_params = SamplingParams(top_p=0.99, top_k=vocab_size - 1) max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens max_num_seqs = self.scheduler_config.max_num_seqs # Profile memory usage with max_num_sequences sequences and the total # number of tokens equal to max_num_batched_tokens. seqs: List[SequenceGroupMetadata] = [] for group_id in range(max_num_seqs): seq_len = max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs) seq_data = SequenceData([0] * seq_len) seq = SequenceGroupMetadata( request_id=str(group_id), is_prompt=True, seq_data={group_id: seq_data}, sampling_params=sampling_params, block_tables=None, lora_request=None, # Warmup for LoRA has been done in the MindIE backend ) seqs.append(seq) # Run the model with the dummy inputs. num_layers = self.model_config.get_num_layers(self.parallel_config) kv_caches = [(None, None)] * num_layers self.execute_model(seqs, kv_caches) torch.cuda.synchronize() return def _make_tensor_with_pad( x: List[List[int]], max_len: int, pad: int, dtype: torch.dtype, device: Optional[Union[str, torch.device]] ) -> torch.Tensor: unpad_x = [i for l in x for i in l] return torch.tensor(unpad_x, dtype=dtype, device=device)
#!/bin/bash if [ -d "./vllm" ]; then echo "./vllm directory has already exist!" exit 1 fi git clone -b v0.3.3 https://github.com/vllm-project/vllm.git vllm cp -r cover/* vllm/ cd vllm pip install -r requirements.txt python3 setup.py install cd ../vllm_npu pip install -r requirements.txt python3 setup.py install
# Vllm-MindIE #### 介绍 昇腾推理引擎对接Vllm开源框架v0.3.3稳定版本补丁 #### 软件架构 软件架构说明 #### 安装教程 确保昇腾推理基础环境安装完成后,执行`install.sh`文件即可完成vllm及昇腾补丁的安装: ```sh bash install.sh ``` #### 使用说明 这里提供了vllm离线模式与在线服务的启动demo作为参考。 * 离线模式: ```sh bash examples/test_offine.sh ``` * 在线服务: ```sh bash examples/start_server.sh ```
numpy decorator attrs psutil absl-py cloudpickle scipy tornado transformers accelerate pandas ray == 2.9.3
import io import os import re from typing import List import setuptools ROOT_DIR = os.path.dirname(__file__) def get_path(*filepath) -> str: return os.path.join(ROOT_DIR, *filepath) def find_version(filepath: str): """Extract version information from the given filepath. """ with open(filepath) as fp: version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M) if version_match: return version_match.group(1) raise RuntimeError("Unable to find version string.") def read_readme() -> str: """Read the README file.""" return io.open(get_path("README.md"), "r", encoding="utf-8").read() def get_requirements() -> List[str]: """Get Python package dependencies from requirements.txt.""" with open(get_path("requirements.txt")) as f: requirements = f.read().strip().split("\n") return requirements setuptools.setup( name="vllm_npu", version=find_version(get_path("vllm_npu", "__init__.py")), author="Huawei", license="Apache 2.0", description=("A high-throughput and memory-efficient inference and " "serving engine for LLMs"), long_description=read_readme(), long_description_content_type="text/markdown", url="", project_urls={ "Homepage": "", "Documentation": "", }, classifiers=[ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")), python_requires=">=3.8", install_requires=get_requirements(), )
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] from typing import Optional import torch from transformers import PretrainedConfig from vllm.logger import init_logger from vllm.utils import is_neuron logger = init_logger(__name__) class DeviceConfig: def __init__(self, device: str = "auto") -> None: if device == "auto": # Automated device type detection if getattr(torch.version, "cann", None) is not None: self.device_type = "npu" elif torch.cuda.is_available(): self.device_type = "cuda" elif is_neuron(): self.device_type = "neuron" else: raise RuntimeError("No supported device detected.") else: # Device type is assigned explicitly self.device_type = device # Some device types require processing inputs on CPU if self.device_type in ["neuron"]: self.device = torch.device("cpu") else: # Set device with device type self.device = torch.device(self.device_type) @property def is_neuron(self): return self.device_type == "neuron" def _get_and_verify_max_len( hf_config: PretrainedConfig, max_model_len: Optional[int], ) -> int: """Get and verify the model's maximum length.""" derived_max_model_len = float("inf") possible_keys = [ # OPT "max_position_embeddings", # GPT-2 "n_positions", # MPT "max_seq_len", # ChatGLM2 "seq_length", # Command-R "model_max_length", # Others "max_sequence_length", "max_seq_length", "seq_len", ] for key in possible_keys: max_len_key = getattr(hf_config, key, None) if max_len_key is not None: derived_max_model_len = min(derived_max_model_len, max_len_key) if derived_max_model_len == float("inf"): if max_model_len is not None: # If max_model_len is specified, we use it. return max_model_len default_max_len = 2048 logger.warning( "The model's config.json does not contain any of the following " "keys to determine the original maximum length of the model: " f"{possible_keys}. Assuming the model's maximum length is " f"{default_max_len}." ) derived_max_model_len = default_max_len rope_scaling = getattr(hf_config, "rope_scaling", None) if rope_scaling is not None: if "type" in rope_scaling: rope_type = rope_scaling["type"] elif "rope_type" in rope_scaling: rope_type = rope_scaling["rope_type"] else: raise ValueError("rope_scaling must have a 'type' or 'rope_type' key.") # The correct one should be "longrope", kept "su" here # to be backward compatible if rope_type not in ("su", "longrope", "llama3"): assert "factor" in rope_scaling scaling_factor = rope_scaling["factor"] if rope_type == "yarn": derived_max_model_len = rope_scaling["original_max_position_embeddings"] derived_max_model_len *= scaling_factor if max_model_len is None: max_model_len = derived_max_model_len elif max_model_len > derived_max_model_len: raise ValueError( f"User-specified max_model_len ({max_model_len}) is greater than " f"the derived max_model_len ({max_len_key}={derived_max_model_len}" " in model's config.json). This may lead to incorrect model " "outputs or CUDA errors. Make sure the value is correct and " "within the model context size." ) return int(max_model_len)
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import torch import torch_npu from torch_npu.contrib import transfer_to_npu from vllm_npu.npu_adaptor import ( BlockDiagonalCausalMask, LowerTriangularMaskWithTensorBias, cache_ops, context_attention_fwd, cuda_utils, ops, ) import vllm_npu.core import vllm_npu.engine import vllm_npu.model_executor import vllm_npu.worker from vllm_npu.config import DeviceConfig, _get_and_verify_max_len from vllm_npu.utils import get_ip from vllm import config, utils utils.get_ip = get_ip config.DeviceConfig = DeviceConfig config._get_and_verify_max_len = _get_and_verify_max_len __version__ = "0.3.3"
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import importlib import sys def replace_modules(old_module_name, new_module_name): if old_module_name in sys.modules: del sys.modules[old_module_name] sys.modules[old_module_name] = importlib.import_module(new_module_name) _default_ops = ( 'xformers', 'xformers.ops', 'vllm._C', 'xformers.ops.fmha.attn_bias', 'vllm.model_executor.layers.triton_kernel.prefix_prefill' ) for _ops in _default_ops: replace_modules(_ops, 'vllm_npu') # dummy class to avoid import error. class ops: pass class cuda_utils: pass class cache_ops: pass class BlockDiagonalCausalMask: pass class LowerTriangularMaskWithTensorBias: pass def context_attention_fwd(): pass
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. import socket def get_ip() -> str: try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ipv4_socket: ipv4_socket.connect(("localhost", 80)) return ipv4_socket.getsockname()[0] except OSError: with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) as ipv6_socket: ipv6_socket.connect(("localhost", 80)) return ipv6_socket.getsockname()[0]