跳到内容

llmcompressor.core

提供 LLM Compressor 的核心压缩框架。

核心 API 管理压缩会话,跟踪状态更改,处理压缩过程中的事件,并为压缩过程提供生命周期钩子。

模块

  • events

    LLM Compressor 核心事件包

  • helpers

    核心压缩操作的辅助函数。

  • lifecycle

    用于管理 LLM Compressor 中压缩生命周期的模块。

  • model_layer

    用于 LLM 压缩工作流程的模型层实用类。

  • session

    用于 LLM 压缩工作流程的压缩会话管理。

  • session_functions

    用于 LLM 压缩工作流程的会话管理函数。

  • state

    用于管理 LLM Compressor 状态的模块。

  • CompressionLifecycle

    用于管理 LLM Compressor 中压缩事件生命周期的类。

  • CompressionSession

    一个用于压缩的会话,它保存了生命周期

  • Data

    一个 dataclass,用于保存训练、验证、

  • Event

    一个用于定义在稀疏化过程中可以触发的事件的类。

  • EventType

    一个枚举,用于定义可以触发的不同类型的事件

  • Hardware

    一个 dataclass,用于保存有关正在使用的硬件的信息。

  • LifecycleCallbacks

    一个用于调用活动会话的生命周期事件的类

  • ModelParameterizedLayer

    一个 dataclass,用于保存参数及其层

  • ModifiedState

    一个 dataclass,用于表示修改后的模型、优化器和损失。

  • State

    状态类保存有关当前压缩状态的信息。

函数

  • active_session

    :return: 用于稀疏化的活动会话

  • create_session

    用于创建并生成用于稀疏化的新会话的上下文管理器。

  • reset_session

    将当前活动会话重置为其初始状态

CompressionLifecycle dataclass

CompressionLifecycle(
    state: State = State(),
    recipe: Recipe = Recipe(),
    initialized_: bool = False,
    finalized: bool = False,
    _last_event_type: EventType
    | None = EventType.BATCH_END,
    _event_order: list[EventType] = (
        lambda: [
            EventType.BATCH_START,
            EventType.LOSS_CALCULATED,
            EventType.OPTIM_PRE_STEP,
            EventType.OPTIM_POST_STEP,
            EventType.BATCH_END,
        ]
    )(),
    global_step: int = 0,
)

用于管理 LLM Compressor 中压缩事件生命周期的类。

参数

  • state

    (State, default: State() ) –

    压缩过程的当前状态

  • recipe

    (Recipe, default: Recipe() ) –

    压缩配方

  • 修饰符

    (list[StageModifiers]) –

    阶段修改器列表

方法

  • event

    处理压缩事件。

  • finalize

    完成压缩生命周期。

  • initialize

    初始化压缩生命周期。

  • reset

    重置压缩生命周期,完成任何活动的修改器

event

event(
    event_type: EventType,
    global_step: int | None = 0,
    **kwargs,
) -> list[Any]

处理压缩事件。

参数

  • event_type

    (EventType) –

    要处理的事件类型

  • kwargs

    传递给事件处理程序的附加参数

返回

  • List[Any]

    从修改器处理事件返回的数据列表

引发

  • ValueError

    如果在初始化之前、完成之后或对于无效事件类型调用

源代码在 llmcompressor/core/lifecycle.py
def event(
    self, event_type: EventType, global_step: int | None = 0, **kwargs
) -> list[Any]:
    """
    Handle a compression event.

    :param event_type: The type of event to handle
    :type event_type: EventType
    :param kwargs: Additional arguments to pass to the event handlers
    :return: List of data returned from handling the event by modifiers
    :rtype: List[Any]
    :raises ValueError: If called before initialization, after finalization,
        or for an invalid event type
    """
    if not self.initialized_:
        logger.error("Cannot invoke event before initializing")
        raise ValueError("Cannot invoke event before initializing")

    if self.finalized:
        logger.error("Cannot invoke event after finalizing")
        raise ValueError("Cannot invoke event after finalizing")

    if event_type in [EventType.INITIALIZE, EventType.FINALIZE]:
        logger.error(
            "Cannot invoke {} event. Use the corresponding method instead.",
            event_type,
        )
        raise ValueError(
            f"Cannot invoke {event_type} event. "
            f"Use the corresponding method instead."
        )

    if not self._validate_event_order(event_type):
        raise ValueError(
            f"Lifecycle events must appear following order: {self._event_order}. "
            f"Instead, {self._last_event_type} was called before {event_type}"
        )

    if event_type == EventType.LOSS_CALCULATED and (
        "loss" not in kwargs or kwargs["loss"] is None
    ):
        logger.error("Loss must be provided for loss calculated event")
        raise ValueError("Loss must be provided for loss calculated event")

    logger.debug("Handling event: {}", event_type)

    # update global step
    if global_step is not None:
        self.global_step = global_step

    event = Event(type_=event_type)
    mod_data = []
    for mod in self.recipe.modifiers:
        data = mod.update_event(state=self.state, event=event, **kwargs)
        logger.debug("Updated event with modifier: {}", mod)
        if data is not None:
            mod_data.append(data)

    assert (
        event is not None
    ), f"Event lifecycle did not return an event for {event_type}"

    return mod_data

finalize

finalize(**kwargs) -> list[Any]

完成压缩生命周期。

参数

  • kwargs

    用于更新状态的附加参数

返回

  • List[Any]

    从完成修改器返回的数据列表

引发

  • ValueError

    如果在初始化之前或调用多次

