跳到内容

Qwen3-Omni

来源 https://github.com/vllm-project/vllm-omni/tree/main/examples/online_serving/qwen3_omni

🛠️ 安装

请参阅 README.md

运行示例 (Qwen3-Omni)

启动服务器

vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091

如果您有自定义的阶段配置文件,请使用以下命令启动服务器

vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091 --stage-configs-path /path/to/stage_configs_file

发送多模态请求

进入示例文件夹

cd examples/online_serving/qwen3_omni

通过 Python 发送请求

python openai_chat_completion_client_for_multimodal_generation.py --query-type use_image

Python 客户端支持以下命令行参数

  • --query-type (或 -q): 查询类型 (默认: use_video)
  • 选项: text, use_audio, use_image, use_video
  • --model (或 -m): 模型名称/路径 (默认: Qwen/Qwen3-Omni-30B-A3B-Instruct)
  • --video-path (或 -v): 本地视频文件或 URL 的路径
  • 如果未提供且 query-type 为 use_video,则使用默认视频 URL
  • 支持本地文件路径 (自动编码为 base64) 或 HTTP/HTTPS URL
  • 示例: --video-path /path/to/video.mp4--video-path https://example.com/video.mp4
  • --image-path (或 -i): 本地图片文件或 URL 的路径
  • 如果未提供且 query-type 为 use_image,则使用默认图片 URL
  • 支持本地文件路径 (自动编码为 base64) 或 HTTP/HTTPS URL
  • 支持常见的图片格式: JPEG, PNG, GIF, WebP
  • 示例: --image-path /path/to/image.jpg--image-path https://example.com/image.png
  • --audio-path (或 -a): 本地音频文件或 URL 的路径
  • 如果未提供且 query-type 为 use_audio,则使用默认音频 URL
  • 支持本地文件路径 (自动编码为 base64) 或 HTTP/HTTPS URL
  • 支持常见的音频格式: MP3, WAV, OGG, FLAC, M4A
  • 示例: --audio-path /path/to/audio.wav--audio-path https://example.com/audio.mp3
  • --prompt (或 -p): 自定义文本提示/问题
  • 如果未提供,则使用所选查询类型的默认提示
  • 示例: --prompt "What are the main activities shown in this video?"

例如,使用带有自定义提示的本地视频文件

python openai_chat_completion_client_for_multimodal_generation.py \
    --query-type use_video \
    --video-path /path/to/your/video.mp4 \
    --prompt "What are the main activities shown in this video?"

通过 curl 发送请求

bash run_curl_multimodal_generation.sh use_image

常见问题

如果遇到有关 librosa 后端错误,请尝试使用以下命令安装 ffmpeg。

sudo apt update
sudo apt install ffmpeg

模态控制

如果您想控制输出模态,例如只输出文本,您可以运行下面的命令

python openai_chat_completion_client_for_multimodal_generation.py \
    --query-type use_image \
    --modalities text

运行本地 Web UI 演示

此 Web UI 演示允许用户通过 Web 浏览器与模型进行交互。

运行 Gradio 演示

Gradio 演示连接到 vLLM API 服务器。您有两个选择

便捷脚本将 vLLM 服务器和 Gradio 演示一起启动

./run_gradio_demo.sh --model Qwen/Qwen3-Omni-30B-A3B-Instruct --server-port 8091 --gradio-port 7861

此脚本将:1. 在后台启动 vLLM 服务器 2. 等待服务器准备就绪 3. 启动 Gradio 演示 4. 在您按下 Ctrl+C 时处理清理工作

该脚本支持以下参数:- --model: 模型名称/路径 (默认: Qwen/Qwen3-Omni-30B-A3B-Instruct) - --server-port: vLLM 服务器的端口 (默认: 8091) - --gradio-port: Gradio 演示的端口 (默认: 7861) - --stage-configs-path: 自定义 stage configs YAML 文件的路径 (可选) - --server-host: vLLM 服务器的主机 (默认: 0.0.0.0) - --gradio-ip: Gradio 演示的 IP (默认: 127.0.0.1) - --share: 公开分享 Gradio 演示 (创建公共链接)

选项 2:手动启动 (两步流程)

步骤 1:启动 vLLM API 服务器

vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091

如果您有自定义 stage configs 文件

vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091 --stage-configs-path /path/to/stage_configs_file

步骤 2:运行 Gradio 演示

在单独的终端中

python gradio_demo.py --model Qwen/Qwen3-Omni-30B-A3B-Instruct --api-base https://:8091/v1 --port 7861

然后在您的本地浏览器中打开 https://:7861/ 以与 Web UI 交互。

