跳到内容

llmcompressor.datasets.utils

LLM压缩工作流的数据集工具函数。

提供用于在模型压缩管道中加载、处理和格式化数据集的辅助函数。处理数据集拆分、分词、校准数据准备以及用于训练和一次性校准工作流的数据加载器创建。

  • LengthAwareSampler

    按序列长度降序采样数据。依赖于数据集中存在的 input_ids

函数

LengthAwareSampler

LengthAwareSampler(
    data_source: Dataset, num_samples: Optional[int] = None
)

基类: Sampler[int]

按序列长度降序采样数据。依赖于数据集中存在 input_idsdecoder_input_ids

参数

  • data_source

    (Dataset) –

    包含 input_idsdecoder_input_ids 列的数据集

  • num_samples

    (Optional[int], 默认值: None ) –

    要采样的最大样本数。首先截断较短的序列长度

Source code in llmcompressor/datasets/utils.py
def __init__(
    self,
    data_source: Dataset,
    num_samples: Optional[int] = None,
) -> None:
    self.data_source = data_source
    self._num_samples = num_samples or len(data_source)

    if "input_ids" in data_source.column_names:
        feature_name = "input_ids"
    elif "decoder_input_ids" in data_source.column_names:
        feature_name = "decoder_input_ids"
    else:
        logger.warning(f"Could not find input ids in {data_source.column_names}")
        self.order = range(len(data_source))
        return

    lengths = [len(sample) for sample in data_source[feature_name]]
    self.order = torch.argsort(torch.tensor(lengths), descending=True).tolist()

get_calibration_dataloader

get_calibration_dataloader(
    dataset_args: DatasetArguments, processor: Processor
) -> torch.utils.data.DataLoader

获取用于一次性校准的数据加载器。

参数

  • dataset_args

    (DatasetArguments) –

    包含数据集参数的 DatasetArguments。

  • processor

    (Processor) –

    模型处理器或分词器。

返回

  • DataLoader

    包含校准数据集的 PyTorch 数据加载器对象。

Source code in llmcompressor/datasets/utils.py
def get_calibration_dataloader(
    dataset_args: DatasetArguments,
    processor: Processor,
) -> torch.utils.data.DataLoader:
    """
    Get the dataloader used for oneshot calibration.
    :param dataset_args: DatasetArguments that contains the dataset parameters.
    :param processor: Processor or the tokenizer of the model.
    :return: PyTorch dataloader object that contains the calibration dataset.
    """
    if dataset_args.dataset is None:
        # weight-only quantization or dynamic quantization
        return

    datasets = get_processed_dataset(
        dataset_args=dataset_args,
        processor=processor,
        do_oneshot=True,
        do_train=False,
    )
    calibration_dataset = datasets.get("calibration")

    return format_calibration_data(dataset_args, calibration_dataset, processor)

get_processed_dataset

get_processed_dataset(
    dataset_args: DatasetArguments,
    processor: Processor | None = None,
    do_oneshot: bool = False,
    do_train: bool = True,
) -> dict[str, Dataset] | None

根据 dataset_args 加载每个流程的数据集,并将每个启用的流程的数据集存储在 datasets 中

参数

  • dataset_args

    (DatasetArguments) –

    包含数据集加载和处理参数的 DatasetArguments

  • processor

    (Processor | None, default: None ) –

    用于数据集分词的处理器或分词器

  • do_oneshot

    (bool, 默认值: False ) –

    一次性流程为 True

  • do_train

    (bool, 默认值: True ) –

    训练流程为 True

返回

  • dict[str, Dataset] | None

    对应于训练或校准(一次性)的数据集

