跳到内容

vllm_gaudi.ops.hpu_fused_moe

HPUUnquantizedFusedMoEMethod

Bases: UnquantizedFusedMoEMethod

不带量化的 MoE 方法。

源代码在 vllm_gaudi/ops/hpu_fused_moe.py
@UnquantizedFusedMoEMethod.register_oot
class HPUUnquantizedFusedMoEMethod(UnquantizedFusedMoEMethod):
    """MoE method without quantization."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        torch.hpu.synchronize()

    def process_weights_after_loading(self, layer: torch.nn.Module) -> None:
        super().process_weights_after_loading(layer)
        # custom handling for HPU
        num_experts = layer.local_num_experts
        ep_shift = layer.ep_rank * num_experts

        experts_min, experts_max = ep_shift, num_experts + ep_shift - 1
        layer.moe_op = VllmMixtureOfExpertsOp(
            num_experts,
            experts_min,
            experts_max,
        )

        for expert_id in range(layer.local_num_experts):
            layer.moe_op.w13_list[expert_id].set_weight(layer.w13_weight.data[expert_id])
            layer.moe_op.w2_list[expert_id].set_weight(layer.w2_weight.data[expert_id])

    def forward_oot(
        self,
        layer: torch.nn.Module,
        x: torch.Tensor,
        use_grouped_topk: bool,
        top_k: int,
        router_logits: torch.Tensor,
        renormalize: bool,
        topk_group: Optional[int] = None,
        num_expert_group: Optional[int] = None,
        global_num_experts: int = -1,
        expert_map: Optional[torch.Tensor] = None,
        custom_routing_function: Optional[Callable] = None,
        scoring_func: str = "softmax",
        e_score_correction_bias: Optional[torch.Tensor] = None,
        apply_router_weight_on_input: bool = False,
        activation: str = "silu",
        **kwargs,
    ):
        input_shape = x.shape
        x = x.view(-1, x.shape[-1])
        if use_grouped_topk or custom_routing_function is not None:
            topk_weights, topk_ids, zero_expert_result = FusedMoE.select_experts(
                hidden_states=x,
                router_logits=router_logits,
                use_grouped_topk=use_grouped_topk,
                top_k=top_k,
                renormalize=renormalize,
                topk_group=topk_group,
                num_expert_group=num_expert_group,
                custom_routing_function=custom_routing_function,
                scoring_func=scoring_func,
                e_score_correction_bias=e_score_correction_bias)
        else:
            import torch.nn.functional as F
            topk_weights = F.softmax(router_logits, dim=1, dtype=torch.float32)
            topk_weights, topk_ids = torch.topk(topk_weights, top_k, dim=-1)
            topk_weights /= topk_weights.sum(dim=-1, keepdim=True)
            topk_weights = topk_weights.to(x.dtype)
        topk_ids = topk_ids.view(*x.shape[:-1], -1)
        topk_weights = topk_weights.view(*x.shape[:-1], -1)

        return layer.moe_op(
            x,
            topk_ids.to(torch.int64),
            topk_weights.to(x.dtype),
            permuted_weights=True,
            activation=activation,
        ).view(*input_shape)

__init__

__init__(*args, **kwargs)
源代码在 vllm_gaudi/ops/hpu_fused_moe.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    torch.hpu.synchronize()

forward_oot

forward_oot(
    layer: Module,
    x: Tensor,
    use_grouped_topk: bool,
    top_k: int,
    router_logits: Tensor,
    renormalize: bool,
    topk_group: Optional[int] = None,
    num_expert_group: Optional[int] = None,
    global_num_experts: int = -1,
    expert_map: Optional[Tensor] = None,
    custom_routing_function: Optional[Callable] = None,
    scoring_func: str = "softmax",
    e_score_correction_bias: Optional[Tensor] = None,
    apply_router_weight_on_input: bool = False,
    activation: str = "silu",
    **kwargs,
)
源代码在 vllm_gaudi/ops/hpu_fused_moe.py
def forward_oot(
    self,
    layer: torch.nn.Module,
    x: torch.Tensor,
    use_grouped_topk: bool,
    top_k: int,
    router_logits: torch.Tensor,
    renormalize: bool,
    topk_group: Optional[int] = None,
    num_expert_group: Optional[int] = None,
    global_num_experts: int = -1,
    expert_map: Optional[torch.Tensor] = None,
    custom_routing_function: Optional[Callable] = None,
    scoring_func: str = "softmax",
    e_score_correction_bias: Optional[torch.Tensor] = None,
    apply_router_weight_on_input: bool = False,
    activation: str = "silu",
    **kwargs,
):
    input_shape = x.shape
    x = x.view(-1, x.shape[-1])
    if use_grouped_topk or custom_routing_function is not None:
        topk_weights, topk_ids, zero_expert_result = FusedMoE.select_experts(
            hidden_states=x,
            router_logits=router_logits,
            use_grouped_topk=use_grouped_topk,
            top_k=top_k,
            renormalize=renormalize,
            topk_group=topk_group,
            num_expert_group=num_expert_group,
            custom_routing_function=custom_routing_function,
            scoring_func=scoring_func,
            e_score_correction_bias=e_score_correction_bias)
    else:
        import torch.nn.functional as F
        topk_weights = F.softmax(router_logits, dim=1, dtype=torch.float32)
        topk_weights, topk_ids = torch.topk(topk_weights, top_k, dim=-1)
        topk_weights /= topk_weights.sum(dim=-1, keepdim=True)
        topk_weights = topk_weights.to(x.dtype)
    topk_ids = topk_ids.view(*x.shape[:-1], -1)
    topk_weights = topk_weights.view(*x.shape[:-1], -1)

    return layer.moe_op(
        x,
        topk_ids.to(torch.int64),
        topk_weights.to(x.dtype),
        permuted_weights=True,
        activation=activation,
    ).view(*input_shape)

process_weights_after_loading

process_weights_after_loading(layer: Module) -> None
源代码在 vllm_gaudi/ops/hpu_fused_moe.py
def process_weights_after_loading(self, layer: torch.nn.Module) -> None:
    super().process_weights_after_loading(layer)
    # custom handling for HPU
    num_experts = layer.local_num_experts
    ep_shift = layer.ep_rank * num_experts

    experts_min, experts_max = ep_shift, num_experts + ep_shift - 1
    layer.moe_op = VllmMixtureOfExpertsOp(
        num_experts,
        experts_min,
        experts_max,
    )

    for expert_id in range(layer.local_num_experts):
        layer.moe_op.w13_list[expert_id].set_weight(layer.w13_weight.data[expert_id])
        layer.moe_op.w2_list[expert_id].set_weight(layer.w2_weight.data[expert_id])

get_compressed_expert_map

get_compressed_expert_map(expert_map: Tensor) -> str

通过移除任何 -1 条目来压缩专家图。

此实现使用标准的 Python 循环,它与不支持动态形状(例如由 torch.where 等操作产生的)的图编译模式兼容。

参数

名称 类型 描述 默认值
expert_map Tensor

形状为 (global_num_experts,) 的张量,将全局专家索引映射到其局部索引。对于未分配给当前 rank 的专家,包含 -1。

required

返回

名称 类型 描述
str str

从局部到全局索引的字符串映射,

str

按全局索引排序。(例如,“0->5, 1->12, 2->23”)

源代码在 vllm_gaudi/ops/hpu_fused_moe.py
def get_compressed_expert_map(expert_map: torch.Tensor) -> str:
    """
    Compresses the expert map by removing any -1 entries.

    This implementation uses a standard Python loop, which is compatible with
    graph compilation modes that do not support dynamic shapes resulting from
    operations like `torch.where`.

    Args:
        expert_map (torch.Tensor): A tensor of shape (global_num_experts,)
            mapping a global expert index to its local index. Contains -1 for
            experts that are not assigned to the current rank.

    Returns:
        str: A string mapping from local to global index, 
        ordered by global index.
            (e.g., "0->5, 1->12, 2->23")
    """
    mappings = []
    # A standard loop over a tensor with a known shape is statically analyzable.
    # `enumerate` provides the global_index (the position in the tensor) and
    # `local_index_tensor` (the value at that position).
    for global_index, local_index_tensor in enumerate(expert_map):
        local_index = local_index_tensor.item()
        # We only build strings for valid experts (those not marked as -1).
        if local_index != -1:
            mappings.append(f"{local_index}->{global_index}")

    return ", ".join(mappings)

patched_fused_moe_forward

patched_fused_moe_forward(
    self, hidden_states: Tensor, router_logits: Tensor
) -> Union[Tensor, tuple[Tensor, Tensor]]

已打补丁的转发方法,绕过自定义 op 以避免重新编译问题。

源代码在 vllm_gaudi/ops/hpu_fused_moe.py
def patched_fused_moe_forward(
    self,
    hidden_states: torch.Tensor,
    router_logits: torch.Tensor,
) -> Union[torch.Tensor, tuple[torch.Tensor, torch.Tensor]]:
    """
    Patched forward method that bypasses the custom op to avoid recompilation issues.
    """
    og_hidden_states = hidden_states.shape[-1]
    if self.hidden_size != og_hidden_states:
        hidden_states = torch.nn.functional.pad(hidden_states, (0, self.hidden_size - og_hidden_states),
                                                mode='constant',
                                                value=0.0)

    use_direct_implementation = self.dp_size == 1
    if self.shared_experts is None:
        if use_direct_implementation:
            fused_output = self.forward_impl(hidden_states, router_logits)
            assert not isinstance(fused_output, tuple)

            if self.zero_expert_num is not None and self.zero_expert_num > 0:
                assert isinstance(fused_output, tuple)
                fused_output, zero_expert_result = fused_output
                return (reduce_output(self, fused_output) + zero_expert_result)[..., :og_hidden_states]
            else:
                return reduce_output(self, fused_output)[..., :og_hidden_states]

        else:
            fused_output = torch.ops.vllm.moe_forward(hidden_states, router_logits, self.layer_name)

        return fused_output[..., :og_hidden_states]
    else:
        if use_direct_implementation:
            shared_output, fused_output = self.forward_impl(hidden_states, router_logits)
            reduce_output(self, shared_output)[..., :og_hidden_states],
            reduce_output(self, fused_output)[..., :og_hidden_states],
        else:
            shared_output, fused_output = torch.ops.vllm.moe_forward_shared(hidden_states, router_logits,
                                                                            self.layer_name)
        return (shared_output[..., :og_hidden_states], fused_output[..., :og_hidden_states])

reduce_output

reduce_output(self, states: Tensor) -> Tensor
源代码在 vllm_gaudi/ops/hpu_fused_moe.py
def reduce_output(self, states: torch.Tensor) -> torch.Tensor:
    if (not self.is_sequence_parallel and not self.use_dp_chunking and self.reduce_results
            and (self.tp_size > 1 or self.ep_size > 1)):
        states = self.maybe_all_reduce_tensor_model_parallel(states)
    return states