gradio 脚本支持以下参数

  • --model: 模型名称/路径 (应与服务器模型匹配)
  • --api-base: vLLM API 服务器的基础 URL (默认: https://:8091/v1)
  • --ip: Gradio 服务器的主机/IP (默认: 127.0.0.1)
  • --port: Gradio 服务器的端口 (默认: 7861)
  • --share: 公开分享 Gradio 演示 (创建公共链接)

示例材料

gradio_demo.py
import argparse
import base64
import io
import os
import random
from pathlib import Path
from typing import Any

import gradio as gr
import numpy as np
import soundfile as sf
import torch
from openai import OpenAI
from PIL import Image

SEED = 42

SUPPORTED_MODELS: dict[str, dict[str, Any]] = {
    "Qwen/Qwen3-Omni-30B-A3B-Instruct": {
        "sampling_params": {
            "thinker": {
                "temperature": 0.4,
                "top_p": 0.9,
                "top_k": 1,
                "max_tokens": 16384,
                "detokenize": True,
                "repetition_penalty": 1.05,
                "stop_token_ids": [151645],
                "seed": SEED,
            },
            "talker": {
                "temperature": 0.9,
                "top_k": 50,
                "max_tokens": 4096,
                "seed": SEED,
                "detokenize": False,
                "repetition_penalty": 1.05,
                "stop_token_ids": [2150],
            },
            "code2wav": {
                "temperature": 0.0,
                "top_p": 1.0,
                "top_k": -1,
                "max_tokens": 4096 * 16,
                "seed": SEED,
                "detokenize": True,
                "repetition_penalty": 1.1,
            },
        },
    },
}
# Ensure deterministic behavior across runs.
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
os.environ["PYTHONHASHSEED"] = str(SEED)
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"


def parse_args():
    parser = argparse.ArgumentParser(description="Gradio demo for Qwen3-Omni online inference.")
    parser.add_argument(
        "--model",
        default="Qwen/Qwen3-Omni-30B-A3B-Instruct",
        help="Model name/path (should match the server model).",
    )
    parser.add_argument(
        "--api-base",
        default="https://:8091/v1",
        help="Base URL for the vLLM API server.",
    )
    parser.add_argument(
        "--ip",
        default="127.0.0.1",
        help="Host/IP for gradio `launch`.",
    )
    parser.add_argument("--port", type=int, default=7861, help="Port for gradio `launch`.")
    parser.add_argument("--share", action="store_true", help="Share the Gradio demo publicly.")
    return parser.parse_args()


def build_sampling_params_dict(seed: int, model_key: str) -> list[dict]:
    """Build sampling params as dict for HTTP API mode."""
    model_conf = SUPPORTED_MODELS.get(model_key)
    if model_conf is None:
        raise ValueError(f"Unsupported model '{model_key}'")

    sampling_templates: dict[str, dict[str, Any]] = model_conf["sampling_params"]
    sampling_params: list[dict] = []
    for stage_name, template in sampling_templates.items():
        params = dict(template)
        params["seed"] = seed
        sampling_params.append(params)
    return sampling_params


def image_to_base64_data_url(image: Image.Image) -> str:
    """Convert PIL Image to base64 data URL."""
    buffered = io.BytesIO()
    # Convert to RGB if needed
    if image.mode != "RGB":
        image = image.convert("RGB")
    image.save(buffered, format="JPEG")
    img_bytes = buffered.getvalue()
    img_b64 = base64.b64encode(img_bytes).decode("utf-8")
    return f"data:image/jpeg;base64,{img_b64}"


def audio_to_base64_data_url(audio_data: tuple[np.ndarray, int]) -> str:
    """Convert audio (numpy array, sample_rate) to base64 data URL."""
    audio_np, sample_rate = audio_data
    # Convert to int16 format for WAV
    if audio_np.dtype != np.int16:
        # Normalize to [-1, 1] range if needed
        if audio_np.dtype == np.float32 or audio_np.dtype == np.float64:
            audio_np = np.clip(audio_np, -1.0, 1.0)
            audio_np = (audio_np * 32767).astype(np.int16)
        else:
            audio_np = audio_np.astype(np.int16)

    # Write to WAV bytes
    buffered = io.BytesIO()
    sf.write(buffered, audio_np, sample_rate, format="WAV")
    wav_bytes = buffered.getvalue()
    wav_b64 = base64.b64encode(wav_bytes).decode("utf-8")
    return f"data:audio/wav;base64,{wav_b64}"


def video_to_base64_data_url(video_file: str) -> str:
    """Convert video file to base64 data URL."""
    video_path = Path(video_file)
    if not video_path.exists():
        raise FileNotFoundError(f"Video file not found: {video_file}")

    # Detect MIME type from extension
    video_path_lower = str(video_path).lower()
    if video_path_lower.endswith(".mp4"):
        mime_type = "video/mp4"
    elif video_path_lower.endswith(".webm"):
        mime_type = "video/webm"
    elif video_path_lower.endswith(".mov"):
        mime_type = "video/quicktime"
    elif video_path_lower.endswith(".avi"):
        mime_type = "video/x-msvideo"
    elif video_path_lower.endswith(".mkv"):
        mime_type = "video/x-matroska"
    else:
        mime_type = "video/mp4"

    with open(video_path, "rb") as f:
        video_bytes = f.read()
    video_b64 = base64.b64encode(video_bytes).decode("utf-8")
    return f"data:{mime_type};base64,{video_b64}"


def process_audio_file(
    audio_file: Any | None,
) -> tuple[np.ndarray, int] | None:
    """Normalize Gradio audio input to (np.ndarray, sample_rate)."""
    if audio_file is None:
        return None

    sample_rate: int | None = None
    audio_np: np.ndarray | None = None

    def _load_from_path(path_str: str) -> tuple[np.ndarray, int] | None:
        if not path_str:
            return None
        path = Path(path_str)
        if not path.exists():
            return None
        data, sr = sf.read(path)
        if data.ndim > 1:
            data = data[:, 0]
        return data.astype(np.float32), int(sr)

    if isinstance(audio_file, tuple):
        if len(audio_file) == 2:
            first, second = audio_file
            # Case 1: (sample_rate, np.ndarray)
            if isinstance(first, (int, float)) and isinstance(second, np.ndarray):
                sample_rate = int(first)
                audio_np = second
            # Case 2: (filepath, (sample_rate, np.ndarray or list))
            elif isinstance(first, str):
                if isinstance(second, tuple) and len(second) == 2:
                    sr_candidate, data_candidate = second
                    if isinstance(sr_candidate, (int, float)) and isinstance(data_candidate, np.ndarray):
                        sample_rate = int(sr_candidate)
                        audio_np = data_candidate
                if audio_np is None:
                    loaded = _load_from_path(first)
                    if loaded is not None:
                        audio_np, sample_rate = loaded
            # Case 3: (None, (sample_rate, np.ndarray))
            elif first is None and isinstance(second, tuple) and len(second) == 2:
                sr_candidate, data_candidate = second
                if isinstance(sr_candidate, (int, float)) and isinstance(data_candidate, np.ndarray):
                    sample_rate = int(sr_candidate)
                    audio_np = data_candidate
        elif len(audio_file) == 1 and isinstance(audio_file[0], str):
            loaded = _load_from_path(audio_file[0])
            if loaded is not None:
                audio_np, sample_rate = loaded
    elif isinstance(audio_file, str):
        loaded = _load_from_path(audio_file)
        if loaded is not None:
            audio_np, sample_rate = loaded

    if audio_np is None or sample_rate is None:
        return None

    if audio_np.ndim > 1:
        audio_np = audio_np[:, 0]

    return audio_np.astype(np.float32), sample_rate


def process_image_file(image_file: Image.Image | None) -> Image.Image | None:
    """Process image file from Gradio input.

    Returns:
        PIL Image in RGB mode or None if no image provided.
    """
    if image_file is None:
        return None
    # Convert to RGB if needed
    if image_file.mode != "RGB":
        image_file = image_file.convert("RGB")
    return image_file


def run_inference_api(
    client: OpenAI,
    model: str,
    sampling_params_dict: list[dict],
    user_prompt: str,
    audio_file: tuple[str, tuple[int, np.ndarray]] | None = None,
    image_file: Image.Image | None = None,
    video_file: str | None = None,
    use_audio_in_video: bool = False,
    output_modalities: str | None = None,
):
    """Run inference using OpenAI API client with multimodal support."""
    if not user_prompt.strip() and not audio_file and not image_file and not video_file:
        return "Please provide at least a text prompt or multimodal input.", None

    try:
        # Build message content list
        content_list = []

        # Process audio
        audio_data = process_audio_file(audio_file)
        if audio_data is not None:
            audio_url = audio_to_base64_data_url(audio_data)
            content_list.append(
                {
                    "type": "audio_url",
                    "audio_url": {"url": audio_url},
                }
            )

        # Process image
        if image_file is not None:
            image_data = process_image_file(image_file)
            if image_data is not None:
                image_url = image_to_base64_data_url(image_data)
                content_list.append(
                    {
                        "type": "image_url",
                        "image_url": {"url": image_url},
                    }
                )

        # Process video
        mm_processor_kwargs = {}
        if video_file is not None:
            video_url = video_to_base64_data_url(video_file)
            video_content = {
                "type": "video_url",
                "video_url": {"url": video_url},
            }
            if use_audio_in_video:
                video_content["video_url"]["num_frames"] = 32  # Default max frames
                mm_processor_kwargs["use_audio_in_video"] = True
            content_list.append(video_content)

        # Add text prompt
        if user_prompt.strip():
            content_list.append(
                {
                    "type": "text",
                    "text": user_prompt,
                }
            )

        # Build messages
        messages = [
            {
                "role": "system",
                "content": [
                    {
                        "type": "text",
                        "text": (
                            "You are Qwen, a virtual human developed by the Qwen Team, "
                            "Alibaba Group, capable of perceiving auditory and visual inputs, "
                            "as well as generating text and speech."
                        ),
                    }
                ],
            },
            {
                "role": "user",
                "content": content_list,
            },
        ]

        # Build extra_body
        extra_body = {
            "sampling_params_list": sampling_params_dict,
        }
        if mm_processor_kwargs:
            extra_body["mm_processor_kwargs"] = mm_processor_kwargs

        # Parse output modalities
        if output_modalities is not None:
            output_modalities_list = [m.strip() for m in output_modalities.split(",")]
        else:
            output_modalities_list = None

        # Call API
        chat_completion = client.chat.completions.create(
            messages=messages,
            model=model,
            modalities=output_modalities_list,
            extra_body=extra_body,
        )

        # Extract outputs
        text_outputs: list[str] = []
        audio_output = None

        for choice in chat_completion.choices:
            if choice.message.content:
                text_outputs.append(choice.message.content)
            if choice.message.audio:
                # Decode base64 audio
                audio_data = base64.b64decode(choice.message.audio.data)
                # Load audio from bytes
                audio_np, sample_rate = sf.read(io.BytesIO(audio_data))
                # Convert to mono if needed
                if audio_np.ndim > 1:
                    audio_np = audio_np[:, 0]
                audio_output = (int(sample_rate), audio_np.astype(np.float32))

        text_response = "\n\n".join(text_outputs) if text_outputs else "No text output."
        return text_response, audio_output
    except Exception as exc:  # pylint: disable=broad-except
        return f"Inference failed: {exc}", None


def build_interface(
    client: OpenAI,
    model: str,
    sampling_params_dict: list[dict],
):
    """Build Gradio interface for API server mode."""

    def run_inference(
        user_prompt: str,
        audio_file: tuple[str, tuple[int, np.ndarray]] | None,
        image_file: Image.Image | None,
        video_file: str | None,
        use_audio_in_video: bool,
        output_modalities: str | None = None,
    ):
        return run_inference_api(
            client,
            model,
            sampling_params_dict,
            user_prompt,
            audio_file,
            image_file,
            video_file,
            use_audio_in_video,
            output_modalities,
        )

    css = """
    .media-input-container {
        display: flex;
        gap: 10px;
    }
    .media-input-container > div {
        flex: 1;
    }
    .media-input-container .image-input,
    .media-input-container .audio-input {
        height: 300px;
    }
    .media-input-container .video-column {
        height: 300px;
        display: flex;
        flex-direction: column;
    }
    .media-input-container .video-input {
        flex: 1;
        min-height: 0;
    }
    #generate-btn button {
        width: 100%;
    }
    """

    with gr.Blocks(css=css) as demo:
        gr.Markdown("# vLLM-Omni Online Serving Demo")
        gr.Markdown(f"**Model:** {model} \n\n")

        with gr.Column():
            with gr.Row():
                input_box = gr.Textbox(
                    label="Text Prompt",
                    placeholder="For example: Describe what happens in the media inputs.",
                    lines=4,
                    scale=1,
                )
            with gr.Row(elem_classes="media-input-container"):
                image_input = gr.Image(
                    label="Image Input (optional)",
                    type="pil",
                    sources=["upload"],
                    scale=1,
                    elem_classes="image-input",
                )
                with gr.Column(scale=1, elem_classes="video-column"):
                    video_input = gr.Video(
                        label="Video Input (optional)",
                        sources=["upload"],
                        elem_classes="video-input",
                    )
                    use_audio_in_video_checkbox = gr.Checkbox(
                        label="Use audio from video",
                        value=False,
                        info="Extract the video's audio track when provided.",
                    )
                audio_input = gr.Audio(
                    label="Audio Input (optional)",
                    type="numpy",
                    sources=["upload", "microphone"],
                    scale=1,
                    elem_classes="audio-input",
                )

        with gr.Row():
            output_modalities = gr.Textbox(
                label="Output Modalities",
                placeholder="For example: text, image, video. Use comma to separate multiple modalities.",
                lines=1,
                scale=1,
            )

        with gr.Row():
            generate_btn = gr.Button(
                "Generate",
                variant="primary",
                size="lg",
                elem_id="generate-btn",
            )

        with gr.Row():
            text_output = gr.Textbox(label="Text Output", lines=10, scale=2)
            audio_output = gr.Audio(label="Audio Output", interactive=False, scale=1)

        generate_btn.click(
            fn=run_inference,
            inputs=[input_box, audio_input, image_input, video_input, use_audio_in_video_checkbox, output_modalities],
            outputs=[text_output, audio_output],
        )
        demo.queue()
    return demo


def main():
    args = parse_args()

    model_name = "/".join(args.model.split("/")[-2:])
    assert model_name in SUPPORTED_MODELS, (
        f"Unsupported model '{model_name}'. Supported models: {SUPPORTED_MODELS.keys()}"
    )

    # Initialize OpenAI client
    print(f"Connecting to API server at: {args.api_base}")
    client = OpenAI(
        api_key="EMPTY",
        base_url=args.api_base,
    )
    print("✓ Connected to API server")

    # Build sampling params
    sampling_params_dict = build_sampling_params_dict(SEED, model_name)

    demo = build_interface(
        client,
        args.model,
        sampling_params_dict,
    )
    try:
        demo.launch(
            server_name=args.ip,
            server_port=args.port,
            share=args.share,
        )
    except KeyboardInterrupt:
        print("\nShutting down...")


if __name__ == "__main__":
    main()
openai_chat_completion_client_for_multimodal_generation.py
import base64
import os
from typing import NamedTuple

import requests
from openai import OpenAI
from vllm.assets.audio import AudioAsset
from vllm.utils.argparse_utils import FlexibleArgumentParser

# Modify OpenAI's API key and API base to use vLLM's API server.
openai_api_key = "EMPTY"
openai_api_base = "https://:8091/v1"

client = OpenAI(
    # defaults to os.environ.get("OPENAI_API_KEY")
    api_key=openai_api_key,
    base_url=openai_api_base,
)

SEED = 42


class QueryResult(NamedTuple):
    inputs: dict
    limit_mm_per_prompt: dict[str, int]


def encode_base64_content_from_url(content_url: str) -> str:
    """Encode a content retrieved from a remote url to base64 format."""

    with requests.get(content_url) as response:
        response.raise_for_status()
        result = base64.b64encode(response.content).decode("utf-8")

    return result


def encode_base64_content_from_file(file_path: str) -> str:
    """Encode a local file to base64 format."""
    with open(file_path, "rb") as f:
        content = f.read()
        result = base64.b64encode(content).decode("utf-8")
    return result


def get_video_url_from_path(video_path: str | None) -> str:
    """Convert a video path (local file or URL) to a video URL format for the API.

    If video_path is None or empty, returns the default URL.
    If video_path is a local file path, encodes it to base64 data URL.
    If video_path is a URL, returns it as-is.
    """
    if not video_path:
        # Default video URL
        return "https://hugging-face.cn/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4"

    # Check if it's a URL (starts with http:// or https://)
    if video_path.startswith(("http://", "https://")):
        return video_path

    # Otherwise, treat it as a local file path
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video file not found: {video_path}")

    # Detect video MIME type from file extension
    video_path_lower = video_path.lower()
    if video_path_lower.endswith(".mp4"):
        mime_type = "video/mp4"
    elif video_path_lower.endswith(".webm"):
        mime_type = "video/webm"
    elif video_path_lower.endswith(".mov"):
        mime_type = "video/quicktime"
    elif video_path_lower.endswith(".avi"):
        mime_type = "video/x-msvideo"
    elif video_path_lower.endswith(".mkv"):
        mime_type = "video/x-matroska"
    else:
        # Default to mp4 if extension is unknown
        mime_type = "video/mp4"

    video_base64 = encode_base64_content_from_file(video_path)
    return f"data:{mime_type};base64,{video_base64}"


def get_image_url_from_path(image_path: str | None) -> str:
    """Convert an image path (local file or URL) to an image URL format for the API.

    If image_path is None or empty, returns the default URL.
    If image_path is a local file path, encodes it to base64 data URL.
    If image_path is a URL, returns it as-is.
    """
    if not image_path:
        # Default image URL
        return "https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/cherry_blossom.jpg"

    # Check if it's a URL (starts with http:// or https://)
    if image_path.startswith(("http://", "https://")):
        return image_path

    # Otherwise, treat it as a local file path
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image file not found: {image_path}")

    # Detect image MIME type from file extension
    image_path_lower = image_path.lower()
    if image_path_lower.endswith((".jpg", ".jpeg")):
        mime_type = "image/jpeg"
    elif image_path_lower.endswith(".png"):
        mime_type = "image/png"
    elif image_path_lower.endswith(".gif"):
        mime_type = "image/gif"
    elif image_path_lower.endswith(".webp"):
        mime_type = "image/webp"
    else:
        # Default to jpeg if extension is unknown
        mime_type = "image/jpeg"

    image_base64 = encode_base64_content_from_file(image_path)
    return f"data:{mime_type};base64,{image_base64}"


def get_audio_url_from_path(audio_path: str | None) -> str:
    """Convert an audio path (local file or URL) to an audio URL format for the API.

    If audio_path is None or empty, returns the default URL.
    If audio_path is a local file path, encodes it to base64 data URL.
    If audio_path is a URL, returns it as-is.
    """
    if not audio_path:
        # Default audio URL
        return AudioAsset("mary_had_lamb").url

    # Check if it's a URL (starts with http:// or https://)
    if audio_path.startswith(("http://", "https://")):
        return audio_path

    # Otherwise, treat it as a local file path
    if not os.path.exists(audio_path):
        raise FileNotFoundError(f"Audio file not found: {audio_path}")

    # Detect audio MIME type from file extension
    audio_path_lower = audio_path.lower()
    if audio_path_lower.endswith((".mp3", ".mpeg")):
        mime_type = "audio/mpeg"
    elif audio_path_lower.endswith(".wav"):
        mime_type = "audio/wav"
    elif audio_path_lower.endswith(".ogg"):
        mime_type = "audio/ogg"
    elif audio_path_lower.endswith(".flac"):
        mime_type = "audio/flac"
    elif audio_path_lower.endswith(".m4a"):
        mime_type = "audio/mp4"
    else:
        # Default to wav if extension is unknown
        mime_type = "audio/wav"

    audio_base64 = encode_base64_content_from_file(audio_path)
    return f"data:{mime_type};base64,{audio_base64}"


def get_system_prompt():
    return {
        "role": "system",
        "content": [
            {
                "type": "text",
                "text": (
                    "You are Qwen, a virtual human developed by the Qwen Team, "
                    "Alibaba Group, capable of perceiving auditory and visual inputs, "
                    "as well as generating text and speech."
                ),
            }
        ],
    }


def get_text_query(custom_prompt: str | None = None):
    question = (
        custom_prompt or "Explain the system architecture for a scalable audio generation pipeline. Answer in 15 words."
    )
    prompt = {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": f"{question}",
            }
        ],
    }
    return prompt


