跳到内容

llmcompressor.utils.helpers

通用实用辅助函数。用于与python原生类型和目录/文件进行交互的常用函数。

  • NumpyArrayBatcher

    批处理程序实例,用于处理包含numpy数组的字典,

函数

  • DisableQuantization

    在应用量化配置后,在前向传递期间禁用量化

  • bucket_iterable

    将可迭代对象分桶成子数组,包含第一个顶部百分比

  • calibration_forward_context

    所有校准前向传递都应在此上下文中进行。

  • clean_path

    :param path: 要清理的目录或文件路径

  • convert_to_bool

    :param val: 要转换为布尔值的对象,

  • create_dirs

    :param path: 尝试创建的目录路径

  • create_parent_dirs

    :param path: 尝试为其创建父目录的文件路径

  • create_unique_dir

    :param path: 要创建唯一版本的路径

  • disable_cache

    暂时禁用transformer模型的键值缓存。用于防止在仅执行预填充阶段而不执行生成阶段的一次性情况下占用过多内存。

  • disable_hf_kernels

    在transformers>=4.50.0版本中,一些模块的前向方法可能会

  • disable_lm_head

    通过将lm_head移动到meta设备来禁用模型的lm_head。此函数

  • eval_context

    禁用给定模块的pytorch训练模式

  • flatten_iterable

    :param li: 一个可能包含嵌套项的可迭代对象,将被展平

  • getattr_chain

    链接多个getattr调用,由.分隔

  • import_from_path

    导入模块和函数/类的名称,由以下分隔符分隔:

  • interpolate

    注意,将值限制在x0的最小值和x1的最大值,

  • interpolate_list_linear

    对测量值列表中的输入值进行线性插值

  • interpolated_integral

    计算一组测量值的插值积分,形式为

  • is_package_available

    一个辅助函数,用于检查包是否可用

  • is_url

    :param val: 要检查是否为url的值

  • json_to_jsonl

    将json列表文件转换为jsonl文件格式(用于高效分片)

  • load_labeled_data

    从磁盘或内存加载标签和数据并将它们分组。假设磁盘上的排序顺序。当为数据和/或标签传入文件glob时,它将进行匹配。

  • load_numpy

    将numpy文件加载为ndarray或OrderedDict,表示npz文件中的内容

  • patch_attr

    修补对象属性的值。原始值在退出时恢复

  • path_file_count

    返回给定路径下匹配给定模式的文件数

  • path_file_size

    返回文件系统上给定路径的总大小(以字节为单位)

  • save_numpy

    将numpy数组或numpy数组集合保存到磁盘

  • tensor_export

    :param tensor: 要导出到保存的numpy数组文件的张量

  • tensors_export

    :param tensors: 要导出到保存的numpy数组文件的张量

  • validate_str_iterable

    :param val: 要验证的值,检查它是否是一个列表(并将其展平),

NumpyArrayBatcher

NumpyArrayBatcher()

继承自 object

批处理程序实例,用于处理包含numpy数组的字典,将多个项目附加到它们上以增加它们的批次大小,然后将它们堆叠成单个批处理的numpy数组,用于字典中的所有键。

方法

  • append

    将新项目附加到当前批次。

  • stack

    将当前项目沿新添加的零维堆叠成批次

源代码位于 llmcompressor/utils/helpers.py
def __init__(self):
    self._items = OrderedDict()  # type: Dict[str, List[numpy.ndarray]]

append

append(item: Union[ndarray, Dict[str, ndarray]])

将新项目附加到当前批次。所有键和形状必须与当前状态匹配。

参数

  • item

    (Union[ndarray, Dict[str, ndarray]]) –

    用于批处理的项目

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def append(self, item: Union[numpy.ndarray, Dict[str, numpy.ndarray]]):
    """
    Append a new item into the current batch.
    All keys and shapes must match the current state.

    :param item: the item to add for batching
    """
    if len(self) < 1 and isinstance(item, numpy.ndarray):
        self._items[NDARRAY_KEY] = [item]
    elif len(self) < 1:
        for key, val in item.items():
            self._items[key] = [val]
    elif isinstance(item, numpy.ndarray):
        if NDARRAY_KEY not in self._items:
            raise ValueError(
                "numpy ndarray passed for item, but prev_batch does not contain one"
            )

        if item.shape != self._items[NDARRAY_KEY][0].shape:
            raise ValueError(
                (
                    "item of numpy ndarray of shape {} does not "
                    "match the current batch shape of {}".format(
                        item.shape, self._items[NDARRAY_KEY][0].shape
                    )
                )
            )

        self._items[NDARRAY_KEY].append(item)
    else:
        diff_keys = list(set(item.keys()) - set(self._items.keys()))

        if len(diff_keys) > 0:
            raise ValueError(
                (
                    "numpy dict passed for item, not all keys match "
                    "with the prev_batch. difference: {}"
                ).format(diff_keys)
            )

        for key, val in item.items():
            if val.shape != self._items[key][0].shape:
                raise ValueError(
                    (
                        "item with key {} of shape {} does not "
                        "match the current batch shape of {}".format(
                            key, val.shape, self._items[key][0].shape
                        )
                    )
                )

            self._items[key].append(val)

stack

stack() -> Dict[str, numpy.ndarray]

将当前项目沿新添加的零维堆叠成批次

返回

  • Dict[str, ndarray]

    堆叠后的项目

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def stack(self) -> Dict[str, numpy.ndarray]:
    """
    Stack the current items into a batch along a new, zeroed dimension

    :return: the stacked items
    """
    batch_dict = OrderedDict()

    for key, val in self._items.items():
        batch_dict[key] = numpy.stack(self._items[key])

    return batch_dict

