| | import re |
| | import torch |
| | import numpy as np |
| | from transformers import MimiModel, GenerationConfig |
| | from transformers import Pipeline, LogitsProcessor |
| |
|
| | class AlternatingCodebooksLogitsProcessor(LogitsProcessor): |
| | def __init__(self, input_start_len: int, codebook_size: int, num_codebooks: int, offset: int, stop_token: int): |
| | self.input_start_len = input_start_len |
| | self.codebook_size = codebook_size |
| | self.num_codebooks = num_codebooks |
| | self.offset = offset |
| | self.stop_token = stop_token |
| | |
| | def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: |
| | curr_len = input_ids.shape[-1] |
| | codebook_idx = ((curr_len - self.input_start_len) % self.num_codebooks) |
| | |
| | scores_processed = scores.clone() |
| | scores_processed[:, : self.offset + codebook_idx * self.codebook_size] = -float("inf") |
| | scores_processed[:, self.offset + (codebook_idx+1) * self.codebook_size :] = -float("inf") |
| | scores_processed[:, self.stop_token] = scores[:, self.stop_token] |
| |
|
| | return scores_processed |
| |
|
| | class IndriTTSPipeline(Pipeline): |
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| |
|
| | self.audio_tokenizer = MimiModel.from_pretrained('kyutai/mimi').to(device=self.device) |
| |
|
| | |
| | self.convert_token = self.tokenizer.encode('[convert]') |
| | self.stop_token = self.tokenizer.encode('[stop]') |
| | self.text_modality_token = self.tokenizer.encode('[text]') |
| | self.acoustic_modality_token = self.tokenizer.encode('[mimi]') |
| | self.num_codebooks = 8 |
| | self.audio_offset = 50257 |
| |
|
| | self.model.stop_token = self.stop_token |
| |
|
| | self.model.generation_config = GenerationConfig( |
| | eos_token_id=self.stop_token, |
| | max_length=kwargs.get('max_length', 1024), |
| | temperature=kwargs.get('temperature', 0.5), |
| | top_k=kwargs.get('top_k', 15), |
| | do_sample=kwargs.get('do_sample', True) |
| | ) |
| |
|
| | def _sanitize_parameters(self, **kwargs): |
| | speaker = kwargs.get('speaker', '[spkr_unk]') |
| |
|
| | preprocess_kwargs = { |
| | 'speaker': speaker |
| | } |
| |
|
| | return preprocess_kwargs, {}, {} |
| |
|
| | def _prepare_tts_tokens(self, text_tokens, speaker): |
| | input_tokens = np.hstack([ |
| | self.text_modality_token, |
| | text_tokens, |
| | self.convert_token, |
| | self.acoustic_modality_token, |
| | self.tokenizer.encode(speaker) |
| | ]) |
| |
|
| | return input_tokens.tolist() |
| |
|
| | def _sanitize_text(self, text): |
| | text = text.lower() |
| | text = re.sub(r'\n+', ' ', text) |
| | text = re.sub(r'[ \t]+', ' ', text) |
| |
|
| | text = re.sub(r'([,\.?])+', r'\1', text) |
| |
|
| | return text.strip() |
| |
|
| | def _deserialize_tokens(self, tokens, num_codebooks): |
| | cb = [tokens[i::num_codebooks] for i in range(num_codebooks)] |
| | min_shape = min([c.shape for c in cb])[0] |
| | acoustic_tokens = torch.vstack([c[:min_shape] - 2048*i for i, c in enumerate(cb)]) |
| |
|
| | return acoustic_tokens |
| |
|
| | |
| | def _prepare_mimi_batch(self, tokens, attention_mask): |
| | max_len = max(token.size(1) for token in tokens) |
| |
|
| | padded_tokens = [] |
| | padded_masks = [] |
| |
|
| | for token, mask in zip(tokens, attention_masks): |
| | pad_len = max_len - token.size(1) |
| |
|
| | padded_token = F.pad(token, (0, pad_len, 0, 0), value=0) |
| | padded_mask = F.pad(mask, (0, pad_len, 0, 0), value=0) |
| |
|
| | padded_tokens.append(padded_token) |
| | padded_masks.append(padded_mask) |
| |
|
| | stacked_tokens = torch.stack(padded_tokens, dim=0) |
| | stacked_masks = torch.stack(padded_masks, dim=0) |
| |
|
| | return stacked_tokens, stacked_masks |
| |
|
| | def preprocess(self, inputs, speaker): |
| | input_text = self._sanitize_text(inputs) |
| | input_tokens = self.tokenizer.encode(input_text) |
| | task_tokens = self._prepare_tts_tokens(input_tokens, speaker) |
| | task_tokens = torch.tensor(task_tokens).unsqueeze(0) |
| |
|
| | return {'input_ids': task_tokens, 'attention_mask': torch.ones_like(task_tokens)} |
| |
|
| | def _forward(self, model_inputs, **forward_args): |
| |
|
| | logits_processor=[ |
| | AlternatingCodebooksLogitsProcessor( |
| | input_start_len=model_inputs['input_ids'].shape[-1], |
| | codebook_size=2048, |
| | num_codebooks=self.num_codebooks, |
| | offset=self.audio_offset, |
| | stop_token=self.stop_token |
| | ) |
| | ] |
| |
|
| | outputs = self.model.generate( |
| | model_inputs['input_ids'], |
| | logits_processor=logits_processor |
| | ) |
| |
|
| | audio_tokens, attention_mask = [], [] |
| |
|
| | for idx, inputs in enumerate(model_inputs['input_ids']): |
| | truncated = outputs[idx, inputs.shape[-1]:] |
| | end = torch.where(truncated == self.stop_token[0])[-1] |
| | |
| | if end.shape[-1] > 0: |
| | end = end[0] |
| | else: |
| | end = truncated.shape[-1] |
| | |
| | truncated = truncated[:end] |
| | truncated -= self.audio_offset |
| | truncated = self._deserialize_tokens(torch.tensor(truncated), self.num_codebooks) |
| | audio_tokens.append(truncated) |
| | attention_mask.append(torch.ones_like(truncated)) |
| |
|
| | audio_tokens = torch.vstack(audio_tokens).unsqueeze(0) |
| | attention_mask = torch.vstack(attention_mask).unsqueeze(0) |
| |
|
| | audio = self.audio_tokenizer.decode(audio_tokens).audio_values |
| |
|
| | return { |
| | 'audio_tokens': audio_tokens, |
| | 'audio': audio |
| | } |
| |
|
| | def postprocess(self, model_outputs): |
| | return model_outputs |
| |
|