跳到内容

IO Processor 插件

IO Processor 插件是一项功能,允许对池化模型的模型输入和输出进行预处理和后处理。其理念是允许用户向 vLLM 传递自定义输入,该输入会被转换为一个或多个模型提示,并馈送给模型的 encode 方法。这类插件的一个潜在用例是使用 vLLM 生成多模态数据。例如,用户向 vLLM 输入一张图片,并获得一张图片作为输出。

在执行带有 IO Processor 插件的推理时,提示类型由插件定义,最终请求的输出也同样如此。vLLM 不会执行任何输入/输出数据验证,由插件负责确保正确的数据被馈送给模型并返回给用户。目前,这些插件仅支持池化模型,可以通过 LLMAsyncLLM 中的 encode 方法,或在在线服务模式下通过 /pooling 端点触发。

编写 IO Processor 插件

IO Processor 插件实现了 IOProcessor 接口。

IOProcessorInput = TypeVar("IOProcessorInput")
IOProcessorOutput = TypeVar("IOProcessorOutput")

class IOProcessor(ABC, Generic[IOProcessorInput, IOProcessorOutput]):
    def __init__(self, vllm_config: VllmConfig):
        self.vllm_config = vllm_config

    @abstractmethod
    def pre_process(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:
        raise NotImplementedError

    async def pre_process_async(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:
        return self.pre_process(prompt, request_id, **kwargs)

    @abstractmethod
    def post_process(
        self,
        model_output: Sequence[PoolingRequestOutput],
        request_id: str | None = None,
        **kwargs,
    ) -> IOProcessorOutput:
        raise NotImplementedError

    async def post_process_async(
        self,
        model_output: AsyncGenerator[tuple[int, PoolingRequestOutput]],
        request_id: str | None = None,
        **kwargs,
    ) -> IOProcessorOutput:
        # We cannot guarantee outputs are returned in the same order they were
        # fed to vLLM.
        # Let's sort them by id before post_processing
        sorted_output = sorted(
            [(i, item) async for i, item in model_output], key=lambda output: output[0]
        )
        collected_output = [output[1] for output in sorted_output]
        return self.post_process(collected_output, request_id, **kwargs)

    @abstractmethod
    def parse_request(self, request: Any) -> IOProcessorInput:
        raise NotImplementedError

    def validate_or_generate_params(
        self, params: SamplingParams | PoolingParams | None = None
    ) -> SamplingParams | PoolingParams:
        return params or PoolingParams()

    @abstractmethod
    def output_to_response(
        self, plugin_output: IOProcessorOutput
    ) -> IOProcessorResponse:
        raise NotImplementedError

parse_request 方法用于验证用户提示并将其转换为 pre_process/pre_process_async 方法所期望的输入。pre_process* 方法接收经过验证的插件输入,为常规推理生成 vLLM 的模型提示。post_process* 方法接收 PoolingRequestOutput 对象作为输入,并生成自定义插件输出。validate_or_generate_params 方法用于通过插件验证用户请求中收到的任何 SamplingParameters/PoolingParameters,或在未指定时生成新的参数。该函数始终返回已验证/生成的参数。output_to_response 方法仅用于在线服务,并将插件输出转换为 IOProcessorResponse 类型,然后由 API 服务器返回。/pooling 服务端点的实现可以在这里找到: vllm/entrypoints/openai/serving_pooling.py.

一个启用 PrithviGeospatialMAE 模型生成 geotiff 图像的插件示例实现可以在 这里找到。请同时参考我们的在线( examples/pooling/plugin/prithvi_geospatial_mae_client.py)和离线( examples/pooling/plugin/prithvi_geospatial_mae_io_processor.py)推理示例。

使用 IO Processor 插件

IO Processor 插件在引擎启动时加载,有两种方法可以指定要加载的插件名称:

  1. 通过 vLLM 的 EngineArgs:在用于初始化 AsyncLLMEngineArgs 中设置 io_processor_plugin 参数。在离线模式下,通过将 io_processor_plugin 参数传递给 LLM 也可以达到相同的效果,或者在服务模式下通过传递 --io-processor-plugin 参数。
  2. 通过模型的 HF 配置:将 io_processor_plugin 字段添加到模型配置 (config.json) 中。

顺序也决定了方法的优先级。即,通过 EngineArgs 设置插件名称将覆盖在模型 HF 配置 (config.json) 中指定的任何插件名称。