DisableQuantization

DisableQuantization(module: Module)

在应用量化配置后,在前向传递期间禁用量化

源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def DisableQuantization(module: torch.nn.Module):
    """
    Disable quantization during forward passes after applying a quantization config
    """
    try:
        module.apply(disable_quantization)
        yield
    finally:
        module.apply(enable_quantization)

bucket_iterable

bucket_iterable(
    val: Iterable[Any],
    num_buckets: int = 3,
    edge_percent: float = 0.05,
    sort_highest: bool = True,
    sort_key: Callable[[Any], Any] = None,
) -> List[Tuple[int, Any]]

将可迭代对象分桶成子数组,包含第一个顶部百分比,然后其余的可迭代对象被切分成相等大小的切片组。

参数

  • 验证

    (Iterable[Any]) –

    要分桶的可迭代对象

  • num_buckets

    (int, default: 3 ) –

    要将可迭代对象分桶的数量,不包括顶部桶

  • edge_percent

    (float, default: 0.05 ) –

    将第一个百分比分组到其自己的桶中。如果sort_highest为True,则这是顶部百分比,否则是底部百分比。如果<=0,则不会创建边缘桶。

  • sort_highest

    (bool, 默认值: True ) –

    如果为True,则按降序排序,使最高百分比在前,并按降序创建桶。如果为False,则按升序排序,使最低百分比在前,并按升序创建桶。

  • sort_key

    (Callable[[Any], Any], default: None ) –

    如果提供,则用于在将可迭代对象转换为列表后进行排序的sort_key

返回

  • List[Tuple[int, Any]]

    一个列表,其中包含每个值及其被排序到的存储桶

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def bucket_iterable(
    val: Iterable[Any],
    num_buckets: int = 3,
    edge_percent: float = 0.05,
    sort_highest: bool = True,
    sort_key: Callable[[Any], Any] = None,
) -> List[Tuple[int, Any]]:
    """
    Bucket iterable into subarray consisting of the first top percentage
    followed by the rest of the iterable sliced into equal sliced groups.

    :param val: The iterable to bucket
    :param num_buckets: The number of buckets to group the iterable into,
        does not include the top bucket
    :param edge_percent: Group the first percent into its own bucket.
        If sort_highest, then this is the top percent, else bottom percent.
        If <= 0, then will not create an edge bucket
    :param sort_highest: True to sort such that the highest percent is first
        and will create buckets in descending order.
        False to sort so lowest is first and create buckets in ascending order.
    :param sort_key: The sort_key, if any, to use for sorting the iterable
        after converting it to a list
    :return: a list of each value mapped to the bucket it was sorted into
    """

    val_list = [v for v in val]
    val_list.sort(key=sort_key, reverse=sort_highest)
    bucketed_values = []
    edge_count = round(edge_percent * len(val_list))

    if edge_count > 0:
        bucketed_values.extend([(-1, val) for val in val_list[:edge_count]])
        val_list = val_list[edge_count:]

    buckets_count = round(len(val_list) / float(num_buckets))

    for bucket in range(num_buckets):
        add_vals = val_list[:buckets_count] if bucket < num_buckets - 1 else val_list
        val_list = val_list[buckets_count:] if bucket < num_buckets - 1 else []
        bucketed_values.extend([(bucket, val) for val in add_vals])

    return bucketed_values

calibration_forward_context

calibration_forward_context(model: Module)

所有校准前向传递都应在此上下文中进行。

  • 移除梯度计算
  • 禁用KV缓存
  • 禁用训练模式并启用评估模式
  • 禁用hf内核,这可能会绕过钩子
  • 禁用lm head(输入和权重仍可校准,输出将是meta)
源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def calibration_forward_context(model: torch.nn.Module):
    """
    Context in which all calibration forward passes should occur.

    - Remove gradient calculations
    - Disable the KV cache
    - Disable train mode and enable eval mode
    - Disable hf kernels which could bypass hooks
    - Disable lm head (input and weights can still be calibrated, output will be meta)
    """
    with contextlib.ExitStack() as stack:
        stack.enter_context(torch.no_grad())
        stack.enter_context(disable_cache(model))
        stack.enter_context(eval_context(model))
        stack.enter_context(disable_hf_kernels(model))
        stack.enter_context(disable_lm_head(model))
        yield

clean_path

clean_path(path: str) -> str

参数

  • 路径

    (str) –

    要清理的目录或文件路径

返回

  • str

    一个清理后的版本,它扩展了用户路径并创建了一个绝对路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def clean_path(path: str) -> str:
    """
    :param path: the directory or file path to clean
    :return: a cleaned version that expands the user path and creates an absolute path
    """
    return os.path.abspath(os.path.expanduser(path))

convert_to_bool

convert_to_bool(val: Any)

参数

  • 验证

    (Any) –

    要转换为布尔值的对象,支持字符串形式的逻辑值,例如True、t、false、0

返回

  • 该值的布尔表示形式,如果无法确定,则默认为True

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def convert_to_bool(val: Any):
    """
    :param val: the value to be converted to a bool,
        supports logical values as strings ie True, t, false, 0
    :return: the boolean representation of the value, if it can't be determined,
        falls back on returning True
    """
    return (
        bool(val)
        if not isinstance(val, str)
        else bool(val) and "f" not in val.lower() and "0" not in val.lower()
    )

create_dirs

create_dirs(path: str)

参数

  • 路径

    (str) –

    尝试创建的目录路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def create_dirs(path: str):
    """
    :param path: the directory path to try and create
    """
    path = clean_path(path)

    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno == errno.EEXIST:
            pass
        else:
            # Unexpected OSError, re-raise.
            raise

