跳到内容

vllm_gaudi.extension.profiler

FileWriter

Bases: Thread

Source code in vllm_gaudi/extension/profiler.py
class FileWriter(threading.Thread):

    def __init__(self, filename, event_queue):
        super().__init__()
        self.filename = filename
        self.event_queue = event_queue
        self.daemon = True
        self.timer_event = threading.Event()

    def _drain_event_queue(self):
        content = ''
        while True:
            try:
                element = self.event_queue.get_nowait()
                content += element
            except queue.Empty:
                break
        return content

    def run(self):
        # don't check the queue too often
        while not self.timer_event.wait(1):
            # Block and wait for the next item in the queue
            content = self.event_queue.get()
            # Collect any other items in the queue
            content += self._drain_event_queue()

            with open(self.filename, 'a') as outfile:
                outfile.write(content)

daemon 实例属性

daemon = True

event_queue 实例属性

event_queue = event_queue

filename 实例属性

filename = filename

timer_event 实例属性

timer_event = Event()

__init__

__init__(filename, event_queue)
Source code in vllm_gaudi/extension/profiler.py
def __init__(self, filename, event_queue):
    super().__init__()
    self.filename = filename
    self.event_queue = event_queue
    self.daemon = True
    self.timer_event = threading.Event()

_drain_event_queue

_drain_event_queue()
Source code in vllm_gaudi/extension/profiler.py
def _drain_event_queue(self):
    content = ''
    while True:
        try:
            element = self.event_queue.get_nowait()
            content += element
        except queue.Empty:
            break
    return content

run

run()
Source code in vllm_gaudi/extension/profiler.py
def run(self):
    # don't check the queue too often
    while not self.timer_event.wait(1):
        # Block and wait for the next item in the queue
        content = self.event_queue.get()
        # Collect any other items in the queue
        content += self._drain_event_queue()

        with open(self.filename, 'a') as outfile:
            outfile.write(content)

HabanaHighLevelProfiler

