ravi.naik
Fixed audio sampling issues
aa2482a
import soundfile as sf
import librosa
import torch
from transformers import (
AutoTokenizer,
CLIPImageProcessor,
WhisperProcessor,
WhisperForConditionalGeneration,
)
from .model import LlavaPhiForCausalLM
from .conversation import conv_templates, SeparatorStyle
IGNORE_INDEX = -100
IMAGE_TOKEN_INDEX = -200
DEFAULT_IMAGE_TOKEN = "<image>"
DEFAULT_IMAGE_PATCH_TOKEN = "<im_patch>"
DEFAULT_IM_START_TOKEN = "<im_start>"
DEFAULT_IM_END_TOKEN = "<im_end>"
class AudioLanguageConnector:
def __init__(self, projection_dim):
model_name = "microsoft/phi-2"
self.phi2_tokenizer = AutoTokenizer.from_pretrained(
model_name, trust_remote_code=True
)
self.phi2_tokenizer.pad_token = self.phi2_tokenizer.eos_token
self.phi2_tokenizer.max_length = projection_dim
def __call__(self, text):
text = f"<audio_start> {text} <audio_end>"
tokens = self.phi2_tokenizer(
text, return_tensors="pt", return_attention_mask=False
)
return tokens
class WhisperWithProjection:
def __init__(self, projection_dim, device):
self.device = device
self.processor = WhisperProcessor.from_pretrained(
"openai/whisper-tiny", device_map=device
)
self.model = WhisperForConditionalGeneration.from_pretrained(
"openai/whisper-tiny", device_map=device
)
self.model.config.forced_decoder_ids = None
# self.audio_language_connector = AudioLanguageConnector(projection_dim)
def __call__(self, audio):
array, sampling_rate = sf.read(audio)
resampled_array = librosa.resample(
array,
orig_sr=sampling_rate,
target_sr=16000,
)
input_features = self.processor(
resampled_array, sampling_rate=16000, return_tensors="pt"
).input_features
# generate token ids
predicted_ids = self.model.generate(input_features.to(self.device))
# decode token ids to text
transcription = self.processor.batch_decode(
predicted_ids, skip_special_tokens=True
)
# audio_embeddings = self.audio_language_connector(transcription)
return transcription
class MultiModalPhi2:
def __init__(
self,
modelname_or_path="RaviNaik/Llava-Phi2",
temperature=0.2,
max_new_tokens=1024,
device="cuda:0",
):
self.model_name = modelname_or_path
self.temperature = temperature
self.max_new_tokens = max_new_tokens
self.device = device
self.disable_torch_init()
self.whisper_w_proj = WhisperWithProjection(projection_dim=512, device=device)
self.load_pretrained_model()
def disable_torch_init(self):
"""
Disable the redundant torch default initialization to accelerate model creation.
"""
setattr(torch.nn.Linear, "reset_parameters", lambda self: None)
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None)
def load_pretrained_model(self):
self.model = LlavaPhiForCausalLM.from_pretrained(
self.model_name, device_map=self.device
)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.image_processor = CLIPImageProcessor.from_pretrained(self.model_name)
mm_use_im_start_end = getattr(self.model.config, "mm_use_im_start_end", False)
mm_use_im_patch_token = getattr(
self.model.config, "mm_use_im_patch_token", True
)
if mm_use_im_patch_token:
self.tokenizer.add_tokens([DEFAULT_IMAGE_PATCH_TOKEN], special_tokens=True)
if mm_use_im_start_end:
self.tokenizer.add_tokens(
[DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN], special_tokens=True
)
def tokenizer_image_token(
self,
prompt,
tokenizer,
image_token_index=IMAGE_TOKEN_INDEX,
return_tensors=None,
):
prompt_chunks = [
tokenizer(chunk).input_ids for chunk in prompt.split("<image>")
]
def insert_separator(X, sep):
return [ele for sublist in zip(X, [sep] * len(X)) for ele in sublist][:-1]
input_ids = []
offset = 0
if (
len(prompt_chunks) > 0
and len(prompt_chunks[0]) > 0
and prompt_chunks[0][0] == tokenizer.bos_token_id
):
offset = 1
input_ids.append(prompt_chunks[0][0])
for x in insert_separator(prompt_chunks, [image_token_index] * (offset + 1)):
input_ids.extend(x[offset:])
if return_tensors is not None:
if return_tensors == "pt":
return torch.tensor(input_ids, dtype=torch.long)
raise ValueError(f"Unsupported tensor type: {return_tensors}")
return input_ids
def __call__(self, text, audio, image):
if text is None:
text = ""
if image is not None:
qs = (
DEFAULT_IM_START_TOKEN
+ DEFAULT_IMAGE_TOKEN
+ DEFAULT_IM_END_TOKEN
+ "\n"
+ text
)
conv = conv_templates["phi-2_v0"].copy()
conv.append_message(conv.roles[0], qs)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
input_ids = self.tokenizer_image_token(
prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors="pt"
).unsqueeze(0)
image_tensor = self.image_processor.preprocess(image, return_tensors="pt")[
"pixel_values"
].to(self.device)
else:
qs = text
conv = conv_templates["phi-2_v0"].copy()
conv.append_message(conv.roles[0], qs)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
input_ids = self.tokenizer(prompt, return_tensors="pt")["input_ids"]
image_tensor = None
if audio is not None:
audio_transcript = self.whisper_w_proj(audio)
audio_embed = self.tokenizer(audio_transcript, return_tensors="pt")[
"input_ids"
]
input_ids = torch.concat([input_ids, audio_embed], dim=1)
input_ids = input_ids.to(self.device)
stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2
with torch.inference_mode():
if image is not None:
output_ids = self.model.generate(
input_ids,
images=image_tensor,
do_sample=True,
temperature=self.temperature,
max_new_tokens=self.max_new_tokens,
eos_token_id=self.tokenizer.eos_token_id, # End of sequence token
pad_token_id=self.tokenizer.eos_token_id, # Pad token
use_cache=True,
)
else:
output_ids = self.model.generate(
input_ids,
do_sample=True,
temperature=self.temperature,
max_new_tokens=self.max_new_tokens,
eos_token_id=self.tokenizer.eos_token_id, # End of sequence token
pad_token_id=self.tokenizer.eos_token_id, # Pad token
use_cache=True,
)
input_token_len = input_ids.shape[1]
n_diff_input_output = (
(input_ids != output_ids[:, :input_token_len]).sum().item()
)
if n_diff_input_output > 0:
print(
f"[Warning] {n_diff_input_output} output_ids are not the same as the input_ids"
)
outputs = self.tokenizer.batch_decode(
output_ids[:, input_token_len:], skip_special_tokens=True
)[0]
outputs = outputs.strip()
if outputs.endswith(stop_str):
outputs = outputs[: -len(stop_str)]
outputs = outputs.strip()
return outputs