create_parent_dirs

create_parent_dirs(path: str)

参数

  • 路径

    (str) –

    尝试为其创建父目录的文件路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def create_parent_dirs(path: str):
    """
    :param path: the file path to try to create the parent directories for
    """
    parent = os.path.dirname(path)
    create_dirs(parent)

create_unique_dir

create_unique_dir(path: str, check_number: int = 0) -> str

参数

  • 路径

    (str) –

    要创建唯一版本的路径(附加数字直到其中一个不存在)

  • check_number

    (int, 默认值: 0 ) –

    开始检查唯一版本号的数字

返回

  • str

    唯一的目录路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def create_unique_dir(path: str, check_number: int = 0) -> str:
    """
    :param path: the file path to create a unique version of
        (append numbers until one doesn't exist)
    :param check_number: the number to begin checking for unique versions at
    :return: the unique directory path
    """
    check_path = clean_path("{}-{:04d}".format(path, check_number))

    if not os.path.exists(check_path):
        return check_path

    return create_unique_dir(path, check_number + 1)

disable_cache

disable_cache(module: Module)

暂时禁用transformer模型的键值缓存。用于防止在仅执行预填充阶段而不执行生成阶段的一次性情况下占用过多内存。

示例

model = AutoModel.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0") input = torch.randint(0, 32, size=(1, 32)) with disable_cache(model): ... output = model(input)

源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def disable_cache(module: torch.nn.Module):
    """
    Temporarily disable the key-value cache for transformer models. Used to prevent
    excess memory use in one-shot cases where the model only performs the prefill
    phase and not the generation phase.

    Example:
    >>> model = AutoModel.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")
    >>> input = torch.randint(0, 32, size=(1, 32))
    >>> with disable_cache(model):
    ...     output = model(input)
    """

    if isinstance(module, PreTrainedModel):
        config = module.config
        config = getattr(config, "text_config", config)
        with patch_attr(config, "use_cache", False):
            yield

    else:
        yield

disable_hf_kernels

disable_hf_kernels(module: Module)

在transformers>=4.50.0版本中,一些模块的前向方法可能会被替换为对hf hub内核的调用。这有可能绕过LLM Compressor添加的钩子。

源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def disable_hf_kernels(module: torch.nn.Module):
    """
    In transformers>=4.50.0, some module forward methods may be
    replaced by calls to hf hub kernels. This has the potential
    to bypass hooks added by LLM Compressor
    """
    if isinstance(module, PreTrainedModel):
        with patch_attr(module.config, "disable_custom_kernels", True):
            yield

    else:
        yield

disable_lm_head

disable_lm_head(model: Module)

通过将lm_head移动到meta设备来禁用模型的lm_head。此函数不分离参数,并在退出时恢复模型的正确加载。

源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def disable_lm_head(model: torch.nn.Module):
    """
    Disable the lm_head of a model by moving it to the meta device. This function
    does not untie parameters and restores the model proper loading upon exit
    """
    _, lm_head = get_embeddings(model)
    if lm_head is None:
        logger.warning(
            f"Attempted to disable lm_head of instance {model.__class__.__name__}, "
            "but was unable to to find lm_head. This may lead to unexpected OOM."
        )
        yield
        return

    elif not isinstance(lm_head, torch.nn.Linear):
        logger.warning(f"Cannot disable LM head of type {lm_head.__class__.__name__}")
        yield
        return

    else:
        dummy_weight = lm_head.weight.to("meta")

        def dummy_forward(self, input: torch.Tensor) -> torch.Tensor:
            return input.to("meta") @ dummy_weight.T

        with contextlib.ExitStack() as stack:
            lm_head_forward = dummy_forward.__get__(lm_head)
            stack.enter_context(patch_attr(lm_head, "forward", lm_head_forward))

            if hasattr(model, "_hf_hook"):
                stack.enter_context(patch_attr(model._hf_hook, "io_same_device", False))

            yield

eval_context

eval_context(module: Module)

禁用给定模块的pytorch训练模式

源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def eval_context(module: torch.nn.Module):
    """
    Disable pytorch training mode for the given module
    """
    restore_value = module.training
    try:
        module.train(False)  # equivalent to eval()
        yield

    finally:
        module.train(restore_value)

flatten_iterable

flatten_iterable(li: Iterable)

参数

  • li

    (Iterable) –

    一个可能包含嵌套项的可迭代对象,将被展平

返回

  • 一个展平后的列表版本,其中所有元素都以深度优先的模式排列在单个列表中

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def flatten_iterable(li: Iterable):
    """
    :param li: a possibly nested iterable of items to be flattened
    :return: a flattened version of the list where all elements are in a single list
             flattened in a depth first pattern
    """

    def _flatten_gen(_li):
        for el in _li:
            if isinstance(el, Iterable) and not isinstance(el, (str, bytes)):
                yield from _flatten_gen(el)
            else:
                yield el

    return list(_flatten_gen(li))

getattr_chain

getattr_chain(
    obj: Any, chain_str: str, *args, **kwargs
) -> Any

链接多个getattr调用,由.分隔

参数

  • obj

    (Any) –

    要检索其属性的对象

  • chain_str

    (str) –

    .分隔的属性名称

  • default

    默认值,否则抛出错误

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def getattr_chain(obj: Any, chain_str: str, *args, **kwargs) -> Any:
    """
    Chain multiple getattr calls, separated by `.`

    :param obj: base object whose attributes are being retrieved
    :param chain_str: attribute names separated by `.`
    :param default: default value, throw error otherwise

    """
    if len(args) >= 1:
        has_default = True
        default = args[0]
    elif "default" in kwargs:
        has_default = True
        default = kwargs["default"]
    else:
        has_default = False

    attr_names = chain_str.split(".")

    res = obj
    for attr_name in attr_names:
        if not hasattr(res, attr_name):
            if has_default:
                return default
            else:
                raise AttributeError(f"{res} object has no attribute {attr_name}")
        res = getattr(res, attr_name)

    return res

