跳到内容

llmcompressor.pipelines

用于编排不同压缩策略的压缩流水线。

提供各种压缩流水线,包括基础、顺序、独立、层顺序和无数据方法。每个流水线协调不同的压缩技术和工作流程,以根据特定需求和约束进行最佳模型优化。

模块

函数

BasicPipeline

CalibrationPipeline

Bases: ABC, RegistryMixin

方法

from_modifiers classmethod

from_modifiers(
    modifiers: List[Modifier], user: Optional[str] = None
) -> CalibrationPipeline

根据可用的修饰符和任何用户规范推断要使用的校准流水线

参数

  • 修饰符

    (List[Modifier]) –

    要应用于模型的修饰符

  • user

    (Optional[str], 默认值: None ) –

    用户传入的流水线名称

返回

  • CalibrationPipeline

    要与数据一起调用的 CalibrationPipeline 实例(如果不是无数据)

源文件在 llmcompressor/pipelines/registry.py
@classmethod
def from_modifiers(
    cls, modifiers: List[Modifier], user: Optional[str] = None
) -> "CalibrationPipeline":
    """
    Infer which calibration pipeline to use based on the available modifiers and
    any user specifications

    :param modifiers: modifiers to apply to model
    :param user: pipeline name passed by user
    :return: CalibrationPipeline instance to be called with data (if not datafree)
    """
    user = standardize_lookup_name(user) if user else None
    inferred = standardize_lookup_name(cls._infer_pipeline(modifiers))
    independent = standardize_lookup_name("independent")

    if user == independent:
        inferred = independent

    if user is not None and user != inferred:
        logger.warning(
            f"Calibration pipeline is set to `{user}`, but it is recommended to "
            f"use `{inferred}`"
        )

    pipeline = user or inferred
    return cls.load_from_registry(pipeline)

DataFreePipeline

IndependentPipeline

SequentialPipeline

Subgraph dataclass

Subgraph(
    graph: Graph,
    input_names: Set[str],
    consumed_names: Set[str],
    _code: Optional[PythonCode] = None,
)

指定模型图的可执行子图的数据类

参数

  • graph

    (Graph) –

    模型图的子图

  • input_names

    (Set[str]) –

    编译后的 forward 函数的参数名称

  • consumed_names

    (Set[str]) –

    任何后续子图未使用的参数名称,因此可以从中间缓存中删除

方法

  • forward

    执行子图中的操作

forward

forward(*args, **kwargs) -> Dict[str, Any]

执行子图中的操作

参数

  • \*args

    子图 forward 函数的参数输入

  • \**kwargs

    子图 forward 函数的关键字输入

返回

  • Dict[str, Any]
源代码位于 llmcompressor/pipelines/sequential/helpers.py
def forward(self, *args, **kwargs) -> Dict[str, Any]:
    """
    Execute the operations within the subgraph

    :param \\*args: argument inputs to subgraph forward function
    :param \\**kwargs: keyword inputs to subgraph forward function
    :return keyword outputs of subgraph forward function (non-consumed variables):
    """
    if self._code is None:
        self._code = self.graph.python_code("self")
        exec(self._code.src, self._code.globals)

    forward_fn = self._code.globals.get("forward")

    with append_autowrap_source_on_fail():
        return forward_fn(*args, **kwargs)

dispatch_for_sequential

dispatch_for_sequential(
    model: PreTrainedModel,
) -> PreTrainedModel

使用顺序管道调度模型进行顺序校准。模型将被卸载到 CPU,如果可用,则调度到 CUDA/XPU 设备。移除任何现有的钩子。

参数

  • model

    (PreTrainedModel) –

    要分发的模型

返回

  • PreTrainedModel

    已调度的模型

源代码位于 llmcompressor/pipelines/sequential/helpers.py
def dispatch_for_sequential(model: PreTrainedModel) -> PreTrainedModel:
    """
    Dispatch a model for sequential calibration using a sequential pipeline.
    The model will be offloaded to the CPU and dispatched to CUDA/XPU device
    if available. Removes any existing hooks.

    :param model: model to dispatch
    :return: dispatched model
    """
    remove_dispatch(model)

    if torch.cuda.is_available():
        offloaded_dispatch(model, execution_device=torch.device("cuda:0"))
    elif hasattr(torch, "xpu") and torch.xpu.is_available():
        offloaded_dispatch(model, execution_device=torch.device("xpu:0"))
    else:
        logger.warning("CUDA/XPU is not available! Compressing model on CPU instead")

    return model

get_sequential_targets

get_sequential_targets(
    modifiers: List[Modifier],
    model: PreTrainedModel,
    args: DatasetArguments,
) -> List[str]

根据修饰符列表和数据集参数推断顺序目标

参数

  • model

    (PreTrainedModel) –

    正在校准的模型

  • 修饰符

    (List[Modifier]) –

    校准期间应用的修饰符列表

  • dataset_args

    用户传递的数据集参数

返回

  • List[str]

    顺序目标列表

