|
import sys |
|
import io, os, stat |
|
import torch |
|
|
|
import subprocess |
|
import random |
|
from zipfile import ZipFile |
|
import uuid |
|
import time |
|
import torchaudio |
|
import numpy as np |
|
|
|
|
|
|
|
print("install unidic") |
|
os.system('python -m unidic download') |
|
|
|
|
|
os.environ["COQUI_TOS_AGREED"] = "1" |
|
|
|
|
|
|
|
import langid |
|
import base64 |
|
import csv |
|
from io import StringIO |
|
import datetime |
|
import re |
|
|
|
from scipy.io.wavfile import write |
|
from pydub import AudioSegment |
|
|
|
from TTS.api import TTS |
|
from TTS.tts.configs.xtts_config import XttsConfig |
|
from TTS.tts.models.xtts import Xtts |
|
from TTS.utils.generic_utils import get_user_data_dir |
|
from huggingface_hub import HfApi |
|
|
|
|
|
print("Export newer ffmpeg binary for denoise filter") |
|
ZipFile("ffmpeg.zip").extractall() |
|
print("Make ffmpeg binary executable") |
|
st = os.stat("ffmpeg") |
|
os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC) |
|
|
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
if not HF_TOKEN: |
|
raise ValueError("HF_TOKEN environment variable is not set") |
|
|
|
|
|
api = HfApi(token=HF_TOKEN) |
|
repo_id = "coqui/xtts" |
|
|
|
|
|
print("Downloading if not downloaded Coqui XTTS V2") |
|
from TTS.utils.manage import ModelManager |
|
|
|
model_name = "tts_models/multilingual/multi-dataset/xtts_v2" |
|
ModelManager().download_model(model_name) |
|
model_path = os.path.join(get_user_data_dir("tts"), model_name.replace("/", "--")) |
|
print("XTTS downloaded") |
|
|
|
|
|
os.system(f'chown -R appuser:appgroup {model_path}') |
|
os.system(f'chmod -R 755 {model_path}') |
|
|
|
if not os.access(model_path, os.W_OK): |
|
raise PermissionError(f"Write permission denied for model directory: {model_path}") |
|
|
|
config = XttsConfig() |
|
config.load_json(os.path.join(model_path, "config.json")) |
|
|
|
model = Xtts.init_from_config(config) |
|
checkpoint_path = os.path.join(model_path, "model.pth") |
|
vocab_path = os.path.join(model_path, "vocab.json") |
|
|
|
if not os.path.exists(checkpoint_path): |
|
raise FileNotFoundError(f"Checkpoint file not found at {checkpoint_path}") |
|
if not os.path.exists(vocab_path): |
|
raise FileNotFoundError(f"Vocab file not found at {vocab_path}") |
|
|
|
if not os.environ.get('CUDA_HOME'): |
|
print(f"ENV var CUDA_HOME is not set, defaulting to: '/usr/local/cuda'") |
|
os.environ['CUDA_HOME'] = f"/usr/local/cuda" |
|
|
|
model.load_checkpoint( |
|
config, |
|
checkpoint_dir=model_path, |
|
vocab_path=vocab_path, |
|
eval=True, |
|
use_deepspeed=True, |
|
) |
|
model.cuda() |
|
|
|
|
|
DEVICE_ASSERT_DETECTED = 0 |
|
DEVICE_ASSERT_PROMPT = None |
|
DEVICE_ASSERT_LANG = None |
|
|
|
supported_languages = config.languages |
|
def numpy_to_mp3(audio_array, sampling_rate): |
|
|
|
if np.issubdtype(audio_array.dtype, np.floating): |
|
max_val = np.max(np.abs(audio_array)) |
|
audio_array = (audio_array / max_val) * 32767 |
|
audio_array = audio_array.astype(np.int16) |
|
|
|
|
|
audio_segment = AudioSegment( |
|
audio_array.tobytes(), |
|
frame_rate=sampling_rate, |
|
sample_width=audio_array.dtype.itemsize, |
|
channels=1 |
|
) |
|
|
|
|
|
mp3_io = io.BytesIO() |
|
audio_segment.export(mp3_io, format="mp3", bitrate="320k") |
|
|
|
|
|
mp3_bytes = mp3_io.getvalue() |
|
mp3_io.close() |
|
|
|
return mp3_bytes |
|
|
|
def predict( |
|
prompt, |
|
language, |
|
audio_file_pth, |
|
mic_file_path, |
|
use_mic, |
|
voice_cleanup, |
|
no_lang_auto_detect, |
|
agree, |
|
): |
|
print("####################################### Predict Called ##############################") |
|
print("promp:",prompt) |
|
print("language:",language) |
|
print("audio_file_pth:",audio_file_pth) |
|
print("mic_file_path:",mic_file_path) |
|
print("use_mic:",use_mic) |
|
print("voice_cleanup:",voice_cleanup) |
|
print("no_lang_auto_detect:",no_lang_auto_detect) |
|
print("agree:",agree) |
|
if agree == True: |
|
if language not in supported_languages: |
|
print( |
|
f"Language you put {language} in is not in is not in our Supported Languages, please choose from dropdown" |
|
) |
|
|
|
return ( |
|
None, |
|
) |
|
|
|
language_predicted = langid.classify(prompt)[ |
|
0 |
|
].strip() |
|
|
|
|
|
if language_predicted == "zh": |
|
|
|
language_predicted = "zh-cn" |
|
|
|
print(f"Detected language:{language_predicted}, Chosen language:{language}") |
|
|
|
|
|
if len(prompt) > 15: |
|
|
|
|
|
|
|
if language_predicted != language and not no_lang_auto_detect: |
|
|
|
|
|
print( |
|
f"It looks like your text isn’t the language you chose , if you’re sure the text is the same language you chose, please check disable language auto-detection checkbox" |
|
) |
|
|
|
return ( |
|
None, |
|
) |
|
|
|
if use_mic == True: |
|
if mic_file_path is not None: |
|
speaker_wav = mic_file_path |
|
else: |
|
print( |
|
"Please record your voice with Microphone, or uncheck Use Microphone to use reference audios" |
|
) |
|
return ( |
|
None, |
|
) |
|
|
|
else: |
|
speaker_wav = audio_file_pth |
|
|
|
|
|
|
|
|
|
|
|
lowpassfilter = denoise = trim = loudness = True |
|
|
|
if lowpassfilter: |
|
lowpass_highpass = "lowpass=8000,highpass=75," |
|
else: |
|
lowpass_highpass = "" |
|
|
|
if trim: |
|
|
|
trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," |
|
else: |
|
trim_silence = "" |
|
|
|
if voice_cleanup: |
|
try: |
|
out_filename = ( |
|
speaker_wav + str(uuid.uuid4()) + ".wav" |
|
) |
|
|
|
|
|
shell_command = f"./ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split( |
|
" " |
|
) |
|
|
|
command_result = subprocess.run( |
|
[item for item in shell_command], |
|
capture_output=False, |
|
text=True, |
|
check=True, |
|
) |
|
speaker_wav = out_filename |
|
print("Filtered microphone input") |
|
except subprocess.CalledProcessError: |
|
|
|
print("Error: failed filtering, use original microphone input") |
|
else: |
|
speaker_wav = speaker_wav |
|
|
|
if len(prompt) < 2: |
|
print("Please give a longer prompt text") |
|
return ( |
|
None, |
|
) |
|
if len(prompt) > 1000: |
|
print( |
|
"Text length limited to 200 characters for this demo, please try shorter text. You can clone this space and edit code for your own usage" |
|
) |
|
return ( |
|
None, |
|
) |
|
global DEVICE_ASSERT_DETECTED |
|
if DEVICE_ASSERT_DETECTED: |
|
global DEVICE_ASSERT_PROMPT |
|
global DEVICE_ASSERT_LANG |
|
|
|
print( |
|
f"Unrecoverable exception caused by language:{DEVICE_ASSERT_LANG} prompt:{DEVICE_ASSERT_PROMPT}" |
|
) |
|
|
|
|
|
space = api.get_space_runtime(repo_id=repo_id) |
|
if space.stage != "BUILDING": |
|
api.restart_space(repo_id=repo_id) |
|
else: |
|
print("TRIED TO RESTART but space is building") |
|
|
|
try: |
|
metrics_text = "" |
|
t_latent = time.time() |
|
|
|
|
|
try: |
|
( |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
) = model.get_conditioning_latents(audio_path=speaker_wav, gpt_cond_len=30, gpt_cond_chunk_len=4, max_ref_length=60) |
|
except Exception as e: |
|
print("Speaker encoding error", str(e)) |
|
print( |
|
"It appears something wrong with reference, did you unmute your microphone?" |
|
) |
|
return ( |
|
None, |
|
) |
|
|
|
latent_calculation_time = time.time() - t_latent |
|
|
|
|
|
|
|
prompt = re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)", r"\1 \2\2", prompt) |
|
|
|
wav_chunks = [] |
|
|
|
""" |
|
print("I: Generating new audio...") |
|
t0 = time.time() |
|
out = model.inference( |
|
prompt, |
|
language, |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
repetition_penalty=5.0, |
|
temperature=0.75, |
|
) |
|
inference_time = time.time() - t0 |
|
print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds") |
|
metrics_text+=f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" |
|
real_time_factor= (time.time() - t0) / out['wav'].shape[-1] * 24000 |
|
print(f"Real-time factor (RTF): {real_time_factor}") |
|
metrics_text+=f"Real-time factor (RTF): {real_time_factor:.2f}\n" |
|
torchaudio.save("output.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000) |
|
""" |
|
print("I: Generating new audio in streaming mode...") |
|
t0 = time.time() |
|
chunks = model.inference_stream( |
|
prompt, |
|
language, |
|
gpt_cond_latent, |
|
speaker_embedding, |
|
repetition_penalty=7.0, |
|
temperature=0.85, |
|
) |
|
|
|
first_chunk = True |
|
for i, chunk in enumerate(chunks): |
|
if first_chunk: |
|
first_chunk_time = time.time() - t0 |
|
metrics_text += f"Latency to first audio chunk: {round(first_chunk_time*1000)} milliseconds\n" |
|
first_chunk = False |
|
|
|
|
|
chunk_np = chunk.cpu().numpy() |
|
print('chunk',i) |
|
yield numpy_to_mp3(chunk_np,24000) |
|
wav_chunks.append(chunk) |
|
|
|
print(f"Received chunk {i} of audio length {chunk.shape[-1]}") |
|
inference_time = time.time() - t0 |
|
print( |
|
f"I: Time to generate audio: {round(inference_time*1000)} milliseconds" |
|
) |
|
|
|
|
|
|
|
|
|
except RuntimeError as e: |
|
if "device-side assert" in str(e): |
|
|
|
print( |
|
f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", |
|
flush=True, |
|
) |
|
print("Unhandled Exception encounter, please retry in a minute") |
|
print("Cuda device-assert Runtime encountered need restart") |
|
if not DEVICE_ASSERT_DETECTED: |
|
DEVICE_ASSERT_DETECTED = 1 |
|
DEVICE_ASSERT_PROMPT = prompt |
|
DEVICE_ASSERT_LANG = language |
|
|
|
|
|
|
|
error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S") |
|
error_data = [ |
|
error_time, |
|
prompt, |
|
language, |
|
audio_file_pth, |
|
mic_file_path, |
|
use_mic, |
|
voice_cleanup, |
|
no_lang_auto_detect, |
|
agree, |
|
] |
|
error_data = [str(e) if type(e) != str else e for e in error_data] |
|
print(error_data) |
|
print(speaker_wav) |
|
write_io = StringIO() |
|
csv.writer(write_io).writerows([error_data]) |
|
csv_upload = write_io.getvalue().encode() |
|
|
|
filename = error_time + "_" + str(uuid.uuid4()) + ".csv" |
|
print("Writing error csv") |
|
error_api = HfApi() |
|
error_api.upload_file( |
|
path_or_fileobj=csv_upload, |
|
path_in_repo=filename, |
|
repo_id="coqui/xtts-flagged-dataset", |
|
repo_type="dataset", |
|
) |
|
|
|
|
|
print("Writing error reference audio") |
|
speaker_filename = ( |
|
error_time + "_reference_" + str(uuid.uuid4()) + ".wav" |
|
) |
|
error_api = HfApi() |
|
error_api.upload_file( |
|
path_or_fileobj=speaker_wav, |
|
path_in_repo=speaker_filename, |
|
repo_id="coqui/xtts-flagged-dataset", |
|
repo_type="dataset", |
|
) |
|
|
|
|
|
space = api.get_space_runtime(repo_id=repo_id) |
|
if space.stage != "BUILDING": |
|
api.restart_space(repo_id=repo_id) |
|
else: |
|
print("TRIED TO RESTART but space is building") |
|
|
|
else: |
|
if "Failed to decode" in str(e): |
|
print("Speaker encoding error", str(e)) |
|
print( |
|
"It appears something wrong with reference, did you unmute your microphone?" |
|
) |
|
else: |
|
print("RuntimeError: non device-side assert error:", str(e)) |
|
print("Something unexpected happened please retry again.") |
|
return ( |
|
None, |
|
) |
|
|
|
else: |
|
print("Please accept the Terms & Condition!") |
|
return ( |
|
None, |
|
) |