源代码在 llmcompressor/core/lifecycle.py
def finalize(self, **kwargs) -> list[Any]:
    """
    Finalize the compression lifecycle.

    :param kwargs: Additional arguments to update the state with
    :return: List of data returned from finalizing modifiers
    :rtype: List[Any]
    :raises ValueError: If called before initialization or more than once
    """
    if not self.initialized_:
        logger.error("Cannot finalize before initializing")
        raise ValueError("Cannot finalize before initializing")

    if self.finalized:
        logger.error("Cannot finalize more than once")
        raise ValueError("Cannot finalize more than once")

    logger.debug("Finalizing compression lifecycle")
    mod_data = []
    for mod in self.recipe.modifiers:
        data = mod.finalize(state=self.state, **kwargs)
        logger.debug("Finalized modifier: {}", mod)
        if data is not None:
            mod_data.append(data)

    self.finalized = True

    logger.info(
        "Compression lifecycle finalized for {} modifiers",
        len(self.recipe.modifiers),
    )

    return mod_data

initialize

initialize(
    recipe: RecipeInput | None = None,
    recipe_stage: RecipeStageInput | None = None,
    recipe_args: RecipeArgsInput | None = None,
    **kwargs,
) -> list[Any]

初始化压缩生命周期。

参数

  • kwargs

    用于更新状态的附加参数

返回

  • List[Any]

    从初始化修改器返回的数据列表

源代码在 llmcompressor/core/lifecycle.py
def initialize(
    self,
    recipe: RecipeInput | None = None,
    recipe_stage: RecipeStageInput | None = None,
    recipe_args: RecipeArgsInput | None = None,
    **kwargs,
) -> list[Any]:
    """
    Initialize the compression lifecycle.

    :param kwargs: Additional arguments to update the state with
    :return: List of data returned from initialization of modifiers
    :rtype: List[Any]
    """

    self.state.update(**kwargs)
    if self.initialized_:  # TODO: do not initialize twice
        return

    logger.debug("Initializing compression lifecycle")
    if not recipe:
        self.recipe = Recipe()
    else:
        self.recipe = Recipe.create_instance(
            path_or_modifiers=recipe, target_stage=recipe_stage
        )
        if recipe_args:
            self.recipe.args = {**recipe_args}

    mod_data = []
    for mod in self.recipe.modifiers:
        data = mod.initialize(state=self.state, **kwargs)
        logger.debug("Initialized modifier: {}", mod)
        if data is not None:
            mod_data.append(data)

    self.initialized_ = True
    logger.info(
        "Compression lifecycle initialized for {} modifiers",
        len(self.recipe.modifiers),
    )

    return mod_data

reset

reset()

重置压缩生命周期,完成所有活动的修改器并将所有属性重置。

源代码在 llmcompressor/core/lifecycle.py
def reset(self):
    """
    Reset the compression lifecycle, finalizing any active modifiers
    and resetting all attributes.
    """
    logger.debug("Resetting compression lifecycle")

    for mod in self.recipe.modifiers:
        if not mod.initialized or mod.finalized:
            continue
        try:
            mod.finalize(self.state)
            logger.debug("Finalized modifier: {}", mod)
        except Exception as e:
            logger.warning(f"Exception during finalizing modifier: {e}")

    self.__init__()
    logger.info("Compression lifecycle reset")

CompressionSession

CompressionSession()

一个用于压缩的会话,它保存当前压缩会话的生命周期和状态

方法

  • event

    为当前 CompressionSession 调用一个事件。

  • finalize

    完成压缩会话。这将运行 finalize 方法

  • get_serialized_recipe

    :return: 当前已编译配方的序列化字符串

  • initialize

    初始化压缩会话。这将运行 initialize 方法

  • log

    记录当前事件类型的模型和损失信息

  • reset

    将会话重置为其初始状态

  • reset_stage

    重置会话以开始新阶段,配方和模型保持不变

属性

源代码在 llmcompressor/core/session.py
def __init__(self):
    self._lifecycle = CompressionLifecycle()

lifecycle property

lifecycle: CompressionLifecycle

Lifecycle 用于跟踪压缩过程中的进度以及哪些修改器处于活动状态。它还提供了在生命周期中调用事件的能力。

返回

state property

state: State

当前压缩会话的状态。State 实例用于存储压缩所需的所有信息,例如配方、模型优化器、数据等。

返回

  • State

    会话的当前状态

event

event(
    event_type: EventType,
    batch_data: Optional[Any] = None,
    loss: Optional[Any] = None,
    **kwargs,
) -> ModifiedState

为当前 CompressionSession 调用一个事件。

参数

  • event_type

    (EventType) –

    要调用的事件类型

  • batch_data

    (Optional[Any], 默认值: None ) –

    用于事件的批数据

  • 损失

    (Optional[Any], 默认值: None ) –

    用于事件的损失(如果存在)

  • kwargs

    传递给生命周期 event 方法的附加 kwargs

返回

源代码在 llmcompressor/core/session.py
def event(
    self,
    event_type: EventType,
    batch_data: Optional[Any] = None,
    loss: Optional[Any] = None,
    **kwargs,
) -> ModifiedState:
    """
    Invoke an event for current CompressionSession.

    :param event_type: the event type to invoke
    :param batch_data: the batch data to use for the event
    :param loss: the loss to use for the event if any
    :param kwargs: additional kwargs to pass to the lifecycle's event method
    :return: the modified state of the session after invoking the event
    """
    mod_data = self._lifecycle.event(
        event_type=event_type, batch_data=batch_data, loss=loss, **kwargs
    )
    return ModifiedState(
        model=self.state.model,
        optimizer=self.state.optimizer,
        loss=self.state.loss,  # TODO: is this supposed to be a different type?
        modifier_data=mod_data,
    )