default_system = (
    "You are Qwen, a virtual human developed by the Qwen Team, Alibaba "
    "Group, capable of perceiving auditory and visual inputs, as well as "
    "generating text and speech."
)


def get_video_query(video_path: str | None = None, custom_prompt: str | None = None):
    question = custom_prompt or "Why is this video funny?"
    video_url = get_video_url_from_path(video_path)
    prompt = {
        "role": "user",
        "content": [
            {
                "type": "video_url",
                "video_url": {"url": video_url},
            },
            {
                "type": "text",
                "text": f"{question}",
            },
        ],
    }
    return prompt


def get_image_query(image_path: str | None = None, custom_prompt: str | None = None):
    question = custom_prompt or "What is the content of this image?"
    image_url = get_image_url_from_path(image_path)
    prompt = {
        "role": "user",
        "content": [
            {
                "type": "image_url",
                "image_url": {"url": image_url},
            },
            {
                "type": "text",
                "text": f"{question}",
            },
        ],
    }
    return prompt


def get_audio_query(audio_path: str | None = None, custom_prompt: str | None = None):
    question = custom_prompt or "What is the content of this audio?"
    audio_url = get_audio_url_from_path(audio_path)
    prompt = {
        "role": "user",
        "content": [
            {
                "type": "audio_url",
                "audio_url": {"url": audio_url},
            },
            {
                "type": "text",
                "text": f"{question}",
            },
        ],
    }
    return prompt