Source code in vllm_gaudi/extension/profiler.py
class HabanaHighLevelProfiler:
    profiling_trace_events: queue.Queue = queue.Queue()
    event_tid = {'counter': 1, 'external': 2, 'internal': 3}
    event_cache: List[Any] = []

    def __init__(self, vllm_instance_id=None):
        self.enabled = get_config().high_level_profiler_enabled and int(os.getenv('RANK', '0')) == 0
        self.pid = os.getpid()
        if self.enabled:
            self.vllm_instance_id = vllm_instance_id if vllm_instance_id is not None \
                else f"vllm-instance-{self.pid}-{str(uuid.uuid4().hex)}"
            msg = f'Profiler enabled for: {self.vllm_instance_id}'
            logger().info(msg)
            self.filename = f'server_events_{self.vllm_instance_id}.json'
            # initialize the trace file (JSON Array Format)
            with open(self.filename, 'w') as outfile:
                outfile.write('[')
            file_writer = FileWriter(self.filename, self.profiling_trace_events)
            file_writer.start()
        if os.getenv('VLLM_PROFILER_ENABLED') == 'full':
            self.enabled = True  # don't save separate high-level traces
        self.gc_track_recompiles = get_config().track_graph_compilation
        self.num_graph_compilations = 0

    def _dump_with_sep(self, entry):
        entry = json.dumps(entry) + ','
        self.profiling_trace_events.put(entry)

    def get_timestamp_us(self):
        return time.time() * 1000000.0

    def record_counter(self, ts, counter):
        if self.enabled:
            self._dump_with_sep({
                'pid': self.pid,
                'tid': self.event_tid['counter'],
                'ph': 'C',
                'name': 'utils',
                'ts': ts,
                'args': counter
            })

    def start(self, type, name, args=None):
        if self.enabled:
            ts = self.get_timestamp_us()
            if args is not None and 'counter' in args:
                self.record_counter(ts, args['counter'])
                del args['counter']
            event = {
                'pid': self.pid,
                'tid': self.event_tid[type],
                'ph': 'X',
                'name': name,
                'ts': ts,
                'dur': None,
                'args': args
            }
            self.event_cache.append(event)

    def end(self):
        if self.enabled:
            ts = self.get_timestamp_us()
            if not self.event_cache:
                logger().warning('Profiler: end() call does not have matching start() call. '
                                 'Disabling profiler.')
                self.enabled = False
                return
            event = self.event_cache.pop()
            event['dur'] = ts - event['ts']
            self._dump_with_sep(event)

    def full_trace_handler(self, dir_name, use_gzip=False):

        def handler_fn(prof) -> None:
            if not os.path.isdir(dir_name):
                try:
                    os.makedirs(dir_name, exist_ok=True)
                except Exception as e:
                    raise RuntimeError("Can't create directory: " + dir_name) from e
            file_name = f"vllm.{time.time_ns()}.pt.trace.json"
            file_path = os.path.join(dir_name, file_name)
            prof.export_chrome_trace(file_path)
            with open(file_path) as f:
                pytorch_trace = json.load(f)
            os.remove(file_path)
            base = pytorch_trace['baseTimeNanoseconds'] / 1000
            events = self.profiling_trace_events
            while True:
                try:
                    event_str = events.get_nowait()
                    event = json.loads(event_str[:-1])
                    event['ts'] = event['ts'] - base
                    pytorch_trace['traceEvents'].append(event)
                except queue.Empty:
                    break

            pytorch_trace['traceEvents'].append({
                "args": {
                    "name": "vLLM"
                },
                "name": "process_name",
                "ph": "M",
                "pid": 1,
                "tid": 0,
                "ts": 0.0
            })
            if use_gzip:
                file_path = file_path + ".gz"
                with gzip.open(file_path, 'wt', encoding="ascii") as zipfile:
                    json.dump(pytorch_trace, zipfile)
            else:
                with open(file_path, "w") as outfile:
                    outfile.write(json.dumps(pytorch_trace))
            logger().info("Saved full profiling to %s", file_path)

        return handler_fn

    @contextmanager
    def record_event(self, type, name, args=None):
        if self.enabled:
            self.start(type, name, args)
            with self.track_graph_compile(type, args) \
                if self.gc_track_recompiles \
                else contextlib.nullcontext():
                yield
            self.end()
        else:
            yield

    def record_block(self, type, name, ts, dur, args=None):
        if self.enabled:
            event = {
                'pid': self.pid,
                'tid': self.event_tid[type],
                'ph': 'X',
                'name': name,
                'ts': ts,
                'dur': dur,
                'args': args
            }
            self._dump_with_sep(event)

    @contextmanager
    def track_graph_compile(self, type, args=None):
        start = self.get_timestamp_us()
        import habana_frameworks.torch as htorch
        from habana_frameworks.torch.hpu.metrics import metric_localcontext
        with metric_localcontext("graph_compilation") as gc:
            yield
            htorch.hpu.synchronize()
        if gc.stats()[0][1] != 0:
            compile_start_time = start
            for recipe in gc.stats()[3][1]:
                recipe_name = recipe[0]
                compile_time = recipe[1]
                self.num_graph_compilations += 1
                self.record_counter(compile_start_time, {'cumulative_graph_compilations': self.num_graph_compilations})
                self.record_block(type, 'GRAPH COMPILE: ' + recipe_name, compile_start_time, compile_time, args)
                compile_start_time += compile_time

enabled 实例属性

enabled = (
    high_level_profiler_enabled
    and int(getenv("RANK", "0")) == 0
)

event_cache 类属性 实例属性

event_cache: List[Any] = []

event_tid 类属性 实例属性

event_tid = {'counter': 1, 'external': 2, 'internal': 3}

filename 实例属性

filename = f'server_events_{vllm_instance_id}.json'

gc_track_recompiles 实例属性

gc_track_recompiles = track_graph_compilation