finalize

finalize(**kwargs) -> ModifiedState

完成压缩会话。这将为会话生命周期中的每个修改器运行 finalize 方法。这还将把会话的状态设置为已完成状态。

参数

  • kwargs

    传递给生命周期 finalize 方法的附加 kwargs

返回

源代码在 llmcompressor/core/session.py
def finalize(self, **kwargs) -> ModifiedState:
    """
    Finalize the session for compression. This will run the finalize method
    for each modifier in the session's lifecycle. This will also set the session's
    state to the finalized state.

    :param kwargs: additional kwargs to pass to the lifecycle's finalize method
    :return: the modified state of the session after finalizing
    """
    mod_data = self._lifecycle.finalize(**kwargs)

    return ModifiedState(
        model=self.state.model,
        optimizer=self.state.optimizer,
        loss=self.state.loss,
        modifier_data=mod_data,
    )

get_serialized_recipe

get_serialized_recipe() -> Optional[str]

返回

  • Optional[str]

    当前已编译配方的序列化字符串

源代码在 llmcompressor/core/session.py
def get_serialized_recipe(self) -> Optional[str]:
    """
    :return: serialized string of the current compiled recipe
    """
    recipe = self.lifecycle.recipe

    if recipe is not None and hasattr(recipe, "yaml"):
        return recipe.yaml()

    logger.warning("Recipe not found in session - it may have been reset")

initialize

initialize(
    recipe: Union[
        str, List[str], Recipe, List[Recipe], None
    ] = None,
    recipe_stage: Union[str, List[str], None] = None,
    recipe_args: Union[Dict[str, Any], None] = None,
    model: Optional[Any] = None,
    teacher_model: Optional[Any] = None,
    optimizer: Optional[Any] = None,
    attach_optim_callbacks: bool = True,
    train_data: Optional[Any] = None,
    val_data: Optional[Any] = None,
    test_data: Optional[Any] = None,
    calib_data: Optional[Any] = None,
    copy_data: bool = True,
    start: Optional[float] = None,
    steps_per_epoch: Optional[int] = None,
    batches_per_step: Optional[int] = None,
    loggers: Union[
        None, LoggerManager, List[BaseLogger]
    ] = None,
    **kwargs,
) -> ModifiedState

初始化压缩会话。这将为会话生命周期中的每个修改器运行 initialize 方法。这还将把会话的状态设置为已初始化状态。

参数

  • recipe

    (Union[str, List[str], Recipe, List[Recipe], None], default: None ) –

    用于压缩的配方,可以是配方文件的路径、原始配方字符串、配方对象或配方对象列表。

  • recipe_stage

    (Union[str, List[str], None], default: None ) –

    用于压缩的目标阶段

  • recipe_args

    (Union[Dict[str, Any], None], default: None ) –

    用于覆盖配方默认值的 args

  • model

    (Optional[Any], 默认值: None ) –

    要压缩的模型

  • teacher_model

    (Optional[Any], 默认值: None ) –

    用于知识蒸馏的教师模型

  • 优化器

    (Optional[Any], 默认值: None ) –

    用于压缩的优化器

  • attach_optim_callbacks

    (bool, 默认值: True ) –

    为压缩生命周期附加优化器回调,否则为 False

  • train_data

    (Optional[Any], 默认值: None ) –

    用于压缩的训练数据

  • val_data

    (Optional[Any], 默认值: None ) –

    用于压缩的验证数据

  • test_data

    (Optional[Any], 默认值: None ) –

    用于压缩的测试数据

  • calib_data

    (Optional[Any], 默认值: None ) –

    用于压缩的校准数据

  • copy_data

    (bool, 默认值: True ) –

    True 表示复制数据,False 表示不复制

  • start

    (Optional[float], 默认值: None ) –

    用于压缩的起始 epoch

  • steps_per_epoch

    (Optional[int], 默认值: None ) –

    用于压缩的每个 epoch 的步数

  • batches_per_step

    (Optional[int], 默认值: None ) –

    用于压缩的每个步数的批次数

  • loggers

    (Union[None, LoggerManager, List[BaseLogger]], default: None ) –

    要用于记录重要信息和里程碑的指标管理器,也接受 BaseLogger 列表。

  • kwargs

    传递给生命周期 initialize 方法的附加 kwargs

返回