import_from_path

import_from_path(path: str) -> str

导入模块和函数/类的名称,由:分隔 示例: path = "/path/to/file.py:func_or_class_name" path = "/path/to/file:focn" path = "path.to.file:focn"

参数

  • 路径

    (str) –

    包括文件路径和对象名称的路径

源代码位于 llmcompressor/utils/helpers.py
def import_from_path(path: str) -> str:
    """
    Import the module and the name of the function/class separated by :
    Examples:
      path = "/path/to/file.py:func_or_class_name"
      path = "/path/to/file:focn"
      path = "path.to.file:focn"
    :param path: path including the file path and object name
    :return Function or class object
    """
    original_path, class_name = path.split(":")
    _path = original_path

    path = original_path.split(".py")[0]
    path = re.sub(r"/+", ".", path)
    try:
        module = importlib.import_module(path)
    except ImportError:
        raise ImportError(f"Cannot find module with path {_path}")

    try:
        return getattr(module, class_name)
    except AttributeError:
        raise AttributeError(f"Cannot find {class_name} in {_path}")

interpolate

interpolate(
    x_cur: float,
    x0: float,
    x1: float,
    y0: Any,
    y1: Any,
    inter_func: str = "linear",
) -> Any

注意,将值限制在x0的最小值和x1的最大值,设计上不工作在范围之外,因为实现原因

参数

  • x_cur

    (float) –

    x的当前值,应在x0和x1之间

  • x0

    (float) –

    用于插值的x的最小值

  • x1

    (float) –

    用于插值的x的最大值

  • y0

    (Any) –

    用于插值的y的最小值

  • y1

    (Any) –

    用于插值的y的最大值

  • inter_func

    (str, default: 'linear' ) –

    用于插值的函数类型:linear、cubic、inverse_cubic

返回

  • Any

    插值结果,将x投影到y,使用给定的插值函数

源代码位于 llmcompressor/utils/helpers.py
@deprecated(future_name="torch.lerp")
def interpolate(
    x_cur: float, x0: float, x1: float, y0: Any, y1: Any, inter_func: str = "linear"
) -> Any:
    """
    note, caps values at their min of x0 and max x1,
    designed to not work outside of that range for implementation reasons

    :param x_cur: the current value for x, should be between x0 and x1
    :param x0: the minimum for x to interpolate between
    :param x1: the maximum for x to interpolate between
    :param y0: the minimum for y to interpolate between
    :param y1: the maximum for y to interpolate between
    :param inter_func: the type of function to interpolate with:
        linear, cubic, inverse_cubic
    :return: the interpolated value projecting x into y for the given
        interpolation function
    """
    if inter_func not in INTERPOLATION_FUNCS:
        raise ValueError(
            "unsupported inter_func given of {} must be one of {}".format(
                inter_func, INTERPOLATION_FUNCS
            )
        )

    # convert our x to 0-1 range since equations are designed to fit in
    # (0,0)-(1,1) space
    x_per = (x_cur - x0) / (x1 - x0)

    # map x to y using the desired function in (0,0)-(1,1) space
    if inter_func == "linear":
        y_per = x_per
    elif inter_func == "cubic":
        # https://www.wolframalpha.com/input/?i=1-(1-x)%5E3+from+0+to+1
        y_per = 1 - (1 - x_per) ** 3
    elif inter_func == "inverse_cubic":
        # https://www.wolframalpha.com/input/?i=1-(1-x)%5E(1%2F3)+from+0+to+1
        y_per = 1 - (1 - x_per) ** (1 / 3)
    else:
        raise ValueError(
            "unsupported inter_func given of {} in interpolate".format(inter_func)
        )

    if y_per <= 0.0 + sys.float_info.epsilon:
        return y0

    if y_per >= 1.0 - sys.float_info.epsilon:
        return y1

    # scale the threshold based on what we want the current to be
    return y_per * (y1 - y0) + y0

interpolate_list_linear

interpolate_list_linear(
    measurements: List[Tuple[float, float]],
    x_val: Union[float, List[float]],
) -> List[Tuple[float, float]]

对测量值列表中的输入值进行线性插值

参数

  • measurements

    (List[Tuple[float, float]]) –

    用于插值输出值的测量值

  • x_val

    (Union[float, List[float]]) –

    目标值,用于插值到第二个维度

返回

  • List[Tuple[float, float]]

    包含目标值和插值值的元组列表

源代码位于 llmcompressor/utils/helpers.py
@deprecated(future_name="torch.lerp")
def interpolate_list_linear(
    measurements: List[Tuple[float, float]], x_val: Union[float, List[float]]
) -> List[Tuple[float, float]]:
    """
    interpolate for input values within a list of measurements linearly

    :param measurements: the measurements to interpolate the output value between
    :param x_val: the target values to interpolate to the second dimension
    :return: a list of tuples containing the target values, interpolated values
    """
    assert len(measurements) > 1
    measurements.sort(key=lambda v: v[0])

    x_vals = [x_val] if isinstance(x_val, float) else x_val
    x_vals.sort()

    interpolated = []
    lower_index = 0
    higher_index = 1

    for x_val in x_vals:
        while (
            x_val > measurements[higher_index][0]
            and higher_index < len(measurements) - 1
        ):
            lower_index += 1
            higher_index += 1

        x0, y0 = measurements[lower_index]
        x1, y1 = measurements[higher_index]
        y_val = y0 + (x_val - x0) * ((y1 - y0) / (x1 - x0))
        interpolated.append((x_val, y_val))

    return interpolated

