import os, sys, traceback, re import json now_dir = os.getcwd() sys.path.append(now_dir) from configs.config import Config Config = Config() import PySimpleGUI as sg import sounddevice as sd import noisereduce as nr import numpy as np from fairseq import checkpoint_utils import librosa, torch, pyworld, faiss, time, threading import torch.nn.functional as F import torchaudio.transforms as tat import scipy.signal as signal import torchcrepe # import matplotlib.pyplot as plt from lib.infer_pack.models import ( SynthesizerTrnMs256NSFsid, SynthesizerTrnMs256NSFsid_nono, SynthesizerTrnMs768NSFsid, SynthesizerTrnMs768NSFsid_nono, ) from i18n import I18nAuto i18n = I18nAuto() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") current_dir = os.getcwd() class RVC: def __init__( self, key, f0_method, hubert_path, pth_path, index_path, npy_path, index_rate ) -> None: """ 初始化 """ try: self.f0_up_key = key self.time_step = 160 / 16000 * 1000 self.f0_min = 50 self.f0_max = 1100 self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700) self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700) self.f0_method = f0_method self.sr = 16000 self.window = 160 # Get Torch Device if torch.cuda.is_available(): self.torch_device = torch.device( f"cuda:{0 % torch.cuda.device_count()}" ) elif torch.backends.mps.is_available(): self.torch_device = torch.device("mps") else: self.torch_device = torch.device("cpu") if index_rate != 0: self.index = faiss.read_index(index_path) # self.big_npy = np.load(npy_path) self.big_npy = self.index.reconstruct_n(0, self.index.ntotal) print("index search enabled") self.index_rate = index_rate model_path = hubert_path print("load model(s) from {}".format(model_path)) models, saved_cfg, task = checkpoint_utils.load_model_ensemble_and_task( [model_path], suffix="", ) self.model = models[0] self.model = self.model.to(device) if Config.is_half: self.model = self.model.half() else: self.model = self.model.float() self.model.eval() cpt = torch.load(pth_path, map_location="cpu") self.tgt_sr = cpt["config"][-1] cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] # n_spk self.if_f0 = cpt.get("f0", 1) self.version = cpt.get("version", "v1") if self.version == "v1": if self.if_f0 == 1: self.net_g = SynthesizerTrnMs256NSFsid( *cpt["config"], is_half=Config.is_half ) else: self.net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) elif self.version == "v2": if self.if_f0 == 1: self.net_g = SynthesizerTrnMs768NSFsid( *cpt["config"], is_half=Config.is_half ) else: self.net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) del self.net_g.enc_q print(self.net_g.load_state_dict(cpt["weight"], strict=False)) self.net_g.eval().to(device) if Config.is_half: self.net_g = self.net_g.half() else: self.net_g = self.net_g.float() except: print(traceback.format_exc()) def get_regular_crepe_computation(self, x, f0_min, f0_max, model="full"): batch_size = 512 # Compute pitch using first gpu audio = torch.tensor(np.copy(x))[None].float() f0, pd = torchcrepe.predict( audio, self.sr, self.window, f0_min, f0_max, model, batch_size=batch_size, device=self.torch_device, return_periodicity=True, ) pd = torchcrepe.filter.median(pd, 3) f0 = torchcrepe.filter.mean(f0, 3) f0[pd < 0.1] = 0 f0 = f0[0].cpu().numpy() return f0 def get_harvest_computation(self, x, f0_min, f0_max): f0, t = pyworld.harvest( x.astype(np.double), fs=self.sr, f0_ceil=f0_max, f0_floor=f0_min, frame_period=10, ) f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sr) f0 = signal.medfilt(f0, 3) return f0 def get_f0(self, x, f0_up_key, inp_f0=None): # Calculate Padding and f0 details here p_len = x.shape[0] // 512 # For Now This probs doesn't work x_pad = 1 f0_min = 50 f0_max = 1100 f0_mel_min = 1127 * np.log(1 + f0_min / 700) f0_mel_max = 1127 * np.log(1 + f0_max / 700) f0 = 0 # Here, check f0_methods and get their computations if self.f0_method == "harvest": f0 = self.get_harvest_computation(x, f0_min, f0_max) elif self.f0_method == "reg-crepe": f0 = self.get_regular_crepe_computation(x, f0_min, f0_max) elif self.f0_method == "reg-crepe-tiny": f0 = self.get_regular_crepe_computation(x, f0_min, f0_max, "tiny") # Calculate f0_course and f0_bak here f0 *= pow(2, f0_up_key / 12) # with open("test.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()])) tf0 = self.sr // self.window # 每秒f0点数 if inp_f0 is not None: delta_t = np.round( (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1 ).astype("int16") replace_f0 = np.interp( list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1] ) shape = f0[x_pad * tf0 : x_pad * tf0 + len(replace_f0)].shape[0] f0[x_pad * tf0 : x_pad * tf0 + len(replace_f0)] = replace_f0[:shape] # with open("test_opt.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()])) f0bak = f0.copy() f0_mel = 1127 * np.log(1 + f0 / 700) f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / ( f0_mel_max - f0_mel_min ) + 1 f0_mel[f0_mel <= 1] = 1 f0_mel[f0_mel > 255] = 255 f0_coarse = np.rint(f0_mel).astype(np.int) return f0_coarse, f0bak # 1-0 def infer(self, feats: torch.Tensor) -> np.ndarray: """ 推理函数 """ audio = feats.clone().cpu().numpy() assert feats.dim() == 1, feats.dim() feats = feats.view(1, -1) padding_mask = torch.BoolTensor(feats.shape).fill_(False) if Config.is_half: feats = feats.half() else: feats = feats.float() inputs = { "source": feats.to(device), "padding_mask": padding_mask.to(device), "output_layer": 9 if self.version == "v1" else 12, } torch.cuda.synchronize() with torch.no_grad(): logits = self.model.extract_features(**inputs) feats = ( self.model.final_proj(logits[0]) if self.version == "v1" else logits[0] ) ####索引优化 try: if ( hasattr(self, "index") and hasattr(self, "big_npy") and self.index_rate != 0 ): npy = feats[0].cpu().numpy().astype("float32") score, ix = self.index.search(npy, k=8) weight = np.square(1 / score) weight /= weight.sum(axis=1, keepdims=True) npy = np.sum(self.big_npy[ix] * np.expand_dims(weight, axis=2), axis=1) if Config.is_half: npy = npy.astype("float16") feats = ( torch.from_numpy(npy).unsqueeze(0).to(device) * self.index_rate + (1 - self.index_rate) * feats ) else: print("index search FAIL or disabled") except: traceback.print_exc() print("index search FAIL") feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) torch.cuda.synchronize() print(feats.shape) if self.if_f0 == 1: pitch, pitchf = self.get_f0(audio, self.f0_up_key) p_len = min(feats.shape[1], 13000, pitch.shape[0]) # 太大了爆显存 else: pitch, pitchf = None, None p_len = min(feats.shape[1], 13000) # 太大了爆显存 torch.cuda.synchronize() # print(feats.shape,pitch.shape) feats = feats[:, :p_len, :] if self.if_f0 == 1: pitch = pitch[:p_len] pitchf = pitchf[:p_len] pitch = torch.LongTensor(pitch).unsqueeze(0).to(device) pitchf = torch.FloatTensor(pitchf).unsqueeze(0).to(device) p_len = torch.LongTensor([p_len]).to(device) ii = 0 # sid sid = torch.LongTensor([ii]).to(device) with torch.no_grad(): if self.if_f0 == 1: infered_audio = ( self.net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0] .data.cpu() .float() ) else: infered_audio = ( self.net_g.infer(feats, p_len, sid)[0][0, 0].data.cpu().float() ) torch.cuda.synchronize() return infered_audio class GUIConfig: def __init__(self) -> None: self.hubert_path: str = "" self.pth_path: str = "" self.index_path: str = "" self.npy_path: str = "" self.f0_method: str = "" self.pitch: int = 12 self.samplerate: int = 44100 self.block_time: float = 1.0 # s self.buffer_num: int = 1 self.threhold: int = -30 self.crossfade_time: float = 0.08 self.extra_time: float = 0.04 self.I_noise_reduce = False self.O_noise_reduce = False self.index_rate = 0.3 class GUI: def __init__(self) -> None: self.config = GUIConfig() self.flag_vc = False self.launcher() def load(self): ( input_devices, output_devices, input_devices_indices, output_devices_indices, ) = self.get_devices() try: with open("values1.json", "r") as j: data = json.load(j) except: # Injecting f0_method into the json data with open("values1.json", "w") as j: data = { "pth_path": "", "index_path": "", "sg_input_device": input_devices[ input_devices_indices.index(sd.default.device[0]) ], "sg_output_device": output_devices[ output_devices_indices.index(sd.default.device[1]) ], "threhold": "-45", "pitch": "0", "index_rate": "0", "block_time": "1", "crossfade_length": "0.04", "extra_time": "1", } return data def launcher(self): data = self.load() sg.theme("DarkTeal12") input_devices, output_devices, _, _ = self.get_devices() layout = [ [ sg.Frame( title="Proudly forked by Mangio621", ), sg.Frame( title=i18n("Load model"), layout=[ [ sg.Input( default_text="hubert_base.pt", key="hubert_path", disabled=True, ), sg.FileBrowse( i18n("Hubert Model"), initial_folder=os.path.join(os.getcwd()), file_types=(("pt files", "*.pt"),), ), ], [ sg.Input( default_text=data.get("pth_path", ""), key="pth_path", ), sg.FileBrowse( i18n("Select the .pth file"), initial_folder=os.path.join(os.getcwd(), "weights"), file_types=(("weight files", "*.pth"),), ), ], [ sg.Input( default_text=data.get("index_path", ""), key="index_path", ), sg.FileBrowse( i18n("Select the .index file"), initial_folder=os.path.join(os.getcwd(), "logs"), file_types=(("index files", "*.index"),), ), ], [ sg.Input( default_text="你不需要填写这个You don't need write this.", key="npy_path", disabled=True, ), sg.FileBrowse( i18n("Select the .npy file"), initial_folder=os.path.join(os.getcwd(), "logs"), file_types=(("feature files", "*.npy"),), ), ], ], ), ], [ # Mangio f0 Selection frame Here sg.Frame( layout=[ [ sg.Radio( "Harvest", "f0_method", key="harvest", default=True ), sg.Radio("Crepe", "f0_method", key="reg-crepe"), sg.Radio("Crepe Tiny", "f0_method", key="reg-crepe-tiny"), ] ], title="Select an f0 Method", ) ], [ sg.Frame( layout=[ [ sg.Text(i18n("Input device")), sg.Combo( input_devices, key="sg_input_device", default_value=data.get("sg_input_device", ""), ), ], [ sg.Text(i18n("Output device")), sg.Combo( output_devices, key="sg_output_device", default_value=data.get("sg_output_device", ""), ), ], ], title=i18n("Audio device (please use the same type of driver)"), ) ], [ sg.Frame( layout=[ [ sg.Text(i18n("Response threshold")), sg.Slider( range=(-60, 0), key="threhold", resolution=1, orientation="h", default_value=data.get("threhold", ""), ), ], [ sg.Text(i18n("Pitch settings")), sg.Slider( range=(-24, 24), key="pitch", resolution=1, orientation="h", default_value=data.get("pitch", ""), ), ], [ sg.Text(i18n("Index Rate")), sg.Slider( range=(0.0, 1.0), key="index_rate", resolution=0.01, orientation="h", default_value=data.get("index_rate", ""), ), ], ], title=i18n("General settings"), ), sg.Frame( layout=[ [ sg.Text(i18n("Sample length")), sg.Slider( range=(0.1, 3.0), key="block_time", resolution=0.1, orientation="h", default_value=data.get("block_time", ""), ), ], [ sg.Text(i18n("Fade length")), sg.Slider( range=(0.01, 0.15), key="crossfade_length", resolution=0.01, orientation="h", default_value=data.get("crossfade_length", ""), ), ], [ sg.Text(i18n("Extra推理时长")), sg.Slider( range=(0.05, 3.00), key="extra_time", resolution=0.01, orientation="h", default_value=data.get("extra_time", ""), ), ], [ sg.Checkbox(i18n("Input noise reduction"), key="I_noise_reduce"), sg.Checkbox(i18n("Output noise reduction"), key="O_noise_reduce"), ], ], title=i18n("Performance settings"), ), ], [ sg.Button(i18n("开始音频Convert"), key="start_vc"), sg.Button(i18n("停止音频Convert"), key="stop_vc"), sg.Text(i18n("Inference time (ms):")), sg.Text("0", key="infer_time"), ], ] self.window = sg.Window("RVC - GUI", layout=layout) self.event_handler() def event_handler(self): while True: event, values = self.window.read() if event == sg.WINDOW_CLOSED: self.flag_vc = False exit() if event == "start_vc" and self.flag_vc == False: if self.set_values(values) == True: print("using_cuda:" + str(torch.cuda.is_available())) self.start_vc() settings = { "pth_path": values["pth_path"], "index_path": values["index_path"], "f0_method": self.get_f0_method_from_radios(values), "sg_input_device": values["sg_input_device"], "sg_output_device": values["sg_output_device"], "threhold": values["threhold"], "pitch": values["pitch"], "index_rate": values["index_rate"], "block_time": values["block_time"], "crossfade_length": values["crossfade_length"], "extra_time": values["extra_time"], } with open("values1.json", "w") as j: json.dump(settings, j) if event == "stop_vc" and self.flag_vc == True: self.flag_vc = False # Function that returns the used f0 method in string format "harvest" def get_f0_method_from_radios(self, values): f0_array = [ {"name": "harvest", "val": values["harvest"]}, {"name": "reg-crepe", "val": values["reg-crepe"]}, {"name": "reg-crepe-tiny", "val": values["reg-crepe-tiny"]}, ] # Filter through to find a true value used_f0 = "" for f0 in f0_array: if f0["val"] == True: used_f0 = f0["name"] break if used_f0 == "": used_f0 = "harvest" # Default Harvest if used_f0 is empty somehow return used_f0 def set_values(self, values): if len(values["pth_path"].strip()) == 0: sg.popup(i18n("Select the pth file")) return False if len(values["index_path"].strip()) == 0: sg.popup(i18n("Select the index file")) return False pattern = re.compile("[^\x00-\x7F]+") if pattern.findall(values["hubert_path"]): sg.popup(i18n("The hubert model path must not contain Chinese characters")) return False if pattern.findall(values["pth_path"]): sg.popup(i18n("The pth file path must not contain Chinese characters.")) return False if pattern.findall(values["index_path"]): sg.popup(i18n("The index file path must not contain Chinese characters.")) return False self.set_devices(values["sg_input_device"], values["sg_output_device"]) self.config.hubert_path = os.path.join(current_dir, "hubert_base.pt") self.config.pth_path = values["pth_path"] self.config.index_path = values["index_path"] self.config.npy_path = values["npy_path"] self.config.f0_method = self.get_f0_method_from_radios(values) self.config.threhold = values["threhold"] self.config.pitch = values["pitch"] self.config.block_time = values["block_time"] self.config.crossfade_time = values["crossfade_length"] self.config.extra_time = values["extra_time"] self.config.I_noise_reduce = values["I_noise_reduce"] self.config.O_noise_reduce = values["O_noise_reduce"] self.config.index_rate = values["index_rate"] return True def start_vc(self): torch.cuda.empty_cache() self.flag_vc = True self.block_frame = int(self.config.block_time * self.config.samplerate) self.crossfade_frame = int(self.config.crossfade_time * self.config.samplerate) self.sola_search_frame = int(0.012 * self.config.samplerate) self.delay_frame = int(0.01 * self.config.samplerate) # 往前预留0.02s self.extra_frame = int(self.config.extra_time * self.config.samplerate) self.rvc = None self.rvc = RVC( self.config.pitch, self.config.f0_method, self.config.hubert_path, self.config.pth_path, self.config.index_path, self.config.npy_path, self.config.index_rate, ) self.input_wav: np.ndarray = np.zeros( self.extra_frame + self.crossfade_frame + self.sola_search_frame + self.block_frame, dtype="float32", ) self.output_wav: torch.Tensor = torch.zeros( self.block_frame, device=device, dtype=torch.float32 ) self.sola_buffer: torch.Tensor = torch.zeros( self.crossfade_frame, device=device, dtype=torch.float32 ) self.fade_in_window: torch.Tensor = torch.linspace( 0.0, 1.0, steps=self.crossfade_frame, device=device, dtype=torch.float32 ) self.fade_out_window: torch.Tensor = 1 - self.fade_in_window self.resampler1 = tat.Resample( orig_freq=self.config.samplerate, new_freq=16000, dtype=torch.float32 ) self.resampler2 = tat.Resample( orig_freq=self.rvc.tgt_sr, new_freq=self.config.samplerate, dtype=torch.float32, ) thread_vc = threading.Thread(target=self.soundinput) thread_vc.start() def soundinput(self): """ 接受音频输入 """ with sd.Stream( channels=2, callback=self.audio_callback, blocksize=self.block_frame, samplerate=self.config.samplerate, dtype="float32", ): while self.flag_vc: time.sleep(self.config.block_time) print("Audio block passed.") print("ENDing VC") def audio_callback( self, indata: np.ndarray, outdata: np.ndarray, frames, times, status ): """ 音频处理 """ start_time = time.perf_counter() indata = librosa.to_mono(indata.T) if self.config.I_noise_reduce: indata[:] = nr.reduce_noise(y=indata, sr=self.config.samplerate) """noise gate""" frame_length = 2048 hop_length = 1024 rms = librosa.feature.rms( y=indata, frame_length=frame_length, hop_length=hop_length ) db_threhold = librosa.amplitude_to_db(rms, ref=1.0)[0] < self.config.threhold # print(rms.shape,db.shape,db) for i in range(db_threhold.shape[0]): if db_threhold[i]: indata[i * hop_length : (i + 1) * hop_length] = 0 self.input_wav[:] = np.append(self.input_wav[self.block_frame :], indata) # infer print("input_wav:" + str(self.input_wav.shape)) # print('infered_wav:'+str(infer_wav.shape)) infer_wav: torch.Tensor = self.resampler2( self.rvc.infer(self.resampler1(torch.from_numpy(self.input_wav))) )[-self.crossfade_frame - self.sola_search_frame - self.block_frame :].to( device ) print("infer_wav:" + str(infer_wav.shape)) # SOLA algorithm from https://github.com/yxlllc/DDSP-SVC cor_nom = F.conv1d( infer_wav[None, None, : self.crossfade_frame + self.sola_search_frame], self.sola_buffer[None, None, :], ) cor_den = torch.sqrt( F.conv1d( infer_wav[None, None, : self.crossfade_frame + self.sola_search_frame] ** 2, torch.ones(1, 1, self.crossfade_frame, device=device), ) + 1e-8 ) sola_offset = torch.argmax(cor_nom[0, 0] / cor_den[0, 0]) print("sola offset: " + str(int(sola_offset))) # crossfade self.output_wav[:] = infer_wav[sola_offset : sola_offset + self.block_frame] self.output_wav[: self.crossfade_frame] *= self.fade_in_window self.output_wav[: self.crossfade_frame] += self.sola_buffer[:] if sola_offset < self.sola_search_frame: self.sola_buffer[:] = ( infer_wav[ -self.sola_search_frame - self.crossfade_frame + sola_offset : -self.sola_search_frame + sola_offset ] * self.fade_out_window ) else: self.sola_buffer[:] = ( infer_wav[-self.crossfade_frame :] * self.fade_out_window ) if self.config.O_noise_reduce: outdata[:] = np.tile( nr.reduce_noise( y=self.output_wav[:].cpu().numpy(), sr=self.config.samplerate ), (2, 1), ).T else: outdata[:] = self.output_wav[:].repeat(2, 1).t().cpu().numpy() total_time = time.perf_counter() - start_time self.window["infer_time"].update(int(total_time * 1000)) print("infer time:" + str(total_time)) print("f0_method: " + str(self.config.f0_method)) def get_devices(self, update: bool = True): """获取设备列表""" if update: sd._terminate() sd._initialize() devices = sd.query_devices() hostapis = sd.query_hostapis() for hostapi in hostapis: for device_idx in hostapi["devices"]: devices[device_idx]["hostapi_name"] = hostapi["name"] input_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_input_channels"] > 0 ] output_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_output_channels"] > 0 ] input_devices_indices = [ d["index"] if "index" in d else d["name"] for d in devices if d["max_input_channels"] > 0 ] output_devices_indices = [ d["index"] if "index" in d else d["name"] for d in devices if d["max_output_channels"] > 0 ] return ( input_devices, output_devices, input_devices_indices, output_devices_indices, ) def set_devices(self, input_device, output_device): """设置输出设备""" ( input_devices, output_devices, input_device_indices, output_device_indices, ) = self.get_devices() sd.default.device[0] = input_device_indices[input_devices.index(input_device)] sd.default.device[1] = output_device_indices[ output_devices.index(output_device) ] print("input device:" + str(sd.default.device[0]) + ":" + str(input_device)) print("output device:" + str(sd.default.device[1]) + ":" + str(output_device)) gui = GUI()