源代码在 llmcompressor/core/session.py
def initialize(
    self,
    recipe: Union[str, List[str], "Recipe", List["Recipe"], None] = None,
    recipe_stage: Union[str, List[str], None] = None,
    recipe_args: Union[Dict[str, Any], None] = None,
    model: Optional[Any] = None,
    teacher_model: Optional[Any] = None,
    optimizer: Optional[Any] = None,
    attach_optim_callbacks: bool = True,
    train_data: Optional[Any] = None,
    val_data: Optional[Any] = None,
    test_data: Optional[Any] = None,
    calib_data: Optional[Any] = None,
    copy_data: bool = True,
    start: Optional[float] = None,
    steps_per_epoch: Optional[int] = None,
    batches_per_step: Optional[int] = None,
    loggers: Union[None, LoggerManager, List[BaseLogger]] = None,
    **kwargs,
) -> ModifiedState:
    """
    Initialize the session for compression. This will run the initialize method
    for each modifier in the session's lifecycle. This will also set the session's
    state to the initialized state.

    :param recipe: the recipe to use for the compression, can be a path to a
        recipe file, a raw recipe string, a recipe object, or a list
        of recipe objects.
    :param recipe_stage: the stage to target for the compression
    :param recipe_args: the args to use for overriding the recipe defaults
    :param model: the model to compress
    :param teacher_model: the teacher model to use for knowledge distillation
    :param optimizer: the optimizer to use for the compression
    :param attach_optim_callbacks: True to attach the optimizer callbacks to the
        compression lifecycle, False otherwise
    :param train_data: the training data to use for the compression
    :param val_data: the validation data to use for the compression
    :param test_data: the testing data to use for the compression
    :param calib_data: the calibration data to use for the compression
    :param copy_data: True to copy the data, False otherwise
    :param start: the start epoch to use for the compression
    :param steps_per_epoch: the number of steps per epoch to use for the
        compression
    :param batches_per_step: the number of batches per step to use for
        compression
    :param loggers: the metrics manager to setup logging important info
        and milestones to, also accepts a list of BaseLogger(s)
    :param kwargs: additional kwargs to pass to the lifecycle's initialize method
    :return: the modified state of the session after initializing
    """
    mod_data = self._lifecycle.initialize(
        recipe=recipe,
        recipe_stage=recipe_stage,
        recipe_args=recipe_args,
        model=model,
        teacher_model=teacher_model,
        optimizer=optimizer,
        attach_optim_callbacks=attach_optim_callbacks,
        train_data=train_data,
        val_data=val_data,
        test_data=test_data,
        calib_data=calib_data,
        copy_data=copy_data,
        start=start,
        steps_per_epoch=steps_per_epoch,
        batches_per_step=batches_per_step,
        loggers=loggers,
        **kwargs,
    )

    return ModifiedState(
        model=self.state.model,
        optimizer=self.state.optimizer,
        loss=self.state.loss,
        modifier_data=mod_data,
    )

log

log(event_type: EventType, loss: Optional[Any] = None)

记录当前事件类型的模型和损失信息

参数

  • event_type

    (EventType) –

    要记录的事件类型

  • 损失

    (Optional[Any], 默认值: None ) –

    要记录的损失(如果存在)

源代码在 llmcompressor/core/session.py
def log(self, event_type: EventType, loss: Optional[Any] = None):
    """
    Log model and loss information for the current event type

    :param event_type: the event type to log for
    :param loss: the loss to log if any
    """
    self._log_model_info()
    self._log_loss(event_type=event_type, loss=loss)

reset

reset()

将会话重置为其初始状态

源代码在 llmcompressor/core/session.py
def reset(self):
    """
    Reset the session to its initial state
    """
    self._lifecycle.reset()

reset_stage

reset_stage()

重置会话以开始新阶段,配方和模型保持不变

源代码在 llmcompressor/core/session.py
def reset_stage(self):
    """
    Reset the session for starting a new stage, recipe and model stays intact
    """
    self.lifecycle.initialized_ = False
    self.lifecycle.finalized = False

Data dataclass

Data(
    train: Optional[Any] = None,
    val: Optional[Any] = None,
    test: Optional[Any] = None,
    calib: Optional[Any] = None,
)

一个 dataclass,用于保存训练、验证、测试和/或校准的不同数据集。每个数据集都是一个 ModifiableData 实例。

参数

  • train

    (Optional[Any], 默认值: None ) –

    训练数据集

  • 验证

    (Optional[Any], 默认值: None ) –

    验证数据集

  • test

    (Optional[Any], 默认值: None ) –

    测试数据集

  • calib

    (Optional[Any], 默认值: None ) –

    校准数据集

Event dataclass

Event(
    type_: Optional[EventType] = None,
    steps_per_epoch: Optional[int] = None,
    batches_per_step: Optional[int] = None,
    invocations_per_step: int = 1,
    global_step: int = 0,
    global_batch: int = 0,
)

一个用于定义在稀疏化过程中可以触发的事件的类。

参数

  • 类型_

    (Optional[EventType], default: None ) –

    事件类型。

  • steps_per_epoch

    (Optional[int], 默认值: None ) –

    每个 epoch 的步数。

  • batches_per_step

    (Optional[int], 默认值: None ) –

    每个步骤的批次数量,其中步骤是优化器步骤调用。对于大多数路径,它们是相同的。当它们不同时,请参阅 invocations_per_step 参数以获取更多详细信息。

  • invocations_per_step

    (int, 默认值: 1 ) –

    调用步骤包装器之前 optimizer.step 被调用的次数。通常可以保留为 1(默认值)。对于较旧的 amp 路径,这是在调用包装的优化器步骤函数以处理 fp16 中的累积之前调用缩放器包装器的次数。

  • global_step

    (int, 默认值: 0 ) –

    当前全局步骤。

  • global_batch

    (int, 默认值: 0 ) –

    当前全局批次。

方法

  • new_instance

    使用提供的关键字参数创建事件的新实例。

  • should_update

    确定事件是否应触发更新。

属性

  • current_index (float) –

    计算事件的当前索引。

  • epoch (int) –

    计算当前 epoch。

  • epoch_based (bool) –

    确定事件是否基于 epoch。

  • epoch_batch (int) –

    计算当前 epoch 中的当前批次。

  • epoch_full (float) –

    计算当前 epoch 和当前步骤的比例。

  • epoch_step (int) –

    计算当前 epoch 中的当前步骤。

current_index property writable

current_index: float

计算事件的当前索引。

返回

  • float

    事件的当前索引,可以是全局步数,也可以是当前步数占 epoch 的比例。

引发

  • ValueError

    如果事件不是基于 epoch 的,或者每个 epoch 的步数太多。

epoch property

epoch: int

计算当前 epoch。

返回

  • int

    当前 epoch。

引发

  • ValueError

    如果事件不是基于 epoch 的。

epoch_based property

epoch_based: bool