num_graph_compilations 实例属性

num_graph_compilations = 0

pid 实例属性

pid = getpid()

profiling_trace_events 类属性 实例属性

profiling_trace_events: Queue = Queue()

vllm_instance_id 实例属性

vllm_instance_id = (
    vllm_instance_id
    if vllm_instance_id is not None
    else f"vllm-instance-{pid}-{str(hex)}"
)

__init__

__init__(vllm_instance_id=None)
Source code in vllm_gaudi/extension/profiler.py
def __init__(self, vllm_instance_id=None):
    self.enabled = get_config().high_level_profiler_enabled and int(os.getenv('RANK', '0')) == 0
    self.pid = os.getpid()
    if self.enabled:
        self.vllm_instance_id = vllm_instance_id if vllm_instance_id is not None \
            else f"vllm-instance-{self.pid}-{str(uuid.uuid4().hex)}"
        msg = f'Profiler enabled for: {self.vllm_instance_id}'
        logger().info(msg)
        self.filename = f'server_events_{self.vllm_instance_id}.json'
        # initialize the trace file (JSON Array Format)
        with open(self.filename, 'w') as outfile:
            outfile.write('[')
        file_writer = FileWriter(self.filename, self.profiling_trace_events)
        file_writer.start()
    if os.getenv('VLLM_PROFILER_ENABLED') == 'full':
        self.enabled = True  # don't save separate high-level traces
    self.gc_track_recompiles = get_config().track_graph_compilation
    self.num_graph_compilations = 0

_dump_with_sep

_dump_with_sep(entry)
Source code in vllm_gaudi/extension/profiler.py
def _dump_with_sep(self, entry):
    entry = json.dumps(entry) + ','
    self.profiling_trace_events.put(entry)

end

end()
Source code in vllm_gaudi/extension/profiler.py
def end(self):
    if self.enabled:
        ts = self.get_timestamp_us()
        if not self.event_cache:
            logger().warning('Profiler: end() call does not have matching start() call. '
                             'Disabling profiler.')
            self.enabled = False
            return
        event = self.event_cache.pop()
        event['dur'] = ts - event['ts']
        self._dump_with_sep(event)

full_trace_handler

full_trace_handler(dir_name, use_gzip=False)
Source code in vllm_gaudi/extension/profiler.py
def full_trace_handler(self, dir_name, use_gzip=False):

    def handler_fn(prof) -> None:
        if not os.path.isdir(dir_name):
            try:
                os.makedirs(dir_name, exist_ok=True)
            except Exception as e:
                raise RuntimeError("Can't create directory: " + dir_name) from e
        file_name = f"vllm.{time.time_ns()}.pt.trace.json"
        file_path = os.path.join(dir_name, file_name)
        prof.export_chrome_trace(file_path)
        with open(file_path) as f:
            pytorch_trace = json.load(f)
        os.remove(file_path)
        base = pytorch_trace['baseTimeNanoseconds'] / 1000
        events = self.profiling_trace_events
        while True:
            try:
                event_str = events.get_nowait()
                event = json.loads(event_str[:-1])
                event['ts'] = event['ts'] - base
                pytorch_trace['traceEvents'].append(event)
            except queue.Empty:
                break

        pytorch_trace['traceEvents'].append({
            "args": {
                "name": "vLLM"
            },
            "name": "process_name",
            "ph": "M",
            "pid": 1,
            "tid": 0,
            "ts": 0.0
        })
        if use_gzip:
            file_path = file_path + ".gz"
            with gzip.open(file_path, 'wt', encoding="ascii") as zipfile:
                json.dump(pytorch_trace, zipfile)
        else:
            with open(file_path, "w") as outfile:
                outfile.write(json.dumps(pytorch_trace))
        logger().info("Saved full profiling to %s", file_path)

    return handler_fn

get_timestamp_us

get_timestamp_us()
Source code in vllm_gaudi/extension/profiler.py
def get_timestamp_us(self):
    return time.time() * 1000000.0

