RVC_HF / easy_infer.py
r3gm's picture
Upload 288 files
7bc29af
import subprocess
import os
import sys
import errno
import shutil
import yt_dlp
from mega import Mega
import datetime
import unicodedata
import torch
import glob
import gradio as gr
import gdown
import zipfile
import traceback
import json
import mdx
from mdx_processing_script import get_model_list,id_to_ptm,prepare_mdx,run_mdx
import requests
import wget
import ffmpeg
import hashlib
now_dir = os.getcwd()
sys.path.append(now_dir)
from unidecode import unidecode
import re
import time
from lib.infer_pack.models_onnx import SynthesizerTrnMsNSFsidM
from infer.modules.vc.pipeline import Pipeline
VC = Pipeline
from lib.infer_pack.models import (
SynthesizerTrnMs256NSFsid,
SynthesizerTrnMs256NSFsid_nono,
SynthesizerTrnMs768NSFsid,
SynthesizerTrnMs768NSFsid_nono,
)
from MDXNet import MDXNetDereverb
from configs.config import Config
from infer_uvr5 import _audio_pre_, _audio_pre_new
from huggingface_hub import HfApi, list_models
from huggingface_hub import login
from i18n import I18nAuto
i18n = I18nAuto()
from bs4 import BeautifulSoup
from sklearn.cluster import MiniBatchKMeans
from dotenv import load_dotenv
load_dotenv()
config = Config()
tmp = os.path.join(now_dir, "TEMP")
shutil.rmtree(tmp, ignore_errors=True)
os.environ["TEMP"] = tmp
weight_root = os.getenv("weight_root")
weight_uvr5_root = os.getenv("weight_uvr5_root")
index_root = os.getenv("index_root")
audio_root = "audios"
names = []
for name in os.listdir(weight_root):
if name.endswith(".pth"):
names.append(name)
index_paths = []
global indexes_list
indexes_list = []
audio_paths = []
for root, dirs, files in os.walk(index_root, topdown=False):
for name in files:
if name.endswith(".index") and "trained" not in name:
index_paths.append("%s\\%s" % (root, name))
for root, dirs, files in os.walk(audio_root, topdown=False):
for name in files:
audio_paths.append("%s/%s" % (root, name))
uvr5_names = []
for name in os.listdir(weight_uvr5_root):
if name.endswith(".pth") or "onnx" in name:
uvr5_names.append(name.replace(".pth", ""))
def calculate_md5(file_path):
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def format_title(title):
formatted_title = re.sub(r'[^\w\s-]', '', title)
formatted_title = formatted_title.replace(" ", "_")
return formatted_title
def silentremove(filename):
try:
os.remove(filename)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def get_md5(temp_folder):
for root, subfolders, files in os.walk(temp_folder):
for file in files:
if not file.startswith("G_") and not file.startswith("D_") and file.endswith(".pth") and not "_G_" in file and not "_D_" in file:
md5_hash = calculate_md5(os.path.join(root, file))
return md5_hash
return None
def find_parent(search_dir, file_name):
for dirpath, dirnames, filenames in os.walk(search_dir):
if file_name in filenames:
return os.path.abspath(dirpath)
return None
def find_folder_parent(search_dir, folder_name):
for dirpath, dirnames, filenames in os.walk(search_dir):
if folder_name in dirnames:
return os.path.abspath(dirpath)
return None
def download_from_url(url):
parent_path = find_folder_parent(".", "pretrained_v2")
zips_path = os.path.join(parent_path, 'zips')
if url != '':
print(i18n("Downloading the file: ") + f"{url}")
if "drive.google.com" in url:
if "file/d/" in url:
file_id = url.split("file/d/")[1].split("/")[0]
elif "id=" in url:
file_id = url.split("id=")[1].split("&")[0]
else:
return None
if file_id:
os.chdir('./zips')
result = subprocess.run(["gdown", f"https://drive.google.com/uc?id={file_id}", "--fuzzy"], capture_output=True, text=True, encoding='utf-8')
if "Too many users have viewed or downloaded this file recently" in str(result.stderr):
return "too much use"
if "Cannot retrieve the public link of the file." in str(result.stderr):
return "private link"
print(result.stderr)
elif "/blob/" in url:
os.chdir('./zips')
url = url.replace("blob", "resolve")
response = requests.get(url)
if response.status_code == 200:
file_name = url.split('/')[-1]
with open(os.path.join(zips_path, file_name), "wb") as newfile:
newfile.write(response.content)
else:
os.chdir(parent_path)
elif "mega.nz" in url:
if "#!" in url:
file_id = url.split("#!")[1].split("!")[0]
elif "file/" in url:
file_id = url.split("file/")[1].split("/")[0]
else:
return None
if file_id:
m = Mega()
m.download_url(url, zips_path)
elif "/tree/main" in url:
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
temp_url = ''
for link in soup.find_all('a', href=True):
if link['href'].endswith('.zip'):
temp_url = link['href']
break
if temp_url:
url = temp_url
url = url.replace("blob", "resolve")
if "huggingface.co" not in url:
url = "https://huggingface.co" + url
wget.download(url)
else:
print("No .zip file found on the page.")
elif "cdn.discordapp.com" in url:
file = requests.get(url)
if file.status_code == 200:
name = url.split('/')
with open(os.path.join(zips_path, name[len(name)-1]), "wb") as newfile:
newfile.write(file.content)
else:
return None
elif "pixeldrain.com" in url:
try:
file_id = url.split("pixeldrain.com/u/")[1]
os.chdir('./zips')
print(file_id)
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}")
if response.status_code == 200:
file_name = response.headers.get("Content-Disposition").split('filename=')[-1].strip('";')
if not os.path.exists(zips_path):
os.makedirs(zips_path)
with open(os.path.join(zips_path, file_name), "wb") as newfile:
newfile.write(response.content)
os.chdir(parent_path)
return "downloaded"
else:
os.chdir(parent_path)
return None
except Exception as e:
print(e)
os.chdir(parent_path)
return None
else:
os.chdir('./zips')
wget.download(url)
os.chdir(parent_path)
print(i18n("Full download"))
return "downloaded"
else:
return None
class error_message(Exception):
def __init__(self, mensaje):
self.mensaje = mensaje
super().__init__(mensaje)
def get_vc(sid, to_return_protect0, to_return_protect1):
global n_spk, tgt_sr, net_g, vc, cpt, version
if sid == "" or sid == []:
global hubert_model
if hubert_model is not None:
print("clean_empty_cache")
del net_g, n_spk, vc, hubert_model, tgt_sr
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None
if torch.cuda.is_available():
torch.cuda.empty_cache()
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g, cpt
if torch.cuda.is_available():
torch.cuda.empty_cache()
cpt = None
return (
{"visible": False, "__type__": "update"},
{"visible": False, "__type__": "update"},
{"visible": False, "__type__": "update"},
)
person = "%s/%s" % (weight_root, sid)
print("loading %s" % person)
cpt = torch.load(person, map_location="cpu")
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]
if_f0 = cpt.get("f0", 1)
if if_f0 == 0:
to_return_protect0 = to_return_protect1 = {
"visible": False,
"value": 0.5,
"__type__": "update",
}
else:
to_return_protect0 = {
"visible": True,
"value": to_return_protect0,
"__type__": "update",
}
to_return_protect1 = {
"visible": True,
"value": to_return_protect1,
"__type__": "update",
}
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g.enc_q
print(net_g.load_state_dict(cpt["weight"], strict=False))
net_g.eval().to(config.device)
if config.is_half:
net_g = net_g.half()
else:
net_g = net_g.float()
vc = VC(tgt_sr, config)
n_spk = cpt["config"][-3]
return (
{"visible": True, "maximum": n_spk, "__type__": "update"},
to_return_protect0,
to_return_protect1,
)
def load_downloaded_model(url):
parent_path = find_folder_parent(".", "pretrained_v2")
try:
infos = []
logs_folders = ['0_gt_wavs','1_16k_wavs','2a_f0','2b-f0nsf','3_feature256','3_feature768']
zips_path = os.path.join(parent_path, 'zips')
unzips_path = os.path.join(parent_path, 'unzips')
weights_path = os.path.join(parent_path, 'weights')
logs_dir = ""
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
if os.path.exists(unzips_path):
shutil.rmtree(unzips_path)
os.mkdir(zips_path)
os.mkdir(unzips_path)
download_file = download_from_url(url)
if not download_file:
print(i18n("The file could not be downloaded."))
infos.append(i18n("The file could not be downloaded."))
yield "\n".join(infos)
elif download_file == "downloaded":
print(i18n("It has been downloaded successfully."))
infos.append(i18n("It has been downloaded successfully."))
yield "\n".join(infos)
elif download_file == "too much use":
raise Exception(i18n("Too many users have recently viewed or downloaded this file"))
elif download_file == "private link":
raise Exception(i18n("Cannot get file from this private link"))
for filename in os.listdir(zips_path):
if filename.endswith(".zip"):
zipfile_path = os.path.join(zips_path,filename)
print(i18n("Proceeding with the extraction..."))
infos.append(i18n("Proceeding with the extraction..."))
shutil.unpack_archive(zipfile_path, unzips_path, 'zip')
model_name = os.path.basename(zipfile_path)
logs_dir = os.path.join(parent_path,'logs', os.path.normpath(str(model_name).replace(".zip","")))
yield "\n".join(infos)
else:
print(i18n("Unzip error."))
infos.append(i18n("Unzip error."))
yield "\n".join(infos)
index_file = False
model_file = False
D_file = False
G_file = False
for path, subdirs, files in os.walk(unzips_path):
for item in files:
item_path = os.path.join(path, item)
if not 'G_' in item and not 'D_' in item and item.endswith('.pth'):
model_file = True
model_name = item.replace(".pth","")
logs_dir = os.path.join(parent_path,'logs', model_name)
if os.path.exists(logs_dir):
shutil.rmtree(logs_dir)
os.mkdir(logs_dir)
if not os.path.exists(weights_path):
os.mkdir(weights_path)
if os.path.exists(os.path.join(weights_path, item)):
os.remove(os.path.join(weights_path, item))
if os.path.exists(item_path):
shutil.move(item_path, weights_path)
if not model_file and not os.path.exists(logs_dir):
os.mkdir(logs_dir)
for path, subdirs, files in os.walk(unzips_path):
for item in files:
item_path = os.path.join(path, item)
if item.startswith('added_') and item.endswith('.index'):
index_file = True
if os.path.exists(item_path):
if os.path.exists(os.path.join(logs_dir, item)):
os.remove(os.path.join(logs_dir, item))
shutil.move(item_path, logs_dir)
if item.startswith('total_fea.npy') or item.startswith('events.'):
if os.path.exists(item_path):
if os.path.exists(os.path.join(logs_dir, item)):
os.remove(os.path.join(logs_dir, item))
shutil.move(item_path, logs_dir)
result = ""
if model_file:
if index_file:
print(i18n("The model works for inference, and has the .index file."))
infos.append("\n" + i18n("The model works for inference, and has the .index file."))
yield "\n".join(infos)
else:
print(i18n("The model works for inference, but it doesn't have the .index file."))
infos.append("\n" + i18n("The model works for inference, but it doesn't have the .index file."))
yield "\n".join(infos)
if not index_file and not model_file:
print(i18n("No relevant file was found to upload."))
infos.append(i18n("No relevant file was found to upload."))
yield "\n".join(infos)
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
if os.path.exists(unzips_path):
shutil.rmtree(unzips_path)
os.chdir(parent_path)
return result
except Exception as e:
os.chdir(parent_path)
if "too much use" in str(e):
print(i18n("Too many users have recently viewed or downloaded this file"))
yield i18n("Too many users have recently viewed or downloaded this file")
elif "private link" in str(e):
print(i18n("Cannot get file from this private link"))
yield i18n("Cannot get file from this private link")
else:
print(e)
yield i18n("An error occurred downloading")
finally:
os.chdir(parent_path)
def load_dowloaded_dataset(url):
parent_path = find_folder_parent(".", "pretrained_v2")
infos = []
try:
zips_path = os.path.join(parent_path, 'zips')
unzips_path = os.path.join(parent_path, 'unzips')
datasets_path = os.path.join(parent_path, 'datasets')
audio_extenions =['wav', 'mp3', 'flac', 'ogg', 'opus',
'm4a', 'mp4', 'aac', 'alac', 'wma',
'aiff', 'webm', 'ac3']
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
if os.path.exists(unzips_path):
shutil.rmtree(unzips_path)
if not os.path.exists(datasets_path):
os.mkdir(datasets_path)
os.mkdir(zips_path)
os.mkdir(unzips_path)
download_file = download_from_url(url)
if not download_file:
print(i18n("An error occurred downloading"))
infos.append(i18n("An error occurred downloading"))
yield "\n".join(infos)
raise Exception(i18n("An error occurred downloading"))
elif download_file == "downloaded":
print(i18n("It has been downloaded successfully."))
infos.append(i18n("It has been downloaded successfully."))
yield "\n".join(infos)
elif download_file == "too much use":
raise Exception(i18n("Too many users have recently viewed or downloaded this file"))
elif download_file == "private link":
raise Exception(i18n("Cannot get file from this private link"))
zip_path = os.listdir(zips_path)
foldername = ""
for file in zip_path:
if file.endswith('.zip'):
file_path = os.path.join(zips_path, file)
print("....")
foldername = file.replace(".zip","").replace(" ","").replace("-","_")
dataset_path = os.path.join(datasets_path, foldername)
print(i18n("Proceeding with the extraction..."))
infos.append(i18n("Proceeding with the extraction..."))
yield "\n".join(infos)
shutil.unpack_archive(file_path, unzips_path, 'zip')
if os.path.exists(dataset_path):
shutil.rmtree(dataset_path)
os.mkdir(dataset_path)
for root, subfolders, songs in os.walk(unzips_path):
for song in songs:
song_path = os.path.join(root, song)
if song.endswith(tuple(audio_extenions)):
formatted_song_name = format_title(os.path.splitext(song)[0])
extension = os.path.splitext(song)[1]
new_song_path = os.path.join(dataset_path, f"{formatted_song_name}{extension}")
shutil.move(song_path, new_song_path)
else:
print(i18n("Unzip error."))
infos.append(i18n("Unzip error."))
yield "\n".join(infos)
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
if os.path.exists(unzips_path):
shutil.rmtree(unzips_path)
print(i18n("The Dataset has been loaded successfully."))
infos.append(i18n("The Dataset has been loaded successfully."))
yield "\n".join(infos)
except Exception as e:
os.chdir(parent_path)
if "too much use" in str(e):
print(i18n("Too many users have recently viewed or downloaded this file"))
yield i18n("Too many users have recently viewed or downloaded this file")
elif "private link" in str(e):
print(i18n("Cannot get file from this private link"))
yield i18n("Cannot get file from this private link")
else:
print(e)
yield i18n("An error occurred downloading")
finally:
os.chdir(parent_path)
def save_model(modelname, save_action):
parent_path = find_folder_parent(".", "pretrained_v2")
zips_path = os.path.join(parent_path, 'zips')
dst = os.path.join(zips_path,modelname)
logs_path = os.path.join(parent_path, 'logs', modelname)
weights_path = os.path.join(parent_path, 'weights', f"{modelname}.pth")
save_folder = parent_path
infos = []
try:
if not os.path.exists(logs_path):
raise Exception("No model found.")
if not 'content' in parent_path:
save_folder = os.path.join(parent_path, 'RVC_Backup')
else:
save_folder = '/content/drive/MyDrive/RVC_Backup'
infos.append(i18n("Save model"))
yield "\n".join(infos)
if not os.path.exists(save_folder):
os.mkdir(save_folder)
if not os.path.exists(os.path.join(save_folder, 'ManualTrainingBackup')):
os.mkdir(os.path.join(save_folder, 'ManualTrainingBackup'))
if not os.path.exists(os.path.join(save_folder, 'Finished')):
os.mkdir(os.path.join(save_folder, 'Finished'))
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
os.mkdir(zips_path)
added_file = glob.glob(os.path.join(logs_path, "added_*.index"))
d_file = glob.glob(os.path.join(logs_path, "D_*.pth"))
g_file = glob.glob(os.path.join(logs_path, "G_*.pth"))
if save_action == i18n("Choose the method"):
raise Exception("No method choosen.")
if save_action == i18n("Save all"):
print(i18n("Save all"))
save_folder = os.path.join(save_folder, 'ManualTrainingBackup')
shutil.copytree(logs_path, dst)
else:
if not os.path.exists(dst):
os.mkdir(dst)
if save_action == i18n("Save D and G"):
print(i18n("Save D and G"))
save_folder = os.path.join(save_folder, 'ManualTrainingBackup')
if len(d_file) > 0:
shutil.copy(d_file[0], dst)
if len(g_file) > 0:
shutil.copy(g_file[0], dst)
if len(added_file) > 0:
shutil.copy(added_file[0], dst)
else:
infos.append(i18n("Saved without index..."))
if save_action == i18n("Save voice"):
print(i18n("Save voice"))
save_folder = os.path.join(save_folder, 'Finished')
if len(added_file) > 0:
shutil.copy(added_file[0], dst)
else:
infos.append(i18n("Saved without index..."))
yield "\n".join(infos)
if not os.path.exists(weights_path):
infos.append(i18n("Saved without inference model..."))
else:
shutil.copy(weights_path, dst)
yield "\n".join(infos)
infos.append("\n" + i18n("This may take a few minutes, please wait..."))
yield "\n".join(infos)
shutil.make_archive(os.path.join(zips_path,f"{modelname}"), 'zip', zips_path)
shutil.move(os.path.join(zips_path,f"{modelname}.zip"), os.path.join(save_folder, f'{modelname}.zip'))
shutil.rmtree(zips_path)
infos.append("\n" + i18n("Model saved successfully"))
yield "\n".join(infos)
except Exception as e:
print(e)
if "No model found." in str(e):
infos.append(i18n("The model you want to save does not exist, be sure to enter the correct name."))
else:
infos.append(i18n("An error occurred saving the model"))
yield "\n".join(infos)
def load_downloaded_backup(url):
parent_path = find_folder_parent(".", "pretrained_v2")
try:
infos = []
logs_folders = ['0_gt_wavs','1_16k_wavs','2a_f0','2b-f0nsf','3_feature256','3_feature768']
zips_path = os.path.join(parent_path, 'zips')
unzips_path = os.path.join(parent_path, 'unzips')
weights_path = os.path.join(parent_path, 'weights')
logs_dir = os.path.join(parent_path, 'logs')
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
if os.path.exists(unzips_path):
shutil.rmtree(unzips_path)
os.mkdir(zips_path)
os.mkdir(unzips_path)
download_file = download_from_url(url)
if not download_file:
print(i18n("The file could not be downloaded."))
infos.append(i18n("The file could not be downloaded."))
yield "\n".join(infos)
elif download_file == "downloaded":
print(i18n("It has been downloaded successfully."))
infos.append(i18n("It has been downloaded successfully."))
yield "\n".join(infos)
elif download_file == "too much use":
raise Exception(i18n("Too many users have recently viewed or downloaded this file"))
elif download_file == "private link":
raise Exception(i18n("Cannot get file from this private link"))
for filename in os.listdir(zips_path):
if filename.endswith(".zip"):
zipfile_path = os.path.join(zips_path,filename)
zip_dir_name = os.path.splitext(filename)[0]
unzip_dir = unzips_path
print(i18n("Proceeding with the extraction..."))
infos.append(i18n("Proceeding with the extraction..."))
shutil.unpack_archive(zipfile_path, unzip_dir, 'zip')
if os.path.exists(os.path.join(unzip_dir, zip_dir_name)):
shutil.move(os.path.join(unzip_dir, zip_dir_name), logs_dir)
else:
new_folder_path = os.path.join(logs_dir, zip_dir_name)
os.mkdir(new_folder_path)
for item_name in os.listdir(unzip_dir):
item_path = os.path.join(unzip_dir, item_name)
if os.path.isfile(item_path):
shutil.move(item_path, new_folder_path)
elif os.path.isdir(item_path):
shutil.move(item_path, new_folder_path)
yield "\n".join(infos)
else:
print(i18n("Unzip error."))
infos.append(i18n("Unzip error."))
yield "\n".join(infos)
result = ""
for filename in os.listdir(unzips_path):
if filename.endswith(".zip"):
silentremove(filename)
if os.path.exists(zips_path):
shutil.rmtree(zips_path)
if os.path.exists(os.path.join(parent_path, 'unzips')):
shutil.rmtree(os.path.join(parent_path, 'unzips'))
print(i18n("The Backup has been uploaded successfully."))
infos.append("\n" + i18n("The Backup has been uploaded successfully."))
yield "\n".join(infos)
os.chdir(parent_path)
return result
except Exception as e:
os.chdir(parent_path)
if "too much use" in str(e):
print(i18n("Too many users have recently viewed or downloaded this file"))
yield i18n("Too many users have recently viewed or downloaded this file")
elif "private link" in str(e):
print(i18n("Cannot get file from this private link"))
yield i18n("Cannot get file from this private link")
else:
print(e)
yield i18n("An error occurred downloading")
finally:
os.chdir(parent_path)
def save_to_wav(record_button):
if record_button is None:
pass
else:
path_to_file=record_button
new_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")+'.wav'
new_path='./audios/'+new_name
shutil.move(path_to_file,new_path)
return new_name
def change_choices2():
audio_paths=[]
for filename in os.listdir("./audios"):
if filename.endswith(('wav', 'mp3', 'flac', 'ogg', 'opus',
'm4a', 'mp4', 'aac', 'alac', 'wma',
'aiff', 'webm', 'ac3')):
audio_paths.append(os.path.join('./audios',filename).replace('\\', '/'))
return {"choices": sorted(audio_paths), "__type__": "update"}, {"__type__": "update"}
def uvr(input_url, output_path, model_name, inp_root, save_root_vocal, paths, save_root_ins, agg, format0, architecture):
carpeta_a_eliminar = "yt_downloads"
if os.path.exists(carpeta_a_eliminar) and os.path.isdir(carpeta_a_eliminar):
for archivo in os.listdir(carpeta_a_eliminar):
ruta_archivo = os.path.join(carpeta_a_eliminar, archivo)
if os.path.isfile(ruta_archivo):
os.remove(ruta_archivo)
elif os.path.isdir(ruta_archivo):
shutil.rmtree(ruta_archivo)
ydl_opts = {
'no-windows-filenames': True,
'restrict-filenames': True,
'extract_audio': True,
'format': 'bestaudio',
'quiet': True,
'no-warnings': True,
}
try:
print(i18n("Downloading audio from the video..."))
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(input_url, download=False)
formatted_title = format_title(info_dict.get('title', 'default_title'))
formatted_outtmpl = output_path + '/' + formatted_title + '.wav'
ydl_opts['outtmpl'] = formatted_outtmpl
ydl = yt_dlp.YoutubeDL(ydl_opts)
ydl.download([input_url])
print(i18n("Audio downloaded!"))
except Exception as error:
print(i18n("An error occurred:"), error)
actual_directory = os.path.dirname(__file__)
vocal_directory = os.path.join(actual_directory, save_root_vocal)
instrumental_directory = os.path.join(actual_directory, save_root_ins)
vocal_formatted = f"vocal_{formatted_title}.wav.reformatted.wav_10.wav"
instrumental_formatted = f"instrument_{formatted_title}.wav.reformatted.wav_10.wav"
vocal_audio_path = os.path.join(vocal_directory, vocal_formatted)
instrumental_audio_path = os.path.join(instrumental_directory, instrumental_formatted)
vocal_formatted_mdx = f"{formatted_title}_vocal_.wav"
instrumental_formatted_mdx = f"{formatted_title}_instrument_.wav"
vocal_audio_path_mdx = os.path.join(vocal_directory, vocal_formatted_mdx)
instrumental_audio_path_mdx = os.path.join(instrumental_directory, instrumental_formatted_mdx)
if architecture == "VR":
try:
print(i18n("Starting audio conversion... (This might take a moment)"))
inp_root, save_root_vocal, save_root_ins = [x.strip(" ").strip('"').strip("\n").strip('"').strip(" ") for x in [inp_root, save_root_vocal, save_root_ins]]
usable_files = [os.path.join(inp_root, file)
for file in os.listdir(inp_root)
if file.endswith(tuple(sup_audioext))]
pre_fun = MDXNetDereverb(15) if model_name == "onnx_dereverb_By_FoxJoy" else (_audio_pre_ if "DeEcho" not in model_name else _audio_pre_new)(
agg=int(agg),
model_path=os.path.join(weight_uvr5_root, model_name + ".pth"),
device=config.device,
is_half=config.is_half,
)
try:
if paths != None:
paths = [path.name for path in paths]
else:
paths = usable_files
except:
traceback.print_exc()
paths = usable_files
print(paths)
for path in paths:
inp_path = os.path.join(inp_root, path)
need_reformat, done = 1, 0
try:
info = ffmpeg.probe(inp_path, cmd="ffprobe")
if info["streams"][0]["channels"] == 2 and info["streams"][0]["sample_rate"] == "44100":
need_reformat = 0
pre_fun._path_audio_(inp_path, save_root_ins, save_root_vocal, format0)
done = 1
except:
traceback.print_exc()
if need_reformat:
tmp_path = f"{tmp}/{os.path.basename(inp_path)}.reformatted.wav"
os.system(f"ffmpeg -i {inp_path} -vn -acodec pcm_s16le -ac 2 -ar 44100 {tmp_path} -y")
inp_path = tmp_path
try:
if not done:
pre_fun._path_audio_(inp_path, save_root_ins, save_root_vocal, format0)
print(f"{os.path.basename(inp_path)}->Success")
except:
print(f"{os.path.basename(inp_path)}->{traceback.format_exc()}")
except:
traceback.print_exc()
finally:
try:
if model_name == "onnx_dereverb_By_FoxJoy":
del pre_fun.pred.model
del pre_fun.pred.model_
else:
del pre_fun.model
del pre_fun
return i18n("Finished"), vocal_audio_path, instrumental_audio_path
except: traceback.print_exc()
if torch.cuda.is_available(): torch.cuda.empty_cache()
elif architecture == "MDX":
try:
print(i18n("Starting audio conversion... (This might take a moment)"))
inp_root, save_root_vocal, save_root_ins = [x.strip(" ").strip('"').strip("\n").strip('"').strip(" ") for x in [inp_root, save_root_vocal, save_root_ins]]
usable_files = [os.path.join(inp_root, file)
for file in os.listdir(inp_root)
if file.endswith(tuple(sup_audioext))]
try:
if paths != None:
paths = [path.name for path in paths]
else:
paths = usable_files
except:
traceback.print_exc()
paths = usable_files
print(paths)
invert=True
denoise=True
use_custom_parameter=True
dim_f=2048
dim_t=256
n_fft=7680
use_custom_compensation=True
compensation=1.025
suffix = "vocal_" #@param ["Vocals", "Drums", "Bass", "Other"]{allow-input: true}
suffix_invert = "instrument_" #@param ["Instrumental", "Drumless", "Bassless", "Instruments"]{allow-input: true}
print_settings = True # @param{type:"boolean"}
onnx = id_to_ptm(model_name)
compensation = compensation if use_custom_compensation or use_custom_parameter else None
mdx_model = prepare_mdx(onnx,use_custom_parameter, dim_f, dim_t, n_fft, compensation=compensation)
for path in paths:
#inp_path = os.path.join(inp_root, path)
suffix_naming = suffix if use_custom_parameter else None
diff_suffix_naming = suffix_invert if use_custom_parameter else None
run_mdx(onnx, mdx_model, path, format0, diff=invert,suffix=suffix_naming,diff_suffix=diff_suffix_naming,denoise=denoise)
if print_settings:
print()
print('[MDX-Net_Colab settings used]')
print(f'Model used: {onnx}')
print(f'Model MD5: {mdx.MDX.get_hash(onnx)}')
print(f'Model parameters:')
print(f' -dim_f: {mdx_model.dim_f}')
print(f' -dim_t: {mdx_model.dim_t}')
print(f' -n_fft: {mdx_model.n_fft}')
print(f' -compensation: {mdx_model.compensation}')
print()
print('[Input file]')
print('filename(s): ')
for filename in paths:
print(f' -{filename}')
print(f"{os.path.basename(filename)}->Success")
except:
traceback.print_exc()
finally:
try:
del mdx_model
return i18n("Finished"), vocal_audio_path_mdx, instrumental_audio_path_mdx
except: traceback.print_exc()
print("clean_empty_cache")
if torch.cuda.is_available(): torch.cuda.empty_cache()
sup_audioext = {'wav', 'mp3', 'flac', 'ogg', 'opus',
'm4a', 'mp4', 'aac', 'alac', 'wma',
'aiff', 'webm', 'ac3'}
def load_downloaded_audio(url):
parent_path = find_folder_parent(".", "pretrained_v2")
try:
infos = []
audios_path = os.path.join(parent_path, 'audios')
zips_path = os.path.join(parent_path, 'zips')
if not os.path.exists(audios_path):
os.mkdir(audios_path)
download_file = download_from_url(url)
if not download_file:
print(i18n("The file could not be downloaded."))
infos.append(i18n("The file could not be downloaded."))
yield "\n".join(infos)
elif download_file == "downloaded":
print(i18n("It has been downloaded successfully."))
infos.append(i18n("It has been downloaded successfully."))
yield "\n".join(infos)
elif download_file == "too much use":
raise Exception(i18n("Too many users have recently viewed or downloaded this file"))
elif download_file == "private link":
raise Exception(i18n("Cannot get file from this private link"))
for filename in os.listdir(zips_path):
item_path = os.path.join(zips_path, filename)
if item_path.split('.')[-1] in sup_audioext:
if os.path.exists(item_path):
shutil.move(item_path, audios_path)
result = ""
print(i18n("Audio files have been moved to the 'audios' folder."))
infos.append(i18n("Audio files have been moved to the 'audios' folder."))
yield "\n".join(infos)
os.chdir(parent_path)
return result
except Exception as e:
os.chdir(parent_path)
if "too much use" in str(e):
print(i18n("Too many users have recently viewed or downloaded this file"))
yield i18n("Too many users have recently viewed or downloaded this file")
elif "private link" in str(e):
print(i18n("Cannot get file from this private link"))
yield i18n("Cannot get file from this private link")
else:
print(e)
yield i18n("An error occurred downloading")
finally:
os.chdir(parent_path)
class error_message(Exception):
def __init__(self, mensaje):
self.mensaje = mensaje
super().__init__(mensaje)
def get_vc(sid, to_return_protect0, to_return_protect1):
global n_spk, tgt_sr, net_g, vc, cpt, version
if sid == "" or sid == []:
global hubert_model
if hubert_model is not None:
print("clean_empty_cache")
del net_g, n_spk, vc, hubert_model, tgt_sr
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None
if torch.cuda.is_available():
torch.cuda.empty_cache()
if_f0 = cpt.get("f0", 1)
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(
*cpt["config"], is_half=config.is_half
)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g, cpt
if torch.cuda.is_available():
torch.cuda.empty_cache()
cpt = None
return (
{"visible": False, "__type__": "update"},
{"visible": False, "__type__": "update"},
{"visible": False, "__type__": "update"},
)
person = "%s/%s" % (weight_root, sid)
print("loading %s" % person)
cpt = torch.load(person, map_location="cpu")
tgt_sr = cpt["config"][-1]
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0]
if_f0 = cpt.get("f0", 1)
if if_f0 == 0:
to_return_protect0 = to_return_protect1 = {
"visible": False,
"value": 0.5,
"__type__": "update",
}
else:
to_return_protect0 = {
"visible": True,
"value": to_return_protect0,
"__type__": "update",
}
to_return_protect1 = {
"visible": True,
"value": to_return_protect1,
"__type__": "update",
}
version = cpt.get("version", "v1")
if version == "v1":
if if_f0 == 1:
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"])
elif version == "v2":
if if_f0 == 1:
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half)
else:
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"])
del net_g.enc_q
print(net_g.load_state_dict(cpt["weight"], strict=False))
net_g.eval().to(config.device)
if config.is_half:
net_g = net_g.half()
else:
net_g = net_g.float()
vc = VC(tgt_sr, config)
n_spk = cpt["config"][-3]
return (
{"visible": True, "maximum": n_spk, "__type__": "update"},
to_return_protect0,
to_return_protect1,
)
def update_model_choices(select_value):
model_ids = get_model_list()
model_ids_list = list(model_ids)
if select_value == "VR":
return {"choices": uvr5_names, "__type__": "update"}
elif select_value == "MDX":
return {"choices": model_ids_list, "__type__": "update"}
def download_model():
gr.Markdown(value="# " + i18n("Download Model"))
gr.Markdown(value=i18n("It is used to download your inference models."))
with gr.Row():
model_url=gr.Textbox(label=i18n("Url:"))
with gr.Row():
download_model_status_bar=gr.Textbox(label=i18n("Status:"))
with gr.Row():
download_button=gr.Button(i18n("Download"))
download_button.click(fn=load_downloaded_model, inputs=[model_url], outputs=[download_model_status_bar])
def download_backup():
gr.Markdown(value="# " + i18n("Download Backup"))
gr.Markdown(value=i18n("It is used to download your training backups."))
with gr.Row():
model_url=gr.Textbox(label=i18n("Url:"))
with gr.Row():
download_model_status_bar=gr.Textbox(label=i18n("Status:"))
with gr.Row():
download_button=gr.Button(i18n("Download"))
download_button.click(fn=load_downloaded_backup, inputs=[model_url], outputs=[download_model_status_bar])
def update_dataset_list(name):
new_datasets = []
for foldername in os.listdir("./datasets"):
if "." not in foldername:
new_datasets.append(os.path.join(find_folder_parent(".","pretrained"),"datasets",foldername))
return gr.Dropdown.update(choices=new_datasets)
def download_dataset(trainset_dir4):
gr.Markdown(value="# " + i18n("Download Dataset"))
gr.Markdown(value=i18n("Download the dataset with the audios in a compatible format (.wav/.flac) to train your model."))
with gr.Row():
dataset_url=gr.Textbox(label=i18n("Url:"))
with gr.Row():
load_dataset_status_bar=gr.Textbox(label=i18n("Status:"))
with gr.Row():
load_dataset_button=gr.Button(i18n("Download"))
load_dataset_button.click(fn=load_dowloaded_dataset, inputs=[dataset_url], outputs=[load_dataset_status_bar])
load_dataset_status_bar.change(update_dataset_list, dataset_url, trainset_dir4)
def download_audio():
gr.Markdown(value="# " + i18n("Download Audio"))
gr.Markdown(value=i18n("Download audios of any format for use in inference (recommended for mobile users)."))
with gr.Row():
audio_url=gr.Textbox(label=i18n("Url:"))
with gr.Row():
download_audio_status_bar=gr.Textbox(label=i18n("Status:"))
with gr.Row():
download_button2=gr.Button(i18n("Download"))
download_button2.click(fn=load_downloaded_audio, inputs=[audio_url], outputs=[download_audio_status_bar])
def youtube_separator():
gr.Markdown(value="# " + i18n("Separate YouTube tracks"))
gr.Markdown(value=i18n("Download audio from a YouTube video and automatically separate the vocal and instrumental tracks"))
with gr.Row():
input_url = gr.inputs.Textbox(label=i18n("Enter the YouTube link:"))
output_path = gr.Textbox(
label=i18n("Enter the path of the audio folder to be processed (copy it from the address bar of the file manager):"),
value=os.path.abspath(os.getcwd()).replace('\\', '/') + "/yt_downloads",
visible=False,
)
advanced_settings_checkbox = gr.Checkbox(
value=False,
label=i18n("Advanced Settings"),
interactive=True,
)
with gr.Row(label = i18n("Advanced Settings"), visible=False, variant='compact') as advanced_settings:
with gr.Column():
model_select = gr.Radio(
label=i18n("Model Architecture:"),
choices=["VR", "MDX"],
value="VR",
interactive=True,
)
model_choose = gr.Dropdown(label=i18n("Model: (Be aware that in some models the named vocal will be the instrumental)"),
choices=uvr5_names,
value="HP5_only_main_vocal"
)
with gr.Row():
agg = gr.Slider(
minimum=0,
maximum=20,
step=1,
label=i18n("Vocal Extraction Aggressive"),
value=10,
interactive=True,
)
with gr.Row():
opt_vocal_root = gr.Textbox(
label=i18n("Specify the output folder for vocals:"), value="audios",
)
opt_ins_root = gr.Textbox(
label=i18n("Specify the output folder for accompaniment:"), value="audio-others",
)
dir_wav_input = gr.Textbox(
label=i18n("Enter the path of the audio folder to be processed:"),
value=((os.getcwd()).replace('\\', '/') + "/yt_downloads"),
visible=False,
)
format0 = gr.Radio(
label=i18n("Export file format"),
choices=["wav", "flac", "mp3", "m4a"],
value="wav",
visible=False,
interactive=True,
)
wav_inputs = gr.File(
file_count="multiple", label=i18n("You can also input audio files in batches. Choose one of the two options. Priority is given to reading from the folder."),
visible=False,
)
model_select.change(
fn=update_model_choices,
inputs=model_select,
outputs=model_choose,
)
with gr.Row():
vc_output4 = gr.Textbox(label=i18n("Status:"))
vc_output5 = gr.Audio(label=i18n("Vocal"), type='filepath')
vc_output6 = gr.Audio(label=i18n("Instrumental"), type='filepath')
with gr.Row():
but2 = gr.Button(i18n("Download and Separate"))
but2.click(
uvr,
[
input_url,
output_path,
model_choose,
dir_wav_input,
opt_vocal_root,
wav_inputs,
opt_ins_root,
agg,
format0,
model_select
],
[vc_output4, vc_output5, vc_output6],
)
def toggle_advanced_settings(checkbox):
return {"visible": checkbox, "__type__": "update"}
advanced_settings_checkbox.change(
fn=toggle_advanced_settings,
inputs=[advanced_settings_checkbox],
outputs=[advanced_settings]
)
def get_bark_voice():
mensaje = """
v2/en_speaker_0 English Male
v2/en_speaker_1 English Male
v2/en_speaker_2 English Male
v2/en_speaker_3 English Male
v2/en_speaker_4 English Male
v2/en_speaker_5 English Male
v2/en_speaker_6 English Male
v2/en_speaker_7 English Male
v2/en_speaker_8 English Male
v2/en_speaker_9 English Female
v2/zh_speaker_0 Chinese (Simplified) Male
v2/zh_speaker_1 Chinese (Simplified) Male
v2/zh_speaker_2 Chinese (Simplified) Male
v2/zh_speaker_3 Chinese (Simplified) Male
v2/zh_speaker_4 Chinese (Simplified) Female
v2/zh_speaker_5 Chinese (Simplified) Male
v2/zh_speaker_6 Chinese (Simplified) Female
v2/zh_speaker_7 Chinese (Simplified) Female
v2/zh_speaker_8 Chinese (Simplified) Male
v2/zh_speaker_9 Chinese (Simplified) Female
v2/fr_speaker_0 French Male
v2/fr_speaker_1 French Female
v2/fr_speaker_2 French Female
v2/fr_speaker_3 French Male
v2/fr_speaker_4 French Male
v2/fr_speaker_5 French Female
v2/fr_speaker_6 French Male
v2/fr_speaker_7 French Male
v2/fr_speaker_8 French Male
v2/fr_speaker_9 French Male
v2/de_speaker_0 German Male
v2/de_speaker_1 German Male
v2/de_speaker_2 German Male
v2/de_speaker_3 German Female
v2/de_speaker_4 German Male
v2/de_speaker_5 German Male
v2/de_speaker_6 German Male
v2/de_speaker_7 German Male
v2/de_speaker_8 German Female
v2/de_speaker_9 German Male
v2/hi_speaker_0 Hindi Female
v2/hi_speaker_1 Hindi Female
v2/hi_speaker_2 Hindi Male
v2/hi_speaker_3 Hindi Female
v2/hi_speaker_4 Hindi Female
v2/hi_speaker_5 Hindi Male
v2/hi_speaker_6 Hindi Male
v2/hi_speaker_7 Hindi Male
v2/hi_speaker_8 Hindi Male
v2/hi_speaker_9 Hindi Female
v2/it_speaker_0 Italian Male
v2/it_speaker_1 Italian Male
v2/it_speaker_2 Italian Female
v2/it_speaker_3 Italian Male
v2/it_speaker_4 Italian Male
v2/it_speaker_5 Italian Male
v2/it_speaker_6 Italian Male
v2/it_speaker_7 Italian Female
v2/it_speaker_8 Italian Male
v2/it_speaker_9 Italian Female
v2/ja_speaker_0 Japanese Female
v2/ja_speaker_1 Japanese Female
v2/ja_speaker_2 Japanese Male
v2/ja_speaker_3 Japanese Female
v2/ja_speaker_4 Japanese Female
v2/ja_speaker_5 Japanese Female
v2/ja_speaker_6 Japanese Male
v2/ja_speaker_7 Japanese Female
v2/ja_speaker_8 Japanese Female
v2/ja_speaker_9 Japanese Female
v2/ko_speaker_0 Korean Female
v2/ko_speaker_1 Korean Male
v2/ko_speaker_2 Korean Male
v2/ko_speaker_3 Korean Male
v2/ko_speaker_4 Korean Male
v2/ko_speaker_5 Korean Male
v2/ko_speaker_6 Korean Male
v2/ko_speaker_7 Korean Male
v2/ko_speaker_8 Korean Male
v2/ko_speaker_9 Korean Male
v2/pl_speaker_0 Polish Male
v2/pl_speaker_1 Polish Male
v2/pl_speaker_2 Polish Male
v2/pl_speaker_3 Polish Male
v2/pl_speaker_4 Polish Female
v2/pl_speaker_5 Polish Male
v2/pl_speaker_6 Polish Female
v2/pl_speaker_7 Polish Male
v2/pl_speaker_8 Polish Male
v2/pl_speaker_9 Polish Female
v2/pt_speaker_0 Portuguese Male
v2/pt_speaker_1 Portuguese Male
v2/pt_speaker_2 Portuguese Male
v2/pt_speaker_3 Portuguese Male
v2/pt_speaker_4 Portuguese Male
v2/pt_speaker_5 Portuguese Male
v2/pt_speaker_6 Portuguese Male
v2/pt_speaker_7 Portuguese Male
v2/pt_speaker_8 Portuguese Male
v2/pt_speaker_9 Portuguese Male
v2/ru_speaker_0 Russian Male
v2/ru_speaker_1 Russian Male
v2/ru_speaker_2 Russian Male
v2/ru_speaker_3 Russian Male
v2/ru_speaker_4 Russian Male
v2/ru_speaker_5 Russian Female
v2/ru_speaker_6 Russian Female
v2/ru_speaker_7 Russian Male
v2/ru_speaker_8 Russian Male
v2/ru_speaker_9 Russian Female
v2/es_speaker_0 Spanish Male
v2/es_speaker_1 Spanish Male
v2/es_speaker_2 Spanish Male
v2/es_speaker_3 Spanish Male
v2/es_speaker_4 Spanish Male
v2/es_speaker_5 Spanish Male
v2/es_speaker_6 Spanish Male
v2/es_speaker_7 Spanish Male
v2/es_speaker_8 Spanish Female
v2/es_speaker_9 Spanish Female
v2/tr_speaker_0 Turkish Male
v2/tr_speaker_1 Turkish Male
v2/tr_speaker_2 Turkish Male
v2/tr_speaker_3 Turkish Male
v2/tr_speaker_4 Turkish Female
v2/tr_speaker_5 Turkish Female
v2/tr_speaker_6 Turkish Male
v2/tr_speaker_7 Turkish Male
v2/tr_speaker_8 Turkish Male
v2/tr_speaker_9 Turkish Male
"""
# Dividir el mensaje en líneas
lineas = mensaje.split("\n")
datos_deseados = []
for linea in lineas:
partes = linea.split("\t")
if len(partes) == 3:
clave, _, genero = partes
datos_deseados.append(f"{clave}-{genero}")
return datos_deseados
def get_edge_voice():
completed_process = subprocess.run(['edge-tts',"-l"], capture_output=True, text=True)
lines = completed_process.stdout.strip().split("\n")
data = []
current_entry = {}
for line in lines:
if line.startswith("Name: "):
if current_entry:
data.append(current_entry)
current_entry = {"Name": line.split(": ")[1]}
elif line.startswith("Gender: "):
current_entry["Gender"] = line.split(": ")[1]
if current_entry:
data.append(current_entry)
tts_voice = []
for entry in data:
name = entry["Name"]
gender = entry["Gender"]
formatted_entry = f'{name}-{gender}'
tts_voice.append(formatted_entry)
return tts_voice
#print(set_tts_voice)