确定事件是否基于 epoch。

返回

  • bool

    如果事件基于 epoch,则为 True,否则为 False。

epoch_batch property

epoch_batch: int

计算当前 epoch 中的当前批次。

返回

  • int

    当前 epoch 中的当前批次。

引发

  • ValueError

    如果事件不是基于 epoch 的。

epoch_full property

epoch_full: float

计算当前 epoch 和当前步骤的比例。

返回

  • float

    当前 epoch 和当前步骤的比例。

引发

  • ValueError

    如果事件不是基于 epoch 的。

epoch_step property

epoch_step: int

计算当前 epoch 中的当前步骤。

返回

  • int

    当前 epoch 中的当前步骤。

引发

  • ValueError

    如果事件不是基于 epoch 的。

new_instance

new_instance(**kwargs) -> Event

使用提供的关键字参数创建事件的新实例。

参数

  • kwargs

    要在新实例中设置的关键字参数。

返回

  • Event

    具有提供的 kwargs 的事件新实例。

源代码位于 llmcompressor/core/events/event.py
def new_instance(self, **kwargs) -> "Event":
    """
    Creates a new instance of the event with the provided keyword arguments.

    :param kwargs: Keyword arguments to set in the new instance.
    :return: A new instance of the event with the provided kwargs.
    :rtype: Event
    """
    logger.debug("Creating new instance of event with kwargs: {}", kwargs)
    instance = deepcopy(self)
    for key, value in kwargs.items():
        setattr(instance, key, value)
    return instance

should_update

should_update(
    start: Optional[float],
    end: Optional[float],
    update: Optional[float],
) -> bool

确定事件是否应触发更新。

参数

  • start

    (Optional[float]) –

    要检查的起始索引,设置为 None 则忽略起始。

  • 结束

    (Optional[float]) –

    要检查的结束索引,设置为 None 则忽略结束。

  • 更新

    (Optional[float]) –

    更新间隔,设置为 None 或 0.0 则始终更新,否则必须大于 0.0,默认为 None。

返回

  • bool

    如果事件应触发更新,则为 True,否则为 False。

源代码位于 llmcompressor/core/events/event.py
def should_update(
    self, start: Optional[float], end: Optional[float], update: Optional[float]
) -> bool:
    """
    Determines if the event should trigger an update.

    :param start: The start index to check against, set to None to ignore start.
    :type start: Optional[float]
    :param end: The end index to check against, set to None to ignore end.
    :type end: Optional[float]
    :param update: The update interval, set to None or 0.0 to always update,
        otherwise must be greater than 0.0, defaults to None.
    :type update: Optional[float]
    :return: True if the event should trigger an update, False otherwise.
    :rtype: bool
    """
    current = self.current_index
    logger.debug(
        "Checking if event should update: "
        "current_index={}, start={}, end={}, update={}",
        current,
        start,
        end,
        update,
    )
    if start is not None and current < start:
        return False
    if end is not None and current > end:
        return False
    return update is None or update <= 0.0 or current % update < 1e-10

EventType

基类:Enum

一个枚举,用于定义在模型压缩生命周期中可以触发的不同类型的事件。每个 EventType 的目的是在训练或训练后管道中触发相应的修饰符回调。

参数

  • 初始化

    初始化事件类型。

  • 完成

    完成事件类型。

  • BATCH_START

    批次开始事件类型。

  • 损失计算

    损失计算事件类型。

  • BATCH_END

    批次结束事件类型。

  • CALIBRATION_EPOCH_START

    校准 epoch 开始事件类型。

  • SEQUENTIAL_EPOCH_END

    层校准 epoch 结束事件类型,专门用于 src/llmcompressor/pipelines/sequential/pipeline.py

  • CALIBRATION_EPOCH_END

    校准 epoch 结束事件类型。

  • OPTIM_PRE_STEP

    优化前步骤事件类型。

  • OPTIM_POST_STEP

    优化后步骤事件类型。

Hardware dataclass

Hardware(
    device: Optional[str] = None,
    devices: Optional[List[str]] = None,
    rank: Optional[int] = None,
    world_size: Optional[int] = None,
    local_rank: Optional[int] = None,
    local_world_size: Optional[int] = None,
    distributed: Optional[bool] = None,
    distributed_strategy: Optional[str] = None,
)

一个 dataclass,用于保存有关正在使用的硬件的信息。

参数

  • device

    (Optional[str], 默认值: None ) –

    当前用于训练的设备

  • devices

    (Optional[List[str]], 默认值: None ) –

    要用于训练的所有设备列表

  • 排名

    (Optional[int], 默认值: None ) –

    当前设备的 rank

  • world_size

    (Optional[int], 默认值: None ) –

    正在使用的设备总数

  • local_rank

    (Optional[int], 默认值: None ) –

    当前设备的 local rank

  • local_world_size

    (Optional[int], 默认值: None ) –

    本地机器上使用的设备总数

  • distributed

    (Optional[bool], default: None ) –

    是否正在使用分布式训练

  • distributed_strategy

    (Optional[str], 默认值: None ) –

    正在使用的分布式策略

LifecycleCallbacks

一个用于调用活动会话的生命周期事件的类

方法

batch_end classmethod

batch_end(**kwargs) -> ModifiedState

为活动会话调用 batch end 事件

参数

  • kwargs

    传递给当前会话的 event 方法的附加 kwargs

返回

  • ModifiedState

    调用事件后活动会话的修改后状态