record_block

record_block(type, name, ts, dur, args=None)
Source code in vllm_gaudi/extension/profiler.py
def record_block(self, type, name, ts, dur, args=None):
    if self.enabled:
        event = {
            'pid': self.pid,
            'tid': self.event_tid[type],
            'ph': 'X',
            'name': name,
            'ts': ts,
            'dur': dur,
            'args': args
        }
        self._dump_with_sep(event)

record_counter

record_counter(ts, counter)
Source code in vllm_gaudi/extension/profiler.py
def record_counter(self, ts, counter):
    if self.enabled:
        self._dump_with_sep({
            'pid': self.pid,
            'tid': self.event_tid['counter'],
            'ph': 'C',
            'name': 'utils',
            'ts': ts,
            'args': counter
        })

record_event

record_event(type, name, args=None)
Source code in vllm_gaudi/extension/profiler.py
@contextmanager
def record_event(self, type, name, args=None):
    if self.enabled:
        self.start(type, name, args)
        with self.track_graph_compile(type, args) \
            if self.gc_track_recompiles \
            else contextlib.nullcontext():
            yield
        self.end()
    else:
        yield

start

start(type, name, args=None)
Source code in vllm_gaudi/extension/profiler.py
def start(self, type, name, args=None):
    if self.enabled:
        ts = self.get_timestamp_us()
        if args is not None and 'counter' in args:
            self.record_counter(ts, args['counter'])
            del args['counter']
        event = {
            'pid': self.pid,
            'tid': self.event_tid[type],
            'ph': 'X',
            'name': name,
            'ts': ts,
            'dur': None,
            'args': args
        }
        self.event_cache.append(event)

track_graph_compile

track_graph_compile(type, args=None)
Source code in vllm_gaudi/extension/profiler.py
@contextmanager
def track_graph_compile(self, type, args=None):
    start = self.get_timestamp_us()
    import habana_frameworks.torch as htorch
    from habana_frameworks.torch.hpu.metrics import metric_localcontext
    with metric_localcontext("graph_compilation") as gc:
        yield
        htorch.hpu.synchronize()
    if gc.stats()[0][1] != 0:
        compile_start_time = start
        for recipe in gc.stats()[3][1]:
            recipe_name = recipe[0]
            compile_time = recipe[1]
            self.num_graph_compilations += 1
            self.record_counter(compile_start_time, {'cumulative_graph_compilations': self.num_graph_compilations})
            self.record_block(type, 'GRAPH COMPILE: ' + recipe_name, compile_start_time, compile_time, args)
            compile_start_time += compile_time

HabanaMemoryProfiler