query_map = {
    "text": get_text_query,
    "use_audio": get_audio_query,
    "use_image": get_image_query,
    "use_video": get_video_query,
}


def run_multimodal_generation(args) -> None:
    model_name = args.model
    thinker_sampling_params = {
        "temperature": 0.4,  # Deterministic
        "top_p": 0.9,
        "top_k": 1,
        "max_tokens": 16384,
        "repetition_penalty": 1.05,
        "stop_token_ids": [151645],  # Qwen EOS token <|im_end|>
        "seed": SEED,
    }

    # Sampling parameters for Talker stage (codec generation)
    # Stop at codec EOS token
    talker_sampling_params = {
        "temperature": 0.9,
        "top_k": 50,
        "max_tokens": 4096,
        "seed": SEED,
        "detokenize": False,
        "repetition_penalty": 1.05,
        "stop_token_ids": [2150],  # TALKER_CODEC_EOS_TOKEN_ID
    }

    # # Sampling parameters for Code2Wav stage (audio generation)
    code2wav_sampling_params = {
        "temperature": 0.0,
        "top_p": 1.0,
        "top_k": -1,
        "max_tokens": 4096 * 16,
        "seed": SEED,
        "detokenize": True,
        "repetition_penalty": 1.1,
    }

    sampling_params_list = [
        thinker_sampling_params,
        talker_sampling_params,
        code2wav_sampling_params,
    ]

    # Get paths and custom prompt from args
    video_path = getattr(args, "video_path", None)
    image_path = getattr(args, "image_path", None)
    audio_path = getattr(args, "audio_path", None)
    custom_prompt = getattr(args, "prompt", None)

    # Get the query function and call it with appropriate parameters
    query_func = query_map[args.query_type]
    if args.query_type == "use_video":
        prompt = query_func(video_path=video_path, custom_prompt=custom_prompt)
    elif args.query_type == "use_image":
        prompt = query_func(image_path=image_path, custom_prompt=custom_prompt)
    elif args.query_type == "use_audio":
        prompt = query_func(audio_path=audio_path, custom_prompt=custom_prompt)
    elif args.query_type == "text":
        prompt = query_func(custom_prompt=custom_prompt)
    else:
        prompt = query_func()

    extra_body = {
        "sampling_params_list": sampling_params_list  # Optional, it has a default setting in stage_configs of the corresponding model.
    }

    if args.query_type == "use_audio_in_video":
        extra_body["mm_processor_kwargs"] = {"use_audio_in_video": True}

    if args.modalities is not None:
        output_modalities = args.modalities.split(",")
    else:
        output_modalities = None

    chat_completion = client.chat.completions.create(
        messages=[
            get_system_prompt(),
            prompt,
        ],
        model=model_name,
        modalities=output_modalities,
        extra_body=extra_body,
    )

    count = 0
    for choice in chat_completion.choices:
        if choice.message.audio:
            audio_data = base64.b64decode(choice.message.audio.data)
            audio_file_path = f"audio_{count}.wav"
            with open(audio_file_path, "wb") as f:
                f.write(audio_data)
            print(f"Audio saved to {audio_file_path}")
            count += 1
        elif choice.message.content:
            print("Chat completion output from text:", choice.message.content)