interpolated_integral

interpolated_integral(
    measurements: List[Tuple[float, float]],
)

计算一组测量值的插值积分,形式为[(x0, y0), (x1, y1), ...]

参数

  • measurements

    (List[Tuple[float, float]]) –

    用于计算积分的测量值

返回

  • 给定测量值的积分或曲线下面积

源代码位于 llmcompressor/utils/helpers.py
@deprecated(future_name="torch.lerp")
def interpolated_integral(measurements: List[Tuple[float, float]]):
    """
    Calculate the interpolated integal for a group of measurements of the form
    [(x0, y0), (x1, y1), ...]

    :param measurements: the measurements to calculate the integral for
    :return: the integral or area under the curve for the measurements given
    """
    if len(measurements) < 1:
        return 0.0

    if len(measurements) == 1:
        return measurements[0][1]

    measurements.sort(key=lambda v: v[0])
    integral = 0.0

    for index, (x_val, y_val) in enumerate(measurements):
        if index >= len(measurements) - 1:
            continue

        x_next, y_next = measurements[index + 1]
        x_dist = x_next - x_val
        area = y_val * x_dist + (y_next - y_val) * x_dist / 2.0
        integral += area

    return integral

is_package_available

is_package_available(
    package_name: str, return_version: bool = False
) -> Union[Tuple[bool, str], bool]

一个辅助函数,用于检查包是否可用,并可选择返回其版本。此函数强制检查包是否可用,而不仅仅是一个与包同名的目录/文件。

灵感来源:https://github.com/huggingface/transformers/blob/965cf677695dd363285831afca8cf479cf0c600c/src/transformers/utils/import_utils.py#L41

参数

  • package_name

    (str) –

    要检查的包名

  • return_version

    (bool, 默认值: False ) –

    如果包可用,则返回True以返回包的版本

返回

  • Union[Tuple[bool, str], bool]

    如果包可用,则返回True,否则返回False;如果return_version为True,则返回(bool, version)元组

源代码位于 llmcompressor/utils/helpers.py
def is_package_available(
    package_name: str,
    return_version: bool = False,
) -> Union[Tuple[bool, str], bool]:
    """
    A helper function to check if a package is available
    and optionally return its version. This function enforces
    a check that the package is available and is not
    just a directory/file with the same name as the package.

    inspired from:
    https://github.com/huggingface/transformers/blob/965cf677695dd363285831afca8cf479cf0c600c/src/transformers/utils/import_utils.py#L41

    :param package_name: The package name to check for
    :param return_version: True to return the version of
        the package if available
    :return: True if the package is available, False otherwise or a tuple of
        (bool, version) if return_version is True
    """

    package_exists = importlib.util.find_spec(package_name) is not None
    package_version = "N/A"
    if package_exists:
        try:
            package_version = importlib.metadata.version(package_name)
            package_exists = True
        except importlib.metadata.PackageNotFoundError:
            package_exists = False
        logger.debug(f"Detected {package_name} version {package_version}")
    if return_version:
        return package_exists, package_version
    else:
        return package_exists

is_url

is_url(val: str)

参数

  • 验证

    (str) –

    要检查是否为url的值

返回

  • 如果值为URL,则返回True,否则返回False

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def is_url(val: str):
    """
    :param val: value to check if it is a url or not
    :return: True if value is a URL, False otherwise
    """

    try:
        result = urlparse(val)

        return all([result.scheme, result.netloc])
    except ValueError:
        return False

json_to_jsonl

json_to_jsonl(json_file_path: str, overwrite: bool = True)

将json列表文件转换为jsonl文件格式(用于高效分片)例如:[{"a": 1}, {"a": 1}]将转换为:{"a": 1}

参数

  • json_file_path

    (str) –

    包含json对象列表的json文件的文件路径

  • overwrite

    (bool, 默认值: True ) –

    如果为True,则将覆盖现有json文件;如果为False,则文件将具有相同的名称,但扩展名为.jsonl

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def json_to_jsonl(json_file_path: str, overwrite: bool = True):
    """
    Converts a json list file to jsonl file format (used for sharding efficienty)
        e.x.
            [{"a": 1}, {"a": 1}]
        would convert to:
            {"a": 1}
            {"a": 1}
    :param json_file_path: file path to a json file path containing a json list
        of objects
    :param overwrite: If True, the existing json file will be overwritten, if False,
        the file will have the same name but with a .jsonl extension
    """
    if not json_file_path.endswith(".json"):
        raise ValueError("json file must have .json extension")
    with open(json_file_path) as json_file:
        json_data = json.load(json_file)

    if not isinstance(json_data, List):
        raise ValueError(
            "Json data must be a list to conver to jsonl format. "
            f"found {type(json_data)}"
        )

    jsonl_file_path = json_file_path + ("" if overwrite else "l")
    with open(jsonl_file_path, "w") as jsonl_file:
        for json_line in json_data:
            json.dump(json_line, jsonl_file)  # append json line
            jsonl_file.write("\n")  # newline

load_labeled_data

load_labeled_data(
    data: Union[
        str,
        Iterable[Union[str, ndarray, Dict[str, ndarray]]],
    ],
    labels: Union[
        None,
        str,
        Iterable[Union[str, ndarray, Dict[str, ndarray]]],
    ],
    raise_on_error: bool = True,
) -> List[
    Tuple[
        Union[numpy.ndarray, Dict[str, numpy.ndarray]],
        Union[
            None, numpy.ndarray, Dict[str, numpy.ndarray]
        ],
    ]
]