Source code in vllm_gaudi/extension/profiler.py
class HabanaMemoryProfiler:

    def __init__(self, device=None):
        self.device = device

    @staticmethod
    def current_device_memory_usage() -> float:
        if is_fake_hpu():
            return 0
        # Return the device memory usage in bytes.
        free_hpu_memory, total_hpu_memory = torch.hpu.mem_get_info()
        return total_hpu_memory - free_hpu_memory

    @staticmethod
    def current_free_device_memory() -> float:
        if is_fake_hpu():
            return 0
        # Return the device memory usage in bytes.
        free_hpu_memory, _ = torch.hpu.mem_get_info()
        return free_hpu_memory

    @staticmethod
    def total_device_memory() -> float:
        if is_fake_hpu():
            return 0
        # Return the device memory usage in bytes.
        _, total_hpu_memory = torch.hpu.mem_get_info()
        return total_hpu_memory

    @staticmethod
    def current_host_memory_usage() -> float:
        # Return the host memory usage in bytes.
        return HabanaMemoryProfiler.total_host_memory() - HabanaMemoryProfiler.current_free_host_memory()

    @staticmethod
    def current_free_host_memory() -> float:
        # Return the host memory usage in bytes.
        return psutil.virtual_memory().available

    @staticmethod
    def total_host_memory() -> float:
        # Return the host memory usage in bytes.
        return psutil.virtual_memory().total

    def get_summary_string(self):
        if getattr(self, 'final_device_memory', None) is None or getattr(self, 'final_host_memory', None) is None:
            raise RuntimeError("HabanaMemoryProfiler.get_summary_string() can only be called "
                               "after closing context manager")
        return (f"{format_bytes(self.consumed_device_memory)} of device memory "
                f"({format_bytes(self.final_device_memory)}/"
                f"{format_bytes(HabanaMemoryProfiler.total_device_memory())} used)"
                f" and {format_bytes(self.consumed_host_memory)} of host memory "
                f"({format_bytes(self.final_host_memory)}/"
                f"{format_bytes(HabanaMemoryProfiler.total_host_memory())} used)")

    def __enter__(self):
        # Force garbage collection
        gc.collect()
        self.initial_device_memory = \
            HabanaMemoryProfiler.current_device_memory_usage()
        self.initial_host_memory = \
            HabanaMemoryProfiler.current_host_memory_usage()
        # This allows us to call methods of the context manager if needed
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Force garbage collection
        gc.collect()
        self.final_device_memory = \
            HabanaMemoryProfiler.current_device_memory_usage(
        )
        self.final_host_memory = HabanaMemoryProfiler.current_host_memory_usage()
        self.consumed_device_memory = \
            self.final_device_memory - self.initial_device_memory
        self.consumed_host_memory = \
            self.final_host_memory - self.initial_host_memory

device 实例属性

device = device

__enter__

__enter__()
Source code in vllm_gaudi/extension/profiler.py
def __enter__(self):
    # Force garbage collection
    gc.collect()
    self.initial_device_memory = \
        HabanaMemoryProfiler.current_device_memory_usage()
    self.initial_host_memory = \
        HabanaMemoryProfiler.current_host_memory_usage()
    # This allows us to call methods of the context manager if needed
    return self

__exit__

__exit__(exc_type, exc_val, exc_tb)
Source code in vllm_gaudi/extension/profiler.py
def __exit__(self, exc_type, exc_val, exc_tb):
    # Force garbage collection
    gc.collect()
    self.final_device_memory = \
        HabanaMemoryProfiler.current_device_memory_usage(
    )
    self.final_host_memory = HabanaMemoryProfiler.current_host_memory_usage()
    self.consumed_device_memory = \
        self.final_device_memory - self.initial_device_memory
    self.consumed_host_memory = \
        self.final_host_memory - self.initial_host_memory

__init__

__init__(device=None)
Source code in vllm_gaudi/extension/profiler.py
def __init__(self, device=None):
    self.device = device

current_device_memory_usage 静态方法

current_device_memory_usage() -> float
Source code in vllm_gaudi/extension/profiler.py
@staticmethod
def current_device_memory_usage() -> float:
    if is_fake_hpu():
        return 0
    # Return the device memory usage in bytes.
    free_hpu_memory, total_hpu_memory = torch.hpu.mem_get_info()
    return total_hpu_memory - free_hpu_memory

current_free_device_memory 静态方法

current_free_device_memory() -> float
Source code in vllm_gaudi/extension/profiler.py
@staticmethod
def current_free_device_memory() -> float:
    if is_fake_hpu():
        return 0
    # Return the device memory usage in bytes.
    free_hpu_memory, _ = torch.hpu.mem_get_info()
    return free_hpu_memory

current_free_host_memory 静态方法

current_free_host_memory() -> float
Source code in vllm_gaudi/extension/profiler.py
@staticmethod
def current_free_host_memory() -> float:
    # Return the host memory usage in bytes.
    return psutil.virtual_memory().available

current_host_memory_usage 静态方法

current_host_memory_usage() -> float
Source code in vllm_gaudi/extension/profiler.py
@staticmethod
def current_host_memory_usage() -> float:
    # Return the host memory usage in bytes.
    return HabanaMemoryProfiler.total_host_memory() - HabanaMemoryProfiler.current_free_host_memory()