def parse_args():
    parser = FlexibleArgumentParser(description="Demo on using vLLM for offline inference with audio language models")
    parser.add_argument(
        "--query-type",
        "-q",
        type=str,
        default="use_video",
        choices=query_map.keys(),
        help="Query type.",
    )
    parser.add_argument(
        "--model",
        "-m",
        type=str,
        default="Qwen/Qwen3-Omni-30B-A3B-Instruct",
        help="Model Name / Path",
    )
    parser.add_argument(
        "--video-path",
        "-v",
        type=str,
        default=None,
        help="Path to local video file or URL. If not provided and query-type is 'use_video', uses default video URL.",
    )
    parser.add_argument(
        "--image-path",
        "-i",
        type=str,
        default=None,
        help="Path to local image file or URL. If not provided and query-type is 'use_image', uses default image URL.",
    )
    parser.add_argument(
        "--audio-path",
        "-a",
        type=str,
        default=None,
        help="Path to local audio file or URL. If not provided and query-type is 'use_audio', uses default audio URL.",
    )
    parser.add_argument(
        "--prompt",
        "-p",
        type=str,
        default=None,
        help="Custom text prompt/question to use instead of the default prompt for the selected query type.",
    )
    parser.add_argument(
        "--modalities",
        type=str,
        default=None,
        help="Output modalities to use for the prompts.",
    )

    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()
    run_multimodal_generation(args)
