|
import math |
|
from typing import Optional, Tuple, Union |
|
import torch |
|
from torch import nn |
|
from transformers.modeling_outputs import BaseModelOutput |
|
from transformers import Wav2Vec2BertModel, Wav2Vec2BertConfig, MllamaPreTrainedModel |
|
from transformers.models.wav2vec2_bert.modeling_wav2vec2_bert import Wav2Vec2BertAdapterLayer, Wav2Vec2BertSelfAttention, Wav2Vec2BertFeatureProjection |
|
from .configuration_llama3 import Llama3Config |
|
|
|
class AudioAdapter(nn.Module): |
|
def __init__(self, config: Wav2Vec2BertConfig): |
|
super().__init__() |
|
|
|
if config.output_hidden_size != config.hidden_size: |
|
self.proj = nn.Linear(config.hidden_size, config.output_hidden_size) |
|
else: |
|
self.proj = None |
|
self.layers = nn.ModuleList(Wav2Vec2BertAdapterLayer(config) for _ in range(config.num_adapter_layers)) |
|
|
|
self.kernel_size = config.adapter_kernel_size |
|
self.stride = config.adapter_stride |
|
|
|
def _compute_sub_sample_lengths_from_attention_mask(self, seq_lens): |
|
if seq_lens is None: |
|
return seq_lens |
|
pad = self.stride // 2 |
|
seq_lens = ((seq_lens + 2 * pad - self.kernel_size) / self.stride) + 1 |
|
return seq_lens.floor() |
|
|
|
def forward(self, hidden_states, attention_mask=None): |
|
|
|
if self.proj is not None: |
|
hidden_states = self.proj(hidden_states) |
|
|
|
sub_sampled_lengths = None |
|
if attention_mask is not None: |
|
sub_sampled_lengths = (attention_mask.size(1) - (1 - attention_mask.int()).sum(1)).to(hidden_states.device) |
|
|
|
for layer in self.layers: |
|
sub_sampled_lengths = self._compute_sub_sample_lengths_from_attention_mask(sub_sampled_lengths) |
|
hidden_states = layer( |
|
hidden_states, attention_mask=attention_mask, sub_sampled_lengths=sub_sampled_lengths |
|
) |
|
|
|
return hidden_states |
|
|
|
|
|
class Llama3Embedding(MllamaPreTrainedModel): |
|
config_class = Llama3Config |
|
base_model_prefix = "audio_model" |
|
def __init__(self, config: Llama3Config): |
|
super().__init__(config) |
|
assert config.audio_config.output_hidden_size == config.text_config.hidden_size |
|
config.audio_config.add_adapter = False |
|
self.audio_encoder = Wav2Vec2BertModel(config.audio_config) |
|
self.audio_adapter = AudioAdapter(config.audio_config) |
|
self.start_of_audio = nn.Parameter(data=torch.empty((1, config.audio_config.output_hidden_size)), requires_grad=True) |
|
self.end_of_audio = nn.Parameter(data=torch.empty((1, config.audio_config.output_hidden_size)), requires_grad=True) |
|
|
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
input_embeddings: torch.Tensor = None, |
|
audio_features: Optional[torch.Tensor] = None, |
|
) -> Union[BaseModelOutput, Tuple[torch.Tensor, ...]]: |
|
if audio_features is None: |
|
return input_embeddings |
|
bs, max_num_img, l, d = audio_features.shape |
|
audio_embeddings = self.audio_encoder(input_features=audio_features.view((bs*max_num_img, l, d)))['last_hidden_state'] |
|
audio_embeddings = self.audio_adapter(audio_embeddings) |
|
audio_embeddings = audio_embeddings.view((bs, max_num_img, -1, self.start_of_audio.shape[-1])) |
|
|
|
for i in range(bs): |
|
for j in range(max_num_img): |
|
audio_id = -1 - j |
|
if torch.any(input_ids[i] == audio_id): |
|
positions = torch.nonzero(input_ids[i] == audio_id, as_tuple=True) |
|
input_embeddings[i] = input_embeddings[i].index_put(positions, torch.concat([self.start_of_audio, audio_embeddings[i, j, :, :], self.end_of_audio]), accumulate=False) |
|
return input_embeddings |
|
|
|
def _init_weights(self, module): |
|
"""Initialize the weights""" |
|
if isinstance(module, Wav2Vec2BertSelfAttention): |
|
if hasattr(module, "pos_bias_u"): |
|
nn.init.xavier_uniform_(module.pos_bias_u) |
|
if hasattr(module, "pos_bias_v"): |
|
nn.init.xavier_uniform_(module.pos_bias_v) |
|
elif isinstance(module, Wav2Vec2BertFeatureProjection): |
|
k = math.sqrt(1 / module.projection.in_features) |
|
nn.init.uniform_(module.projection.weight, a=-k, b=k) |
|
nn.init.uniform_(module.projection.bias, a=-k, b=k) |
|
elif isinstance(module, nn.Linear): |
|
module.weight.data.normal_(mean=0.0, std=self.config.audio_config.initializer_range) |
|
|
|
if module.bias is not None: |
|
module.bias.data.zero_() |
|
elif isinstance(module, (nn.LayerNorm, nn.GroupNorm)): |
|
module.bias.data.zero_() |
|
module.weight.data.fill_(1.0) |
|
elif isinstance(module, nn.Conv1d): |
|
nn.init.kaiming_normal_(module.weight) |
|
|
|
if module.bias is not None: |
|
k = math.sqrt(module.groups / (module.in_channels * module.kernel_size[0])) |
|
nn.init.uniform_(module.bias, a=-k, b=k) |