get_summary_string

get_summary_string()
Source code in vllm_gaudi/extension/profiler.py
def get_summary_string(self):
    if getattr(self, 'final_device_memory', None) is None or getattr(self, 'final_host_memory', None) is None:
        raise RuntimeError("HabanaMemoryProfiler.get_summary_string() can only be called "
                           "after closing context manager")
    return (f"{format_bytes(self.consumed_device_memory)} of device memory "
            f"({format_bytes(self.final_device_memory)}/"
            f"{format_bytes(HabanaMemoryProfiler.total_device_memory())} used)"
            f" and {format_bytes(self.consumed_host_memory)} of host memory "
            f"({format_bytes(self.final_host_memory)}/"
            f"{format_bytes(HabanaMemoryProfiler.total_host_memory())} used)")

total_device_memory 静态方法

total_device_memory() -> float
Source code in vllm_gaudi/extension/profiler.py
@staticmethod
def total_device_memory() -> float:
    if is_fake_hpu():
        return 0
    # Return the device memory usage in bytes.
    _, total_hpu_memory = torch.hpu.mem_get_info()
    return total_hpu_memory

total_host_memory 静态方法

total_host_memory() -> float
Source code in vllm_gaudi/extension/profiler.py
@staticmethod
def total_host_memory() -> float:
    # Return the host memory usage in bytes.
    return psutil.virtual_memory().total

HabanaProfilerCounterHelper

Source code in vllm_gaudi/extension/profiler.py
class HabanaProfilerCounterHelper:

    def __init__(self):
        self.niter = 0
        self.average_real_throughput = None
        self.logged_once = False
        self.prompt_real_seq_lens = []
        self.decode_real_seq_lens = []

    def capture_decode_seq_stats(self, real_seq_lens):
        self.decode_real_seq_lens = real_seq_lens

    def capture_prompt_seq_stats(self, real_seq_lens):
        self.prompt_real_seq_lens.append(real_seq_lens)

    def reset_prompt_seq_stats(self):
        self.prompt_real_seq_lens = []

    def get_counter_dict(self, cache_config, duration, seq_len, batch_size_padded, real_batch_size, prompt_batch_idx,
                         is_prompt):
        throughput = batch_size_padded / (duration / 1e6)
        throughput_effective = real_batch_size / (duration / 1e6)
        if is_prompt:
            real_max_seq_len = max(self.prompt_real_seq_lens[prompt_batch_idx])
            real_num_tokens = sum(self.prompt_real_seq_lens[prompt_batch_idx])
        else:
            real_max_seq_len = max(self.decode_real_seq_lens)
            real_num_tokens = sum(self.decode_real_seq_lens)
        padded_num_tokens = batch_size_padded * seq_len
        batch_token_utilization = real_num_tokens / padded_num_tokens
        if self.average_real_throughput is None:
            self.average_real_throughput = throughput_effective
        else:  # https://www.heikohoffmann.de/htmlthesis/node134.html
            self.average_real_throughput = self.average_real_throughput + 1 / (self.niter + 1) * (
                throughput_effective - self.average_real_throughput)
        phase = "prompt" if is_prompt else "decode"
        counters = {
            f'{phase}_bucket_batch_size': batch_size_padded,
            f'{phase}_batch_size': real_batch_size,
            f'{phase}_bucket_seq_len': seq_len,
            f'{phase}_seq_len': real_max_seq_len,
            f'{phase}_bucket_gen_throughput': throughput,
            f'{phase}_real_gen_throughput': throughput_effective,
            f'{phase}_batch_token_utilization': batch_token_utilization,
            'average_real_throughput': self.average_real_throughput,
            'engine_iteration': self.niter,
        }
        self.niter += 1
        if is_prompt:
            prompt_bucket_in_throughput = (seq_len * batch_size_padded) / (duration / 1e6)
            prompt_real_in_throughput = sum(self.prompt_real_seq_lens[prompt_batch_idx]) / (duration / 1e6)
            counters[f'{phase}_bucket_in_throughput'] = prompt_bucket_in_throughput
            counters[f'{phase}_real_in_throughput'] = prompt_real_in_throughput

        # KV cache might not be created yet (e.g. for profiling run)
        if cache_config.num_gpu_blocks is not None and \
            cache_config.num_gpu_blocks != 0:
            seq_lens = self.prompt_real_seq_lens[prompt_batch_idx] \
                if is_prompt \
                else self.decode_real_seq_lens
            cache_num_blocks_used = [math.ceil(sl / cache_config.block_size) for sl in seq_lens]
            cache_total_num_blocks_used = sum(cache_num_blocks_used)
            num_cache_blocks = cache_config.num_gpu_blocks
            cache_total_num_free_blocks = \
                num_cache_blocks - cache_total_num_blocks_used
            cache_computed_utilization = \
                cache_total_num_blocks_used / num_cache_blocks
            max_blocks_per_seq = math.ceil(seq_len / cache_config.block_size)
            batch_block_utilization = cache_total_num_blocks_used / (batch_size_padded * max_blocks_per_seq)
            counters['cache_num_blocks_used'] = cache_total_num_blocks_used
            counters['cache_num_free_blocks'] = cache_total_num_free_blocks
            counters['cache_computed_utilization'] = cache_computed_utilization
            counters[f'{phase}_batch_block_utilization'] = batch_block_utilization
        if not self.logged_once:
            counters['const_cache_num_blocks'] = cache_config.num_gpu_blocks
            counters[
                'const_gpu_memory_utilization'] = \
                    cache_config.gpu_memory_utilization
            counters['const_block_size'] = cache_config.block_size
            self.logged_once = True

        return counters