从磁盘或内存加载标签和数据并将它们分组。假设磁盘上的排序顺序。当为数据和/或标签传入文件glob时,它将进行匹配。

参数

  • data

    (Union[str, Iterable[Union[str, ndarray, Dict[str, ndarray]]]]) –

    用于数据的glob文件、numpy数据tarball的文件路径或数组列表

  • labels

    (Union[None, str, Iterable[Union[str, ndarray, Dict[str, ndarray]]]]) –

    用于标签的glob文件、numpy数据tarball的文件路径或数组列表,如果存在

  • raise_on_error

    (bool, 默认值: True ) –

    如果为True,则在发生任何错误时引发异常;如果为False,则记录警告、忽略并继续

返回

  • List[Tuple[Union[ndarray, Dict[str, ndarray]], Union[None, ndarray, Dict[str, ndarray]]]]

    一个列表,包含数据和标签的元组。如果标签传入为None,则每个元组的第二个索引将为None

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def load_labeled_data(
    data: Union[str, Iterable[Union[str, numpy.ndarray, Dict[str, numpy.ndarray]]]],
    labels: Union[
        None, str, Iterable[Union[str, numpy.ndarray, Dict[str, numpy.ndarray]]]
    ],
    raise_on_error: bool = True,
) -> List[
    Tuple[
        Union[numpy.ndarray, Dict[str, numpy.ndarray]],
        Union[None, numpy.ndarray, Dict[str, numpy.ndarray]],
    ]
]:
    """
    Load labels and data from disk or from memory and group them together.
    Assumes sorted ordering for on disk. Will match between when a file glob is passed
    for either data and/or labels.

    :param data: the file glob, file path to numpy data tar ball, or list of arrays to
        use for data
    :param labels: the file glob, file path to numpy data tar ball, or list of arrays
        to use for labels, if any
    :param raise_on_error: True to raise on any error that occurs;
        False to log a warning, ignore, and continue
    :return: a list containing tuples of the data, labels. If labels was passed in
        as None, will now contain a None for the second index in each tuple
    """
    if isinstance(data, str):
        data = load_numpy_list(data)

    if labels is None:
        labels = [None for _ in range(len(data))]
    elif isinstance(labels, str):
        labels = load_numpy_list(labels)

    if len(data) != len(labels) and labels:
        # always raise this error, lengths must match
        raise ValueError(
            "len(data) given of {} does not match len(labels) given of {}".format(
                len(data), len(labels)
            )
        )

    labeled_data = []

    for dat, lab in zip(data, labels):
        try:
            if isinstance(dat, str):
                dat = load_numpy(dat)

            if lab is not None and isinstance(lab, str):
                lab = load_numpy(lab)

            labeled_data.append((dat, lab))
        except Exception as err:
            if raise_on_error:
                raise err
            else:
                logger.error("Error creating labeled data: {}".format(err))

    return labeled_data

load_numpy

load_numpy(
    file_path: str,
) -> Union[numpy.ndarray, Dict[str, numpy.ndarray]]

将numpy文件加载为ndarray或OrderedDict,表示npz文件中的内容

参数

  • 文件路径

    (str) –

    要加载的文件路径

返回

  • Union[ndarray, Dict[str, ndarray]]

    从文件中加载的值

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def load_numpy(file_path: str) -> Union[numpy.ndarray, Dict[str, numpy.ndarray]]:
    """
    Load a numpy file into either an ndarray or an OrderedDict representing what
    was in the npz file

    :param file_path: the file_path to load
    :return: the loaded values from the file
    """
    file_path = clean_path(file_path)
    array = numpy.load(file_path)

    if not isinstance(array, numpy.ndarray):
        tmp_arrray = array
        array = OrderedDict()
        for key, val in tmp_arrray.items():
            array[key] = val

    return array

load_numpy_from_tar

load_numpy_from_tar(
    path: str,
) -> List[Union[numpy.ndarray, Dict[str, numpy.ndarray]]]

将numpy数据加载到tar文件中的列表中。tar文件中包含的所有文件都应为numpy文件。

参数

  • 路径

    (str) –

    要从中加载numpy数据的tar文件的路径

返回

  • List[Union[ndarray, Dict[str, ndarray]]]

    加载的numpy数据列表,可以是数组或数组的有序字典

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def load_numpy_from_tar(
    path: str,
) -> List[Union[numpy.ndarray, Dict[str, numpy.ndarray]]]:
    """
    Load numpy data into a list from a tar file.
    All files contained in the tar are expected to be the numpy files.
    :param path: path to the tarfile to load the numpy data from
    :return: the list of loaded numpy data, either arrays or ordereddicts of arrays
    """
    tar = tarfile.open(path, "r")
    files = tar.getmembers()
    files = sorted([file.name for file in files])
    data = []

    for file in files:
        extracted = BytesIO()
        extracted.write(tar.extractfile(file).read())
        extracted.seek(0)
        array = numpy.load(extracted)
        data.append(_fix_loaded_numpy(array))

    return data

load_numpy_list

load_numpy_list(
    data: Union[
        str,
        Iterable[Union[str, ndarray, Dict[str, ndarray]]],
    ],
) -> List[Union[numpy.ndarray, Dict[str, numpy.ndarray]]]

将numpy数据加载到列表中

参数

  • data

    (Union[str, Iterable[Union[str, ndarray, Dict[str, ndarray]]]]) –

    要加载的数据,可以是以下之一:[文件夹路径,文件路径的可迭代对象,numpy数组的可迭代对象]

