import re import os import nltk import torch import pickle import torchaudio import numpy as np import gradio as gr from google.cloud import storage from TTS.tts.models.xtts import Xtts from nltk.tokenize import sent_tokenize from huggingface_hub import hf_hub_download from TTS.tts.configs.xtts_config import XttsConfig def _download_starting_files() -> None: """ Downloads the embeddings from a bucket """ os.makedirs('assets', exist_ok=True) # Download credentials file hf_hub_download( repo_id=os.environ.get('DATA'), repo_type='dataset', filename="credentials.json", token=os.environ.get('HUB_TOKEN'), local_dir="assets" ) # Initialise a client credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') storage_client = storage.Client.from_service_account_json(credentials) bucket = storage_client.get_bucket('embeddings-bella') # Get both embeddings blob = bucket.blob("gpt_cond_latent.npy") blob.download_to_filename('assets/gpt_cond_latent.npy') blob = bucket.blob("speaker_embedding.npy") blob.download_to_filename('assets/speaker_embedding.npy') def _load_array(filename): """ Opens a file a returns it, used with numpy files """ with open(filename, 'rb') as f: return pickle.load(f) # Get embeddings _download_starting_files() os.environ['COQUI_TOS_AGREED'] = '1' # Used to generate audio based on a sample nltk.download('punkt') model_path = os.path.join("tts_model") config = XttsConfig() config.load_json(os.path.join(model_path, "config.json")) model = Xtts.init_from_config(config) model.load_checkpoint( config, checkpoint_path=os.path.join(model_path, "model.pth"), vocab_path=os.path.join(model_path, "vocab.json"), eval=True, use_deepspeed=True, ) device = 'cuda' if torch.cuda.is_available() else 'cpu' model.to(device) # Speaker latent path_latents = 'assets/gpt_cond_latent.npy' gpt_cond_latent = _load_array(path_latents) # Speaker embedding path_embedding = 'assets/speaker_embedding.npy' speaker_embedding = _load_array(path_embedding) def get_audio(text: str, language: str = 'es') -> gr.Audio: """ Returns a link from a bucket in GCP that contains the generated audio given a text and language and the name of such audio :param text: used to generate the audio :param language: 'es', 'en' or 'pt' :return link_audio and name_audio """ # Creates an audio with the answer and saves it as output.wav _save_audio(text, language) return gr.Audio(value='output.wav', interactive=False, visible=True) def _save_audio(answer: str, language: str) -> None: """ Splits the answer into sentences, clean and creates an audio for each one, then concatenates all the audios and saves them into a file (output.wav) """ # Split the answer into sentences and clean it sentences = _get_clean_answer(answer, language) # Get the voice of each sentence audio_segments = [] for sentence in sentences: audio_stream = _get_voice(sentence, language) audio_stream = torch.tensor(audio_stream) audio_segments.append(audio_stream) # Concatenate and save all audio segments concatenated_audio = torch.cat(audio_segments, dim=0) torchaudio.save('output.wav', concatenated_audio.unsqueeze(0), 24000) def _get_voice(sentence: str, language: str) -> np.ndarray: """ Returns a numpy array with a wav of an audio with the given sentence and language """ out = model.inference( sentence, language=language, gpt_cond_latent=gpt_cond_latent, speaker_embedding=speaker_embedding, temperature=0.1 ) return out['wav'] def _get_clean_answer(answer: str, language: str) -> list[str]: """ Returns a list of sentences of the answer. It also removes links """ # Remove the links in the audio and add another sentence if language == 'en': clean_answer = re.sub(r'http[s]?://\S+', 'the following link', answer) max_characters = 250 elif language == 'es': clean_answer = re.sub(r'http[s]?://\S+', 'el siguiente link', answer) max_characters = 239 else: clean_answer = re.sub(r'http[s]?://\S+', 'o seguinte link', answer) max_characters = 203 # Change the name from Bella to Bela clean_answer = clean_answer.replace('Bella', 'Bela') # Remove Florida and zipcode clean_answer = re.sub(r', FL \d+', "", clean_answer) # Split the answer into sentences with nltk and make sure they are shorter than the maximum possible # characters split_sentences = sent_tokenize(clean_answer) sentences = [] for sentence in split_sentences: if len(sentence) > max_characters: sentences.extend(split_sentence(sentence, max_characters)) else: sentences.append(sentence) return sentences def split_sentence(sentence: str, max_characters: int) -> list[str]: """ Returns a split sentences. The split point is the nearest comma to the middle of the sentence, if there is no comma then a space is used or just the middle. If the remaining sentences are still too long, another iteration is run """ # Get index of each comma sentences = [] commas = [i for i, c in enumerate(sentence) if c == ','] # No commas, search for spaces if len(commas) == 0: commas = [i for i, c in enumerate(sentence) if c == ' '] # No commas or spaces, split it in the middle if len(commas) == 0: sentences.append(sentence[:len(sentence) // 2]) sentences.append(sentence[len(sentence) // 2:]) return sentences # Nearest index to the middle split_point = min(commas, key=lambda x: abs(x - (len(sentence) // 2))) if sentence[split_point] == ',': left = sentence[:split_point] right = sentence[split_point + 2:] else: left = sentence[:split_point] right = sentence[split_point + 1:] if len(left) > max_characters: sentences.extend(split_sentence(left, max_characters)) else: sentences.append(left) if len(right) > max_characters: sentences.extend(split_sentence(right, max_characters)) else: sentences.append(right) return sentences