qwen3_omni_moe_thinking.yaml
# Stage config for running Qwen3-Omni-MoE-Thinking (text-only output)
# This config is for models like Qwen3-Omni-30B-A3B-Thinking that only have the
# thinker component and do not support audio output.
#
# Single stage: Thinker (multimodal understanding + text generation)

# The following config has been verified on 2x H100-80G GPUs.
stage_args:
  - stage_id: 0
    runtime:
      devices: "0,1"
      max_batch_size: 1
    engine_args:
      model_stage: thinker
      model_arch: Qwen3OmniMoeForConditionalGeneration
      worker_cls: vllm_omni.worker.gpu_ar_worker.GPUARWorker
      scheduler_cls: vllm_omni.core.sched.omni_ar_scheduler.OmniARScheduler
      gpu_memory_utilization: 0.9
      enforce_eager: true
      trust_remote_code: true
      engine_output_type: text
      distributed_executor_backend: "mp"
      enable_prefix_caching: false
      hf_config_name: thinker_config
      tensor_parallel_size: 2
    final_output: true
    final_output_type: text
    is_comprehension: true
    default_sampling_params:
      temperature: 0.4
      top_p: 0.9
      top_k: 1
      max_tokens: 2048
      seed: 42
      detokenize: True
      repetition_penalty: 1.05
run_curl_multimodal_generation.sh
#!/usr/bin/env bash
set -euo pipefail

# Default query type
QUERY_TYPE="${1:-use_video}"

# Default modalities argument
MODALITIES="${2:-null}"

# Validate query type
if [[ ! "$QUERY_TYPE" =~ ^(text|use_audio|use_image|use_video)$ ]]; then
    echo "Error: Invalid query type '$QUERY_TYPE'"
    echo "Usage: $0 [text|use_audio|use_image|use_video] [modalities]"
    echo "  text: Text query"
    echo "  use_audio: Audio + Text query"
    echo "  use_image: Image + Text query"
    echo "  use_video: Video + Text query"
    echo "  modalities: Modalities parameter (default: null)"
    exit 1
fi

SEED=42