源代码在 llmcompressor/core/session_functions.py
@classmethod
def batch_end(cls, **kwargs) -> ModifiedState:
    """
    Invoke a batch end event for the active session

    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    active_session()._log_model_info()
    return cls.event(EventType.BATCH_END, **kwargs)

batch_start classmethod

batch_start(
    batch_data: Optional[Any] = None, **kwargs
) -> ModifiedState

为活动会话调用 batch start 事件

参数

  • batch_data

    (Optional[Any], 默认值: None ) –

    用于事件的批数据

  • kwargs

    传递给当前会话的 event 方法的附加 kwargs

返回

  • ModifiedState

    调用事件后活动会话的修改后状态

源代码在 llmcompressor/core/session_functions.py
@classmethod
def batch_start(cls, batch_data: Optional[Any] = None, **kwargs) -> ModifiedState:
    """
    Invoke a batch start event for the active session

    :param batch_data: the batch data to use for the event
    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    return cls.event(EventType.BATCH_START, batch_data=batch_data, **kwargs)

calibration_epoch_end classmethod

calibration_epoch_end(**kwargs) -> ModifiedState

在校准期间为活动会话调用 epoch end 事件。此事件应在模型校准一个 epoch 后调用

有关用法示例,请参阅 src/llmcompressor/pipelines/basic/pipeline.py

源代码在 llmcompressor/core/session_functions.py
@classmethod
def calibration_epoch_end(cls, **kwargs) -> ModifiedState:
    """
    Invoke a epoch end event for the active session during calibration. This event
    should be called after the model has been calibrated for one epoch

    see `src/llmcompressor/pipelines/basic/pipeline.py` for usage example
    """
    return cls.event(EventType.CALIBRATION_EPOCH_END, **kwargs)

calibration_epoch_start classmethod

calibration_epoch_start(**kwargs) -> ModifiedState

在校准期间为活动会话调用 epoch start 事件。此事件应在校准一个 epoch 开始之前调用

有关用法示例,请参阅 src/llmcompressor/pipelines/basic/pipeline.py

源代码在 llmcompressor/core/session_functions.py
@classmethod
def calibration_epoch_start(cls, **kwargs) -> ModifiedState:
    """
    Invoke a epoch start event for the active session during calibration. This event
    should be called before calibration starts for one epoch

    see `src/llmcompressor/pipelines/basic/pipeline.py` for usage example
    """
    return cls.event(EventType.CALIBRATION_EPOCH_START, **kwargs)

event classmethod

event(event_type: EventType, **kwargs) -> ModifiedState

为活动会话调用一个事件

参数

  • event_type

    (EventType) –

    要调用的事件类型

  • kwargs

    传递给当前会话的 event 方法的附加 kwargs

返回

  • ModifiedState

    调用事件后活动会话的修改后状态

源代码在 llmcompressor/core/session_functions.py
@classmethod
def event(cls, event_type: EventType, **kwargs) -> ModifiedState:
    """
    Invoke an event for the active session

    :param event_type: the event type to invoke
    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    if event_type in [EventType.INITIALIZE, EventType.FINALIZE]:
        raise ValueError(
            f"Cannot invoke {event_type} event. "
            f"Use the corresponding method instead."
        )

    return active_session().event(event_type, **kwargs)

loss_calculated classmethod

loss_calculated(
    loss: Optional[Any] = None, **kwargs
) -> ModifiedState

为活动会话调用 loss calculated 事件

参数

  • 损失

    (Optional[Any], 默认值: None ) –

    用于事件的损失

  • kwargs

    传递给当前会话的 event 方法的附加 kwargs

返回

  • ModifiedState

    调用事件后活动会话的修改后状态

源代码在 llmcompressor/core/session_functions.py
@classmethod
def loss_calculated(cls, loss: Optional[Any] = None, **kwargs) -> ModifiedState:
    """
    Invoke a loss calculated event for the active session

    :param loss: the loss to use for the event
    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    # log loss if loss calculated
    active_session()._log_loss(event_type=EventType.LOSS_CALCULATED, loss=loss)
    return cls.event(EventType.LOSS_CALCULATED, loss=loss, **kwargs)

optim_post_step classmethod

optim_post_step(**kwargs) -> ModifiedState

为活动会话调用 optimizer post-step 事件

参数

  • kwargs

    传递给当前会话的 event 方法的附加 kwargs

返回

  • ModifiedState

    调用事件后活动会话的修改后状态