返回

  • List[Union[ndarray, Dict[str, ndarray]]]

    加载的数据项列表

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def load_numpy_list(
    data: Union[str, Iterable[Union[str, numpy.ndarray, Dict[str, numpy.ndarray]]]],
) -> List[Union[numpy.ndarray, Dict[str, numpy.ndarray]]]:
    """
    Load numpy data into a list
    :param data: the data to load, one of:
        [folder path, iterable of file paths, iterable of numpy arrays]
    :return: the list of loaded data items
    """
    loaded = []

    if isinstance(data, str):
        if os.path.isfile(data) and tarfile.is_tarfile(data):
            data = load_numpy_from_tar(data)
        elif os.path.isfile(data) and ".np" in data:
            # treat as a numpy file to load from
            data = [load_numpy(data)]
        else:
            # load from directory or glob
            glob_path = os.path.join(data, "*") if os.path.isdir(data) else data
            data = sorted(glob.glob(glob_path))

    for dat in data:
        if isinstance(dat, str):
            dat = load_numpy(dat)

        loaded.append(dat)

    return loaded

patch_attr

patch_attr(base: object, attr: str, value: Any)

修补对象属性的值。原始值在退出时恢复

参数

  • base

    (object) –

    具有要修补属性的对象

  • attr

    (str) –

    要修补的属性名称

  • (Any) –

    用于替换原始值 用法: >>> from types import SimpleNamespace >>> obj = SimpleNamespace() >>> with patch_attr(obj, "attribute", "value"): ... assert obj.attribute == "value" >>> assert not hasattr(obj, "attribute")

源代码位于 llmcompressor/utils/helpers.py
@contextlib.contextmanager
def patch_attr(base: object, attr: str, value: Any):
    """
    Patch the value of an object attribute. Original value is restored upon exit

    :param base: object which has the attribute to patch
    :param attr: name of the the attribute to patch
    :param value: used to replace original value

    Usage:
    >>> from types import SimpleNamespace
    >>> obj = SimpleNamespace()
    >>> with patch_attr(obj, "attribute", "value"):
    ...     assert obj.attribute == "value"
    >>> assert not hasattr(obj, "attribute")
    """
    _sentinel = object()
    original_value = getattr(base, attr, _sentinel)

    setattr(base, attr, value)
    try:
        yield
    finally:
        if original_value is not _sentinel:
            setattr(base, attr, original_value)
        else:
            delattr(base, attr)

path_file_count

path_file_count(path: str, pattern: str = '*') -> int

返回给定路径下匹配给定模式的文件数

参数

  • 路径

    (str) –

    要搜索文件的目录路径

  • pattern

    (str, default: '*' ) –

    文件必须匹配的模式才能被计数

返回

  • int

    目录下匹配模式的文件数

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def path_file_count(path: str, pattern: str = "*") -> int:
    """
    Return the number of files that match the given pattern under the given path

    :param path: the path to the directory to look for files under
    :param pattern: the pattern the files must match to be counted
    :return: the number of files matching the pattern under the directory
    """
    path = clean_path(path)

    return len(fnmatch.filter(os.listdir(path), pattern))

path_file_size

path_file_size(path: str) -> int

返回文件系统上给定路径的总大小(以字节为单位)

参数

  • 路径

    (str) –

    要获取大小的路径(目录或文件)

返回

  • int

    路径的大小(以字节为单位),在磁盘上的存储方式

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def path_file_size(path: str) -> int:
    """
    Return the total size, in bytes, for a path on the file system

    :param path: the path (directory or file) to get the size for
    :return: the size of the path, in bytes, as stored on disk
    """

    if not os.path.isdir(path):
        stat = os.stat(path)

        return stat.st_size

    total_size = 0
    seen = {}

    for dir_path, dir_names, filenames in os.walk(path):
        for file in filenames:
            file_path = os.path.join(dir_path, file)

            try:
                stat = os.stat(file_path)
            except OSError:
                continue

            try:
                seen[stat.st_ino]
            except KeyError:
                seen[stat.st_ino] = True
            else:
                continue

            total_size += stat.st_size

    return total_size

save_numpy

save_numpy(
    array: Union[
        ndarray, Dict[str, ndarray], Iterable[ndarray]
    ],
    export_dir: str,
    name: str,
    npz: bool = True,
)

将numpy数组或numpy数组集合保存到磁盘

参数

  • array

    (Union[ndarray, Dict[str, ndarray], Iterable[ndarray]]) –

    要保存的数组或数组集合

  • export_dir

    (str) –

    要将numpy文件导出的目录

  • 名称

    (str) –

    要导出到的文件名(不带扩展名)

  • npz

    (bool, 默认值: True ) –

    如果为True,则保存为npz压缩文件;如果为False,则为标准npy。注意,npy只能用于单个numpy数组

返回

  • 已保存的路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def save_numpy(
    array: Union[numpy.ndarray, Dict[str, numpy.ndarray], Iterable[numpy.ndarray]],
    export_dir: str,
    name: str,
    npz: bool = True,
):
    """
    Save a numpy array or collection of numpy arrays to disk

    :param array: the array or collection of arrays to save
    :param export_dir: the directory to export the numpy file into
    :param name: the name of the file to export to (without extension)
    :param npz: True to save as an npz compressed file, False for standard npy.
        Note, npy can only be used for single numpy arrays
    :return: the saved path
    """
    create_dirs(export_dir)
    export_path = os.path.join(
        export_dir, "{}.{}".format(name, "npz" if npz else "npy")
    )

    if isinstance(array, numpy.ndarray) and npz:
        numpy.savez_compressed(export_path, array)
    elif isinstance(array, numpy.ndarray):
        numpy.save(export_path, array)
    elif isinstance(array, Dict) and npz:
        numpy.savez_compressed(export_path, **array)
    elif isinstance(array, Dict):
        raise ValueError("Dict can only be exported to an npz file")
    elif isinstance(array, Iterable) and npz:
        numpy.savez_compressed(export_path, *[val for val in array])
    elif isinstance(array, Iterable):
        raise ValueError("Iterable can only be exported to an npz file")
    else:
        raise ValueError("Unrecognized type given for array {}".format(array))

    return export_path

