import PySimpleGUI as sg import sounddevice as sd import torch,librosa,threading,time from enhancer import Enhancer import numpy as np from ddsp.vocoder import load_model, F0_Extractor, Volume_Extractor, Units_Encoder from ddsp.core import upsample class SvcDDSP: def __init__(self, model_path, vocoder_based_enhancer, enhancer_adaptive_key, input_pitch_extractor, f0_min, f0_max, threhold, spk_id, spk_mix_dict, enable_spk_id_cover): self.model_path = model_path self.vocoder_based_enhancer = vocoder_based_enhancer self.enhancer_adaptive_key = enhancer_adaptive_key self.input_pitch_extractor = input_pitch_extractor self.f0_min = f0_min self.f0_max = f0_max self.threhold = threhold self.device = 'cuda' if torch.cuda.is_available() else 'cpu' self.spk_id = spk_id self.spk_mix_dict = spk_mix_dict self.enable_spk_id_cover = enable_spk_id_cover # load ddsp model self.model, self.args = load_model(self.model_path, device=self.device) # load units encoder self.units_encoder = Units_Encoder( self.args.data.encoder, self.args.data.encoder_ckpt, self.args.data.encoder_sample_rate, self.args.data.encoder_hop_size, device=self.device) # load enhancer if self.vocoder_based_enhancer: self.enhancer = Enhancer(self.args.enhancer.type, self.args.enhancer.ckpt, device=self.device) def infer(self, pitch_adjust, speaker_id, safe_prefix_pad_length,audio,sample_rate): print("Infering...") # load input #audio, sample_rate = librosa.load(input_wav, sr=None, mono=True) hop_size = self.args.data.block_size * sample_rate / self.args.data.sampling_rate # safe front silence if safe_prefix_pad_length > 0.03: silence_front = safe_prefix_pad_length - 0.03 else: silence_front = 0 # extract f0 pitch_extractor = F0_Extractor( self.input_pitch_extractor, sample_rate, hop_size, float(self.f0_min), float(self.f0_max)) f0 = pitch_extractor.extract(audio, uv_interp=True, device=self.device, silence_front=silence_front) f0 = torch.from_numpy(f0).float().to(self.device).unsqueeze(-1).unsqueeze(0) f0 = f0 * 2 ** (float(pitch_adjust) / 12) # extract volume volume_extractor = Volume_Extractor(hop_size) volume = volume_extractor.extract(audio) mask = (volume > 10 ** (float(self.threhold) / 20)).astype('float') mask = np.pad(mask, (4, 4), constant_values=(mask[0], mask[-1])) mask = np.array([np.max(mask[n : n + 9]) for n in range(len(mask) - 8)]) mask = torch.from_numpy(mask).float().to(self.device).unsqueeze(-1).unsqueeze(0) mask = upsample(mask, self.args.data.block_size).squeeze(-1) volume = torch.from_numpy(volume).float().to(self.device).unsqueeze(-1).unsqueeze(0) # extract units audio_t = torch.from_numpy(audio).float().unsqueeze(0).to(self.device) units = self.units_encoder.encode(audio_t, sample_rate, hop_size) # spk_id or spk_mix_dict if self.enable_spk_id_cover: spk_id = self.spk_id else: spk_id = speaker_id spk_id = torch.LongTensor(np.array([[spk_id]])).to(self.device) # forward and return the output with torch.no_grad(): output, _, (s_h, s_n) = self.model(units, f0, volume, spk_id = spk_id, spk_mix_dict = self.spk_mix_dict) output *= mask if self.vocoder_based_enhancer: output, output_sample_rate = self.enhancer.enhance( output, self.args.data.sampling_rate, f0, self.args.data.block_size, adaptive_key = self.enhancer_adaptive_key, silence_front = silence_front) else: output_sample_rate = self.args.data.sampling_rate output = output.squeeze().cpu().numpy() return output, output_sample_rate class GUI: def __init__(self) -> None: self.flag_vc:bool=False#变声线程flag self.samplerate=44100#Hz self.block_time=1.5#s self.block_frame=0 self.crossfade_frame=0 self.fade_in_window:np.ndarray=None#crossfade计算用numpy数组 self.fade_out_window:np.ndarray=None#crossfade计算用numpy数组 self.f_safe_prefix_pad_length:float = 1.0 self.input_wav:np.ndarray=None#输入音频规范化后的保存地址 self.output_wav:np.ndarray=None#输出音频规范化后的保存地址 self.temp_wav:np.ndarray=None#包含crossfade和输出音频的缓存区 self.f_pitch_change:float = 0.0#float(request_form.get("fPitchChange", 0)) self.crossfade_last:np.ndarray=None#保存上一个output的crossfade self.f0_mode=["parselmouth", "dio", "harvest", "crepe"]#F0预测器 self.spk_id = 1# 默认说话人。 self.svc_model:SvcDDSP = None self.launcher()#start # 混合说话人字典(捏音色功能) # 设置为非 None 字典会覆盖 spk_id self.spk_mix_dict = None # {1:0.5, 2:0.5} 表示1号说话人和2号说话人的音色按照0.5:0.5的比例混合 self.use_vocoder_based_enhancer = True def launcher(self): '''窗口加载''' input_devices,output_devices,_, _=self.get_devices() sg.theme('DarkAmber') # 设置主题 # 界面布局 layout = [ [ sg.Frame(layout=[ [sg.Input(key='sg_model',default_text='exp\\model_chino.pt'),sg.FileBrowse('选择模型文件')] ],title='模型.pt格式(自动识别同目录下config.yaml)') ], [ sg.Frame(layout=[ [sg.Text("输入设备"),sg.Combo(input_devices,key='sg_input_device',default_value=input_devices[sd.default.device[0]])], [sg.Text("输出设备"),sg.Combo(output_devices,key='sg_output_device',default_value=output_devices[sd.default.device[1]])] ],title='音频设备') ], [ sg.Frame(layout=[ [sg.Text("说话人id"),sg.Input(key='spk_id',default_text='1')], [sg.Text("响应阈值"),sg.Slider(range=(-60,0),orientation='h',key='noise',resolution=1,default_value=-35)], [sg.Text("变调"),sg.Slider(range=(-24,24),orientation='h',key='pitch',resolution=1,default_value=12)], [sg.Text("采样率"),sg.Input(key='samplerate',default_text='44100')], [sg.Checkbox(text='启用捏音色功能',default=False,key='spk_mix'),sg.Button("设置混合音色",key='set_spk_mix')] ],title='普通设置'), sg.Frame(layout=[ [sg.Text("音频切分大小"),sg.Slider(range=(0.1,3.0),orientation='h',key='block',resolution=0.05,default_value=0.5)], [sg.Text("交叉淡化时长"),sg.Slider(range=(0.02,0.1),orientation='h',key='crossfade',resolution=0.01)], [sg.Text("使用历史区块数量"),sg.Slider(range=(1,10),orientation='h',key='buffernum',resolution=1,default_value=2)], [sg.Text("f0预测模式"),sg.Combo(values=self.f0_mode,key='f0_mode',default_value=self.f0_mode[2])], [sg.Checkbox(text='启用增强器',default=True,key='use_enhancer')] ],title='性能设置'), ], [sg.Button("开始音频转换",key="start_vc"),sg.Button("停止音频转换",key="stop_vc")] ] # 创造窗口 window = sg.Window('DDSP - GUI by INT16', layout) self.event_handler(window=window) def event_handler(self,window): '''事件处理''' while True:#事件处理循环 event, values = window.read() if event ==sg.WINDOW_CLOSED: # 如果用户关闭窗口 self.flag_vc=False exit() if event=='start_vc' and self.flag_vc==False: #set values 和界面布局layout顺序一一对应 checkpoint_path = values['sg_model'] self.set_devices(values["sg_input_device"],values['sg_output_device']) self.spk_id=int(values['spk_id']) threhold = values['noise'] self.f_pitch_change = values['pitch'] self.samplerate=int(values['samplerate']) block_time = float(values['block']) crossfade_time = values['crossfade'] buffer_num = int(values['buffernum']) select_pitch_extractor=values['f0_mode'] self.use_vocoder_based_enhancer=values['use_enhancer'] if not values['spk_mix']: self.spk_mix_dict=None self.block_frame=int(block_time*self.samplerate) self.crossfade_frame=int(crossfade_time*self.samplerate) self.f_safe_prefix_pad_length=block_time*(buffer_num)-crossfade_time*2 print('crossfade_time:'+str(crossfade_time)) print("buffer_num:"+str(buffer_num)) print("samplerate:"+str(self.samplerate)) print('block_time:'+str(block_time)) print("prefix_pad_length:"+str(self.f_safe_prefix_pad_length)) print("mix_mode:"+str(self.spk_mix_dict)) print("enhancer:"+str(self.use_vocoder_based_enhancer)) self.start_vc(checkpoint_path,select_pitch_extractor,threhold,buffer_num) if event=='stop_vc'and self.flag_vc==True: self.flag_vc = False if event=='set_spk_mix' and self.flag_vc==False: spk_mix = sg.popup_get_text(message='示例:1:0.3,2:0.5,3:0.2',title="设置混合音色,支持多人") if spk_mix != None: self.spk_mix_dict=eval("{"+spk_mix.replace(',',',').replace(':',':')+"}") def start_vc(self,checkpoint_path,select_pitch_extractor,threhold,buffer_num): '''开始音频转换''' self.flag_vc = True # 是否使用预训练的基于声码器的增强器增强输出,但对硬件要求更高。 enhancer_adaptive_key = 0 # f0范围限制(Hz) limit_f0_min = 50 limit_f0_max = 1100 enable_spk_id_cover = True #初始化一下各个ndarray self.input_wav=np.zeros(int((1+buffer_num)*self.block_frame),dtype='float32') self.output_wav=np.zeros(self.block_frame,dtype='float32') self.temp_wav=np.zeros(self.block_frame+self.crossfade_frame,dtype='float32') self.crossfade_last=np.zeros(self.crossfade_frame,dtype='float32') self.fade_in_window = np.linspace(0, 1,self.crossfade_frame) self.fade_out_window = np.linspace(1, 0,self.crossfade_frame) self.svc_model = SvcDDSP(checkpoint_path, self.use_vocoder_based_enhancer, enhancer_adaptive_key, select_pitch_extractor,limit_f0_min, limit_f0_max, threhold, self.spk_id, self.spk_mix_dict, enable_spk_id_cover) thread_vc=threading.Thread(target=self.soundinput) thread_vc.start() def soundinput(self): ''' 接受音频输入 ''' with sd.Stream(callback=self.audio_callback, blocksize=self.block_frame,samplerate=self.samplerate,dtype='float32'): while self.flag_vc: time.sleep(self.block_time) print('Audio block passed.') print('ENDing VC') def audio_callback(self,indata,outdata, frames, time, status): ''' 音频处理 ''' print("Realtime VCing...") self.input_wav[:]=np.roll(self.input_wav,-self.block_frame) self.input_wav[-self.block_frame:]=librosa.to_mono(indata.T) print('input_wav.shape:'+str(self.input_wav.shape)) _audio, _model_sr = self.svc_model.infer( self.f_pitch_change, self.spk_id, self.f_safe_prefix_pad_length,self.input_wav,self.samplerate) self.temp_wav[:] = librosa.resample(_audio, orig_sr=_model_sr, target_sr=self.samplerate)[-self.block_frame-self.crossfade_frame:] #cross-fade output_wav's start with last crossfade self.output_wav[:]=self.temp_wav[:self.block_frame] self.output_wav[:self.crossfade_frame]*=self.fade_in_window self.output_wav[:self.crossfade_frame]+=self.crossfade_last self.crossfade_last[:]=self.temp_wav[-self.crossfade_frame:] self.crossfade_last[:]*=self.fade_out_window print("infered _audio.shape:"+str(_audio.shape)) outdata[:] = np.array([self.output_wav, self.output_wav]).T print('Outputed.') def get_devices(self,update: bool = True): '''获取设备列表''' if update: sd._terminate() sd._initialize() devices = sd.query_devices() hostapis = sd.query_hostapis() for hostapi in hostapis: for device_idx in hostapi["devices"]: devices[device_idx]["hostapi_name"] = hostapi["name"] input_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_input_channels"] > 0 ] output_devices = [ f"{d['name']} ({d['hostapi_name']})" for d in devices if d["max_output_channels"] > 0 ] input_devices_indices = [d["index"] for d in devices if d["max_input_channels"] > 0] output_devices_indices = [ d["index"] for d in devices if d["max_output_channels"] > 0 ] return input_devices, output_devices, input_devices_indices, output_devices_indices def set_devices(self,input_device,output_device): '''设置输出设备''' input_devices,output_devices,input_device_indices, output_device_indices=self.get_devices() sd.default.device[0]=input_device_indices[input_devices.index(input_device)] sd.default.device[1]=output_device_indices[output_devices.index(output_device)] print("input device:"+str(sd.default.device[0])+":"+str(input_device)) print("output device:"+str(sd.default.device[1])+":"+str(output_device)) if __name__ == "__main__": gui=GUI()