源代码位于 llmcompressor/pipelines/sequential/helpers.py
def get_sequential_targets(
    modifiers: List[Modifier], model: PreTrainedModel, args: "DatasetArguments"
) -> List[str]:
    """
    Infer sequential targets from modifiers list and dataset args

    :param model: model being calibrated
    :param modifiers: list of modifiers being applied during calibration
    :param dataset_args: dataset arguments passed by user
    :return: list of sequential targets
    """
    modifier_targets = [
        (modifier, modifier.sequential_targets)
        for modifier in modifiers
        if getattr(modifier, "sequential_targets", None) is not None
    ]

    # deprecation warning
    if len(modifier_targets) >= 1:
        logger.warning(
            "Passing sequential targets through modifiers is deprecated, "
            "please use `oneshot(sequential_targets=...)`"
        )

    # cannot infer from multiple modifiers
    if len(modifier_targets) >= 2:
        types = [type(modifier) for modifier, _ in modifier_targets]
        raise ValueError(
            "Cannot infer sequential targets from multiple sequential modifiers "
            f"({types})"
        )

    # resolve single modifier
    if len(modifier_targets) == 1:
        if args.sequential_targets is not None:
            raise ValueError(
                f"Got sequential targets from both {type(modifier_targets[0][0])} "
                "and dataset arguments `sequential_targets`"
            )

        sequential_targets = modifier_targets[0][1]

    # if no modifiers, use data args
    else:
        sequential_targets = args.sequential_targets  # may be `None`

    # validate and infer
    if sequential_targets is None:
        return get_no_split_params(model)
    elif isinstance(sequential_targets, str):
        return [sequential_targets]
    else:
        return sequential_targets

trace_subgraphs

trace_subgraphs(
    model: PreTrainedModel,
    sample_input: Dict[str, Any],
    sequential_targets: List[str],
    ignore: List[str],
) -> List[Subgraph]

追踪模型以生成子图,其中每个顺序目标都恰好属于一个子图,并且按顺序执行每个子图等效于执行原始模型

参数

  • model

    (PreTrainedModel) –

    正在追踪的模型

  • sample_input

    (Dict[str, Any]) –

    在执行期间其值将发生变化的输入,但其 lenboolcontains 值在批次中假定为常量

  • sequential_targets

    (List[str]) –

    匹配顺序目标的模式列表

  • ignore

    (List[str]) –

    追踪期间要跳过的函数和方法名称

返回

  • List[Subgraph]

    按执行顺序排列的子图列表

源代码位于 llmcompressor/pipelines/sequential/helpers.py
def trace_subgraphs(
    model: PreTrainedModel,
    sample_input: Dict[str, Any],
    sequential_targets: List[str],
    ignore: List[str],
) -> List[Subgraph]:
    """
    Trace a model to produce subgraphs, where each sequential target belongs to exactly
    one subgraph and where executing each subgraph in order is equivalent to executing
    the original model

    :param model: model being traced
    :param sample_input: inputs whose values will change during execution but whose
        __len__, __bool__, and __contains__ values are assumed constant across batches
    :param sequential_targets: list of patterns matching sequential targets
    :param ignore: function and method names to skip during tracing
    :return: a list of Subgraphs in order of execution
    """
    # find modules
    targets = match_modules(model, sequential_targets)
    ancestors = get_sequential_ancestors(model, targets)
    offloaded = set(m for m in model.modules() if has_offloaded_params(m))

    # initialize arguments
    tracer = SequentialTracer(ancestors, offloaded)
    concrete_args = populate_concrete_args(model, sample_input)

    with contextlib.ExitStack() as stack:
        # calibration context
        stack.enter_context(calibration_forward_context(model))
        stack.enter_context(HooksMixin.disable_hooks())

        # flags useful for tracing
        stack.enter_context(patch_attr(model.config, "_attn_implementation", "eager"))
        stack.enter_context(patch_attr(torch.compiler, "_is_compiling_flag", True))

        # autowrap forwards
        stack.enter_context(autowrap_forwards(ancestors, ignore))

        # avoid bug where pytorch cannot handle wrapped root functions
        unwrapped = inspect.unwrap(model.forward).__get__(model)
        stack.enter_context(patch_attr(model, "forward", unwrapped))
        stack.enter_context(patch_attr(type(model), "forward", unwrapped.__func__))
        assert isinstance(model.forward, MethodType)
        assert isinstance(type(model).forward, FunctionType)

        with append_autowrap_source_on_fail():
            graph = GraphModule(
                model,
                tracer.trace(
                    model,
                    dummy_inputs=sample_input,
                    concrete_args=concrete_args,
                    complete_concrete_args_with_inputs_not_in_dummy_inputs=False,
                    # bug in trace throws an error for variadic
                    # args and kwargs in function signature
                ),
            )

    # copy metadata
    graph.config = model.config
    graph.class_for_deserialization = model.__class__
    graph.device = model.device

    # perform subgraph partition
    partitions = topological_partition(graph, targets)
    subgraphs = partition_graph(model, partitions)
    trace_consumed_names(subgraphs)

    # As currently implemented, `topological_partition` generates an extra subgraph at
    # the beginning which does not contain a target. This adds a little more runtime,
    # and could be folded into the first subgraph in the future
    if len(subgraphs) != len(targets) + 1:
        logger.warning(
            f"Expected {len(targets)} subgraphs, but only traced {len(subgraphs)}. "
            "This is likely due to having wrapped code which calls sequential targets"
        )

    return subgraphs