跳到内容

vllm_gaudi.v1.worker.hpu_worker

一个 GPU 工作进程类。

logger 模块属性

logger = logger()

HPUWorker

Bases: WorkerBase

Source code in vllm_gaudi/v1/worker/hpu_worker.py
class HPUWorker(WorkerBase):

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ):

        # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config)
        self.vllm_config = vllm_config
        self.model_config = vllm_config.model_config
        self.cache_config = vllm_config.cache_config
        self.lora_config = vllm_config.lora_config
        self.load_config = vllm_config.load_config
        self.parallel_config = vllm_config.parallel_config
        self.scheduler_config = vllm_config.scheduler_config
        self.device_config = vllm_config.device_config
        self.speculative_config = vllm_config.speculative_config
        self.observability_config = vllm_config.observability_config

        self.local_rank = local_rank
        self.rank = rank
        self.distributed_init_method = distributed_init_method
        self.is_driver_worker = is_driver_worker

        if self.cache_config.cache_dtype == "auto":
            self.cache_dtype = self.model_config.dtype
        else:
            self.cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[self.cache_config.cache_dtype]

        if self.model_config.trust_remote_code:
            # note: lazy import to avoid importing torch before initializing
            from vllm.utils.import_utils import init_cached_hf_modules
            init_cached_hf_modules()

        self.gc_track_recompiles = get_config().track_graph_compilation and not get_config().high_level_profiler_enabled
        self.step = 0
        self.profile_steps = get_config().VLLM_PROFILE_STEPS
        self.step_profiler = setup_step_profiler(self.profile_steps)
        self.step_debug = init_debug_logger('steps')

    def init_profiler(self):
        """Initialize the profiler."""
        if envs.VLLM_TORCH_PROFILER_DIR:
            torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
            logger.info("Profiling enabled. Traces will be saved to: %s", torch_profiler_trace_dir)
            if os.getenv('VLLM_PROFILER_ENABLED') == 'full':
                fn = self.model_runner.profiler.full_trace_handler
                with_stack = False
            else:
                fn = torch.profiler.tensorboard_trace_handler
                with_stack = True
            self.profiler = torch.profiler.profile(activities=[
                torch.profiler.ProfilerActivity.CPU,
                torch.profiler.ProfilerActivity.HPU,
            ],
                                                   with_stack=with_stack,
                                                   on_trace_ready=fn(torch_profiler_trace_dir, use_gzip=True))

        else:
            self.profiler = None

    def start_profile(self):
        if self.profiler is None:
            raise RuntimeError("Profiler is not enabled.")
        high_level_profiler = self.model_runner.profiler
        with high_level_profiler.record_event('internal', 'start_profiler'):
            # Clean up the queue
            while True:
                try:
                    high_level_profiler.profiling_trace_events.get_nowait()
                except queue.Empty:
                    break
            self.profiler.start()

    def stop_profile(self):
        if self.profiler is None:
            raise RuntimeError("Profiler is not enabled.")
        self.profiler.stop()

    def init_device(self):
        self.device = torch.device("hpu")
        # Initialize the distributed environment.
        init_worker_distributed_environment(self.vllm_config, self.rank, self.distributed_init_method, self.local_rank)
        # Set random seed.
        set_random_seed(self.model_config.seed)
        self.model_runner = HPUModelRunner(vllm_config=self.vllm_config, is_driver_worker=self.is_driver_worker)
        self.init_profiler()

    def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
        return self.model_runner.get_kv_cache_spec()

    def get_model(self) -> nn.Module:
        return self.model_runner.get_model()

    def load_model(self) -> None:
        self.model_runner.load_model()

    @torch.inference_mode()
    def determine_available_memory(self) -> int:
        """Profiles the peak memory usage of the model to determine how many
        KV blocks may be allocated without OOMs.

        The engine will first conduct a profiling of the existing memory usage.
        Then, it calculate the maximum possible number of GPU and CPU blocks
        that can be allocated with the remaining free memory.

        .. tip::
            You may limit the usage of GPU memory
            by adjusting the `gpu_memory_utilization` parameter.
        """
        # Profile the memory usage of the model and get the maximum number of
        # cache blocks that can be allocated with the remaining free memory.

        # Execute a forward pass with dummy inputs to profile the memory usage
        # of the model.
        kv_caches: dict[str, torch.Tensor] = {}
        kv_cache_spec = self.model_runner.get_kv_cache_spec()
        single_kv_block_size_bytes = 0
        for layer_name, layer_spec in kv_cache_spec.items():
            if isinstance(layer_spec, FullAttentionSpec):
                dtype = layer_spec.dtype

                # Use an empty tensor instead of `None`` to force Dynamo to pass
                # it by reference, rather by specializing on the value ``None``.
                hpu_k_cache = torch.tensor([], dtype=dtype, device='hpu')
                hpu_v_cache = torch.tensor([], dtype=dtype, device='hpu')

                kv_caches[layer_name] = (hpu_k_cache, hpu_v_cache)

                single_kv_block_size_bytes += layer_spec.page_size_bytes

            else:
                raise NotImplementedError

        runner_kv_caches: list[torch.Tensor] = []
        bind_kv_cache(kv_caches, self.vllm_config.compilation_config.static_forward_context, runner_kv_caches)
        if is_fake_hpu():
            fake_hpu_cache_alloc = 4 * 2**30  # take 4 GiB flat on fake hpu
            return fake_hpu_cache_alloc
        with HabanaMemoryProfiler() as m:
            self.model_runner.profile_run()
            torch.hpu.synchronize()
        msg = ("Model profiling run "
               f"took {m.get_summary_string()}")
        logger.info(msg)
        # At this point we should've allocated the maximum workspace for all
        # recipes we will use the extra memory for graphs/blocks
        free_hpu_memory = torch.hpu.mem_get_info()[0]

        graph_reserved_mem = (float(os.environ.get('VLLM_GRAPH_RESERVED_MEM', '0.1'))
                              if not self.model_config.enforce_eager else 0)
        graph_headroom = 1 - graph_reserved_mem
        available_hpu_memory = free_hpu_memory * \
            self.cache_config.gpu_memory_utilization
        hpu_memory_margin = free_hpu_memory * (1 - self.cache_config.gpu_memory_utilization)
        self.model_runner.mem_margin = hpu_memory_margin
        cache_size_bytes = available_hpu_memory * graph_headroom
        graph_headroom_bytes = available_hpu_memory * (1 - graph_headroom)
        dummy_block_headroom = single_kv_block_size_bytes
        msg = (f"Free device memory: {format_bytes(free_hpu_memory)}, "
               f"{format_bytes(available_hpu_memory)} usable "
               f"(gpu_memory_utilization={self.cache_config.gpu_memory_utilization}),"
               f" {format_bytes(graph_headroom_bytes)} reserved for HPUGraphs "
               f"(VLLM_GRAPH_RESERVED_MEM={graph_reserved_mem}), "
               f"{format_bytes(dummy_block_headroom)} reserved for KV cache dummy "
               f"block {format_bytes(cache_size_bytes - dummy_block_headroom)} "
               "reserved for usable KV cache")

        logger.info(msg)
        gc.collect()
        return cache_size_bytes - dummy_block_headroom

    def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

    def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:
        """Allocate GPU KV cache with the specified kv_cache_config."""

        with HabanaMemoryProfiler() as m:
            self.model_runner.initialize_kv_cache(kv_cache_config)
            torch.hpu.synchronize()
        msg = (f"Usable num_blocks: {kv_cache_config.num_blocks}, "
               f"actual allocated num_blocks: "
               f"{self.model_runner.kv_caches[0][0].shape[0]} "
               f"(_PAD_BLOCK_ID={self.model_runner._PAD_BLOCK_ID}, "
               f"_PAD_SLOT_ID={self.model_runner._PAD_SLOT_ID})")
        logger.info(msg)
        msg = ("Initializing cache engine "
               f"took {m.get_summary_string()}")
        logger.info(msg)
        self.compile_or_warm_up_model()

    def compile_or_warm_up_model(self) -> None:
        # Don't run the warmup if in eager or if the model is already warmed up
        if not self.model_config.enforce_eager \
            and not getattr(self.model_runner, 'graphed_buckets', None):
            self.model_runner.warmup_model()
        # Reset the seed to ensure that the random state is not affected by
        # the model initialization and profiling.
        set_random_seed(self.model_config.seed)

    def sample_tokens(self, grammar_output: "GrammarOutput|None") -> ModelRunnerOutput | AsyncModelRunnerOutput:
        return self.model_runner.sample_tokens(grammar_output)

    @torch.inference_mode()
    def execute_model(
        self,
        scheduler_output: "SchedulerOutput",
    ) -> ModelRunnerOutput | None:
        if self.step_debug:
            self.step_debug(f'step={self.step}')
        if self.step_profiler and self.step == self.profile_steps[0]:
            self.step_profiler.start()
        with track_graph_compile('HPUWorker.execute_model') \
                if self.gc_track_recompiles \
                else contextlib.nullcontext():
            output = self.model_runner.execute_model(scheduler_output)
        # TODO(woosuk): Send the output to the engine process.
        if self.step_profiler:
            if self.step >= self.profile_steps[0]:
                self.step_profiler.step()
            if self.step == self.profile_steps[1]:
                self.step_profiler.stop()
                self.step_profiler = None
                raise RuntimeError('Step profiling finished!')
        self.step += 1
        # NOTE(Harish): removed "if self.rank == 0 else None" for KV_connector enabling with TP>1
        # referred to Gpu Model Runner, KV connector aggregation expects valid output from all ranks
        return output

    def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
        return self.model_runner.get_supported_tasks()

    def take_draft_token_ids(self) -> Optional[DraftTokenIds]:
        return self.model_runner.take_draft_token_ids()

    def profile(self, is_start: bool = True):
        if self.profiler is None:
            raise RuntimeError("Profiler is not enabled.")
        if is_start:
            self.profiler.start()
        else:
            self.profiler.stop()

    def execute_dummy_batch(self) -> None:
        self.model_runner._dummy_run(1)

    def get_kv_connector_handshake_metadata(self) -> dict | None:
        """Get KV connector metadata from this worker if available."""

        if not has_kv_transfer_group():
            return None

        connector = get_kv_transfer_group()
        # Return None for connectors that don't need to exchange handshake
        # metadata across workers.
        if (metadata := connector.get_handshake_metadata()) is None:
            return None

        tp_rank = get_tp_group().rank_in_group
        return {tp_rank: metadata}

