Spaces:
Running
on
Zero
Running
on
Zero
import time | |
from .models import * | |
from .utils import * | |
from .config import * | |
from .init import * | |
from .sample_caching import * | |
import gradio as gr | |
from pydub import AudioSegment | |
import random, os, threading, tempfile | |
from langdetect import detect | |
from .vote import log_text | |
# top five models in order to always have one of them picked and scrutinized | |
top_five = ['fishaudio/fish-speech-1'] # fish 1.5 | |
hf_token=os.getenv('HF_TOKEN') | |
# prioritize low vote models | |
sql = 'SELECT name FROM model WHERE (upvote + downvote) < 750 ORDER BY (upvote + downvote) ASC' | |
conn = get_db() | |
cursor = conn.cursor() | |
cursor.execute(sql) | |
data = cursor.fetchall() | |
for model in data: | |
if ( | |
len(top_five) >= 5 | |
): | |
break | |
if model[0] in AVAILABLE_MODELS.keys(): | |
top_five.append(model[0]) | |
print(f"low vote top_five: {top_five}") | |
def random_m(): | |
return random.sample(list(set(AVAILABLE_MODELS.keys())), 2) | |
def check_toxicity(text): | |
if not TOXICITY_CHECK: | |
return False | |
return toxicity.predict(text)['toxicity'] > 0.8 | |
def synthandreturn(text, autoplay, request: gr.Request): | |
text = text.strip() | |
if len(text) > MAX_SAMPLE_TXT_LENGTH: | |
raise gr.Error(f'You exceeded the limit of {MAX_SAMPLE_TXT_LENGTH} characters') | |
if len(text) < MIN_SAMPLE_TXT_LENGTH: | |
raise gr.Error(f'Please input a text longer than {MIN_SAMPLE_TXT_LENGTH} characters') | |
if ( | |
# test toxicity if not prepared text | |
text not in sents | |
and check_toxicity(text) | |
): | |
print(f'Detected toxic content! "{text}"') | |
raise gr.Error('Your text failed the toxicity test') | |
if not text: | |
raise gr.Error(f'You did not enter any text') | |
# Check language | |
try: | |
if ( | |
text not in sents | |
and not detect(text) == "en" | |
): | |
gr.Warning('Warning: The input text may not be in English') | |
except: | |
pass | |
# Get two random models | |
# forced model: your TTS model versus The World!!! | |
# mdl1 = 'Pendrokar/xVASynth' | |
# scrutinize the top five by always picking one of them | |
if (len(top_five) >= 5): | |
mdl1 = random.sample(top_five, 1)[0] | |
vsModels = dict(AVAILABLE_MODELS) | |
del vsModels[mdl1] | |
# randomize position of the forced model | |
mdl2 = random.sample(list(vsModels.keys()), 1) | |
# forced random | |
mdl1, mdl2 = random.sample(list([mdl1, mdl2[0]]), 2) | |
else: | |
# actual random | |
mdl1, mdl2 = random.sample(list(AVAILABLE_MODELS.keys()), 2) | |
print("[debug] Using", mdl1, mdl2) | |
def predict_and_update_result(text, model, result_storage, request:gr.Request): | |
hf_headers = {} | |
try: | |
if HF_SPACES[model]['is_zero_gpu_space']: | |
hf_headers = {"X-IP-Token": request.headers['x-ip-token']} | |
except: | |
pass | |
# re-attempt if necessary | |
attempt_count = 0 | |
max_attempts = 1 # 3 =May cause 429 Too Many Request | |
while attempt_count < max_attempts: | |
try: | |
if model in AVAILABLE_MODELS: | |
if '/' in model: | |
# Use public HF Space | |
# if (model not in hf_clients): | |
# hf_clients[model] = Client(model, hf_token=hf_token, headers=hf_headers) | |
mdl_space = Client(model, hf_token=hf_token, headers=hf_headers) | |
# print(f"{model}: Fetching endpoints of HF Space") | |
# assume the index is one of the first 9 return params | |
return_audio_index = int(HF_SPACES[model]['return_audio_index']) | |
endpoints = mdl_space.view_api(all_endpoints=True, print_info=False, return_format='dict') | |
api_name = None | |
fn_index = None | |
end_parameters = None | |
# has named endpoint | |
if '/' == HF_SPACES[model]['function'][0]: | |
# audio sync function name | |
api_name = HF_SPACES[model]['function'] | |
end_parameters = _get_param_examples( | |
endpoints['named_endpoints'][api_name]['parameters'] | |
) | |
# has unnamed endpoint | |
else: | |
# endpoint index is the first character | |
fn_index = int(HF_SPACES[model]['function']) | |
end_parameters = _get_param_examples( | |
endpoints['unnamed_endpoints'][str(fn_index)]['parameters'] | |
) | |
# override some or all default parameters | |
space_inputs = _override_params(end_parameters, model) | |
# force text | |
space_inputs[HF_SPACES[model]['text_param_index']] = text | |
print(f"{model}: Sending request to HF Space") | |
results = mdl_space.predict(*space_inputs, api_name=api_name, fn_index=fn_index) | |
# return path to audio | |
result = results | |
if (not isinstance(results, str)): | |
# return_audio_index may be a filepath string | |
result = results[return_audio_index] | |
if (isinstance(result, dict)): | |
# return_audio_index is a dictionary | |
result = results[return_audio_index]['value'] | |
else: | |
# Use the private HF Space | |
result = router.predict(text, AVAILABLE_MODELS[model].lower(), api_name="/synthesize") | |
else: | |
result = router.predict(text, model.lower(), api_name="/synthesize") | |
break | |
except Exception as e: | |
attempt_count += 1 | |
raise gr.Error(f"{model}:"+ repr(e)) | |
# print(f"{model}: Unable to call API (attempt: {attempt_count})") | |
# sleep for three seconds to avoid spamming the server with requests | |
# time.sleep(3) | |
# Fetch and store client again | |
# hf_clients[model] = Client(model, hf_token=hf_token, headers=hf_headers) | |
if attempt_count >= max_attempts: | |
raise gr.Error(f"{model}: Failed to call model") | |
else: | |
print('Done with', model) | |
try: | |
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: | |
audio = AudioSegment.from_file(result) | |
current_sr = audio.frame_rate | |
if current_sr > 24000: | |
print(f"{model}: Resampling") | |
audio = audio.set_frame_rate(24000) | |
try: | |
print(f"{model}: Trying to normalize audio") | |
audio = match_target_amplitude(audio, -20) | |
except: | |
print(f"{model}: [WARN] Unable to normalize audio") | |
audio.export(f.name, format="wav") | |
os.unlink(result) | |
result = f.name | |
gr.Info('Audio from a TTS model received') | |
except: | |
print(f"{model}: [WARN] Unable to resample audio") | |
pass | |
if model in AVAILABLE_MODELS.keys(): model = AVAILABLE_MODELS[model] | |
result_storage[model] = result | |
def _get_param_examples(parameters): | |
example_inputs = [] | |
for param_info in parameters: | |
if ( | |
param_info['component'] == 'Radio' | |
or param_info['component'] == 'Dropdown' | |
or param_info['component'] == 'Audio' | |
or param_info['python_type']['type'] == 'str' | |
): | |
example_inputs.append(str(param_info['example_input'])) | |
continue | |
if param_info['python_type']['type'] == 'int': | |
example_inputs.append(int(param_info['example_input'])) | |
continue | |
if param_info['python_type']['type'] == 'float': | |
example_inputs.append(float(param_info['example_input'])) | |
continue | |
if param_info['python_type']['type'] == 'bool': | |
example_inputs.append(bool(param_info['example_input'])) | |
continue | |
return example_inputs | |
def _override_params(inputs, modelname): | |
try: | |
for key,value in OVERRIDE_INPUTS[modelname].items(): | |
inputs[key] = value | |
print(f"{modelname}: Default inputs overridden by Arena") | |
except: | |
pass | |
return inputs | |
def _cache_sample(text, model): | |
# skip caching if not hardcoded sentence | |
if (text not in sents): | |
return False | |
already_cached = False | |
# check if already cached | |
for cached_sample in cached_samples: | |
# TODO:replace cached with newer version | |
if (cached_sample.transcript == text and cached_sample.modelName == model): | |
already_cached = True | |
return True | |
if (already_cached): | |
return False | |
try: | |
cached_samples.append(Sample(results[model], text, model)) | |
except: | |
print('Error when trying to cache sample') | |
return False | |
mdl1k = mdl1 | |
mdl2k = mdl2 | |
print(mdl1k, mdl2k) | |
if mdl1 in AVAILABLE_MODELS.keys(): mdl1k=AVAILABLE_MODELS[mdl1] | |
if mdl2 in AVAILABLE_MODELS.keys(): mdl2k=AVAILABLE_MODELS[mdl2] | |
results = {} | |
print(f"Sending models {mdl1k} and {mdl2k} to API") | |
# do not use multithreading when both spaces are ZeroGPU type | |
if ( | |
# exists | |
'is_zero_gpu_space' in HF_SPACES[mdl1] | |
# is True | |
and HF_SPACES[mdl1]['is_zero_gpu_space'] | |
and 'is_zero_gpu_space' in HF_SPACES[mdl2] | |
and HF_SPACES[mdl2]['is_zero_gpu_space'] | |
): | |
# run Zero-GPU spaces one at a time | |
predict_and_update_result(text, mdl1k, results, request) | |
_cache_sample(text, mdl1k) | |
predict_and_update_result(text, mdl2k, results, request) | |
_cache_sample(text, mdl2k) | |
else: | |
# use multithreading | |
thread1 = threading.Thread(target=predict_and_update_result, args=(text, mdl1k, results, request)) | |
thread2 = threading.Thread(target=predict_and_update_result, args=(text, mdl2k, results, request)) | |
thread1.start() | |
# wait 3 seconds to calm hf.space domain | |
time.sleep(3) | |
thread2.start() | |
# timeout in 2 minutes | |
thread1.join(120) | |
thread2.join(120) | |
# cache the result | |
for model in [mdl1k, mdl2k]: | |
_cache_sample(text, model) | |
print(f"Retrieving models {mdl1k} and {mdl2k} from API") | |
return ( | |
text, | |
"Synthesize", | |
gr.update(visible=True), # r2 | |
mdl1, # model1 | |
mdl2, # model2 | |
gr.update(visible=True, value=results[mdl1k], autoplay=autoplay), # aud1 | |
gr.update(visible=True, value=results[mdl2k], autoplay=False), # aud2 | |
gr.update(visible=True, interactive=False), #abetter | |
gr.update(visible=True, interactive=False), #bbetter | |
gr.update(visible=False), #prevmodel1 | |
gr.update(visible=False), #prevmodel2 | |
gr.update(visible=False), #nxt round btn | |
# reset gr.State aplayed & bplayed | |
False, #aplayed | |
False, #bplayed | |
) | |
# Battle Mode | |
def synthandreturn_battle(text, mdl1, mdl2, autoplay): | |
if mdl1 == mdl2: | |
raise gr.Error('You can\'t pick two of the same models.') | |
text = text.strip() | |
if len(text) > MAX_SAMPLE_TXT_LENGTH: | |
raise gr.Error(f'You exceeded the limit of {MAX_SAMPLE_TXT_LENGTH} characters') | |
if len(text) < MIN_SAMPLE_TXT_LENGTH: | |
raise gr.Error(f'Please input a text longer than {MIN_SAMPLE_TXT_LENGTH} characters') | |
if ( | |
# test toxicity if not prepared text | |
text not in sents | |
and check_toxicity(text) | |
): | |
print(f'Detected toxic content! "{text}"') | |
raise gr.Error('Your text failed the toxicity test') | |
if not text: | |
raise gr.Error(f'You did not enter any text') | |
# Check language | |
try: | |
if not detect(text) == "en": | |
gr.Warning('Warning: The input text may not be in English') | |
except: | |
pass | |
# Get two random models | |
log_text(text) | |
print("[debug] Using", mdl1, mdl2) | |
def predict_and_update_result(text, model, result_storage): | |
try: | |
if model in AVAILABLE_MODELS: | |
result = router.predict(text, AVAILABLE_MODELS[model].lower(), api_name="/synthesize") | |
else: | |
result = router.predict(text, model.lower(), api_name="/synthesize") | |
except: | |
raise gr.Error('Unable to call API, please try again :)') | |
print('Done with', model) | |
# try: | |
# doresample(result) | |
# except: | |
# pass | |
try: | |
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: | |
audio = AudioSegment.from_file(result) | |
current_sr = audio.frame_rate | |
if current_sr > 24000: | |
audio = audio.set_frame_rate(24000) | |
try: | |
print('Trying to normalize audio') | |
audio = match_target_amplitude(audio, -20) | |
except: | |
print('[WARN] Unable to normalize audio') | |
audio.export(f.name, format="wav") | |
os.unlink(result) | |
result = f.name | |
except: | |
pass | |
if model in AVAILABLE_MODELS.keys(): model = AVAILABLE_MODELS[model] | |
print(model) | |
print(f"Running model {model}") | |
result_storage[model] = result | |
# try: | |
# doloudnorm(result) | |
# except: | |
# pass | |
mdl1k = mdl1 | |
mdl2k = mdl2 | |
print(mdl1k, mdl2k) | |
if mdl1 in AVAILABLE_MODELS.keys(): mdl1k=AVAILABLE_MODELS[mdl1] | |
if mdl2 in AVAILABLE_MODELS.keys(): mdl2k=AVAILABLE_MODELS[mdl2] | |
results = {} | |
print(f"Sending models {mdl1k} and {mdl2k} to API") | |
thread1 = threading.Thread(target=predict_and_update_result, args=(text, mdl1k, results)) | |
thread2 = threading.Thread(target=predict_and_update_result, args=(text, mdl2k, results)) | |
thread1.start() | |
thread2.start() | |
thread1.join() | |
thread2.join() | |
print(f"Retrieving models {mdl1k} and {mdl2k} from API") | |
return ( | |
text, | |
"Synthesize", | |
gr.update(visible=True), # r2 | |
mdl1, # model1 | |
mdl2, # model2 | |
gr.update(visible=True, value=results[mdl1k], autoplay=autoplay), # aud1 | |
gr.update(visible=True, value=results[mdl2k], autoplay=False), # aud2 | |
gr.update(visible=True, interactive=False), #abetter | |
gr.update(visible=True, interactive=False), #bbetter | |
gr.update(visible=False), #prevmodel1 | |
gr.update(visible=False), #prevmodel2 | |
gr.update(visible=False), #nxt round btn | |
) | |
def randomsent(): | |
return '⚡', random.choice(sents), '🎲' | |
def randomsent_battle(): | |
return tuple(randomsent()) + tuple(random_m()) | |
def clear_stuff(): | |
return [ | |
gr.update(visible=True, value="", elem_classes=[]), | |
"Synthesize", | |
gr.update(visible=False), # r2 | |
'', # model1 | |
'', # model2 | |
gr.update(visible=False, interactive=False, autoplay=False), # aud1 | |
gr.update(visible=False, interactive=False, autoplay=False), # aud2 | |
gr.update(visible=False, interactive=False), #abetter | |
gr.update(visible=False, interactive=False), #bbetter | |
gr.update(visible=False), #prevmodel1 | |
gr.update(visible=False), #prevmodel2 | |
gr.update(visible=False), #nxt round btn | |
False, #aplayed | |
False, #bplayed | |
] |