tensor_export

tensor_export(
    tensor: Union[
        ndarray, Dict[str, ndarray], Iterable[ndarray]
    ],
    export_dir: str,
    name: str,
    npz: bool = True,
) -> str

参数

  • tensor

    (Union[ndarray, Dict[str, ndarray], Iterable[ndarray]]) –

    要导出到保存的numpy数组文件的张量

  • export_dir

    (str) –

    要将文件导出的目录

  • 名称

    (str) –

    文件名,将在其后附加.npy

  • npz

    (bool, 默认值: True ) –

    如果为True,则导出为npz文件,否则为False

返回

  • str

    张量被导出到的numpy文件的路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def tensor_export(
    tensor: Union[numpy.ndarray, Dict[str, numpy.ndarray], Iterable[numpy.ndarray]],
    export_dir: str,
    name: str,
    npz: bool = True,
) -> str:
    """
    :param tensor: tensor to export to a saved numpy array file
    :param export_dir: the directory to export the file in
    :param name: the name of the file, .npy will be appended to it
    :param npz: True to export as an npz file, False otherwise
    :return: the path of the numpy file the tensor was exported to
    """
    create_dirs(export_dir)
    export_path = os.path.join(
        export_dir, "{}.{}".format(name, "npz" if npz else "npy")
    )

    if isinstance(tensor, numpy.ndarray) and npz:
        numpy.savez_compressed(export_path, tensor)
    elif isinstance(tensor, numpy.ndarray):
        numpy.save(export_path, tensor)
    elif isinstance(tensor, Dict) and npz:
        numpy.savez_compressed(export_path, **tensor)
    elif isinstance(tensor, Dict):
        raise ValueError("tensor dictionaries can only be saved as npz")
    elif isinstance(tensor, Iterable) and npz:
        numpy.savez_compressed(export_path, *tensor)
    elif isinstance(tensor, Iterable):
        raise ValueError("tensor iterables can only be saved as npz")
    else:
        raise ValueError("unknown type give for tensor {}".format(tensor))

    return export_path

tensors_export

tensors_export(
    tensors: Union[
        ndarray, Dict[str, ndarray], Iterable[ndarray]
    ],
    export_dir: str,
    name_prefix: str,
    counter: int = 0,
    break_batch: bool = False,
) -> List[str]

参数

  • tensors

    (Union[ndarray, Dict[str, ndarray], Iterable[ndarray]]) –

    要导出到保存的numpy数组文件的张量

  • export_dir

    (str) –

    要将文件导出的目录

  • name_prefix

    (str) –

    要保存张量的名称前缀,将在其后附加关于张量在列表或字典中的位置的信息,以及.npy文件格式

  • counter

    (int, 默认值: 0 ) –

    当前用于保存张量的计数器

  • break_batch

    (bool, 默认值: False ) –

    将张量视为批次并将其分解为多个张量

返回

  • List[str]

    导出的路径

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def tensors_export(
    tensors: Union[numpy.ndarray, Dict[str, numpy.ndarray], Iterable[numpy.ndarray]],
    export_dir: str,
    name_prefix: str,
    counter: int = 0,
    break_batch: bool = False,
) -> List[str]:
    """
    :param tensors: the tensors to export to a saved numpy array file
    :param export_dir: the directory to export the files in
    :param name_prefix: the prefix name for the tensors to save as, will append
        info about the position of the tensor in a list or dict in addition
        to the .npy file format
    :param counter: the current counter to save the tensor at
    :param break_batch: treat the tensor as a batch and break apart into
        multiple tensors
    :return: the exported paths
    """
    create_dirs(export_dir)
    exported_paths = []

    if break_batch:
        _tensors_export_batch(tensors, export_dir, name_prefix, counter, exported_paths)
    else:
        _tensors_export_recursive(
            tensors, export_dir, name_prefix, counter, exported_paths
        )

    return exported_paths

validate_str_iterable

validate_str_iterable(
    val: Union[str, Iterable[str]], error_desc: str = ""
) -> Union[str, Iterable[str]]

参数

  • 验证

    (Union[str, Iterable[str]]) –

    要验证的值,检查它是否是一个列表(并将其展平),否则检查它是一个ALLALL_PRUNABLE字符串,否则引发ValueError

  • error_desc

    (str, 默认值: '' ) –

    在val无效时用于引发错误的描述

返回

  • Union[str, Iterable[str]]

    参数的验证版本

源代码位于 llmcompressor/utils/helpers.py
@deprecated()
def validate_str_iterable(
    val: Union[str, Iterable[str]], error_desc: str = ""
) -> Union[str, Iterable[str]]:
    """
    :param val: the value to validate, check that it is a list (and flattens it),
        otherwise checks that it's an __ALL__ or __ALL_PRUNABLE__ string,
        otherwise raises a ValueError
    :param error_desc: the description to raise an error with in the event that
        the val wasn't valid
    :return: the validated version of the param
    """
    if isinstance(val, str):
        if val.upper() != ALL_TOKEN and val.upper() != ALL_PRUNABLE_TOKEN:
            raise ValueError(
                "unsupported string ({}) given in {}".format(val, error_desc)
            )

        return val.upper()

    if isinstance(val, Iterable):
        return flatten_iterable(val)

    raise ValueError("unsupported type ({}) given in {}".format(val, error_desc))