cache_config 实例属性

cache_config = cache_config

cache_dtype 实例属性

cache_dtype = dtype

device_config 实例属性

device_config = device_config

distributed_init_method 实例属性

distributed_init_method = distributed_init_method

gc_track_recompiles 实例属性

gc_track_recompiles = (
    track_graph_compilation
    and not high_level_profiler_enabled
)

is_driver_worker 实例属性

is_driver_worker = is_driver_worker

load_config 实例属性

load_config = load_config

local_rank 实例属性

local_rank = local_rank

lora_config 实例属性

lora_config = lora_config

model_config 实例属性

model_config = model_config

observability_config 实例属性

observability_config = observability_config

parallel_config 实例属性

parallel_config = parallel_config

profile_steps 实例属性

profile_steps = VLLM_PROFILE_STEPS

rank 实例属性

rank = rank

scheduler_config 实例属性

scheduler_config = scheduler_config

speculative_config 实例属性

speculative_config = speculative_config

step 实例属性

step = 0

step_debug 实例属性

step_debug = init_debug_logger('steps')

step_profiler 实例属性

step_profiler = setup_step_profiler(profile_steps)

vllm_config 实例属性

vllm_config = vllm_config

__init__

__init__(
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
)
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def __init__(
    self,
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
):

    # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config)
    self.vllm_config = vllm_config
    self.model_config = vllm_config.model_config
    self.cache_config = vllm_config.cache_config
    self.lora_config = vllm_config.lora_config
    self.load_config = vllm_config.load_config
    self.parallel_config = vllm_config.parallel_config
    self.scheduler_config = vllm_config.scheduler_config
    self.device_config = vllm_config.device_config
    self.speculative_config = vllm_config.speculative_config
    self.observability_config = vllm_config.observability_config

    self.local_rank = local_rank
    self.rank = rank
    self.distributed_init_method = distributed_init_method
    self.is_driver_worker = is_driver_worker

    if self.cache_config.cache_dtype == "auto":
        self.cache_dtype = self.model_config.dtype
    else:
        self.cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[self.cache_config.cache_dtype]

    if self.model_config.trust_remote_code:
        # note: lazy import to avoid importing torch before initializing
        from vllm.utils.import_utils import init_cached_hf_modules
        init_cached_hf_modules()

    self.gc_track_recompiles = get_config().track_graph_compilation and not get_config().high_level_profiler_enabled
    self.step = 0
    self.profile_steps = get_config().VLLM_PROFILE_STEPS
    self.step_profiler = setup_step_profiler(self.profile_steps)
    self.step_debug = init_debug_logger('steps')