average_real_throughput 实例属性

average_real_throughput = None

decode_real_seq_lens 实例属性

decode_real_seq_lens = []

logged_once 实例属性

logged_once = False

niter 实例属性

niter = 0

prompt_real_seq_lens 实例属性

prompt_real_seq_lens = []

__init__

__init__()
Source code in vllm_gaudi/extension/profiler.py
def __init__(self):
    self.niter = 0
    self.average_real_throughput = None
    self.logged_once = False
    self.prompt_real_seq_lens = []
    self.decode_real_seq_lens = []

capture_decode_seq_stats

capture_decode_seq_stats(real_seq_lens)
Source code in vllm_gaudi/extension/profiler.py
def capture_decode_seq_stats(self, real_seq_lens):
    self.decode_real_seq_lens = real_seq_lens

capture_prompt_seq_stats

capture_prompt_seq_stats(real_seq_lens)
Source code in vllm_gaudi/extension/profiler.py
def capture_prompt_seq_stats(self, real_seq_lens):
    self.prompt_real_seq_lens.append(real_seq_lens)

get_counter_dict

get_counter_dict(
    cache_config,
    duration,
    seq_len,
    batch_size_padded,
    real_batch_size,
    prompt_batch_idx,
    is_prompt,
)
Source code in vllm_gaudi/extension/profiler.py
def get_counter_dict(self, cache_config, duration, seq_len, batch_size_padded, real_batch_size, prompt_batch_idx,
                     is_prompt):
    throughput = batch_size_padded / (duration / 1e6)
    throughput_effective = real_batch_size / (duration / 1e6)
    if is_prompt:
        real_max_seq_len = max(self.prompt_real_seq_lens[prompt_batch_idx])
        real_num_tokens = sum(self.prompt_real_seq_lens[prompt_batch_idx])
    else:
        real_max_seq_len = max(self.decode_real_seq_lens)
        real_num_tokens = sum(self.decode_real_seq_lens)
    padded_num_tokens = batch_size_padded * seq_len
    batch_token_utilization = real_num_tokens / padded_num_tokens
    if self.average_real_throughput is None:
        self.average_real_throughput = throughput_effective
    else:  # https://www.heikohoffmann.de/htmlthesis/node134.html
        self.average_real_throughput = self.average_real_throughput + 1 / (self.niter + 1) * (
            throughput_effective - self.average_real_throughput)
    phase = "prompt" if is_prompt else "decode"
    counters = {
        f'{phase}_bucket_batch_size': batch_size_padded,
        f'{phase}_batch_size': real_batch_size,
        f'{phase}_bucket_seq_len': seq_len,
        f'{phase}_seq_len': real_max_seq_len,
        f'{phase}_bucket_gen_throughput': throughput,
        f'{phase}_real_gen_throughput': throughput_effective,
        f'{phase}_batch_token_utilization': batch_token_utilization,
        'average_real_throughput': self.average_real_throughput,
        'engine_iteration': self.niter,
    }
    self.niter += 1
    if is_prompt:
        prompt_bucket_in_throughput = (seq_len * batch_size_padded) / (duration / 1e6)
        prompt_real_in_throughput = sum(self.prompt_real_seq_lens[prompt_batch_idx]) / (duration / 1e6)
        counters[f'{phase}_bucket_in_throughput'] = prompt_bucket_in_throughput
        counters[f'{phase}_real_in_throughput'] = prompt_real_in_throughput

    # KV cache might not be created yet (e.g. for profiling run)
    if cache_config.num_gpu_blocks is not None and \
        cache_config.num_gpu_blocks != 0:
        seq_lens = self.prompt_real_seq_lens[prompt_batch_idx] \
            if is_prompt \
            else self.decode_real_seq_lens
        cache_num_blocks_used = [math.ceil(sl / cache_config.block_size) for sl in seq_lens]
        cache_total_num_blocks_used = sum(cache_num_blocks_used)
        num_cache_blocks = cache_config.num_gpu_blocks
        cache_total_num_free_blocks = \
            num_cache_blocks - cache_total_num_blocks_used
        cache_computed_utilization = \
            cache_total_num_blocks_used / num_cache_blocks
        max_blocks_per_seq = math.ceil(seq_len / cache_config.block_size)
        batch_block_utilization = cache_total_num_blocks_used / (batch_size_padded * max_blocks_per_seq)
        counters['cache_num_blocks_used'] = cache_total_num_blocks_used
        counters['cache_num_free_blocks'] = cache_total_num_free_blocks
        counters['cache_computed_utilization'] = cache_computed_utilization
        counters[f'{phase}_batch_block_utilization'] = batch_block_utilization
    if not self.logged_once:
        counters['const_cache_num_blocks'] = cache_config.num_gpu_blocks
        counters[
            'const_gpu_memory_utilization'] = \
                cache_config.gpu_memory_utilization
        counters['const_block_size'] = cache_config.block_size
        self.logged_once = True

    return counters

reset_prompt_seq_stats

reset_prompt_seq_stats()
Source code in vllm_gaudi/extension/profiler.py
def reset_prompt_seq_stats(self):
    self.prompt_real_seq_lens = []

format_bytes

format_bytes(size)
Source code in vllm_gaudi/extension/profiler.py
def format_bytes(size):
    # 2**10 = 1024
    power = 2**10
    n = 0
    power_labels = {0: '', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
    while abs(size) > power:
        size /= power
        n += 1
    return f'{size:.4g} {power_labels[n]+"B"}'

setup_profiler

setup_profiler(warmup, active)
Source code in vllm_gaudi/extension/profiler.py
def setup_profiler(warmup, active):
    schedule = torch.profiler.schedule(wait=0, warmup=warmup, active=active, repeat=1)
    activities = [torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.HPU]
    profiler = torch.profiler.profile(schedule=schedule,
                                      activities=activities,
                                      on_trace_ready=torch.profiler.tensorboard_trace_handler('.', use_gzip=True),
                                      record_shapes=False,
                                      with_stack=True)
    return profiler