thinker_sampling_params='{
  "temperature": 0.4,
  "top_p": 0.9,
  "top_k": 1,
  "max_tokens": 16384,
  "seed": 42,
  "repetition_penalty": 1.05,
  "stop_token_ids": [151645]
}'

talker_sampling_params='{
  "temperature": 0.9,
  "top_k": 50,
  "max_tokens": 4096,
  "seed": 42,
  "detokenize": false,
  "repetition_penalty": 1.05,
  "stop_token_ids": [2150]
}'

code2wav_sampling_params='{
  "temperature": 0.0,
  "top_p": 1.0,
  "top_k": -1,
  "max_tokens": 65536,
  "seed": 42,
  "detokenize": true,
  "repetition_penalty": 1.1
}'
# Above is optional, it has a default setting in stage_configs of the corresponding model.

# Define URLs for assets
MARY_HAD_LAMB_AUDIO_URL="https://vllm-public-assets.s3.us-west-2.amazonaws.com/multimodal_asset/mary_had_lamb.ogg"
CHERRY_BLOSSOM_IMAGE_URL="https://vllm-public-assets.s3.us-west-2.amazonaws.com/vision_model_images/cherry_blossom.jpg"
SAMPLE_VIDEO_URL="https://hugging-face.cn/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4"

# Build user content and extra fields based on query type
case "$QUERY_TYPE" in
  text)
    user_content='[
      {
        "type": "text",
        "text": "Explain the system architecture for a scalable audio generation pipeline. Answer in 15 words."
      }
    ]'
    sampling_params_list='[
      '"$thinker_sampling_params"',
      '"$talker_sampling_params"',
      '"$code2wav_sampling_params"'
    ]'
    mm_processor_kwargs="{}"
    ;;
  use_audio)
    user_content='[
        {
          "type": "audio_url",
          "audio_url": {
            "url": "'"$MARY_HAD_LAMB_AUDIO_URL"'"
          }
        },
        {
          "type": "text",
          "text": "What is the content of this audio?"
        }
      ]'
    sampling_params_list='[
      '"$thinker_sampling_params"',
      '"$talker_sampling_params"',
      '"$code2wav_sampling_params"'
    ]'
    mm_processor_kwargs="{}"
    ;;
  use_image)
    user_content='[
        {
          "type": "image_url",
          "image_url": {
            "url": "'"$CHERRY_BLOSSOM_IMAGE_URL"'"
          }
        },
        {
          "type": "text",
          "text": "What is the content of this image?"
        }
      ]'
    sampling_params_list='[
      '"$thinker_sampling_params"',
      '"$talker_sampling_params"',
      '"$code2wav_sampling_params"'
    ]'
    mm_processor_kwargs="{}"
    ;;
  use_video)
    user_content='[
        {
          "type": "video_url",
          "video_url": {
            "url": "'"$SAMPLE_VIDEO_URL"'"
          }
        },
        {
          "type": "text",
          "text": "Why is this video funny?"
        }
      ]'
    sampling_params_list='[
      '"$thinker_sampling_params"',
      '"$talker_sampling_params"',
      '"$code2wav_sampling_params"'
    ]'
    mm_processor_kwargs="{}"
    ;;
esac

echo "Running query type: $QUERY_TYPE"
echo ""


output=$(curl -sS -X POST https://:8091/v1/chat/completions \
    -H "Content-Type: application/json" \
    -d @- <<EOF
{
  "model": "Qwen/Qwen3-Omni-30B-A3B-Instruct",
  "sampling_params_list": $sampling_params_list,
  "mm_processor_kwargs": $mm_processor_kwargs,
  "modalities": $MODALITIES,
  "messages": [
    {
      "role": "system",
      "content": [
        {
          "type": "text",
          "text": "You are Qwen, a virtual human developed by the Qwen Team, Alibaba Group, capable of perceiving auditory and visual inputs, as well as generating text and speech."
        }
      ]
    },
    {
      "role": "user",
      "content": $user_content
    }
  ]
}
EOF
  )

# Here it only shows the text content of the first choice. Audio content has many binaries, so it's not displayed here.
echo "Output of request: $(echo "$output" | jq '.choices[0].message.content')"
run_gradio_demo.sh
#!/bin/bash
# Convenience script to launch both vLLM server and Gradio demo for Qwen3-Omni
#
# Usage:
#   ./run_gradio_demo.sh [OPTIONS]
#
# Example:
#   ./run_gradio_demo.sh --model Qwen/Qwen3-Omni-30B-A3B-Instruct --server-port 8091 --gradio-port 7861

set -e

# Default values
MODEL="Qwen/Qwen3-Omni-30B-A3B-Instruct"
SERVER_PORT=8091
GRADIO_PORT=7861
STAGE_CONFIGS_PATH=""
SERVER_HOST="0.0.0.0"
GRADIO_IP="127.0.0.1"
GRADIO_SHARE=false

# Parse command line arguments
while [[ $# -gt 0 ]]; do
    case $1 in
        --model)
            MODEL="$2"
            shift 2
            ;;
        --server-port)
            SERVER_PORT="$2"
            shift 2
            ;;
        --gradio-port)
            GRADIO_PORT="$2"
            shift 2
            ;;
        --stage-configs-path)
            STAGE_CONFIGS_PATH="$2"
            shift 2
            ;;
        --server-host)
            SERVER_HOST="$2"
            shift 2
            ;;
        --gradio-ip)
            GRADIO_IP="$2"
            shift 2
            ;;
        --share)
            GRADIO_SHARE=true
            shift
            ;;
        --help)
            echo "Usage: $0 [OPTIONS]"
            echo ""
            echo "Options:"
            echo "  --model MODEL                 Model name/path (default: Qwen/Qwen3-Omni-30B-A3B-Instruct)"
            echo "  --server-port PORT            Port for vLLM server (default: 8091)"
            echo "  --gradio-port PORT            Port for Gradio demo (default: 7861)"
            echo "  --stage-configs-path PATH     Path to custom stage configs YAML file (optional)"
            echo "  --server-host HOST            Host for vLLM server (default: 0.0.0.0)"
            echo "  --gradio-ip IP                IP for Gradio demo (default: 127.0.0.1)"
            echo "  --share                       Share Gradio demo publicly"
            echo "  --help                        Show this help message"
            echo ""
            exit 0
            ;;
        *)
            echo "Unknown option: $1"
            echo "Use --help for usage information"
            exit 1
            ;;
    esac