compile_or_warm_up_model

compile_or_warm_up_model() -> None
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def compile_or_warm_up_model(self) -> None:
    # Don't run the warmup if in eager or if the model is already warmed up
    if not self.model_config.enforce_eager \
        and not getattr(self.model_runner, 'graphed_buckets', None):
        self.model_runner.warmup_model()
    # Reset the seed to ensure that the random state is not affected by
    # the model initialization and profiling.
    set_random_seed(self.model_config.seed)

determine_available_memory

determine_available_memory() -> int

配置模型的峰值内存使用量,以确定可以在不发生 OOM 的情况下分配多少 KV 块。

引擎将首先分析现有内存使用情况。然后,它将计算在剩余可用内存中可以分配的最大 GPU 块和 CPU 块的数量。

.. tip:: 您可以通过调整 gpu_memory_utilization 参数来限制 GPU 内存的使用。

Source code in vllm_gaudi/v1/worker/hpu_worker.py
@torch.inference_mode()
def determine_available_memory(self) -> int:
    """Profiles the peak memory usage of the model to determine how many
    KV blocks may be allocated without OOMs.

    The engine will first conduct a profiling of the existing memory usage.
    Then, it calculate the maximum possible number of GPU and CPU blocks
    that can be allocated with the remaining free memory.

    .. tip::
        You may limit the usage of GPU memory
        by adjusting the `gpu_memory_utilization` parameter.
    """
    # Profile the memory usage of the model and get the maximum number of
    # cache blocks that can be allocated with the remaining free memory.

    # Execute a forward pass with dummy inputs to profile the memory usage
    # of the model.
    kv_caches: dict[str, torch.Tensor] = {}
    kv_cache_spec = self.model_runner.get_kv_cache_spec()
    single_kv_block_size_bytes = 0
    for layer_name, layer_spec in kv_cache_spec.items():
        if isinstance(layer_spec, FullAttentionSpec):
            dtype = layer_spec.dtype

            # Use an empty tensor instead of `None`` to force Dynamo to pass
            # it by reference, rather by specializing on the value ``None``.
            hpu_k_cache = torch.tensor([], dtype=dtype, device='hpu')
            hpu_v_cache = torch.tensor([], dtype=dtype, device='hpu')

            kv_caches[layer_name] = (hpu_k_cache, hpu_v_cache)

            single_kv_block_size_bytes += layer_spec.page_size_bytes

        else:
            raise NotImplementedError

    runner_kv_caches: list[torch.Tensor] = []
    bind_kv_cache(kv_caches, self.vllm_config.compilation_config.static_forward_context, runner_kv_caches)
    if is_fake_hpu():
        fake_hpu_cache_alloc = 4 * 2**30  # take 4 GiB flat on fake hpu
        return fake_hpu_cache_alloc
    with HabanaMemoryProfiler() as m:
        self.model_runner.profile_run()
        torch.hpu.synchronize()
    msg = ("Model profiling run "
           f"took {m.get_summary_string()}")
    logger.info(msg)
    # At this point we should've allocated the maximum workspace for all
    # recipes we will use the extra memory for graphs/blocks
    free_hpu_memory = torch.hpu.mem_get_info()[0]

    graph_reserved_mem = (float(os.environ.get('VLLM_GRAPH_RESERVED_MEM', '0.1'))
                          if not self.model_config.enforce_eager else 0)
    graph_headroom = 1 - graph_reserved_mem
    available_hpu_memory = free_hpu_memory * \
        self.cache_config.gpu_memory_utilization
    hpu_memory_margin = free_hpu_memory * (1 - self.cache_config.gpu_memory_utilization)
    self.model_runner.mem_margin = hpu_memory_margin
    cache_size_bytes = available_hpu_memory * graph_headroom
    graph_headroom_bytes = available_hpu_memory * (1 - graph_headroom)
    dummy_block_headroom = single_kv_block_size_bytes
    msg = (f"Free device memory: {format_bytes(free_hpu_memory)}, "
           f"{format_bytes(available_hpu_memory)} usable "
           f"(gpu_memory_utilization={self.cache_config.gpu_memory_utilization}),"
           f" {format_bytes(graph_headroom_bytes)} reserved for HPUGraphs "
           f"(VLLM_GRAPH_RESERVED_MEM={graph_reserved_mem}), "
           f"{format_bytes(dummy_block_headroom)} reserved for KV cache dummy "
           f"block {format_bytes(cache_size_bytes - dummy_block_headroom)} "
           "reserved for usable KV cache")

    logger.info(msg)
    gc.collect()
    return cache_size_bytes - dummy_block_headroom