源代码在 llmcompressor/core/session_functions.py
@classmethod
def optim_post_step(cls, **kwargs) -> ModifiedState:
    """
    Invoke an optimizer post-step event for the active session

    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    return cls.event(EventType.OPTIM_POST_STEP, **kwargs)

optim_pre_step classmethod

optim_pre_step(**kwargs) -> ModifiedState

为活动会话调用 optimizer pre-step 事件

参数

  • kwargs

    传递给当前会话的 event 方法的附加 kwargs

返回

  • ModifiedState

    调用事件后活动会话的修改后状态

源代码在 llmcompressor/core/session_functions.py
@classmethod
def optim_pre_step(cls, **kwargs) -> ModifiedState:
    """
    Invoke an optimizer pre-step event for the active session

    :param kwargs: additional kwargs to pass to the current session's event method
    :return: the modified state of the active session after invoking the event
    """
    return cls.event(EventType.OPTIM_PRE_STEP, **kwargs)

sequential_epoch_end classmethod

sequential_epoch_end(
    subgraph: Subgraph, **kwargs
) -> ModifiedState

为活动会话调用 sequential epoch end 事件。此事件应在校准/训练完一个顺序层一个 epoch 后调用

在用一个批次校准顺序层后调用此函数,有关用法示例,请参阅 src/llmcompressor/pipelines/sequential/pipeline.py

源代码在 llmcompressor/core/session_functions.py
@classmethod
def sequential_epoch_end(cls, subgraph: "Subgraph", **kwargs) -> ModifiedState:
    """
    Invoke a sequential epoch end event for the active session. This event should be
    called after one sequential layer has been calibrated/trained for one epoch

    This is called after a sequential layer has been calibrated with one batch, see
    `src/llmcompressor/pipelines/sequential/pipeline.py` for usage example
    """
    return cls.event(EventType.SEQUENTIAL_EPOCH_END, subgraph=subgraph, **kwargs)

ModelParameterizedLayer dataclass

ModelParameterizedLayer(
    layer_name: str, layer: Any, param_name: str, param: Any
)

一个 dataclass,用于保存参数及其层

参数

  • layer_name

    (str) –

    层的名称

  • layer

    (Any) –

    层对象

  • param_name

    (str) –

    参数的名称

  • param

    (Any) –

    参数对象

ModifiedState dataclass

ModifiedState(model, optimizer, loss, modifier_data)

一个 dataclass,用于表示修改后的模型、优化器和损失。

参数

  • model

    (Optional[Any]) –

    修改后的模型

  • 优化器

    (Optional[Any]) –

    修改后的优化器

  • 损失

    (Optional[Any]) –

    修改后的损失

  • modifier_data

    (Optional[List[Dict[str, Any]]]) –

    用于修改模型、优化器和损失的修改器数据

使用给定参数初始化 ModifiedState。

参数

  • model

    (Any) –

    修改后的模型

  • 优化器

    (Any) –

    修改后的优化器

  • 损失

    (Any) –

    修改后的损失

  • modifier_data

    (List[Dict[str, Any]]) –

    用于修改模型、优化器和损失的修改器数据

源代码位于 llmcompressor/core/state.py
def __init__(self, model, optimizer, loss, modifier_data):
    """
    Initialize the ModifiedState with the given parameters.

    :param model: The modified model
    :type model: Any
    :param optimizer: The modified optimizer
    :type optimizer: Any
    :param loss: The modified loss
    :type loss: Any
    :param modifier_data: The modifier data used to modify the model, optimizer,
        and loss
    :type modifier_data: List[Dict[str, Any]]
    """
    self.model = model
    self.optimizer = optimizer
    self.loss = loss
    self.modifier_data = modifier_data

State dataclass

State(
    model: Any = None,
    teacher_model: Any = None,
    optimizer: Any = None,
    optim_wrapped: bool = None,
    loss: Any = None,
    batch_data: Any = None,
    data: Data = Data(),
    hardware: Hardware = Hardware(),
    loggers: Optional[LoggerManager] = None,
    model_log_cadence: Optional[float] = None,
    _last_log_step: Union[float, int, None] = None,
)

状态类保存有关当前压缩状态的信息。

参数

  • model

    (Any, 默认值: None ) –

    用于压缩的模型

  • teacher_model

    (Any, 默认值: None ) –

    用于压缩的教师模型

  • 优化器

    (Any, 默认值: None ) –

    用于训练的优化器

  • optim_wrapped

    (bool, 默认值: None ) –

    优化器是否已被包装

  • 损失

    (Any, 默认值: None ) –

    用于训练的损失函数

  • batch_data

    (Any, 默认值: None ) –

    当前用于压缩的批次数据

  • data

    (Data, default: Data() ) –

    用于训练、验证、测试和/或校准的数据集,包装在 Data 实例中

  • 硬件

    (Hardware, default: Hardware() ) –

    硬件实例,包含有关正在使用的目标硬件的信息

  • loggers

    (Optional[LoggerManager], default: None ) –

    LoggerManager 实例,包含所有要记录的日志记录器

  • model_log_cadence

    (Optional[float], 默认值: None ) –

    记录模型信息相对于 epoch 的节奏。如果为 1,则每个 epoch 记录一次。如果为 2,则每隔一个 epoch 记录一次,依此类推。默认值为 1。

方法

  • update

    使用给定参数更新状态。

属性

  • compression_ready (bool) –

    检查模型和优化器是否已设置为进行压缩。

compression_ready property

compression_ready: bool

检查模型和优化器是否已设置为进行压缩。

返回

  • bool

    如果模型和优化器已设置,则为 True,否则为 False

update

update(
    model: Any = None,
    teacher_model: Any = None,
    optimizer: Any = None,
    attach_optim_callbacks: bool = True,
    train_data: Any = None,
    val_data: Any = None,
    test_data: Any = None,
    calib_data: Any = None,
    copy_data: bool = True,
    start: float = None,
    steps_per_epoch: int = None,
    batches_per_step: int = None,
    loggers: Union[
        None, LoggerManager, List[BaseLogger]
    ] = None,
    model_log_cadence: Optional[float] = None,
    **kwargs,
) -> Dict

使用给定参数更新状态。

参数

  • model

    (Any, 默认值: None ) –

    用于更新状态的模型

  • teacher_model

    (Any, 默认值: None ) –

    用于更新状态的教师模型

  • 优化器

    (Any, 默认值: None ) –

    用于更新状态的优化器

  • attach_optim_callbacks

    (bool, 默认值: True ) –

    是否附加优化器回调

  • train_data

    (Any, 默认值: None ) –

    用于更新状态的训练数据

  • val_data

    (Any, 默认值: None ) –

    用于更新状态的验证数据

  • test_data

    (Any, 默认值: None ) –

    用于更新状态的测试数据

  • calib_data

    (Any, 默认值: None ) –

    用于更新状态的校准数据

  • copy_data

    (bool, 默认值: True ) –

    是否复制数据

  • start

    (float, 默认值: None ) –

    用于更新状态的起始索引

  • steps_per_epoch

    (int, 默认值: None ) –

    用于更新状态的每个 epoch 的步数

  • batches_per_step

    (int, 默认值: None ) –

    用于更新状态的每个步骤的批次数量

  • loggers

    (Union[None, LoggerManager, List[BaseLogger]], default: None ) –

    用于设置记录重要信息和里程碑的指标管理器,也接受 BaseLogger 列表

  • model_log_cadence

    (Optional[float], 默认值: None ) –

    记录模型信息相对于 epoch 的节奏。如果为 1,则每个 epoch 记录一次。如果为 2,则每隔一个 epoch 记录一次,依此类推。默认值为 1。

  • kwargs

    用于更新状态的额外关键字参数

返回

  • Dict

    更新后的状态字典

源代码位于 llmcompressor/core/state.py
def update(
    self,
    model: Any = None,
    teacher_model: Any = None,
    optimizer: Any = None,
    attach_optim_callbacks: bool = True,
    train_data: Any = None,
    val_data: Any = None,
    test_data: Any = None,
    calib_data: Any = None,
    copy_data: bool = True,
    start: float = None,
    steps_per_epoch: int = None,
    batches_per_step: int = None,
    loggers: Union[None, LoggerManager, List[BaseLogger]] = None,
    model_log_cadence: Optional[float] = None,
    **kwargs,
) -> Dict:
    """
    Update the state with the given parameters.

    :param model: The model to update the state with
    :type model: Any
    :param teacher_model: The teacher model to update the state with
    :type teacher_model: Any
    :param optimizer: The optimizer to update the state with
    :type optimizer: Any
    :param attach_optim_callbacks: Whether or not to attach optimizer callbacks
    :type attach_optim_callbacks: bool
    :param train_data: The training data to update the state with
    :type train_data: Any
    :param val_data: The validation data to update the state with
    :type val_data: Any
    :param test_data: The testing data to update the state with
    :type test_data: Any
    :param calib_data: The calibration data to update the state with
    :type calib_data: Any
    :param copy_data: Whether or not to copy the data
    :type copy_data: bool
    :param start: The start index to update the state with
    :type start: float
    :param steps_per_epoch: The steps per epoch to update the state with
    :type steps_per_epoch: int
    :param batches_per_step: The batches per step to update the state with
    :type batches_per_step: int
    :param loggers: The metrics manager to setup logging important info and
        milestones to, also accepts a list of BaseLogger(s)
    :type loggers: Union[None, LoggerManager, List[BaseLogger]]
    :param model_log_cadence: The cadence to log model information w.r.t epochs.
        If 1, logs every epoch. If 2, logs every other epoch, etc. Default is 1.
    :type model_log_cadence: Optional[float]
    :param kwargs: Additional keyword arguments to update the state with
    :return: The updated state as a dictionary
    :rtype: Dict
    """
    logger.debug(
        "Updating state with provided parameters: {}",
        {
            "model": model,
            "teacher_model": teacher_model,
            "optimizer": optimizer,
            "attach_optim_callbacks": attach_optim_callbacks,
            "train_data": train_data,
            "val_data": val_data,
            "test_data": test_data,
            "calib_data": calib_data,
            "copy_data": copy_data,
            "start": start,
            "steps_per_epoch": steps_per_epoch,
            "batches_per_step": batches_per_step,
            "loggers": loggers,
            "model_log_cadence": model_log_cadence,
            "kwargs": kwargs,
        },
    )

    if model is not None:
        self.model = model
    if teacher_model is not None:
        self.teacher_model = teacher_model
    if optimizer is not None:
        self.optim_wrapped = attach_optim_callbacks
        self.optimizer = optimizer
    if train_data is not None:
        self.data.train = train_data if not copy_data else deepcopy(train_data)
    if val_data is not None:
        self.data.val = val_data if not copy_data else deepcopy(val_data)
    if test_data is not None:
        self.data.test = test_data if not copy_data else deepcopy(test_data)
    if calib_data is not None:
        self.data.calib = calib_data if not copy_data else deepcopy(calib_data)

    if "device" in kwargs:
        self.hardware.device = kwargs["device"]

    loggers = loggers or []
    if isinstance(loggers, list):
        loggers = LoggerManager(loggers)
    self.loggers = loggers

    if model_log_cadence is not None:
        self.model_log_cadence = model_log_cadence

    return kwargs

active_session

active_session() -> CompressionSession

返回

源代码在 llmcompressor/core/session_functions.py
def active_session() -> CompressionSession:
    """
    :return: the active session for sparsification
    """
    global _local_storage
    return getattr(_local_storage, "session", _global_session)

create_session

create_session() -> (
    Generator[CompressionSession, None, None]
)

用于创建并生成用于稀疏化的新会话的上下文管理器。这将在上下文期间将活动会话设置为新会话。

返回

源代码在 llmcompressor/core/session_functions.py
@contextmanager
def create_session() -> Generator[CompressionSession, None, None]:
    """
    Context manager to create and yield a new session for sparsification.
    This will set the active session to the new session for the duration
    of the context.

    :return: the new session
    """
    global _local_storage
    orig_session = getattr(_local_storage, "session", None)
    new_session = CompressionSession()
    _local_storage.session = new_session
    try:
        yield new_session
    finally:
        _local_storage.session = orig_session

reset_session

reset_session()

将当前活动会话重置为其初始状态

源代码在 llmcompressor/core/session_functions.py
def reset_session():
    """
    Reset the currently active session to its initial state
    """
    session = active_session()
    session._lifecycle.reset()