done

# Get the directory where this script is located
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
API_BASE="https://:${SERVER_PORT}/v1"
HEALTH_URL="https://:${SERVER_PORT}/health"

echo "=========================================="
echo "Starting vLLM-Omni Gradio Demo"
echo "=========================================="
echo "Model: $MODEL"
echo "Server: http://${SERVER_HOST}:${SERVER_PORT}"
echo "Gradio: http://${GRADIO_IP}:${GRADIO_PORT}"
echo "=========================================="

# Build vLLM server command
SERVER_CMD=("vllm" "serve" "$MODEL" "--omni" "--port" "$SERVER_PORT" "--host" "$SERVER_HOST")
if [ -n "$STAGE_CONFIGS_PATH" ]; then
    SERVER_CMD+=("--stage-configs-path" "$STAGE_CONFIGS_PATH")
fi

# Function to cleanup on exit
cleanup() {
    echo ""
    echo "Shutting down..."
    if [ -n "$SERVER_PID" ]; then
        echo "Stopping vLLM server (PID: $SERVER_PID)..."
        kill "$SERVER_PID" 2>/dev/null || true
        wait "$SERVER_PID" 2>/dev/null || true
    fi
    if [ -n "$GRADIO_PID" ]; then
        echo "Stopping Gradio demo (PID: $GRADIO_PID)..."
        kill "$GRADIO_PID" 2>/dev/null || true
        wait "$GRADIO_PID" 2>/dev/null || true
    fi
    echo "Cleanup complete"
    exit 0
}

# Set up signal handlers
trap cleanup SIGINT SIGTERM

# Start vLLM server with output shown in real-time and saved to log
echo ""
echo "Starting vLLM server..."
LOG_FILE="/tmp/vllm_server_${SERVER_PORT}.log"
"${SERVER_CMD[@]}" 2>&1 | tee "$LOG_FILE" &
SERVER_PID=$!

# Start a background process to monitor the log for startup completion
STARTUP_COMPLETE=false
TAIL_PID=""

# Function to cleanup tail process
cleanup_tail() {
    if [ -n "$TAIL_PID" ]; then
        kill "$TAIL_PID" 2>/dev/null || true
        wait "$TAIL_PID" 2>/dev/null || true
    fi
}

# Wait for server to be ready by checking log output
echo ""
echo "Waiting for vLLM server to be ready (checking for 'Application startup complete' message)..."
echo ""

# Monitor log file for startup completion message
MAX_WAIT=300  # 5 minutes timeout as fallback
ELAPSED=0

# Use a temporary file to track startup completion
STARTUP_FLAG="/tmp/vllm_startup_flag_${SERVER_PORT}.tmp"
rm -f "$STARTUP_FLAG"

# Start monitoring in background
(
    tail -f "$LOG_FILE" 2>/dev/null | grep -m 1 "Application startup complete" > /dev/null && touch "$STARTUP_FLAG"
) &
TAIL_PID=$!

while [ $ELAPSED -lt $MAX_WAIT ]; do
    # Check if startup flag file exists (startup complete)
    if [ -f "$STARTUP_FLAG" ]; then
        cleanup_tail
        echo ""
        echo "✓ vLLM server is ready!"
        STARTUP_COMPLETE=true
        break
    fi

    # Check if server process is still running
    if ! kill -0 "$SERVER_PID" 2>/dev/null; then
        cleanup_tail
        echo ""
        echo "Error: vLLM server failed to start (process terminated)"
        wait "$SERVER_PID" 2>/dev/null || true
        exit 1
    fi

    sleep 1
    ELAPSED=$((ELAPSED + 1))
done

cleanup_tail
rm -f "$STARTUP_FLAG"

if [ "$STARTUP_COMPLETE" != "true" ]; then
    echo ""
    echo "Error: vLLM server did not complete startup within ${MAX_WAIT} seconds"
    kill "$SERVER_PID" 2>/dev/null || true
    exit 1
fi

# Start Gradio demo
echo ""
echo "Starting Gradio demo..."
cd "$SCRIPT_DIR"
GRADIO_CMD=("python" "gradio_demo.py" "--model" "$MODEL" "--api-base" "$API_BASE" "--ip" "$GRADIO_IP" "--port" "$GRADIO_PORT")
if [ "$GRADIO_SHARE" = true ]; then
    GRADIO_CMD+=("--share")
fi

"${GRADIO_CMD[@]}" > /tmp/gradio_demo.log 2>&1 &
GRADIO_PID=$!

echo ""
echo "=========================================="
echo "Both services are running!"
echo "=========================================="
echo "vLLM Server: http://${SERVER_HOST}:${SERVER_PORT}"
echo "Gradio Demo: http://${GRADIO_IP}:${GRADIO_PORT}"
echo ""
echo "Press Ctrl+C to stop both services"
echo "=========================================="
echo ""

# Wait for either process to exit
wait $SERVER_PID $GRADIO_PID || true

cleanup