execute_dummy_batch

execute_dummy_batch() -> None
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def execute_dummy_batch(self) -> None:
    self.model_runner._dummy_run(1)

execute_model

execute_model(
    scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput | None
Source code in vllm_gaudi/v1/worker/hpu_worker.py
@torch.inference_mode()
def execute_model(
    self,
    scheduler_output: "SchedulerOutput",
) -> ModelRunnerOutput | None:
    if self.step_debug:
        self.step_debug(f'step={self.step}')
    if self.step_profiler and self.step == self.profile_steps[0]:
        self.step_profiler.start()
    with track_graph_compile('HPUWorker.execute_model') \
            if self.gc_track_recompiles \
            else contextlib.nullcontext():
        output = self.model_runner.execute_model(scheduler_output)
    # TODO(woosuk): Send the output to the engine process.
    if self.step_profiler:
        if self.step >= self.profile_steps[0]:
            self.step_profiler.step()
        if self.step == self.profile_steps[1]:
            self.step_profiler.stop()
            self.step_profiler = None
            raise RuntimeError('Step profiling finished!')
    self.step += 1
    # NOTE(Harish): removed "if self.rank == 0 else None" for KV_connector enabling with TP>1
    # referred to Gpu Model Runner, KV connector aggregation expects valid output from all ranks
    return output

get_kv_cache_spec

get_kv_cache_spec() -> dict[str, KVCacheSpec]
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
    return self.model_runner.get_kv_cache_spec()

get_kv_connector_handshake_metadata

get_kv_connector_handshake_metadata() -> dict | None

如果可用,从该工作进程获取 KV 连接器元数据。

Source code in vllm_gaudi/v1/worker/hpu_worker.py
def get_kv_connector_handshake_metadata(self) -> dict | None:
    """Get KV connector metadata from this worker if available."""

    if not has_kv_transfer_group():
        return None

    connector = get_kv_transfer_group()
    # Return None for connectors that don't need to exchange handshake
    # metadata across workers.
    if (metadata := connector.get_handshake_metadata()) is None:
        return None

    tp_rank = get_tp_group().rank_in_group
    return {tp_rank: metadata}

get_model

get_model() -> Module
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def get_model(self) -> nn.Module:
    return self.model_runner.get_model()

get_supported_tasks

get_supported_tasks() -> tuple[SupportedTask, ...]
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
    return self.model_runner.get_supported_tasks()

init_device

init_device()
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def init_device(self):
    self.device = torch.device("hpu")
    # Initialize the distributed environment.
    init_worker_distributed_environment(self.vllm_config, self.rank, self.distributed_init_method, self.local_rank)
    # Set random seed.
    set_random_seed(self.model_config.seed)
    self.model_runner = HPUModelRunner(vllm_config=self.vllm_config, is_driver_worker=self.is_driver_worker)
    self.init_profiler()

init_profiler

init_profiler()

初始化性能分析器。

Source code in vllm_gaudi/v1/worker/hpu_worker.py
def init_profiler(self):
    """Initialize the profiler."""
    if envs.VLLM_TORCH_PROFILER_DIR:
        torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
        logger.info("Profiling enabled. Traces will be saved to: %s", torch_profiler_trace_dir)
        if os.getenv('VLLM_PROFILER_ENABLED') == 'full':
            fn = self.model_runner.profiler.full_trace_handler
            with_stack = False
        else:
            fn = torch.profiler.tensorboard_trace_handler
            with_stack = True
        self.profiler = torch.profiler.profile(activities=[
            torch.profiler.ProfilerActivity.CPU,
            torch.profiler.ProfilerActivity.HPU,
        ],
                                               with_stack=with_stack,
                                               on_trace_ready=fn(torch_profiler_trace_dir, use_gzip=True))

    else:
        self.profiler = None

initialize_cache

initialize_cache(
    num_gpu_blocks: int, num_cpu_blocks: int
) -> None
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
    self.cache_config.num_gpu_blocks = num_gpu_blocks
    self.cache_config.num_cpu_blocks = num_cpu_blocks

initialize_from_config

initialize_from_config(
    kv_cache_config: KVCacheConfig,
) -> None

使用指定的 kv_cache_config 分配 GPU KV 缓存。

Source code in vllm_gaudi/v1/worker/hpu_worker.py
def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:
    """Allocate GPU KV cache with the specified kv_cache_config."""

    with HabanaMemoryProfiler() as m:
        self.model_runner.initialize_kv_cache(kv_cache_config)
        torch.hpu.synchronize()
    msg = (f"Usable num_blocks: {kv_cache_config.num_blocks}, "
           f"actual allocated num_blocks: "
           f"{self.model_runner.kv_caches[0][0].shape[0]} "
           f"(_PAD_BLOCK_ID={self.model_runner._PAD_BLOCK_ID}, "
           f"_PAD_SLOT_ID={self.model_runner._PAD_SLOT_ID})")
    logger.info(msg)
    msg = ("Initializing cache engine "
           f"took {m.get_summary_string()}")
    logger.info(msg)
    self.compile_or_warm_up_model()

load_model

load_model() -> None
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def load_model(self) -> None:
    self.model_runner.load_model()

profile

profile(is_start: bool = True)
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def profile(self, is_start: bool = True):
    if self.profiler is None:
        raise RuntimeError("Profiler is not enabled.")
    if is_start:
        self.profiler.start()
    else:
        self.profiler.stop()

sample_tokens

sample_tokens(
    grammar_output: GrammarOutput | None,
) -> ModelRunnerOutput | AsyncModelRunnerOutput
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def sample_tokens(self, grammar_output: "GrammarOutput|None") -> ModelRunnerOutput | AsyncModelRunnerOutput:
    return self.model_runner.sample_tokens(grammar_output)

start_profile

start_profile()
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def start_profile(self):
    if self.profiler is None:
        raise RuntimeError("Profiler is not enabled.")
    high_level_profiler = self.model_runner.profiler
    with high_level_profiler.record_event('internal', 'start_profiler'):
        # Clean up the queue
        while True:
            try:
                high_level_profiler.profiling_trace_events.get_nowait()
            except queue.Empty:
                break
        self.profiler.start()

stop_profile

stop_profile()
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def stop_profile(self):
    if self.profiler is None:
        raise RuntimeError("Profiler is not enabled.")
    self.profiler.stop()

take_draft_token_ids

take_draft_token_ids() -> Optional[DraftTokenIds]
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def take_draft_token_ids(self) -> Optional[DraftTokenIds]:
    return self.model_runner.take_draft_token_ids()

init_worker_distributed_environment

init_worker_distributed_environment(
    vllm_config: VllmConfig,
    rank: int,
    distributed_init_method: Optional[str] = None,
    local_rank: int = -1,
) -> None
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def init_worker_distributed_environment(
    vllm_config: VllmConfig,
    rank: int,
    distributed_init_method: Optional[str] = None,
    local_rank: int = -1,
) -> None:
    parallel_config = vllm_config.parallel_config
    """Initialize the distributed environment."""
    init_distributed_environment(parallel_config.world_size, rank, distributed_init_method, local_rank, backend='hccl')
    ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size)

    dummy_tensor_hpu = torch.ones(1).to('hpu')
    torch.distributed.all_reduce(dummy_tensor_hpu)
    assert dummy_tensor_hpu.item() == parallel_config.world_size * parallel_config.data_parallel_size
    ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size)

    ensure_kv_transfer_initialized(vllm_config)

setup_step_profiler

setup_step_profiler(steps)
Source code in vllm_gaudi/v1/worker/hpu_worker.py
def setup_step_profiler(steps):
    if steps is None:
        return None
    step_start, step_end = steps
    active = step_end - step_start + 1
    return setup_profiler(warmup=0, active=active)

track_graph_compile

track_graph_compile(name: str)
Source code in vllm_gaudi/v1/worker/hpu_worker.py
@contextmanager
def track_graph_compile(name: str):
    import habana_frameworks.torch as htorch
    from habana_frameworks.torch.hpu.metrics import metric_localcontext
    with metric_localcontext("graph_compilation") as gc:
        yield
        htorch.hpu.synchronize()
    if gc.stats()[0][1] != 0:
        msg = f"[{name}] graph compilation detected: {gc.stats()}"
        logger.warning(msg)