File size: 6,331 Bytes
a95b578
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import re
import os
import nltk
import torch
import pickle
import torchaudio
import numpy as np
import gradio as gr
from google.cloud import storage
from TTS.tts.models.xtts import Xtts
from nltk.tokenize import sent_tokenize
from huggingface_hub import hf_hub_download
from TTS.tts.configs.xtts_config import XttsConfig


def _download_starting_files() -> None:
    """
    Downloads the embeddings from a bucket
    """
    os.makedirs('assets', exist_ok=True)

    # Download credentials file
    hf_hub_download(
        repo_id=os.environ.get('DATA'), repo_type='dataset', filename="credentials.json",
        token=os.environ.get('HUB_TOKEN'), local_dir="assets"
    )

    # Initialise a client
    credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
    storage_client = storage.Client.from_service_account_json(credentials)
    bucket = storage_client.get_bucket('embeddings-bella')

    # Get both embeddings
    blob = bucket.blob("gpt_cond_latent.npy")
    blob.download_to_filename('assets/gpt_cond_latent.npy')
    blob = bucket.blob("speaker_embedding.npy")
    blob.download_to_filename('assets/speaker_embedding.npy')


def _load_array(filename):
    """
    Opens a file a returns it, used with numpy files
    """
    with open(filename, 'rb') as f:
        return pickle.load(f)


# Get embeddings
_download_starting_files()
os.environ['COQUI_TOS_AGREED'] = '1'

# Used to generate audio based on a sample
nltk.download('punkt')
model_path = os.path.join("tts_model")

config = XttsConfig()
config.load_json(os.path.join(model_path, "config.json"))

model = Xtts.init_from_config(config)
model.load_checkpoint(
    config,
    checkpoint_path=os.path.join(model_path, "model.pth"),
    vocab_path=os.path.join(model_path, "vocab.json"),
    eval=True,
    use_deepspeed=True,
)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# Speaker latent
path_latents = 'assets/gpt_cond_latent.npy'
gpt_cond_latent = _load_array(path_latents)

# Speaker embedding
path_embedding = 'assets/speaker_embedding.npy'
speaker_embedding = _load_array(path_embedding)


def get_audio(text: str, language: str = 'es') -> gr.Audio:
    """
    Returns a link from a bucket in GCP that contains the generated audio given a text and language and the
    name of such audio
    :param text: used to generate the audio
    :param language: 'es', 'en' or 'pt'
    :return link_audio and name_audio
    """
    # Creates an audio with the answer and saves it as output.wav
    _save_audio(text, language)

    return gr.Audio(value='output.wav', interactive=False, visible=True)


def _save_audio(answer: str, language: str) -> None:
    """
    Splits the answer into sentences, clean and creates an audio for each one, then concatenates
    all the audios and saves them into a file (output.wav)
    """
    # Split the answer into sentences and clean it
    sentences = _get_clean_answer(answer, language)

    # Get the voice of each sentence
    audio_segments = []
    for sentence in sentences:
        audio_stream = _get_voice(sentence, language)
        audio_stream = torch.tensor(audio_stream)
        audio_segments.append(audio_stream)

    # Concatenate and save all audio segments
    concatenated_audio = torch.cat(audio_segments, dim=0)
    torchaudio.save('output.wav', concatenated_audio.unsqueeze(0), 24000)


def _get_voice(sentence: str, language: str) -> np.ndarray:
    """
    Returns a numpy array with a wav of an audio with the given sentence and language
    """
    out = model.inference(
        sentence,
        language=language,
        gpt_cond_latent=gpt_cond_latent,
        speaker_embedding=speaker_embedding,
        temperature=0.1
    )
    return out['wav']


def _get_clean_answer(answer: str, language: str) -> list[str]:
    """
    Returns a list of sentences of the answer. It also removes links
    """
    # Remove the links in the audio and add another sentence
    if language == 'en':
        clean_answer = re.sub(r'http[s]?://\S+', 'the following link', answer)
        max_characters = 250
    elif language == 'es':
        clean_answer = re.sub(r'http[s]?://\S+', 'el siguiente link', answer)
        max_characters = 239
    else:
        clean_answer = re.sub(r'http[s]?://\S+', 'o seguinte link', answer)
        max_characters = 203

    # Change the name from Bella to Bela
    clean_answer = clean_answer.replace('Bella', 'Bela')

    # Remove Florida and zipcode
    clean_answer = re.sub(r', FL \d+', "", clean_answer)

    # Split the answer into sentences with nltk and make sure they are shorter than the maximum possible
    # characters
    split_sentences = sent_tokenize(clean_answer)
    sentences = []
    for sentence in split_sentences:
        if len(sentence) > max_characters:
            sentences.extend(split_sentence(sentence, max_characters))
        else:
            sentences.append(sentence)

    return sentences


def split_sentence(sentence: str, max_characters: int) -> list[str]:
    """
    Returns a split sentences. The split point is the nearest comma to the middle
    of the sentence, if there is no comma then a space is used or just the middle. If the
    remaining sentences are still too long, another iteration is run
    """
    # Get index of each comma
    sentences = []
    commas = [i for i, c in enumerate(sentence) if c == ',']

    # No commas, search for spaces
    if len(commas) == 0:
        commas = [i for i, c in enumerate(sentence) if c == ' ']

    # No commas or spaces, split it in the middle
    if len(commas) == 0:
        sentences.append(sentence[:len(sentence) // 2])
        sentences.append(sentence[len(sentence) // 2:])
        return sentences

    # Nearest index to the middle
    split_point = min(commas, key=lambda x: abs(x - (len(sentence) // 2)))

    if sentence[split_point] == ',':
        left = sentence[:split_point]
        right = sentence[split_point + 2:]
    else:
        left = sentence[:split_point]
        right = sentence[split_point + 1:]

    if len(left) > max_characters:
        sentences.extend(split_sentence(left, max_characters))
    else:
        sentences.append(left)
    if len(right) > max_characters:
        sentences.extend(split_sentence(right, max_characters))
    else:
        sentences.append(right)

    return sentences