Source code in llmcompressor/datasets/utils.py
def get_processed_dataset(
    dataset_args: DatasetArguments,
    processor: Processor | None = None,
    do_oneshot: bool = False,
    do_train: bool = True,
) -> dict[str, Dataset] | None:
    """
    Loads datasets for each flow based on dataset_args, stores a Dataset for each
    enabled flow in datasets
    :param dataset_args: DatasetArguments that contain dataset loading and
        processing params
    :param processor: processor or tokenizer to use for dataset tokenization
    :param do_oneshot: True for oneshot pathway
    :param do_train: True for train pathway
    :return: A dataset corresponding to either train or calibration (oneshot)
    """
    if dataset_args.dataset is None:
        logger.warning(
            "Running oneshot without calibration data. This is expected for "
            "weight-only and dynamic quantization"
        )
        return

    splits = dataset_args.splits
    tokenized_datasets = {}

    def _get_split_name(inp_str):
        # strip out split name, for ex train[60%:] -> train
        split_name_match = re.match(r"(\w*)\[.*\]", inp_str)
        if split_name_match is not None:
            return split_name_match.group(1)
        return inp_str

    match splits:
        case None:
            splits = {"all": None}
        case str():
            splits = {_get_split_name(splits): splits}
        case list():
            splits = {_get_split_name(s): s for s in splits}
        case dict():
            pass
        case _:
            raise ValueError(f"Invalid splits type: {type(splits)}")

    # default to custom dataset if dataset provided isn't a string
    registry_id = (
        dataset_args.dataset if isinstance(dataset_args.dataset, str) else "custom"
    )
    for split_name, split_str in splits.items():
        dataset = dataset_args.dataset
        if hasattr(dataset, "column_names") and "input_ids" in dataset.column_names:
            # dataset is already tokenized
            tokenized_datasets[split_name] = dataset
        else:
            # dataset needs to be tokenized
            dataset_manager = TextGenerationDataset.load_from_registry(
                registry_id,
                dataset_args=dataset_args,
                split=split_str,
                processor=processor,
            )
            tokenized_datasets[split_name] = dataset_manager(add_labels=do_train)

    return make_dataset_splits(
        tokenized_datasets,
        do_oneshot=do_oneshot,
        do_train=do_train,
    )

make_dataset_splits

make_dataset_splits(
    tokenized_datasets: dict[str, Any],
    do_oneshot: bool = True,
    do_train: bool = False,
) -> dict[str, Dataset]

根据将要运行的任务重构 datasets 字典 训练

参数

  • tokenized_datasets

    (dict[str, Any]) –

    已处理数据集的字典

  • do_oneshot

    (bool, 默认值: True ) –

    是否存储校准数据集

返回

  • dict[str, Dataset]

    对应于训练或校准(一次性)的数据集

Source code in llmcompressor/datasets/utils.py
def make_dataset_splits(
    tokenized_datasets: dict[str, Any],
    do_oneshot: bool = True,
    do_train: bool = False,
) -> dict[str, Dataset]:
    """
    Restructures the datasets dictionary based on what tasks will be run
    train
    :param tokenized_datasets: dictionary of processed datasets
    :param do_oneshot: Whether to store the calibration dataset
    :return: A dataset corresponding to either train or calibration (oneshot)
    """

    # handles case where all splits are contained in a single dataset
    if "all" in tokenized_datasets and len(tokenized_datasets) == 1:
        tokenized_datasets = tokenized_datasets.get("all")
        if isinstance(tokenized_datasets, Dataset):
            tokenized_datasets = {"train": tokenized_datasets}

    train_split = calib_split = None

    if do_train:
        if "train" not in tokenized_datasets:
            raise ValueError("--do_train requires a train dataset")
        train_split = tokenized_datasets["train"]
    if do_oneshot:
        calib_split = tokenized_datasets.get("calibration")
        if calib_split is None:
            if "train" not in tokenized_datasets:
                raise ValueError("--do_oneshot requires a calibration dataset")
            calib_split = tokenized_datasets["train"]

    split_datasets = {
        "train": train_split,
        "calibration": calib_split,
    }
    return split_datasets