| | |
| | |
| |
|
| | from collections.abc import Iterable |
| | from typing import TYPE_CHECKING, Any, Optional, TypeVar |
| |
|
| | import torch |
| | import torch.nn as nn |
| |
|
| | from .interfaces_base import VllmModelForPooling, is_pooling_model |
| |
|
| | if TYPE_CHECKING: |
| | from vllm.model_executor.layers.pooler import PoolingType |
| |
|
| | _T = TypeVar("_T", bound=type[nn.Module]) |
| |
|
| | _GENERATE_SUFFIXES = [ |
| | "ForCausalLM", |
| | "ForConditionalGeneration", |
| | "ChatModel", |
| | "LMHeadModel", |
| | ] |
| |
|
| |
|
| | def _get_pooling_model_name(orig_model_name: str, pooling_suffix: str) -> str: |
| | model_name = orig_model_name |
| |
|
| | for generate_suffix in _GENERATE_SUFFIXES: |
| | model_name = model_name.removesuffix(generate_suffix) |
| |
|
| | return model_name + pooling_suffix |
| |
|
| |
|
| | def _create_pooling_model_cls( |
| | orig_cls: _T, |
| | *, |
| | default_pooling_type: "PoolingType", |
| | default_normalize: bool, |
| | default_softmax: bool, |
| | ) -> _T: |
| | |
| | from vllm.config import VllmConfig |
| | from vllm.model_executor.layers.pooler import Pooler, PoolerOutput |
| | from vllm.model_executor.pooling_metadata import PoolingMetadata |
| |
|
| | from .utils import AutoWeightsLoader, WeightsMapper |
| |
|
| | class ModelForPooling(orig_cls, VllmModelForPooling): |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | vllm_config: "VllmConfig", |
| | prefix: str = "", |
| | **kwargs: Any, |
| | ) -> None: |
| | super().__init__(vllm_config=vllm_config, prefix=prefix, **kwargs) |
| |
|
| | |
| | for attr in ("lm_head", "logits_processor"): |
| | if hasattr(self, attr): |
| | delattr(self, attr) |
| |
|
| | pooler_config = vllm_config.model_config.pooler_config |
| | assert pooler_config is not None |
| |
|
| | |
| | if not getattr(self, "_pooler", None): |
| | self._pooler = Pooler.from_config_with_defaults( |
| | pooler_config, |
| | pooling_type=default_pooling_type, |
| | normalize=default_normalize, |
| | softmax=default_softmax, |
| | ) |
| |
|
| | def pooler( |
| | self, |
| | hidden_states: torch.Tensor, |
| | pooling_metadata: PoolingMetadata, |
| | ) -> PoolerOutput: |
| | return self._pooler(hidden_states, pooling_metadata) |
| |
|
| | def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]): |
| | |
| |
|
| | |
| | weights = ((name, data) for name, data in weights |
| | if not name.startswith("lm_head.")) |
| |
|
| | |
| | |
| | |
| | if hasattr(self, "model") and hasattr(self.model, "load_weights"): |
| | |
| | model_is_only_param = all( |
| | name == "model" or next(child.parameters(), None) is None |
| | for name, child in self.named_children()) |
| |
|
| | if model_is_only_param: |
| | mapper = WeightsMapper(orig_to_new_prefix={"model.": ""}) |
| | weights = mapper.apply(weights) |
| |
|
| | loaded_params = self.model.load_weights(weights) |
| | loaded_params = {f"model.{name}" for name in loaded_params} |
| | return loaded_params |
| |
|
| | |
| | if hasattr(orig_cls, "load_weights"): |
| | return orig_cls.load_weights(self, weights) |
| | |
| | else: |
| | loader = AutoWeightsLoader(self) |
| | return loader.load_weights(weights) |
| |
|
| | return ModelForPooling |
| |
|
| |
|
| | def as_embedding_model(cls: _T) -> _T: |
| | """ |
| | Subclass an existing vLLM model to support embeddings. |
| | |
| | By default, the embeddings of the whole prompt are extracted from the |
| | normalized hidden state corresponding to the last token. |
| | |
| | Note: |
| | We assume that no extra layers are added to the original model; |
| | please implement your own model if this is not the case. |
| | """ |
| | |
| | if is_pooling_model(cls): |
| | return cls |
| |
|
| | |
| | from vllm.model_executor.layers.pooler import PoolingType |
| |
|
| | ModelForEmbedding = _create_pooling_model_cls( |
| | cls, |
| | default_pooling_type=PoolingType.LAST, |
| | default_normalize=True, |
| | default_softmax=False, |
| | ) |
| | ModelForEmbedding.__name__ = \ |
| | _get_pooling_model_name(cls.__name__, "ForEmbedding") |
| |
|
| | return ModelForEmbedding |
| |
|
| |
|
| | def as_classification_model(cls: _T) -> _T: |
| | """ |
| | Subclass an existing vLLM model to support classification. |
| | |
| | By default, the class probabilities are extracted from the softmaxed |
| | hidden state corresponding to the last token. |
| | |
| | Note: |
| | We assume that the classification head is a single linear layer |
| | stored as the attribute `score` of the top-level model; |
| | please implement your own model if this is not the case. |
| | """ |
| | |
| | if is_pooling_model(cls): |
| | return cls |
| |
|
| | |
| | from vllm.config import VllmConfig |
| | from vllm.model_executor.layers.linear import RowParallelLinear |
| | from vllm.model_executor.layers.pooler import PoolingType |
| | from vllm.sequence import IntermediateTensors |
| |
|
| | from .utils import maybe_prefix |
| |
|
| | ModelForPooling = _create_pooling_model_cls( |
| | cls, |
| | default_pooling_type=PoolingType.LAST, |
| | default_normalize=False, |
| | default_softmax=True, |
| | ) |
| |
|
| | class ModelForClassification(ModelForPooling): |
| |
|
| | def __init__( |
| | self, |
| | *, |
| | vllm_config: "VllmConfig", |
| | prefix: str = "", |
| | **kwargs: Any, |
| | ) -> None: |
| | super().__init__(vllm_config=vllm_config, prefix=prefix, **kwargs) |
| |
|
| | config = vllm_config.model_config.hf_config |
| | quant_config = vllm_config.quant_config |
| |
|
| | self.score = RowParallelLinear(config.hidden_size, |
| | config.num_labels, |
| | quant_config=quant_config, |
| | input_is_parallel=False, |
| | bias=False, |
| | prefix=maybe_prefix( |
| | prefix, "score")) |
| |
|
| | def forward( |
| | self, |
| | input_ids: torch.Tensor, |
| | positions: torch.Tensor, |
| | intermediate_tensors: Optional[IntermediateTensors] = None, |
| | inputs_embeds: Optional[torch.Tensor] = None, |
| | ) -> torch.Tensor: |
| | hidden_states = super().forward(input_ids, positions, |
| | intermediate_tensors, |
| | inputs_embeds) |
| | logits, _ = self.score(hidden_states) |
| | return logits |
| |
|
| |
|
| | ModelForClassification.__name__ = \ |
| | _get_pooling_model_name(cls.__name__, "ForClassification") |
| |
|
| | return ModelForClassification |
| |
|
| |
|
| | def as_reward_model(cls: _T) -> _T: |
| | """ |
| | Subclass an existing vLLM model to support reward modeling. |
| | |
| | By default, we return the hidden states of each token directly. |
| | |
| | Note: |
| | We assume that no extra layers are added to the original model; |
| | please implement your own model if this is not the case. |
| | """ |
| | |
| | if is_pooling_model(cls): |
| | return cls |
| |
|
| | |
| | from vllm.model_executor.layers.pooler import PoolingType |
| |
|
| | ModelForReward = _create_pooling_model_cls( |
| | cls, |
| | default_pooling_type=PoolingType.ALL, |
| | default_normalize=False, |
| | default_softmax=False, |
| | ) |
| |
|
| | ModelForReward.__name__ = \ |
| | _get_pooling_model_name(cls.__name__, "ForReward") |
| |
|
| | return ModelForReward |
| |
|