vLLM 0.3.3版本参考适配代码

  1. ./cover
    1. ./cover/vllm/__init__.py
      """vLLM: a high-throughput and memory-efficient inference engine for LLMs"""
      import vllm_npu
      from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
      from vllm.engine.async_llm_engine import AsyncLLMEngine
      from vllm.engine.llm_engine import LLMEngine
      from vllm.engine.ray_utils import initialize_cluster
      from vllm.entrypoints.llm import LLM
      from vllm.outputs import CompletionOutput, RequestOutput
      from vllm.sampling_params import SamplingParams
      __version__ = "0.3.3"
      __all__ = [
          "LLM",
          "SamplingParams",
          "RequestOutput",
          "CompletionOutput",
          "LLMEngine",
          "EngineArgs",
          "AsyncLLMEngine",
          "AsyncEngineArgs",
          "initialize_cluster",
      ]
    2. ./cover/requirements.txt
      ninja  # For faster builds.
      psutil
      ray == 2.9.3
      sentencepiece  # Required for LLaMA tokenizer.
      numpy
      transformers >= 4.38.0  # Required for Gemma.
      fastapi
      uvicorn[standard]
      pydantic >= 2.0  # Required for OpenAI server.
      prometheus_client >= 0.18.0
      pynvml == 11.5.0
      outlines >= 0.0.27
    3. ./cover/setup.py
      import contextlib
      import io
      import os
      import re
      import subprocess
      import warnings
      from pathlib import Path
      from typing import List, Set
      from packaging.version import parse, Version
      import setuptools
      import torch
      import torch.utils.cpp_extension as torch_cpp_ext
      from torch.utils.cpp_extension import BuildExtension, CUDAExtension, CUDA_HOME, ROCM_HOME
      ROOT_DIR = os.path.dirname(__file__)
      # If you are developing the C++ backend of vLLM, consider building vLLM with
      # `python setup.py develop` since it will give you incremental builds.
      # The downside is that this method is deprecated, see
      # https://github.com/pypa/setuptools/issues/917
      MAIN_CUDA_VERSION = "12.1"
      # Supported NVIDIA GPU architectures.
      NVIDIA_SUPPORTED_ARCHS = {"7.0", "7.5", "8.0", "8.6", "8.9", "9.0"}
      ROCM_SUPPORTED_ARCHS = {"gfx908", "gfx90a", "gfx942", "gfx1100"}
      # SUPPORTED_ARCHS = NVIDIA_SUPPORTED_ARCHS.union(ROCM_SUPPORTED_ARCHS)
      def _is_hip() -> bool:
          return torch.version.hip is not None
      def _is_neuron() -> bool:
          torch_neuronx_installed = True
          try:
              subprocess.run(["neuron-ls"], capture_output=True, check=True)
          except (FileNotFoundError, PermissionError):
              torch_neuronx_installed = False
          return torch_neuronx_installed
      def _is_cuda() -> bool:
          return (torch.version.cuda is not None) and not _is_neuron()
      # Compiler flags.
      CXX_FLAGS = ["-g", "-O2", "-std=c++17"]
      # TODO(woosuk): Should we use -O3?
      NVCC_FLAGS = ["-O2", "-std=c++17"]
      if _is_hip():
          if ROCM_HOME is None:
              raise RuntimeError(
                  "Cannot find ROCM_HOME. ROCm must be available to build the package."
              )
          NVCC_FLAGS += ["-DUSE_ROCM"]
          NVCC_FLAGS += ["-U__HIP_NO_HALF_CONVERSIONS__"]
          NVCC_FLAGS += ["-U__HIP_NO_HALF_OPERATORS__"]
      if _is_cuda() and CUDA_HOME is None:
          raise RuntimeError(
              "Cannot find CUDA_HOME. CUDA must be available to build the package.")
      ABI = 1 if torch._C._GLIBCXX_USE_CXX11_ABI else 0
      CXX_FLAGS += [f"-D_GLIBCXX_USE_CXX11_ABI={ABI}"]
      NVCC_FLAGS += [f"-D_GLIBCXX_USE_CXX11_ABI={ABI}"]
      def get_hipcc_rocm_version():
          # Run the hipcc --version command
          result = subprocess.run(['hipcc', '--version'],
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT,
                                  text=True)
          # Check if the command was executed successfully
          if result.returncode != 0:
              print("Error running 'hipcc --version'")
              return None
          # Extract the version using a regular expression
          match = re.search(r'HIP version: (\S+)', result.stdout)
          if match:
              # Return the version string
              return match.group(1)
          else:
              print("Could not find HIP version in the output")
              return None
      def glob(pattern: str):
          root = Path(__name__).parent
          return [str(p) for p in root.glob(pattern)]
      def get_neuronxcc_version():
          import sysconfig
          site_dir = sysconfig.get_paths()["purelib"]
          version_file = os.path.join(site_dir, "neuronxcc", "version",
                                      "__init__.py")
          # Check if the command was executed successfully
          with open(version_file, "rt") as fp:
              content = fp.read()
          # Extract the version using a regular expression
          match = re.search(r"__version__ = '(\S+)'", content)
          if match:
              # Return the version string
              return match.group(1)
          else:
              raise RuntimeError("Could not find HIP version in the output")
      def get_nvcc_cuda_version(cuda_dir: str) -> Version:
          """Get the CUDA version from nvcc.
          Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py
          """
          nvcc_output = subprocess.check_output([cuda_dir + "/bin/nvcc", "-V"],
                                                universal_newlines=True)
          output = nvcc_output.split()
          release_idx = output.index("release") + 1
          nvcc_cuda_version = parse(output[release_idx].split(",")[0])
          return nvcc_cuda_version
      def get_pytorch_rocm_arch() -> Set[str]:
          """Get the cross section of Pytorch,and vllm supported gfx arches
          ROCM can get the supported gfx architectures in one of two ways
          Either through the PYTORCH_ROCM_ARCH env var, or output from
          rocm_agent_enumerator.
          In either case we can generate a list of supported arch's and
          cross reference with VLLM's own ROCM_SUPPORTED_ARCHs.
          """
          env_arch_list = os.environ.get("PYTORCH_ROCM_ARCH", None)
          # If we don't have PYTORCH_ROCM_ARCH specified pull the list from rocm_agent_enumerator
          if env_arch_list is None:
              command = "rocm_agent_enumerator"
              env_arch_list = subprocess.check_output([command]).decode('utf-8')\
                              .strip().replace("\n", ";")
              arch_source_str = "rocm_agent_enumerator"
          else:
              arch_source_str = "PYTORCH_ROCM_ARCH env variable"
          # List are separated by ; or space.
          pytorch_rocm_arch = set(env_arch_list.replace(" ", ";").split(";"))
          # Filter out the invalid architectures and print a warning.
          arch_list = pytorch_rocm_arch.intersection(ROCM_SUPPORTED_ARCHS)
          # If none of the specified architectures are valid, raise an error.
          if not arch_list:
              raise RuntimeError(
                  f"None of the ROCM architectures in {arch_source_str} "
                  f"({env_arch_list}) is supported. "
                  f"Supported ROCM architectures are: {ROCM_SUPPORTED_ARCHS}.")
          invalid_arch_list = pytorch_rocm_arch - ROCM_SUPPORTED_ARCHS
          if invalid_arch_list:
              warnings.warn(
                  f"Unsupported ROCM architectures ({invalid_arch_list}) are "
                  f"excluded from the {arch_source_str} output "
                  f"({env_arch_list}). Supported ROCM architectures are: "
                  f"{ROCM_SUPPORTED_ARCHS}.",
                  stacklevel=2)
          return arch_list
      def get_torch_arch_list() -> Set[str]:
          # TORCH_CUDA_ARCH_LIST can have one or more architectures,
          # e.g. "8.0" or "7.5,8.0,8.6+PTX". Here, the "8.6+PTX" option asks the
          # compiler to additionally include PTX code that can be runtime-compiled
          # and executed on the 8.6 or newer architectures. While the PTX code will
          # not give the best performance on the newer architectures, it provides
          # forward compatibility.
          env_arch_list = os.environ.get("TORCH_CUDA_ARCH_LIST", None)
          if env_arch_list is None:
              return set()
          # List are separated by ; or space.
          torch_arch_list = set(env_arch_list.replace(" ", ";").split(";"))
          if not torch_arch_list:
              return set()
          # Filter out the invalid architectures and print a warning.
          valid_archs = NVIDIA_SUPPORTED_ARCHS.union(
              {s + "+PTX"
               for s in NVIDIA_SUPPORTED_ARCHS})
          arch_list = torch_arch_list.intersection(valid_archs)
          # If none of the specified architectures are valid, raise an error.
          if not arch_list:
              raise RuntimeError(
                  "None of the CUDA architectures in `TORCH_CUDA_ARCH_LIST` env "
                  f"variable ({env_arch_list}) is supported. "
                  f"Supported CUDA architectures are: {valid_archs}.")
          invalid_arch_list = torch_arch_list - valid_archs
          if invalid_arch_list:
              warnings.warn(
                  f"Unsupported CUDA architectures ({invalid_arch_list}) are "
                  "excluded from the `TORCH_CUDA_ARCH_LIST` env variable "
                  f"({env_arch_list}). Supported CUDA architectures are: "
                  f"{valid_archs}.",
                  stacklevel=2)
          return arch_list
      if _is_hip():
          rocm_arches = get_pytorch_rocm_arch()
          NVCC_FLAGS += ["--offload-arch=" + arch for arch in rocm_arches]
      else:
          # First, check the TORCH_CUDA_ARCH_LIST environment variable.
          compute_capabilities = get_torch_arch_list()
      if _is_cuda() and not compute_capabilities:
          # If TORCH_CUDA_ARCH_LIST is not defined or empty, target all available
          # GPUs on the current machine.
          device_count = torch.cuda.device_count()
          for i in range(device_count):
              major, minor = torch.cuda.get_device_capability(i)
              if major < 7:
                  raise RuntimeError(
                      "GPUs with compute capability below 7.0 are not supported.")
              compute_capabilities.add(f"{major}.{minor}")
      ext_modules = []
      if _is_cuda():
          nvcc_cuda_version = get_nvcc_cuda_version(CUDA_HOME)
          if not compute_capabilities:
              # If no GPU is specified nor available, add all supported architectures
              # based on the NVCC CUDA version.
              compute_capabilities = NVIDIA_SUPPORTED_ARCHS.copy()
              if nvcc_cuda_version < Version("11.1"):
                  compute_capabilities.remove("8.6")
              if nvcc_cuda_version < Version("11.8"):
                  compute_capabilities.remove("8.9")
                  compute_capabilities.remove("9.0")
          # Validate the NVCC CUDA version.
          if nvcc_cuda_version < Version("11.0"):
              raise RuntimeError(
                  "CUDA 11.0 or higher is required to build the package.")
          if (nvcc_cuda_version < Version("11.1")
                  and any(cc.startswith("8.6") for cc in compute_capabilities)):
              raise RuntimeError(
                  "CUDA 11.1 or higher is required for compute capability 8.6.")
          if nvcc_cuda_version < Version("11.8"):
              if any(cc.startswith("8.9") for cc in compute_capabilities):
                  # CUDA 11.8 is required to generate the code targeting compute capability 8.9.
                  # However, GPUs with compute capability 8.9 can also run the code generated by
                  # the previous versions of CUDA 11 and targeting compute capability 8.0.
                  # Therefore, if CUDA 11.8 is not available, we target compute capability 8.0
                  # instead of 8.9.
                  warnings.warn(
                      "CUDA 11.8 or higher is required for compute capability 8.9. "
                      "Targeting compute capability 8.0 instead.",
                      stacklevel=2)
                  compute_capabilities = set(cc for cc in compute_capabilities
                                             if not cc.startswith("8.9"))
                  compute_capabilities.add("8.0+PTX")
              if any(cc.startswith("9.0") for cc in compute_capabilities):
                  raise RuntimeError(
                      "CUDA 11.8 or higher is required for compute capability 9.0.")
          NVCC_FLAGS_PUNICA = NVCC_FLAGS.copy()
          # Add target compute capabilities to NVCC flags.
          for capability in compute_capabilities:
              num = capability[0] + capability[2]
              NVCC_FLAGS += ["-gencode", f"arch=compute_{num},code=sm_{num}"]
              if capability.endswith("+PTX"):
                  NVCC_FLAGS += [
                      "-gencode", f"arch=compute_{num},code=compute_{num}"
                  ]
              if int(capability[0]) >= 8:
                  NVCC_FLAGS_PUNICA += [
                      "-gencode", f"arch=compute_{num},code=sm_{num}"
                  ]
                  if capability.endswith("+PTX"):
                      NVCC_FLAGS_PUNICA += [
                          "-gencode", f"arch=compute_{num},code=compute_{num}"
                      ]
          # Use NVCC threads to parallelize the build.
          if nvcc_cuda_version >= Version("11.2"):
              nvcc_threads = int(os.getenv("NVCC_THREADS", 8))
              num_threads = min(os.cpu_count(), nvcc_threads)
              NVCC_FLAGS += ["--threads", str(num_threads)]
          if nvcc_cuda_version >= Version("11.8"):
              NVCC_FLAGS += ["-DENABLE_FP8_E5M2"]
          # changes for punica kernels
          NVCC_FLAGS += torch_cpp_ext.COMMON_NVCC_FLAGS
          REMOVE_NVCC_FLAGS = [
              '-D__CUDA_NO_HALF_OPERATORS__',
              '-D__CUDA_NO_HALF_CONVERSIONS__',
              '-D__CUDA_NO_BFLOAT16_CONVERSIONS__',
              '-D__CUDA_NO_HALF2_OPERATORS__',
          ]
          for flag in REMOVE_NVCC_FLAGS:
              with contextlib.suppress(ValueError):
                  torch_cpp_ext.COMMON_NVCC_FLAGS.remove(flag)
          install_punica = bool(int(os.getenv("VLLM_INSTALL_PUNICA_KERNELS", "0")))
          device_count = torch.cuda.device_count()
          for i in range(device_count):
              major, minor = torch.cuda.get_device_capability(i)
              if major < 8:
                  install_punica = False
                  break
          if install_punica:
              ext_modules.append(
                  CUDAExtension(
                      name="vllm._punica_C",
                      sources=["csrc/punica/punica_ops.cc"] +
                      glob("csrc/punica/bgmv/*.cu"),
                      extra_compile_args={
                          "cxx": CXX_FLAGS,
                          "nvcc": NVCC_FLAGS_PUNICA,
                      },
                  ))
      elif _is_neuron():
          neuronxcc_version = get_neuronxcc_version()
      vllm_extension_sources = [
      ]
      if _is_cuda():
          vllm_extension_sources.append("csrc/quantization/awq/gemm_kernels.cu")
          vllm_extension_sources.append(
              "csrc/quantization/marlin/marlin_cuda_kernel.cu")
          vllm_extension_sources.append("csrc/custom_all_reduce.cu")
          # Add MoE kernels.
          ext_modules.append(
              CUDAExtension(
                  name="vllm._moe_C",
                  sources=glob("csrc/moe/*.cu") + glob("csrc/moe/*.cpp"),
                  extra_compile_args={
                      "cxx": CXX_FLAGS,
                      "nvcc": NVCC_FLAGS,
                  },
              ))
      """
      if not _is_neuron():
          vllm_extension = CUDAExtension(
              name="vllm._C",
              sources=vllm_extension_sources,
              extra_compile_args={
                  "cxx": CXX_FLAGS,
                  "nvcc": NVCC_FLAGS,
              },
              libraries=["cuda"] if _is_cuda() else [],
          )
          ext_modules.append(vllm_extension)
      """
      def get_path(*filepath) -> str:
          return os.path.join(ROOT_DIR, *filepath)
      def find_version(filepath: str) -> str:
          """Extract version information from the given filepath.
          Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py
          """
          with open(filepath) as fp:
              version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                        fp.read(), re.M)
              if version_match:
                  return version_match.group(1)
              raise RuntimeError("Unable to find version string.")
      def get_vllm_version() -> str:
          version = find_version(get_path("vllm", "__init__.py"))
          return version
          if _is_hip():
              # Get the HIP version
              hipcc_version = get_hipcc_rocm_version()
              if hipcc_version != MAIN_CUDA_VERSION:
                  rocm_version_str = hipcc_version.replace(".", "")[:3]
                  version += f"+rocm{rocm_version_str}"
          elif _is_neuron():
              # Get the Neuron version
              neuron_version = str(neuronxcc_version)
              if neuron_version != MAIN_CUDA_VERSION:
                  neuron_version_str = neuron_version.replace(".", "")[:3]
                  version += f"+neuron{neuron_version_str}"
          else:
              cuda_version = str(nvcc_cuda_version)
              if cuda_version != MAIN_CUDA_VERSION:
                  cuda_version_str = cuda_version.replace(".", "")[:3]
                  version += f"+cu{cuda_version_str}"
          return version
      def read_readme() -> str:
          """Read the README file if present."""
          p = get_path("README.md")
          if os.path.isfile(p):
              return io.open(get_path("README.md"), "r", encoding="utf-8").read()
          else:
              return ""
      def get_requirements() -> List[str]:
          """Get Python package dependencies from requirements.txt."""
          if _is_hip():
              with open(get_path("requirements-rocm.txt")) as f:
                  requirements = f.read().strip().split("\n")
          elif _is_neuron():
              with open(get_path("requirements-neuron.txt")) as f:
                  requirements = f.read().strip().split("\n")
          else:
              with open(get_path("requirements.txt")) as f:
                  requirements = f.read().strip().split("\n")
          return requirements
      package_data = {
          "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"]
      }
      if os.environ.get("VLLM_USE_PRECOMPILED"):
          ext_modules = []
          package_data["vllm"].append("*.so")
      setuptools.setup(
          name="vllm",
          version=get_vllm_version(),
          author="vLLM Team",
          license="Apache 2.0",
          description=("A high-throughput and memory-efficient inference and "
                       "serving engine for LLMs"),
          long_description=read_readme(),
          long_description_content_type="text/markdown",
          url="https://github.com/vllm-project/vllm",
          project_urls={
              "Homepage": "https://github.com/vllm-project/vllm",
              "Documentation": "https://vllm.readthedocs.io/en/latest/",
          },
          classifiers=[
              "Programming Language :: Python :: 3.8",
              "Programming Language :: Python :: 3.9",
              "Programming Language :: Python :: 3.10",
              "Programming Language :: Python :: 3.11",
              "License :: OSI Approved :: Apache Software License",
              "Topic :: Scientific/Engineering :: Artificial Intelligence",
          ],
          packages=setuptools.find_packages(exclude=("benchmarks", "csrc", "docs",
                                                     "examples", "tests")),
          python_requires=">=3.8",
          install_requires=get_requirements(),
          ext_modules=ext_modules,
          cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {},
          package_data=package_data,
      )
  2. ./examples
    1. ./examples/start_server.sh
      #!/bin/bash
      # ATB加速库环境变量
      export ATB_LAYER_INTERNAL_TENSOR_REUSE=1
      export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1
      export ATB_OPERATION_EXECUTE_ASYNC=1
      export TASK_QUEUE_ENABLE=1
      export ATB_CONTENT_NCHW_TO_ND=1
      export ATB_CONTEXT_WORKSPACE_RING=1
      export HCCL_BUFFSIZE=120
      export LCCL_DETERMINISTIC=1
      export HCCL_DETERMINISTIC=1
      export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8
      export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16
      export ATB_LAUNCH_KERNEL_WITH_TILING=0
      export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
      python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
    2. ./examples/test_offline.py
      from vllm import LLM, SamplingParams
      import json
      import argparse
      parser = argparse.ArgumentParser()
      parser.add_argument('--model_path', type=str, default="facebook/opt-125m")
      # input prompts for test
      prompts = [
          "Hello, my name is",
          "The president of the United States is",
          "The capital of France is",
          "The future of AI is",
      ]
      sampling_params = SamplingParams(max_tokens=512, temperature=0)
      args = parser.parse_args()
      model_path = args.model_path
      llm = LLM(model=model_path,
                max_model_len=4096, # max length of prompt
                tensor_parallel_size=8, # number of NPUs to be used
                max_num_seqs=256, # max batch number
                enforce_eager=True, # disable CUDA graph mode
                trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library
      )
      outputs = llm.generate(prompts, sampling_params)
      for i, output in enumerate(outputs):
          prompt = output.prompt
          generated_text = output.outputs[0].text
          print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
    3. ./examples/test_offline.sh
      #!/bin/bash
      # ATB加速库环境变量
      export ATB_LAYER_INTERNAL_TENSOR_REUSE=1
      export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1
      export ATB_OPERATION_EXECUTE_ASYNC=1
      export TASK_QUEUE_ENABLE=1
      export ATB_CONTENT_NCHW_TO_ND=1
      export ATB_CONTEXT_WORKSPACE_RING=1
      export HCCL_BUFFSIZE=120
      export LCCL_DETERMINISTIC=1
      export HCCL_DETERMINISTIC=true
      export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8
      export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16
      export ATB_LAUNCH_KERNEL_WITH_TILING=0
      export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
      python3 test_offline.py --model_path facebook/opt-125m
  3. ./vllm_npu/tests
    1. ./vllm_npu/tests/models/test_models.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from typing import Union
      import pytest
      import torch
      from mindie_llm.modeling.backend_type import BackendType
      from pytest_mock import MockerFixture
      from vllm.config import ModelConfig, ParallelConfig
      from vllm.model_executor.input_metadata import InputMetadata
      from vllm_npu import DeviceConfig
      from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
      from vllm_npu.worker.ascend_worker import Worker
      MODELS = ["Ascend/Mock-Model-42B"]
      PARAMS = [
          {"rank": 0, "local_rank": 0, "world_size": 1},
          {"rank": 1, "local_rank": 1, "world_size": 2},
      ]
      @pytest.mark.parametrize("model", MODELS)
      @pytest.mark.parametrize("params", PARAMS)
      @pytest.mark.parametrize("dtype", [torch.bfloat16])
      def test_load_model(
          mocker: MockerFixture, model: str, params: list[dict[str, int]], dtype: Union[str, torch.dtype]
      ) -> None:
          world_size, local_rank, rank = params.get("world_size"), params.get("local_rank"), params.get("rank")
          mock_parallel_config = mocker.MagicMock(spec=ParallelConfig)
          mock_parallel_config.world_size = world_size
          mock_model_config = mocker.MagicMock(spec=ModelConfig)
          mock_model_config.model = model
          mock_model_config.dtype = dtype
          mock_model_config.load_format = "auto"
          mock_model_config.max_context_len_to_capture = 0
          worker = Worker(mock_model_config, mock_parallel_config, None, DeviceConfig(), local_rank, rank, None)
          mock_get_model = mocker.patch("vllm_npu.model_executor.ascend_model_loader.MindIELlmWrapper")
          worker.load_model()
          # Check if the args are correctly passed to backend
          mock_get_model.assert_called_once_with(
              {
                  "backend_type": BackendType.ATB,
                  "model_id": model,
                  "rank": rank,
                  "local_rank": local_rank,
                  "world_size": world_size,
                  "npu_device_id": local_rank,
              }
          )
      @pytest.mark.parametrize("load_format", ["auto", "safetensors", "pt"])
      def test_load_weights(mocker: MockerFixture, load_format: str) -> None:
          mocker.patch("torch.get_default_dtype", return_value=torch.float32)
          mocker.patch("torch.set_default_dtype")
          mock_generator_torch = mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.GeneratorTorch")
          mock_sampler = mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.Sampler")
          mindie_llm_wrapper = MindIELlmWrapper(mocker.ANY)
          # Testing supported formats, raising inside
          mindie_llm_wrapper.load_weights(load_format)
          # Testing GeneratorTorch and Sampler call counts
          mock_generator_torch.assert_called_once()
          mock_sampler.assert_called_once()
          # Testing unsupported format
          try:
              mindie_llm_wrapper.load_weights("unsupported")
          except ValueError as e:
              if "load-format support [safetensors, pt]" not in str(e):
                  raise AssertionError("Error message does not match expected text for unsupported formats") from e
          else:
              raise AssertionError("Expected a ValueError for unsupported format, but none was raised")
      @pytest.mark.parametrize("with_kv_cache", [True, False])
      @pytest.mark.parametrize("is_prompt", [True, False])
      def test_forward(mocker: MockerFixture, with_kv_cache: bool, is_prompt: bool) -> None:
          mocker.patch("torch.get_default_dtype", return_value=torch.float32)
          mocker.patch("torch.set_default_dtype")
          mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.GeneratorTorch")
          mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.Sampler")
          wrapper = MindIELlmWrapper(mocker.ANY)
          wrapper.load_weights()
          mock_forward_tensor = mocker.patch.object(
              wrapper.mindie_model, "forward_tensor", return_value=torch.tensor([[0.1, 0.2, 0.3]])
          )
          input_ids = torch.tensor([[1, 2, 3]])
          positions = torch.tensor([[0, 1, 2]])
          kv_caches = [(torch.tensor([0]), torch.tensor([0]))] if with_kv_cache else [(None, None)]
          lora_requests = [None]
          mock_input_metadata = mocker.MagicMock(spec=InputMetadata)
          mock_input_metadata.is_prompt = is_prompt
          mock_input_metadata.prompt_lens = torch.tensor([3], dtype=torch.int32)
          mock_input_metadata.context_lens = torch.tensor([3], dtype=torch.int32)
          mock_input_metadata.max_context_len = 3
          mock_input_metadata.block_tables = torch.tensor([[0]], dtype=torch.int32, device="npu")
          mock_input_metadata.slot_mapping = torch.tensor([0, 1, 2], dtype=torch.int64)
          wrapper.forward(input_ids, positions, kv_caches, mock_input_metadata, lora_requests)
          if is_prompt:
              expected_lengths = mock_input_metadata.prompt_lens.to(torch.int32)
              expected_max_seq_len = int(mock_input_metadata.prompt_lens.max())
              expected_lm_head_indices = (mock_input_metadata.prompt_lens.cumsum(dim=-1) - 1).to(torch.int64)
          else:
              expected_lengths = mock_input_metadata.context_lens
              expected_max_seq_len = mock_input_metadata.max_context_len
              expected_lm_head_indices = None
          if with_kv_cache:
              expected_kv_caches = kv_caches
              expected_block_tables = mock_input_metadata.block_tables
              expected_slots = mock_input_metadata.slot_mapping
          else:
              _, expected_block_tables, expected_slots = wrapper.create_dummy_kv_cache(mock_input_metadata, input_ids)
              # Hard to compare this object
              expected_kv_caches = mocker.ANY
          expected_adapter_ids = [None]
          mock_forward_tensor.assert_called_once_with(
              input_ids,
              positions,
              is_prompt,
              expected_kv_caches,
              expected_block_tables,
              expected_slots,
              expected_lengths,
              expected_max_seq_len,
              expected_lm_head_indices,
              adapter_ids = expected_adapter_ids,
          )
    2. ./vllm_npu/tests/sampler/test_sampler.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import random
      from typing import Union
      import numpy as np
      import pytest
      import torch
      from mindie_llm.text_generator.utils.sampling_metadata import (
          DoSampleMetadata,
          PenaltyMetadata,
          SamplingData,
          SamplingParam,
          SeedMetadata,
          TopKMetadata,
          TopPMetadata,
      )
      from pytest_mock import MockerFixture
      from vllm.sampling_params import _SAMPLING_EPS
      from vllm.sequence import SamplingParams, SequenceData, SequenceGroupMetadata
      from vllm.worker.model_runner import ModelRunner
      from vllm_npu.model_executor.layers.sampler import Sampler
      torch.npu.set_device("npu")
      def _prepare_test(
          batch_size: int, is_prompt: bool = True, random_seq_length: bool = False, temprature: Union[int, float] = 0
      ) -> tuple[torch.Tensor, Sampler]:
          torch.npu.set_device("npu")
          sampler = Sampler(None)
          model_runner = ModelRunner(None, None, None, None, None)
          model_runner.set_block_size(128)
          model_runner.sliding_window = None
          seq_group_metadata_list = []
          prompt_lens = []
          for i in range(batch_size):
              token_ids = list(range(random.randint(1, 128))) if random_seq_length else [1, 2, 3]
              seq_group_metadata_list.append(
                  SequenceGroupMetadata(
                      request_id=f"test_{i}",
                      is_prompt=is_prompt,
                      seq_data={0: SequenceData(token_ids)},
                      sampling_params=SamplingParams(temperature=temprature),
                      block_tables={0: [1]},
                  )
              )
              prompt_lens.append(seq_group_metadata_list[-1].seq_data[0].get_len())
          sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list, prompt_lens, prompt_lens)
          return sampling_metadata, sampler
      @pytest.mark.parametrize("vocab_size", [32000])
      @pytest.mark.parametrize("dtype", [torch.bfloat16])
      def test_forward(mocker: MockerFixture, vocab_size: int, dtype: torch.dtype):
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size)
          fake_logits = torch.full((batch_size, vocab_size), 1e-1, dtype=dtype)
          fake_tokens = np.arange(batch_size)
          fake_sample_results = [([i], [0]) for i in range(batch_size)]
          deconstructor = sampler.get_sample_data_and_param
          reconstructor = sampler.get_sample_results
          mock_process = mocker.patch(
              "vllm_npu.model_executor.layers.sampler._apply_logits_processors", return_value=fake_logits
          )
          mock_atb_model = mocker.patch.object(sampler, "atb_model")
          mock_deconstructor = mocker.patch.object(
              sampler, "get_sample_data_and_param", side_effect=lambda *args, **kwargs: deconstructor(*args, **kwargs)
          )
          mock_backend_sample = mocker.patch.object(mock_atb_model, "sample", return_value=fake_tokens)
          mock_reconstructor = mocker.patch.object(
              sampler, "get_sample_results", side_effect=lambda *args, **kwargs: reconstructor(*args, **kwargs)
          )
          mock_logprobs = mocker.patch(
              "vllm_npu.model_executor.layers.sampler._get_logprobs", return_value=(mocker.ANY, mocker.ANY)
          )
          mocker.patch("vllm_npu.model_executor.layers.sampler._build_sampler_output")
          sampler.forward(fake_logits, sampling_metadata)
          mock_process.assert_called_once_with(fake_logits, sampling_metadata)
          mock_deconstructor.assert_called_once_with(sampling_metadata, vocab_size)
          mock_backend_sample.assert_called_once_with(fake_logits, mocker.ANY, mocker.ANY)
          mock_reconstructor.assert_called_once_with(sampling_metadata, fake_tokens)
          mock_logprobs.assert_called_once_with(mocker.ANY, sampling_metadata, fake_sample_results)
      @pytest.mark.parametrize("temperature", [0, 0.1, 1, 10])
      @pytest.mark.parametrize("vocab_size", [32000])
      def test_sample_data_and_param(temperature, vocab_size):
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size, temprature=temperature)
          sampling_data, sampling_param = sampler.get_sample_data_and_param(sampling_metadata, vocab_size)
          # Sampling data
          if not isinstance(sampling_data, SamplingData):
              raise TypeError("The returned object is not an instance of SamplingData.")
          if not hasattr(sampling_data, "all_input_ids"):
              raise AttributeError("SamplingData does not have the 'all_input_ids' attribute.")
          if not hasattr(sampling_data, "output_ids"):
              raise AttributeError("SamplingData does not have the 'output_ids' attribute.")
          if not isinstance(sampling_data.all_input_ids, torch.Tensor):
              raise TypeError("all_input_ids should be a tensor.")
          if not isinstance(sampling_data.output_ids, torch.Tensor):
              raise TypeError("output_ids should be a tensor.")
          if sampling_data.all_input_ids.dtype != torch.int32:
              raise ValueError("The dtype of all_input_ids should be int32.")
          if sampling_data.output_ids.dtype != torch.int32:
              raise ValueError("The dtype of output_ids should be int32.")
          # Sampling param
          if temperature < _SAMPLING_EPS:
              if sampling_param is not None:
                  raise ValueError("Sampling data should be None in greedy mode.")
          else:
              if not isinstance(sampling_param, SamplingParam):
                  raise TypeError("The returned object is not an instance of SamplingParam.")
              if not hasattr(sampling_param, "penalty_meta"):
                  raise AttributeError("SamplingParam does not have the 'penalty_meta' attribute.")
              if not hasattr(sampling_param, "temperature"):
                  raise AttributeError("SamplingParam does not have the 'temperature' attribute.")
              if not hasattr(sampling_param, "top_k_meta"):
                  raise AttributeError("SamplingParam does not have the 'top_k_meta' attribute.")
              if not hasattr(sampling_param, "top_p_meta"):
                  raise AttributeError("SamplingParam does not have the 'top_p_meta' attribute.")
              if not hasattr(sampling_param, "seed_meta"):
                  raise AttributeError("SamplingParam does not have the 'seed_meta' attribute.")
              if not hasattr(sampling_param, "do_sample_meta"):
                  raise AttributeError("SamplingParam does not have the 'do_sample_meta' attribute.")
              if not isinstance(sampling_param.penalty_meta, PenaltyMetadata):
                  raise TypeError("penalty_meta should be an instance of PenaltyMetadata.")
              if temperature != 1:
                  if not isinstance(sampling_param.temperature, torch.Tensor):
                      raise TypeError("Temperature should be a tensor.")
              else:
                  if sampling_param.temperature is not None:
                      raise ValueError("Temperature tensors should be None.")
              if not isinstance(sampling_param.top_k_meta, TopKMetadata):
                  raise TypeError("top_k_meta should be an instance of TopKMetadata.")
              if not isinstance(sampling_param.top_p_meta, TopPMetadata):
                  raise TypeError("top_p_meta should be an instance of TopPMetadata.")
              if not isinstance(sampling_param.seed_meta, SeedMetadata):
                  raise TypeError("seed_meta should be an instance of SeedMetadata.")
              if not isinstance(sampling_param.do_sample_meta, DoSampleMetadata):
                  raise TypeError("do_sample_meta should be an instance of DoSampleMetadata.")
      def test_sample_results_reconstruction():
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size)
          samples = np.arange(batch_size)
          sample_results = sampler.get_sample_results(sampling_metadata, samples)
          expected_sample_results = [([i], [0]) for i in range(batch_size)]
          if sample_results != expected_sample_results:
              raise ValueError(f"Sample results mismatch: Expected {expected_sample_results}, got {sample_results}.")
          selected_seq_groups = [([0], SamplingParams()), ([1, 2, 3], SamplingParams()), ([4, 5], SamplingParams())]
          samples = np.array([101, 102, 103, 104, 105, 106])
          results = sampler.extract_next_tokens_and_parents(selected_seq_groups, samples)
          expected_results = [([101], [0]), ([102], [0, 1, 2]), ([105], [0, 1])]
          if results != expected_results:
              raise ValueError(f"Results mismatch: Expected {expected_results}, got {results}.")
      @pytest.mark.parametrize("is_prompt", [True, False])
      @pytest.mark.parametrize("vocab_size", [32000])
      def test_token_padding(is_prompt, vocab_size):
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size, is_prompt, True)
          sampling_data, _ = sampler.get_sample_data_and_param(sampling_metadata, vocab_size)
          # Note that if padding fails, then sampling_data cannot be constructed.
          # So we just check some basic shape info here.
          if sampling_data.all_input_ids.shape[0] != batch_size:
              raise ValueError(
                  f"Expected all_input_ids to have shape[0] of {batch_size}, but got {sampling_data.all_input_ids.shape[0]}."
              )
          if sampling_data.output_ids.shape[0] != batch_size:
              raise ValueError(
                  f"Expected output_ids to have shape[0] of {batch_size}, but got {sampling_data.output_ids.shape[0]}."
              )
  4. ./vllm_npu/vllm_npu/core
    1. ./vllm_npu/vllm_npu/core/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from vllm_npu.core.scheduler import _schedule
      from vllm.core.scheduler import Scheduler
      Scheduler._schedule = _schedule
    2. ./vllm_npu/vllm_npu/core/sheduler.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      import time
      from typing import Dict, List, Deque, Optional
      from collections import deque
      from vllm.core.block_manager import AllocStatus
      from vllm.sequence import SequenceGroup, SequenceStatus
      from vllm.core.scheduler import logger, SchedulerOutputs
      def _schedule(self) -> SchedulerOutputs:
          # Blocks that need to be swapped or copied before model execution.
          blocks_to_swap_in: Dict[int, int] = {}
          blocks_to_swap_out: Dict[int, int] = {}
          blocks_to_copy: Dict[int, List[int]] = {}
          # Fix the current time.
          now = time.monotonic()
          # Join waiting sequences if possible.
          if not self.swapped:
              ignored_seq_groups: List[SequenceGroup] = []
              scheduled: List[SequenceGroup] = []
              # The total number of sequences on the fly, including the
              # requests in the generation phase.
              num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                  for seq_group in self.running)
              num_batched_tokens = 0
              curr_loras = set(
                  seq_group.lora_int_id
                  for seq_group in self.running) if self.lora_enabled else None
              # Optimization: We do not sort the waiting queue since the preempted
              # sequence groups are added to the front and the new sequence groups
              # are added to the back.
              leftover_waiting_sequences = deque()
              while self.waiting:
                  seq_group = self.waiting[0]
                  waiting_seqs = seq_group.get_seqs(
                      status=SequenceStatus.WAITING)
                  assert len(waiting_seqs) == 1, (
                      "Waiting sequence group should have only one prompt "
                      "sequence.")
                  num_prompt_tokens = waiting_seqs[0].get_len()
                  if num_prompt_tokens > self.prompt_limit:
                      logger.warning(
                          f"Input prompt ({num_prompt_tokens} tokens) is too long"
                          f" and exceeds limit of {self.prompt_limit}")
                      for seq in waiting_seqs:
                          seq.status = SequenceStatus.FINISHED_IGNORED
                      ignored_seq_groups.append(seq_group)
                      self.waiting.popleft()
                      continue
                  # If the sequence group cannot be allocated, stop.
                  can_allocate = self.block_manager.can_allocate(seq_group)
                  if can_allocate == AllocStatus.LATER:
                      break
                  elif can_allocate == AllocStatus.NEVER:
                      logger.warning(
                          f"Input prompt ({num_prompt_tokens} tokens) is too long"
                          f" and exceeds the capacity of block_manager")
                      for seq in waiting_seqs:
                          seq.status = SequenceStatus.FINISHED_IGNORED
                      ignored_seq_groups.append(seq_group)
                      self.waiting.popleft()
                      continue
                  lora_int_id = 0
                  if self.lora_enabled:
                      lora_int_id = seq_group.lora_int_id
                      if lora_int_id > 0 and lora_int_id not in curr_loras and len(
                              curr_loras) >= self.lora_config.max_loras:
                          # We don't have a space for another LoRA, so
                          # we ignore this request for now.
                          leftover_waiting_sequences.appendleft(seq_group)
                          self.waiting.popleft()
                          continue
                  # If the number of batched tokens exceeds the limit, stop.
                  if (num_batched_tokens + num_prompt_tokens >
                          self.scheduler_config.max_num_batched_tokens):
                      break
                  # The total number of sequences in the RUNNING state should not
                  # exceed the maximum number of sequences.
                  num_new_seqs = seq_group.get_max_num_running_seqs()
                  if (num_curr_seqs + num_new_seqs >
                          self.scheduler_config.max_num_seqs):
                      break
                  if lora_int_id > 0:
                      curr_loras.add(lora_int_id)
                  self.waiting.popleft()
                  self._allocate(seq_group)
                  self.running.append(seq_group)
                  num_batched_tokens += num_prompt_tokens
                  num_curr_seqs += num_new_seqs
                  scheduled.append(seq_group)
              self.waiting.extendleft(leftover_waiting_sequences)
              if scheduled or ignored_seq_groups:
                  scheduler_outputs = SchedulerOutputs(
                      scheduled_seq_groups=scheduled,
                      prompt_run=True,
                      num_batched_tokens=num_batched_tokens,
                      blocks_to_swap_in=blocks_to_swap_in,
                      blocks_to_swap_out=blocks_to_swap_out,
                      blocks_to_copy=blocks_to_copy,
                      ignored_seq_groups=ignored_seq_groups,
                  )
                  return scheduler_outputs
          # NOTE(woosuk): Preemption happens only when there is no available slot
          # to keep all the sequence groups in the RUNNING state.
          # In this case, the policy is responsible for deciding which sequence
          # groups to preempt.
          self.running = self.policy.sort_by_priority(now, self.running)
          # Reserve new token slots for the running sequence groups.
          running: Deque[SequenceGroup] = deque()
          preempted: List[SequenceGroup] = []
          while self.running:
              seq_group = self.running.popleft()
              while not self.block_manager.can_append_slot(seq_group):
                  if self.running:
                      # Preempt the lowest-priority sequence groups.
                      victim_seq_group = self.running.pop()
                      self._preempt(victim_seq_group, blocks_to_swap_out)
                      preempted.append(victim_seq_group)
                  else:
                      # No other sequence groups can be preempted.
                      # Preempt the current sequence group.
                      self._preempt(seq_group, blocks_to_swap_out)
                      preempted.append(seq_group)
                      break
              else:
                  # Append new slots to the sequence group.
                  self._append_slot(seq_group, blocks_to_copy)
                  running.append(seq_group)
          self.running = running
          # Swap in the sequence groups in the SWAPPED state if possible.
          self.swapped = self.policy.sort_by_priority(now, self.swapped)
          if not preempted:
              num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                  for seq_group in self.running)
              curr_loras = set(
                  seq_group.lora_int_id
                  for seq_group in self.running) if self.lora_enabled else None
              leftover_swapped = deque()
              while self.swapped:
                  seq_group = self.swapped[0]
                  lora_int_id = 0
                  if self.lora_enabled:
                      lora_int_id = seq_group.lora_int_id
                      if lora_int_id > 0 and lora_int_id not in curr_loras and len(
                              curr_loras) >= self.lora_config.max_loras:
                          # We don't have a space for another LoRA, so
                          # we ignore this request for now.
                          leftover_swapped.appendleft(seq_group)
                          self.swapped.popleft()
                          continue
                  # If the sequence group cannot be swapped in, stop.
                  if not self.block_manager.can_swap_in(seq_group):
                      break
                  # The total number of sequences in the RUNNING state should not
                  # exceed the maximum number of sequences.
                  num_new_seqs = seq_group.get_max_num_running_seqs()
                  if (num_curr_seqs + num_new_seqs >
                          self.scheduler_config.max_num_seqs):
                      break
                  if lora_int_id > 0:
                      curr_loras.add(lora_int_id)
                  self.swapped.popleft()
                  self._swap_in(seq_group, blocks_to_swap_in)
                  self._append_slot(seq_group, blocks_to_copy)
                  num_curr_seqs += num_new_seqs
                  self.running.append(seq_group)
              self.swapped.extendleft(leftover_swapped)
          # Each sequence in the generation phase only takes one token slot.
          # Therefore, the number of batched tokens is equal to the number of
          # sequences in the RUNNING state.
          num_batched_tokens = sum(
              seq_group.num_seqs(status=SequenceStatus.RUNNING)
              for seq_group in self.running)
          scheduler_outputs = SchedulerOutputs(
              scheduled_seq_groups=self.running,
              prompt_run=False,
              num_batched_tokens=num_batched_tokens,
              blocks_to_swap_in=blocks_to_swap_in,
              blocks_to_swap_out=blocks_to_swap_out,
              blocks_to_copy=blocks_to_copy,
              ignored_seq_groups=[],
          )
          return scheduler_outputs
  5. ./vllm_npu/vllm_npu/engine
    1. ./vllm_npu/vllm_npu/engine/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from vllm_npu.config import DeviceConfig
      from vllm_npu.utils import get_ip
      from vllm_npu.engine.llm_engine import DEVICE_TO_WORKER_MODULE_MAP
      from vllm_npu.engine.ray_utils import initialize_cluster
      from vllm.engine import arg_utils, llm_engine, ray_utils
      arg_utils.DeviceConfig = DeviceConfig
      llm_engine.DEVICE_TO_WORKER_MODULE_MAP = DEVICE_TO_WORKER_MODULE_MAP
      llm_engine.get_ip = get_ip
      llm_engine.initialize_cluster = initialize_cluster
      ray_utils.get_ip = get_ip
      ray_utils.initialize_cluster = initialize_cluster
    2. ./vllm_npu/vllm_npu/engine/llm_engine.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      DEVICE_TO_WORKER_MODULE_MAP = {
          "cuda": "vllm.worker.worker",
          "neuron": "vllm.worker.neuron_worker",
          "npu": "vllm_npu.worker.ascend_worker",
      }
    3. ./vllm_npu/vllm_npu/engine/ray_utils.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from typing import Optional, Tuple, TYPE_CHECKING
      from vllm.config import ParallelConfig
      from vllm.utils import is_hip
      from vllm.logger import init_logger
      logger = init_logger(__name__)
      try:
          import ray
      except ImportError as e:
          logger.warning(f"Failed to import Ray with {e!r}. "
                         "For distributed inference, please install Ray with "
                         "`pip install ray`.")
          ray = None
      if TYPE_CHECKING:
          from ray.util.placement_group import PlacementGroup
      def initialize_cluster(
          parallel_config: ParallelConfig,
          engine_use_ray: bool = False,
          ray_address: Optional[str] = None,
      ) -> Optional["PlacementGroup"]:
          """Initialize the distributed cluster probably with Ray.
          Args:
              parallel_config: The configurations for parallel execution.
              engine_use_ray: Whether to use Ray for async engine.
              ray_address: The address of the Ray cluster. If None, uses
                  the default Ray cluster address.
          Returns:
              An optional `PlacementGroup`. It includes the specification
              of the resources for each distributed worker. None if Ray is
              not used.
          """
          if parallel_config.worker_use_ray or engine_use_ray:
              if ray is None:
                  raise ImportError(
                      "Ray is not installed. Please install Ray to use distributed "
                      "serving.")
              # Connect to a ray cluster.
              if is_hip():
                  ray.init(address=ray_address,
                           ignore_reinit_error=True,
                           num_gpus=parallel_config.world_size)
              else:
                  """start adapt"""
                  ray.init(address=ray_address, ignore_reinit_error=True, 
                              num_gpus=parallel_config.world_size)
                  """end adapt"""
          if not parallel_config.worker_use_ray:
              assert parallel_config.world_size == 1, (
                  "Ray is required if parallel_config.world_size > 1.")
              return None
          # Create placement group for worker processes
          current_placement_group = ray.util.get_current_placement_group()
          if current_placement_group:
              # We are in a placement group
              bundles = current_placement_group.bundle_specs
              # Verify that we can use the placement group.
              gpu_bundles = 0
              for bundle in bundles:
                  bundle_gpus = bundle.get("GPU", 0)
                  if bundle_gpus > 1:
                      raise ValueError(
                          "Placement group bundle cannot have more than 1 GPU.")
                  if bundle_gpus:
                      gpu_bundles += 1
              if parallel_config.world_size > gpu_bundles:
                  raise ValueError(
                      "The number of required GPUs exceeds the total number of "
                      "available GPUs in the placement group.")
          else:
              num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0)
              if parallel_config.world_size > num_gpus_in_cluster:
                  raise ValueError(
                      "The number of required GPUs exceeds the total number of "
                      "available GPUs in the cluster.")
              # Create a new placement group
              placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)
              current_placement_group = ray.util.placement_group(
                  placement_group_specs)
              # Wait until PG is ready - this will block until all
              # requested resources are available, and will timeout
              # if they cannot be provisioned.
              ray.get(current_placement_group.ready(), timeout=1800)
          return current_placement_group
  6. ./vllm_npu/vllm_npu/model_executor
    1. ./vllm_npu/vllm_npu/model_executor/ascend_model_loader.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of code in this file was copied from project [vLLM Team][vllm]
      """Utilities for selecting and loading models."""
      import contextlib
      import torch
      import torch.nn as nn
      from vllm.config import DeviceConfig, ModelConfig
      from vllm.logger import init_logger
      from vllm.model_executor.weight_utils import initialize_dummy_weights
      from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
      logger = init_logger(__name__)
      @contextlib.contextmanager
      def _set_default_torch_dtype(dtype: torch.dtype):
          """Sets the default torch dtype to the given dtype."""
          old_dtype = torch.get_default_dtype()
          torch.set_default_dtype(dtype)
          yield
          torch.set_default_dtype(old_dtype)
      def get_model(
          model_config: ModelConfig,
          device_config: DeviceConfig,
          **kwargs,
      ) -> nn.Module:
          """Get model with MindIELlmWrapper
          Args:
              model_config (ModelConfig): Model configs.
              device_config (DeviceConfig): Device configs.
              mindie_model_config (dict[str, Any]): MindIE configs.
          Raises:
              ValueError: LoRA not supported.
          Returns:
              nn.Module: Model.
          """
          if kwargs.get("lora_config"):
              logger.info(
                  "Using LoRA(s) with MindIE backend:\n"
                  "Please make sure your '--lora-modules' matches with your 'lora_adapter.json' in the model directory!\n"
                  "Current config for LoRA(s): %s",
                  kwargs.get("lora_config"),
              )
          with _set_default_torch_dtype(model_config.dtype):
              with torch.device(device_config.device):
                  model = MindIELlmWrapper(model_config.mindie_config)
              if model_config.load_format == "dummy":
                  initialize_dummy_weights(model)
              else:
                  model.load_weights(model_config.load_format)
              model = model.npu()
          model = model.eval()
          # For compatibility, move the config to the first hierarchy
          model.config = model.mindie_model.model_wrapper.model_runner.config
          return model
    2. ./vllm_npu/vllm_npu/model_executor/utils.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import importlib
      import torch
      from vllm.config import DeviceConfig, ModelConfig
      DEVICE_TO_MODEL_LOADER_MAP = {
          "cuda": "vllm.model_executor.model_loader",
          "neuron": "vllm.model_executor.neuron_model_loader",
          "npu": "vllm_npu.model_executor.ascend_model_loader",
      }
      def get_model(model_config: ModelConfig, device_config: DeviceConfig, **kwargs) -> torch.nn.Module:
          model_loader_module = DEVICE_TO_MODEL_LOADER_MAP[device_config.device_type]
          imported_model_loader = importlib.import_module(f"{model_loader_module}")
          get_model_fn = imported_model_loader.get_model
          return get_model_fn(model_config, device_config, **kwargs)
    3. ./vllm_npu/vllm_npu/model_executor/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import vllm_npu.model_executor.ascend_model_loader
      import vllm_npu.model_executor.models.ascend.mindie_llm_wrapper
      from vllm_npu.model_executor.utils import DEVICE_TO_MODEL_LOADER_MAP, get_model
      from vllm.model_executor import utils
      from vllm import model_executor
      model_executor.get_model = get_model
      utils.DEVICE_TO_MODEL_LOADER_MAP = DEVICE_TO_MODEL_LOADER_MAP
    4. ./vllm_npu/vllm_npu/model_executor/layers/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
    5. ./vllm_npu/vllm_npu/model_executor/layers/sampler.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      """A layer that samples the next tokens from the model's outputs."""
      from random import randint
      from typing import Dict, List, Optional, Tuple
      import numpy as np
      import torch
      import torch.nn as nn
      from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
      from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam
      from vllm.model_executor.layers.sampler import _apply_logits_processors, _build_sampler_output, _get_logprobs
      from vllm.model_executor.sampling_metadata import SamplingMetadata
      from vllm.sampling_params import SamplingParams, SamplingType
      from vllm.sequence import SamplerOutput
      _SAMPLING_EPS = 1e-5
      def _to_tensor(data, dtype=None):
          if dtype:
              return torch.tensor(data, dtype=dtype, device=torch.device("npu"))
          else:
              return torch.tensor(data, device=torch.device("npu"))
      class Sampler(nn.Module):
          """Adapts the vLLM sample function to the backend sample function in MindIE."""
          def __init__(self, atb_model: GeneratorTorch) -> None:
              super().__init__()
              self.atb_model = atb_model
          def forward(self, logits: torch.Tensor, sampling_metadata: SamplingMetadata) -> Optional[SamplerOutput]:
              _, vocab_size = logits.shape
              if not sampling_metadata.perform_sampling:
                  return None
              # Apply logits processors (if any).
              logits = _apply_logits_processors(logits, sampling_metadata)
              sampling_data, sampling_param = self.get_sample_data_and_param(sampling_metadata, vocab_size)
              logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float)
              next_tokens = self.atb_model.sample(logits, sampling_data, sampling_param)
              # Sample the next tokens.
              sample_results = self.get_sample_results(sampling_metadata, next_tokens)
              # Get the logprobs query results.
              prompt_logprobs, sample_logprobs = _get_logprobs(logprobs, sampling_metadata, sample_results)
              return _build_sampler_output(sample_results, sampling_metadata, prompt_logprobs, sample_logprobs)
          def get_sample_data_and_param(
              self,
              sampling_metadata: "SamplingMetadata",
              vocab_size: int,
          ) -> tuple[SamplingData, SamplingParam]:
              """Get sample data and parameter from sampling metadata.
              Args:
                  sampling_metadata (SamplingMetadata): Sampling metadata.
                  vocab_size (int): Vocabulary size.
              Returns:
                  tuple[SamplingData, SamplingParam]: Sample data and parameter.
              """
              all_input_tokens: List[List[int]] = []
              prompt_tokens: List[List[int]] = []
              output_tokens: List[List[int]] = []
              top_ks: List[int] = []
              temperatures: List[float] = []
              top_ps: List[float] = []
              min_ps: List[float] = []
              presence_penalties: List[float] = []
              frequency_penalties: List[float] = []
              repetition_penalties: List[float] = []
              seeds: List[int] = []
              do_penalties = False
              do_top_p_top_k = False
              do_min_p = False
              for i, seq_group in enumerate(sampling_metadata.seq_groups):
                  seq_ids, sampling_params = seq_group
                  temperature = sampling_params.temperature
                  p = sampling_params.presence_penalty
                  f = sampling_params.frequency_penalty
                  r = sampling_params.repetition_penalty
                  top_p = sampling_params.top_p
                  min_p = sampling_params.min_p
                  # k should not be greater than the vocab size.
                  top_k = min(sampling_params.top_k, vocab_size)
                  top_k = vocab_size if top_k == -1 else top_k
                  is_greedy = sampling_params.sampling_type == SamplingType.GREEDY
                  seed = sampling_params.seed or (
                      0 if is_greedy else randint(torch.iinfo(torch.long).min, torch.iinfo(torch.long).max)
                  )
                  if temperature < _SAMPLING_EPS:
                      # NOTE: Zero temperature means deterministic sampling
                      # (i.e., greedy sampling or beam search).
                      # Set the temperature to 1 to avoid division by zero.
                      temperature = 1.0
                  if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS or top_k != vocab_size):
                      do_top_p_top_k = True
                  if not do_min_p and min_p > _SAMPLING_EPS:
                      do_min_p = True
                  if not do_penalties and any(
                      [abs(p) >= _SAMPLING_EPS, abs(f) >= _SAMPLING_EPS, abs(r - 1.0) >= _SAMPLING_EPS]
                  ):
                      do_penalties = True
                  if i < sampling_metadata.num_prompts and sampling_params.prompt_logprobs is not None:
                      # For tokens in the prompt that we only need to get their logprobs
                      prompt_len = sampling_metadata.prompt_lens[i]
                      temperatures += [temperature] * (prompt_len - 1)
                      seeds += [seed] * (prompt_len - 1)
                      top_ps += [top_p] * (prompt_len - 1)
                      top_ks += [top_k] * (prompt_len - 1)
                      min_ps += [min_p] * (prompt_len - 1)
                      presence_penalties += [0] * (prompt_len - 1)
                      frequency_penalties += [0] * (prompt_len - 1)
                      repetition_penalties += [1] * (prompt_len - 1)
                      prompt_tokens.extend([] for _ in range(prompt_len - 1))
                      output_tokens.extend([] for _ in range(prompt_len - 1))
                      all_input_tokens.extend([] for _ in range(prompt_len - 1))
                  for seq_id in seq_ids:
                      seq_data = sampling_metadata.seq_data[seq_id]
                      prompt_tokens.append(seq_data.prompt_token_ids)
                      output_tokens.append(seq_data.output_token_ids)
                      all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids)
                  temperatures += [temperature] * len(seq_ids)
                  top_ps += [top_p] * len(seq_ids)
                  top_ks += [top_k] * len(seq_ids)
                  min_ps += [min_p] * len(seq_ids)
                  presence_penalties += [p] * len(seq_ids)
                  frequency_penalties += [f] * len(seq_ids)
                  repetition_penalties += [r] * len(seq_ids)
              max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0)
              padded_all_input_tokens = [
                  tokens + [vocab_size] * (max_tokens_len - len(tokens))
                  for tokens in all_input_tokens
              ]
              output_max_len = max([len(tokens) for tokens in output_tokens], default=0)
              padded_output_tokens = [tokens + [vocab_size] * (output_max_len - len(tokens)) for tokens in output_tokens]
              # To use from_numpy or not to, this is a question.
              # Convert to tensor if not None
              all_input_ids = padded_all_input_tokens and _to_tensor(padded_all_input_tokens, torch.int32)
              output_ids = padded_output_tokens and _to_tensor(padded_output_tokens, torch.int32)
              sampling_data = SamplingData(all_input_ids=all_input_ids, output_ids=output_ids)
              repetition_penalties = np.array(repetition_penalties, dtype=np.float32)
              frequency_penalties = np.array(frequency_penalties, dtype=np.float32)
              presence_penalties = np.array(presence_penalties, dtype=np.float32)
              temperatures = np.array(temperatures, dtype=np.float32)
              top_ks = np.array(top_ks, dtype=np.int32)
              top_ps = np.array(top_ps, dtype=np.float32)
              seeds = np.array(seeds, dtype=np.int32)
              do_sample = np.array([True])
              # Temporary solution, need to find out a better way to deal with this
              if is_greedy:
                  sampling_param = None
              else:
                  sampling_param = SamplingParam.from_numpy(
                      repetition_penalty=repetition_penalties,
                      frequency_penalty=frequency_penalties,
                      presence_penalty=presence_penalties,
                      temperature=temperatures,
                      top_k=top_ks,
                      top_p=top_ps,
                      seed=seeds,
                      do_sample=do_sample,
                      to_tensor=_to_tensor,
                  )
              return sampling_data, sampling_param
          def get_sample_results(
              self,
              sampling_metadata: SamplingMetadata,
              samples: np.array,
          ) -> List[Tuple[List[int], List[int]]]:
              """Reconstructs the sample results from the backend samples.
              Args:
                  sampling_metadata (SamplingMetadata): Sampling metadata.
                  samples (np.array): Backend samples.
              Raises:
                  ValueError: Beam search not supported temporarily.
              Returns:
                  List[Tuple[List[int], List[int]]]: Sample results.
              """
              categorized_seq_group_ids = {t: [] for t in SamplingType}
              categorized_sample_indices = sampling_metadata.categorized_sample_indices
              for i, seq_group in enumerate(sampling_metadata.seq_groups):
                  _, sampling_params = seq_group
                  sampling_type = sampling_params.sampling_type
                  categorized_seq_group_ids[sampling_type].append(i)
              sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {}
              sample_metadata = {}
              # Counterintiutively, having two loops here is actually faster.
              # The first loop can run without waiting on GPU<->CPU sync.
              for sampling_type in SamplingType:
                  if len(categorized_sample_indices[sampling_type]) == 0:
                      continue
                  seq_group_ids = categorized_seq_group_ids[sampling_type]
                  seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_ids]
                  sample_metadata[sampling_type] = (seq_group_ids, seq_groups)
              for sampling_type in SamplingType:
                  if sampling_type not in sample_metadata:
                      continue
                  seq_group_ids, seq_groups = sample_metadata[sampling_type]
                  if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED):
                      sample_results = self.extract_next_tokens_and_parents(seq_groups, samples)
                  elif sampling_type == SamplingType.BEAM:
                      raise ValueError(f"Unsupported sampling type: beam search.")
                  sample_results_dict.update(zip(seq_group_ids, sample_results))
              sample_results = [sample_results_dict.get(i) for i in range(len(sampling_metadata.seq_groups))]
              return sample_results
          def extract_next_tokens_and_parents(
              self,
              selected_seq_groups: List[Tuple[List[int], SamplingParams]],
              samples: np.array,
          ) -> List[Tuple[List[int], List[int]]]:
              samples = samples.tolist()
              sample_idx = 0
              results = []
              for seq_group in selected_seq_groups:
                  seq_ids, _ = seq_group
                  num_parent_seqs = len(seq_ids)
                  parent_ids = list(range(num_parent_seqs))
                  next_token_ids = [samples[sample_idx]]
                  results.append((next_token_ids, parent_ids))
                  sample_idx += num_parent_seqs
              return results
    6. ./vllm_npu/vllm_npu/model_executor/models/ascend/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
    7. ./vllm_npu/vllm_npu/model_executor/models/ascend/mindie_llm_wrapper.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import math
      from typing import List, Literal, Optional
      import torch
      from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
      from torch import nn
      from vllm.lora.request import LoRARequest
      from vllm.model_executor import SamplingMetadata
      from vllm.model_executor.input_metadata import InputMetadata
      from vllm.sequence import SamplerOutput
      from vllm.worker.cache_engine import KVCache
      from vllm_npu.model_executor.layers.sampler import Sampler
      DEVICE_TYPE = "npu"
      DEFAULT_FORMAT = "auto"
      class MindIELlmWrapper(nn.Module):
          def __init__(self, mindie_config):
              super(MindIELlmWrapper, self).__init__()
              self.mindie_config = mindie_config
              self.mindie_model = None
              self.sampler = None
          def forward(
              self,
              input_ids: torch.Tensor,
              positions: torch.Tensor,
              kv_caches: List[KVCache],
              input_metadata: InputMetadata,
              lora_requests: List[LoRARequest],
          ) -> torch.Tensor:
              if kv_caches[0][0] is None:
                  kv_caches, block_tables, slots = self.create_dummy_kv_cache(input_metadata, input_ids)
              else:
                  if input_metadata.is_prompt:
                      block_tables = torch.tensor([0], dtype=torch.int32, device="npu")
                  else:
                      block_tables = input_metadata.block_tables
                  slots = input_metadata.slot_mapping
              if input_metadata.is_prompt:
                  input_lengths = input_metadata.prompt_lens.to(torch.int32)
                  max_seq_len = int(input_metadata.prompt_lens.max())
                  lm_head_indices = (input_metadata.prompt_lens.cumsum(dim=-1) - 1).to(torch.int64)
              else:
                  input_lengths = input_metadata.context_lens
                  max_seq_len = input_metadata.max_context_len
                  lm_head_indices = None
              adapter_ids = [lora_request.lora_name if lora_request else None for lora_request in lora_requests]
              logits = self.mindie_model.forward_tensor(
                  input_ids,
                  positions,
                  input_metadata.is_prompt,
                  kv_caches,
                  block_tables,
                  slots,
                  input_lengths,
                  max_seq_len,
                  lm_head_indices,
                  adapter_ids=adapter_ids,  # batch_size len
              )
              return logits
          def sample(
              self,
              hidden_states: torch.Tensor,
              sampling_metadata: SamplingMetadata,
          ) -> Optional[SamplerOutput]:
              # hidden_states is logits
              next_tokens = self.sampler(hidden_states, sampling_metadata)
              return next_tokens
          def load_weights(self, load_format: Literal["auto", "safetensors", "pt"] = DEFAULT_FORMAT):
              """Load model weights.
              Args:
                  load_format (Literal[auto, safetensors, pt], optional): The format of weights. Defaults to "auto".
              Raises:
                  ValueError: Format not supported.
              """
              if load_format not in ["auto", "safetensors", "pt"]:
                  raise ValueError("load-format support [safetensors, pt]")
              self.weight_dtype = torch.get_default_dtype()
              torch.set_default_dtype(torch.float32)
              # Replacing the deprecated method with GeneratorTorch
              self.mindie_model = GeneratorTorch(self.mindie_config)
              self.sampler = Sampler(self.mindie_model)
              self.sampler.logits_as_hidden_states = True
              torch.set_default_dtype(self.weight_dtype)
          # when warmup, create dummy kvcache, block_tables, slot_mapping
          def create_dummy_kv_cache(self, input_metadata, input_ids):
              dummy_block_num = 1
              dummy_block_size = 128
              kv_cache = [
                  (
                      torch.empty(
                          (
                              dummy_block_num,
                              dummy_block_size,
                              self.mindie_model.model_info.num_kv_heads,
                              self.mindie_model.model_info.head_size,
                          ),
                          dtype=self.weight_dtype,
                          device=DEVICE_TYPE,
                      ),
                      torch.empty(
                          (
                              dummy_block_num,
                              dummy_block_size,
                              self.mindie_model.model_info.num_kv_heads,
                              self.mindie_model.model_info.head_size,
                          ),
                          dtype=self.weight_dtype,
                          device=DEVICE_TYPE,
                      ),
                  )
                  for _ in range(self.mindie_model.model_info.num_layers)
              ]
              max_s = max(input_metadata.prompt_lens)
              max_need_block = math.ceil(max_s / dummy_block_size)
              batch_size = len(input_metadata.prompt_lens)
              block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device=DEVICE_TYPE)
              slot = [i for i in range(dummy_block_size)]
              slots = []
              warm_up_len = len(input_ids)
              while warm_up_len > 0:
                  if warm_up_len > dummy_block_size:
                      slots.extend(slot)
                      warm_up_len -= dummy_block_size
                  else:
                      slots.extend(slot[:warm_up_len])
                      warm_up_len = 0
              slots = torch.tensor(slots, dtype=torch.long, device=DEVICE_TYPE)
              return kv_cache, block_tables, slots
    8. ./vllm_npu/vllm_npu/model_executor/models/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
  7. ./vllm_npu/vllm_npu/worker
    1. ./vllm_npu/vllm_npu/worker/ascend_worker.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      """A Ascend worker class."""
      import gc
      from typing import Dict, List, Optional, Set, Tuple
      import torch
      import torch.distributed
      from mindie_llm.modeling.backend_type import BackendType
      from vllm.config import (
          CacheConfig,
          DeviceConfig,
          LoRAConfig,
          ModelConfig,
          ParallelConfig,
          SchedulerConfig,
      )
      from vllm.lora.request import LoRARequest
      from vllm.model_executor import set_random_seed
      from vllm.model_executor.parallel_utils.communication_op import broadcast_tensor_dict
      from vllm.model_executor.parallel_utils.custom_all_reduce import init_custom_ar
      from vllm.model_executor.parallel_utils.parallel_state import ensure_model_parallel_initialized
      from vllm.sequence import SamplerOutput, SequenceGroupMetadata
      from vllm.worker.cache_engine import CacheEngine
      from vllm.worker.model_runner import ModelRunner
      class Worker:
          """A worker class that executes the model on a group of NPUs."""
          def __init__(
              self,
              model_config: ModelConfig,
              parallel_config: ParallelConfig,
              scheduler_config: SchedulerConfig,
              device_config: DeviceConfig,
              local_rank: int,
              rank: int,
              distributed_init_method: str,
              lora_config: Optional[LoRAConfig] = None,
              kv_cache_dtype: Optional[str] = "auto",
              is_driver_worker: bool = False,
          ) -> None:
              self.model_config = model_config
              self.parallel_config = parallel_config
              self.scheduler_config = scheduler_config
              self.device_config = device_config
              self.local_rank = local_rank
              self.rank = rank
              self.distributed_init_method = distributed_init_method
              self.lora_config = lora_config
              self.is_driver_worker = is_driver_worker
              if self.is_driver_worker and self.rank != 0:
                  raise ValueError("The driver worker must have rank 0.")
              self.device = None
              self.model_runner = ModelRunner(
                  model_config,
                  parallel_config,
                  scheduler_config,
                  device_config,
                  self.lora_config,
                  kv_cache_dtype,
                  is_driver_worker,
              )
              # Uninitialized cache engine. Will be initialized by
              # self.init_cache_engine().
              self.cache_config = None
              self.cache_engine = None
              self.cache_events = None
              self.gpu_cache = None
          def init_model(self, cupy_port: Optional[int] = None) -> None:
              self.device = torch.device(f"npu:{self.local_rank}")
              torch.npu.set_device(self.device)
              # Initialize the distributed environment.
              init_distributed_environment(self.parallel_config, self.rank, self.distributed_init_method)
              # Initialize the model.
              set_random_seed(self.model_config.seed)
          def load_model(self):
              # Add an attribute dynamically, not graceful, but useful
              self.model_config.mindie_config = {
                  "backend_type": BackendType.ATB,
                  "model_id": self.model_config.model,
                  "rank": self.rank,
                  "local_rank": self.local_rank,
                  "world_size": self.parallel_config.world_size,
                  "npu_device_id": self.local_rank,
              }
              self.model_runner.load_model()
              del self.model_config.mindie_config
          @torch.inference_mode()
          def profile_num_available_blocks(
              self, block_size: int, gpu_memory_utilization: float, cpu_swap_space: int, cache_dtype: str
          ) -> Tuple[int, int]:
              """Profiles the peak memory usage of the model and returns the maximum
              number of GPU and CPU cache blocks that can be allocated.
              Args:
                  block_size: The size of the cache block.
                  gpu_memory_utilization: The fraction of the total GPU memory to use.
                  cpu_swap_space: The size of the CPU swap space in bytes.
              """
              # Profile the memory usage of the model and get the maximum number of
              # cache blocks that can be allocated with the remaining free memory.
              torch.npu.empty_cache()
              torch.npu.reset_peak_memory_stats()
              # Execute a forward pass with dummy inputs to profile the memory usage
              # of the model.
              self.model_runner.profile_run()
              dummy_block_size = 128
              dummy_num_blocks = dummy_block_size // block_size
              # Calculate the number of blocks that can be allocated with the
              # profiled peak memory.
              torch.npu.synchronize()
              peak_memory = torch.npu.max_memory_allocated()
              total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory
              cache_block_size = CacheEngine.get_cache_block_size(
                  block_size, cache_dtype, self.model_config, self.parallel_config
              )
              num_gpu_blocks = (
                  int((total_gpu_memory * gpu_memory_utilization - peak_memory) // cache_block_size) + dummy_num_blocks
              )
              num_cpu_blocks = int(cpu_swap_space // cache_block_size)
              num_gpu_blocks = max(num_gpu_blocks, 0)
              num_cpu_blocks = max(num_cpu_blocks, 0)
              if self.model_runner.lora_manager:
                  self.model_runner.remove_all_loras()
              gc.collect()
              torch.npu.empty_cache()
              return num_gpu_blocks, num_cpu_blocks
          def init_cache_engine(self, cache_config: CacheConfig) -> None:
              self.cache_config = cache_config
              self.cache_engine = CacheEngine(self.cache_config, self.model_config, self.parallel_config)
              self.cache_events = self.cache_engine.events
              self.gpu_cache = self.cache_engine.gpu_cache
              self.model_runner.set_block_size(self.cache_engine.block_size)
          def warm_up_model(self) -> None:
              pass
          def cache_swap(
              self,
              blocks_to_swap_in: Dict[int, int],
              blocks_to_swap_out: Dict[int, int],
              blocks_to_copy: Dict[int, List[int]],
          ) -> None:
              # Issue cache operations.
              issued_cache_op = False
              if blocks_to_swap_in:
                  self.cache_engine.swap_in(blocks_to_swap_in)
                  issued_cache_op = True
              if blocks_to_swap_out:
                  self.cache_engine.swap_out(blocks_to_swap_out)
                  issued_cache_op = True
              if blocks_to_copy:
                  self.cache_engine.copy(blocks_to_copy)
                  issued_cache_op = True
              cache_events = self.cache_events if issued_cache_op else None
              if cache_events is not None:
                  for event in cache_events:
                      event.wait()
          @torch.inference_mode()
          def execute_model(
              self,
              seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None,
              blocks_to_swap_in: Optional[Dict[int, int]] = None,
              blocks_to_swap_out: Optional[Dict[int, int]] = None,
              blocks_to_copy: Optional[Dict[int, List[int]]] = None,
          ) -> Optional[SamplerOutput]:
              if self.is_driver_worker:
                  if seq_group_metadata_list is None:
                      raise ValueError("seq_group_metadata_list must be provided for the driver worker.")
                  num_seq_groups = len(seq_group_metadata_list)
                  if any(x is None for x in [blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy]):
                      raise ValueError(
                          "blocks_to_swap_in, blocks_to_swap_out, and blocks_to_copy must be provided for the driver worker."
                      )
                  data = {
                      "num_seq_groups": num_seq_groups,
                      "blocks_to_swap_in": blocks_to_swap_in,
                      "blocks_to_swap_out": blocks_to_swap_out,
                      "blocks_to_copy": blocks_to_copy,
                  }
                  broadcast_tensor_dict(data, src=0)
              else:
                  data = broadcast_tensor_dict(src=0)
                  num_seq_groups = data["num_seq_groups"]
                  blocks_to_swap_in = data["blocks_to_swap_in"]
                  blocks_to_swap_out = data["blocks_to_swap_out"]
                  blocks_to_copy = data["blocks_to_copy"]
              self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy)
              # If there is no input, we don't need to execute the model.
              if num_seq_groups == 0:
                  return {}
              output = self.model_runner.execute_model(seq_group_metadata_list, self.gpu_cache)
              return output
          def add_lora(self, lora_request: LoRARequest) -> bool:
              return self.model_runner.add_lora(lora_request)
          def remove_lora(self, lora_id: int) -> bool:
              return self.model_runner.remove_lora(lora_id)
          def list_loras(self) -> Set[int]:
              return self.model_runner.list_loras()
      def init_distributed_environment(
          parallel_config: ParallelConfig,
          rank: int,
          distributed_init_method: Optional[str] = None,
      ) -> None:
          """Initialize the distributed environment."""
          if torch.distributed.is_initialized():
              torch_world_size = torch.distributed.get_world_size()
              if torch_world_size != parallel_config.world_size:
                  raise RuntimeError(
                      "torch.distributed is already initialized but the torch world "
                      "size does not match parallel_config.world_size "
                      f"({torch_world_size} vs. {parallel_config.world_size})."
                  )
          elif not distributed_init_method:
              raise ValueError(
                  "distributed_init_method must be set if torch.distributed "
                  f"is not already initialized: {distributed_init_method}"
              )
          else:
              torch.distributed.init_process_group(
                  backend="nccl",
                  world_size=parallel_config.world_size,
                  rank=rank,
                  init_method=distributed_init_method,
              )
          ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size)
          # Initialize a custom fast all-reduce implementation.
          if not parallel_config.disable_custom_all_reduce:
              init_custom_ar()
    2. ./vllm_npu/vllm_npu/worker/cache_engine.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from typing import Tuple
      def get_key_block_shape(self) -> Tuple[int, int, int]:
          return (
              self.block_size,
              self.num_heads,
              self.head_size,
          )
      def get_value_block_shape(self) -> Tuple[int, int, int]:
          return (
              self.block_size,
              self.num_heads,
              self.head_size,
          )
    3. ./vllm_npu/vllm_npu/worker/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import vllm_npu.worker.ascend_worker
      from vllm_npu.worker.cache_engine import get_key_block_shape, get_value_block_shape
      from vllm_npu.worker.model_runner import (
          _make_tensor_with_pad,
          _prepare_decode,
          _prepare_prompt,
          execute_model,
          load_model,
          profile_run,
      )
      from vllm.worker import cache_engine, model_runner
      cache_engine.CacheEngine.get_key_block_shape = get_key_block_shape
      cache_engine.CacheEngine.get_value_block_shape = get_value_block_shape
      model_runner._make_tensor_with_pad = _make_tensor_with_pad
      model_runner.ModelRunner._prepare_decode = _prepare_decode
      model_runner.ModelRunner._prepare_prompt = _prepare_prompt
      model_runner.ModelRunner.execute_model = execute_model
      model_runner.ModelRunner.load_model = load_model
      model_runner.ModelRunner.profile_run = profile_run
    4. ./vllm_npu/vllm_npu/worker/model_runner.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from collections import namedtuple
      from typing import List, Optional, Set, Tuple, Union, NamedTuple
      import torch
      from vllm.lora.request import LoRARequest
      from vllm.model_executor import InputMetadata, get_model
      from vllm.sampling_params import SamplingParams
      from vllm.sequence import SamplerOutput, SequenceData, SequenceGroupMetadata
      from vllm.worker.model_runner import _BATCH_SIZES_TO_CAPTURE, _PAD_SLOT_ID, _pad_to_max
      class PreparedPromptData(NamedTuple):
          tokens: torch.Tensor
          positions: torch.Tensor
          metadata: InputMetadata
          prompt_lens: List[int]
          subquery_lens: List[int]
          lora_index_mapping: List[int]
          lora_prompt_mapping: List[int]
          requests: List[LoRARequest]
      class PreparedDecodeData(NamedTuple):
          tokens: torch.Tensor
          positions: torch.Tensor
          metadata: InputMetadata
          index_mapping: List[int]
          prompt_mapping: List[int]
          requests: List[LoRARequest]
      def load_model(self) -> None:
          self.model = get_model(
              self.model_config,
              self.device_config,
              lora_config=self.lora_config,
              parallel_config=self.parallel_config,
              scheduler_config=self.scheduler_config,
          )
      def _prepare_prompt(
          self,
          seq_group_metadata_list: List[SequenceGroupMetadata],
      ) -> PreparedPromptData:
          assert len(seq_group_metadata_list) > 0
          input_tokens: List[List[int]] = []
          input_positions: List[List[int]] = []
          slot_mapping: List[List[int]] = []
          lora_index_mapping: List[int] = []
          lora_prompt_mapping: List[int] = []
          lora_requests: List[LoRARequest] = []
          prompt_lens: List[int] = []
          context_lens: List[int] = []
          subquery_lens: List[int] = []
          prefix_block_tables: List[List[int]] = []
          for seq_group_metadata in seq_group_metadata_list:
              assert seq_group_metadata.is_prompt
              seq_ids = list(seq_group_metadata.seq_data.keys())
              assert len(seq_ids) == 1
              seq_id = seq_ids[0]
              seq_data = seq_group_metadata.seq_data[seq_id]
              prompt_tokens = seq_data.get_token_ids()
              prompt_len = len(prompt_tokens)
              prompt_lens.append(prompt_len)
              prefix_len = 0
              prefix = seq_group_metadata.prefix
              if prefix is not None and prefix.computed:
                  prefix_len = prefix.get_length()
                  prompt_tokens = prompt_tokens[prefix_len:]
                  prefix_block_tables.append(prefix.get_block_numbers())
              else:
                  prefix_block_tables.append([])
              # actual prompt lens
              context_lens.append(prefix_len)
              subquery_lens.append(prompt_len - prefix_len)
              input_tokens.append(prompt_tokens)
              # NOTE(woosuk): Here we assume that the first token in the prompt
              # is always the first token in the sequence.
              input_positions.append(list(range(prefix_len, prefix_len + len(prompt_tokens))))
              lora_id = seq_group_metadata.lora_int_id
              lora_requests.append(seq_group_metadata.lora_request)
              lora_index_mapping.append([lora_id] * (prompt_len - prefix_len))
              lora_prompt_mapping.extend(
                  [lora_id] * (prompt_len - prefix_len if seq_group_metadata.sampling_params.prompt_logprobs else 1)
              )
              if seq_group_metadata.block_tables is None:
                  # During memory profiling, the block tables are not initialized
                  # yet. In this case, we just use a dummy slot mapping.
                  slot_mapping.append([_PAD_SLOT_ID] * prompt_len)
                  continue
              # Compute the slot mapping.
              slot_mapping.append([])
              block_table = seq_group_metadata.block_tables[seq_id]
              # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID,
              # where start_idx is max(0, prompt_len - sliding_window).
              # For example, if the prompt len is 10, sliding window is 8, and
              # block size is 4, the first two tokens are masked and the slot
              # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1].
              start_idx = 0
              if self.sliding_window is not None:
                  assert prefix_len == 0, "Prefix caching is currently not supported with " "sliding window attention"
                  start_idx = max(0, prompt_len - self.sliding_window)
              for i in range(prefix_len, prompt_len):
                  if i < start_idx:
                      slot_mapping[-1].append(_PAD_SLOT_ID)
                      continue
                  block_number = block_table[i // self.block_size]
                  block_offset = i % self.block_size
                  slot = block_number * self.block_size + block_offset
                  slot_mapping[-1].append(slot)
          max_prompt_len = max(subquery_lens)
          input_tokens = _make_tensor_with_pad(input_tokens, max_prompt_len, pad=0, dtype=torch.long, device=self.device)
          input_positions = _make_tensor_with_pad(
              input_positions, max_prompt_len, pad=0, dtype=torch.long, device=self.device
          )
          slot_mapping = _make_tensor_with_pad(
              slot_mapping, max_prompt_len, pad=_PAD_SLOT_ID, dtype=torch.long, device=self.device
          )
          lora_index_mapping = [_pad_to_max(mapping, max_prompt_len, pad=0) for mapping in lora_index_mapping]
          context_lens_tensor = torch.tensor(context_lens, dtype=torch.int, device=self.device)
          # Prepare prefix block tables
          max_prompt_block_table_len = max(len(t) for t in prefix_block_tables)
          block_tables = _make_tensor_with_pad(
              prefix_block_tables, max_len=max_prompt_block_table_len, pad=0, dtype=torch.int, device=self.device
          )
          start_loc_tensor = torch.arange(
              0, len(prompt_lens) * max_prompt_len, max_prompt_len, dtype=torch.long, device=self.device
          )
          prompt_lens_tensor = torch.tensor(prompt_lens, dtype=torch.long, device=self.device)
          input_metadata = InputMetadata(
              is_prompt=True,
              slot_mapping=slot_mapping,
              prompt_lens=prompt_lens_tensor,
              max_seq_len=max_prompt_len,
              start_loc=start_loc_tensor,
              max_context_len=None,
              context_lens=context_lens_tensor,
              block_tables=block_tables,
              use_cuda_graph=False,
              kv_cache_dtype=self.kv_cache_dtype,
          )
          return PreparedPromptData(
              input_tokens,
              input_positions,
              input_metadata,
              prompt_lens,
              subquery_lens,
              lora_index_mapping,
              lora_prompt_mapping,
              lora_requests,
          )
      def _prepare_decode(self, seq_group_metadata_list: List[SequenceGroupMetadata]) -> PreparedDecodeData:
          if not seq_group_metadata_list:
              raise ValueError("seq_group_metadata_list is empty")
          input_tokens: List[List[int]] = []
          input_positions: List[List[int]] = []
          slot_mapping: List[List[int]] = []
          context_lens: List[int] = []
          block_tables: List[List[int]] = []
          lora_index_mapping: List[int] = []
          lora_prompt_mapping: List[int] = []
          lora_requests: List[LoRARequest] = []
          max_num_blocks_per_seq = 0
          for seq_group_metadata in seq_group_metadata_list:
              if seq_group_metadata.is_prompt:
                  raise ValueError("seq_group_metadata.is_prompt is True")
              seq_ids = list(seq_group_metadata.seq_data.keys())
              lora_id = seq_group_metadata.lora_int_id
              lora_requests.append(seq_group_metadata.lora_request)
              for seq_id in seq_ids:
                  seq_data = seq_group_metadata.seq_data[seq_id]
                  generation_token = seq_data.get_last_token_id()
                  input_tokens.append([generation_token])
                  seq_len = seq_data.get_len()
                  position = seq_len - 1
                  input_positions.append([position])
                  context_len = seq_len if self.sliding_window is None else min(seq_len, self.sliding_window)
                  context_lens.append(context_len)
                  block_table = seq_group_metadata.block_tables[seq_id]
                  max_num_blocks_per_seq = max(max_num_blocks_per_seq, len(block_table))
                  block_number = block_table[position // self.block_size]
                  block_offset = position % self.block_size
                  slot = block_number * self.block_size + block_offset
                  slot_mapping.append([slot])
                  lora_index_mapping.append([lora_id])
                  lora_prompt_mapping.append(lora_id)
                  if self.sliding_window is not None:
                      sliding_window_blocks = self.sliding_window // self.block_size
                      block_table = block_table[-sliding_window_blocks:]
                  block_tables.append(block_table)
          batch_size = len(input_tokens)
          max_context_len = max(context_lens)
          use_captured_graph = (
              not self.model_config.enforce_eager
              and batch_size <= _BATCH_SIZES_TO_CAPTURE[-1]
              and max_context_len <= self.max_context_len_to_capture
          )
          input_tokens = _make_tensor_with_pad(input_tokens, max_len=1, pad=0, dtype=torch.long, device=self.device)
          input_positions = _make_tensor_with_pad(input_positions, max_len=1, pad=0, dtype=torch.long, device=self.device)
          slot_mapping = _make_tensor_with_pad(
              slot_mapping, max_len=1, pad=_PAD_SLOT_ID, dtype=torch.long, device=self.device
          )
          context_lens = torch.tensor(context_lens, dtype=torch.int, device=self.device)
          if use_captured_graph:
              # The shape of graph_block_tables is
              # [max batch size, max context len // block size].
              input_block_tables = self.graph_block_tables[:batch_size]
              for i, block_table in enumerate(block_tables):
                  if block_table:
                      input_block_tables[i, : len(block_table)] = block_table
              block_tables = torch.tensor(input_block_tables, device=self.device)
          else:
              padded_block_tables = [_pad_to_max(block_table, max_num_blocks_per_seq, 0) for block_table in block_tables]
              block_tables = torch.tensor(padded_block_tables, dtype=torch.int, device="npu")
          lora_index_mapping = [_pad_to_max(mapping, 1, pad=0) for mapping in lora_index_mapping]
          input_metadata = InputMetadata(
              is_prompt=False,
              slot_mapping=slot_mapping,
              prompt_lens=None,
              max_seq_len=None,
              start_loc=None,
              max_context_len=max_context_len,
              context_lens=context_lens,
              block_tables=block_tables,
              use_cuda_graph=use_captured_graph,
              kv_cache_dtype=self.kv_cache_dtype,
          )
          return PreparedDecodeData(
              input_tokens, input_positions, input_metadata, lora_index_mapping, lora_prompt_mapping, lora_requests
          )
      @torch.inference_mode()
      def execute_model(
          self,
          seq_group_meta_list: Optional[List[SequenceGroupMetadata]],
          kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
      ) -> Optional[SamplerOutput]:
          (tokens, positions, input_meta, sampling_meta, lora_requests, _) = self.prepare_input_tensors(seq_group_meta_list)
          hidden_states = self.model(
              input_ids=tokens,
              positions=positions,
              kv_caches=kv_caches,
              input_metadata=input_meta,
              lora_requests=lora_requests,
          )
          # Sample the next token.
          output = self.model.sample(hidden_states=hidden_states, sampling_metadata=sampling_meta)
          return output
      @torch.inference_mode()
      def profile_run(self) -> None:
          # Enable top-k sampling to reflect the accurate memory usage.
          vocab_size = self.model_config.get_vocab_size()
          sampling_params = SamplingParams(top_p=0.99, top_k=vocab_size - 1)
          max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens
          max_num_seqs = self.scheduler_config.max_num_seqs
          # Profile memory usage with max_num_sequences sequences and the total
          # number of tokens equal to max_num_batched_tokens.
          seqs: List[SequenceGroupMetadata] = []
          for group_id in range(max_num_seqs):
              seq_len = max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs)
              seq_data = SequenceData([0] * seq_len)
              seq = SequenceGroupMetadata(
                  request_id=str(group_id),
                  is_prompt=True,
                  seq_data={group_id: seq_data},
                  sampling_params=sampling_params,
                  block_tables=None,
                  lora_request=None,  # Warmup for LoRA has been done in the MindIE backend
              )
              seqs.append(seq)
          # Run the model with the dummy inputs.
          num_layers = self.model_config.get_num_layers(self.parallel_config)
          kv_caches = [(None, None)] * num_layers
          self.execute_model(seqs, kv_caches)
          torch.cuda.synchronize()
          return
      def _make_tensor_with_pad(
          x: List[List[int]], max_len: int, pad: int, dtype: torch.dtype, device: Optional[Union[str, torch.device]]
      ) -> torch.Tensor:
          unpad_x = [i for l in x for i in l]
          return torch.tensor(unpad_x, dtype=dtype, device=device)
  8. 其他文件
    1. ./install.sh
      #!/bin/bash
      if [ -d "./vllm" ]; then
          echo "./vllm directory has already exist!"
          exit 1
      fi
      git clone -b v0.3.3 https://github.com/vllm-project/vllm.git vllm
      cp -r cover/* vllm/
      cd vllm
      pip install -r requirements.txt
      python3 setup.py install
      cd ../vllm_npu
      pip install -r requirements.txt
      python3 setup.py install
    2. ./README.md
      # Vllm-MindIE
      #### 介绍
      昇腾推理引擎对接Vllm开源框架v0.3.3稳定版本补丁
      #### 软件架构
      软件架构说明
      #### 安装教程
      确保昇腾推理基础环境安装完成后,执行`install.sh`文件即可完成vllm及昇腾补丁的安装:
      ```sh
      bash install.sh
      ```
      #### 使用说明
      这里提供了vllm离线模式与在线服务的启动demo作为参考。
      *   离线模式:
          ```sh
          bash examples/test_offine.sh
          ```
      *   在线服务:
          ```sh
          bash examples/start_server.sh
          ```
    3. ./requirements.txt
      numpy
      decorator
      attrs
      psutil
      absl-py
      cloudpickle
      scipy
      tornado
      transformers
      accelerate
      pandas
      ray == 2.9.3
    4. ./setup.py
      import io
      import os
      import re
      from typing import List
      import setuptools
      ROOT_DIR = os.path.dirname(__file__)
      def get_path(*filepath) -> str:
          return os.path.join(ROOT_DIR, *filepath)
      def find_version(filepath: str):
          """Extract version information from the given filepath.
          """
          with open(filepath) as fp:
              version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                        fp.read(), re.M)
              if version_match:
                  return version_match.group(1)
              raise RuntimeError("Unable to find version string.")
      def read_readme() -> str:
          """Read the README file."""
          return io.open(get_path("README.md"), "r", encoding="utf-8").read()
      def get_requirements() -> List[str]:
          """Get Python package dependencies from requirements.txt."""
          with open(get_path("requirements.txt")) as f:
              requirements = f.read().strip().split("\n")
          return requirements
      setuptools.setup(
          name="vllm_npu",
          version=find_version(get_path("vllm_npu", "__init__.py")),
          author="Huawei",
          license="Apache 2.0",
          description=("A high-throughput and memory-efficient inference and "
                       "serving engine for LLMs"),
          long_description=read_readme(),
          long_description_content_type="text/markdown",
          url="",
          project_urls={
              "Homepage": "",
              "Documentation": "",
          },
          classifiers=[
              "Programming Language :: Python :: 3.8",
              "Programming Language :: Python :: 3.9",
              "Programming Language :: Python :: 3.10",
              "Programming Language :: Python :: 3.11",
              "Topic :: Scientific/Engineering :: Artificial Intelligence",
          ],
          packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")),
          python_requires=">=3.8",
          install_requires=get_requirements(),
      )
    5. ./vllm_npu/config.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from typing import Optional
      import torch
      from transformers import PretrainedConfig
      from vllm.logger import init_logger
      from vllm.utils import is_neuron
      logger = init_logger(__name__)
      class DeviceConfig:
          def __init__(self, device: str = "auto") -> None:
              if device == "auto":
                  # Automated device type detection
                  if getattr(torch.version, "cann", None) is not None:
                      self.device_type = "npu"
                  elif torch.cuda.is_available():
                      self.device_type = "cuda"
                  elif is_neuron():
                      self.device_type = "neuron"
                  else:
                      raise RuntimeError("No supported device detected.")
              else:
                  # Device type is assigned explicitly
                  self.device_type = device
              # Some device types require processing inputs on CPU
              if self.device_type in ["neuron"]:
                  self.device = torch.device("cpu")
              else:
                  # Set device with device type
                  self.device = torch.device(self.device_type)
          @property
          def is_neuron(self):
              return self.device_type == "neuron"
      def _get_and_verify_max_len(
          hf_config: PretrainedConfig,
          max_model_len: Optional[int],
      ) -> int:
          """Get and verify the model's maximum length."""
          derived_max_model_len = float("inf")
          possible_keys = [
              # OPT
              "max_position_embeddings",
              # GPT-2
              "n_positions",
              # MPT
              "max_seq_len",
              # ChatGLM2
              "seq_length",
              # Command-R
              "model_max_length",
              # Others
              "max_sequence_length",
              "max_seq_length",
              "seq_len",
          ]
          for key in possible_keys:
              max_len_key = getattr(hf_config, key, None)
              if max_len_key is not None:
                  derived_max_model_len = min(derived_max_model_len, max_len_key)
          if derived_max_model_len == float("inf"):
              if max_model_len is not None:
                  # If max_model_len is specified, we use it.
                  return max_model_len
              default_max_len = 2048
              logger.warning(
                  "The model's config.json does not contain any of the following "
                  "keys to determine the original maximum length of the model: "
                  f"{possible_keys}. Assuming the model's maximum length is "
                  f"{default_max_len}."
              )
              derived_max_model_len = default_max_len
          rope_scaling = getattr(hf_config, "rope_scaling", None)
          if rope_scaling is not None:
              if "type" in rope_scaling:
                  rope_type = rope_scaling["type"]
              elif "rope_type" in rope_scaling:
                  rope_type = rope_scaling["rope_type"]
              else:
                  raise ValueError("rope_scaling must have a 'type' or 'rope_type' key.")
              # The correct one should be "longrope", kept "su" here
              # to be backward compatible
              if rope_type not in ("su", "longrope", "llama3"):
                  assert "factor" in rope_scaling
                  scaling_factor = rope_scaling["factor"]
                  if rope_type == "yarn":
                      derived_max_model_len = rope_scaling["original_max_position_embeddings"]
                  derived_max_model_len *= scaling_factor
          if max_model_len is None:
              max_model_len = derived_max_model_len
          elif max_model_len > derived_max_model_len:
              raise ValueError(
                  f"User-specified max_model_len ({max_model_len}) is greater than "
                  f"the derived max_model_len ({max_len_key}={derived_max_model_len}"
                  " in model's config.json). This may lead to incorrect model "
                  "outputs or CUDA errors. Make sure the value is correct and "
                  "within the model context size."
              )
          return int(max_model_len)
    6. ./vllm_npu/__init__.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import torch
      import torch_npu
      from torch_npu.contrib import transfer_to_npu
      from vllm_npu.npu_adaptor import (
          BlockDiagonalCausalMask,
          LowerTriangularMaskWithTensorBias,
          cache_ops,
          context_attention_fwd,
          cuda_utils,
          ops,
      )
      import vllm_npu.core
      import vllm_npu.engine
      import vllm_npu.model_executor
      import vllm_npu.worker
      from vllm_npu.config import DeviceConfig, _get_and_verify_max_len
      from vllm_npu.utils import get_ip
      from vllm import config, utils
      utils.get_ip = get_ip
      config.DeviceConfig = DeviceConfig
      config._get_and_verify_max_len = _get_and_verify_max_len
      __version__ = "0.3.3"
    7. ./vllm_npu/npu_adapter.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import importlib
      import sys
      def replace_modules(old_module_name, new_module_name):
          if old_module_name in sys.modules:
              del sys.modules[old_module_name]
          sys.modules[old_module_name] = importlib.import_module(new_module_name)
      _default_ops = (
          'xformers',
          'xformers.ops',
          'vllm._C',
          'xformers.ops.fmha.attn_bias',
          'vllm.model_executor.layers.triton_kernel.prefix_prefill'
      )
      for _ops in _default_ops:
          replace_modules(_ops, 'vllm_npu')
      # dummy class to avoid import error.
      class ops:
          pass
      class cuda_utils:
          pass
      class cache_ops:
          pass
      class BlockDiagonalCausalMask:
          pass
      class LowerTriangularMaskWithTensorBias:
          pass
      def context_attention_fwd():
          pass
    8. ./vllm_npu/utils.py
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import socket
      def get_ip() -> str:
          try:
              with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ipv4_socket:
                  ipv4_socket.connect(("localhost", 80))
                  return ipv4_socket.getsockname()[0]
          except OSError:
              with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) as ipv6_socket:
                  ipv6_socket.connect(("localhost", 80))
                  return ipv6_socket.getsockname()[0]