|
import PySimpleGUI as sg |
|
import sounddevice as sd |
|
import torch,librosa,threading,time |
|
from enhancer import Enhancer |
|
import numpy as np |
|
from ddsp.vocoder import load_model, F0_Extractor, Volume_Extractor, Units_Encoder |
|
from ddsp.core import upsample |
|
|
|
|
|
class SvcDDSP: |
|
def __init__(self, model_path, vocoder_based_enhancer, enhancer_adaptive_key, input_pitch_extractor, |
|
f0_min, f0_max, threhold, spk_id, spk_mix_dict, enable_spk_id_cover): |
|
self.model_path = model_path |
|
self.vocoder_based_enhancer = vocoder_based_enhancer |
|
self.enhancer_adaptive_key = enhancer_adaptive_key |
|
self.input_pitch_extractor = input_pitch_extractor |
|
self.f0_min = f0_min |
|
self.f0_max = f0_max |
|
self.threhold = threhold |
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
self.spk_id = spk_id |
|
self.spk_mix_dict = spk_mix_dict |
|
self.enable_spk_id_cover = enable_spk_id_cover |
|
|
|
|
|
self.model, self.args = load_model(self.model_path, device=self.device) |
|
|
|
|
|
self.units_encoder = Units_Encoder( |
|
self.args.data.encoder, |
|
self.args.data.encoder_ckpt, |
|
self.args.data.encoder_sample_rate, |
|
self.args.data.encoder_hop_size, |
|
device=self.device) |
|
|
|
|
|
if self.vocoder_based_enhancer: |
|
self.enhancer = Enhancer(self.args.enhancer.type, self.args.enhancer.ckpt, device=self.device) |
|
|
|
def infer(self, pitch_adjust, speaker_id, safe_prefix_pad_length,audio,sample_rate): |
|
print("Infering...") |
|
|
|
|
|
hop_size = self.args.data.block_size * sample_rate / self.args.data.sampling_rate |
|
|
|
if safe_prefix_pad_length > 0.03: |
|
silence_front = safe_prefix_pad_length - 0.03 |
|
else: |
|
silence_front = 0 |
|
|
|
|
|
pitch_extractor = F0_Extractor( |
|
self.input_pitch_extractor, |
|
sample_rate, |
|
hop_size, |
|
float(self.f0_min), |
|
float(self.f0_max)) |
|
f0 = pitch_extractor.extract(audio, uv_interp=True, device=self.device, silence_front=silence_front) |
|
f0 = torch.from_numpy(f0).float().to(self.device).unsqueeze(-1).unsqueeze(0) |
|
f0 = f0 * 2 ** (float(pitch_adjust) / 12) |
|
|
|
|
|
volume_extractor = Volume_Extractor(hop_size) |
|
volume = volume_extractor.extract(audio) |
|
mask = (volume > 10 ** (float(self.threhold) / 20)).astype('float') |
|
mask = np.pad(mask, (4, 4), constant_values=(mask[0], mask[-1])) |
|
mask = np.array([np.max(mask[n : n + 9]) for n in range(len(mask) - 8)]) |
|
mask = torch.from_numpy(mask).float().to(self.device).unsqueeze(-1).unsqueeze(0) |
|
mask = upsample(mask, self.args.data.block_size).squeeze(-1) |
|
volume = torch.from_numpy(volume).float().to(self.device).unsqueeze(-1).unsqueeze(0) |
|
|
|
|
|
audio_t = torch.from_numpy(audio).float().unsqueeze(0).to(self.device) |
|
units = self.units_encoder.encode(audio_t, sample_rate, hop_size) |
|
|
|
|
|
if self.enable_spk_id_cover: |
|
spk_id = self.spk_id |
|
else: |
|
spk_id = speaker_id |
|
spk_id = torch.LongTensor(np.array([[spk_id]])).to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
output, _, (s_h, s_n) = self.model(units, f0, volume, spk_id = spk_id, spk_mix_dict = self.spk_mix_dict) |
|
output *= mask |
|
if self.vocoder_based_enhancer: |
|
output, output_sample_rate = self.enhancer.enhance( |
|
output, |
|
self.args.data.sampling_rate, |
|
f0, |
|
self.args.data.block_size, |
|
adaptive_key = self.enhancer_adaptive_key, |
|
silence_front = silence_front) |
|
else: |
|
output_sample_rate = self.args.data.sampling_rate |
|
|
|
output = output.squeeze().cpu().numpy() |
|
return output, output_sample_rate |
|
|
|
|
|
|
|
|
|
class GUI: |
|
def __init__(self) -> None: |
|
self.flag_vc:bool=False |
|
self.samplerate=44100 |
|
self.block_time=1.5 |
|
self.block_frame=0 |
|
self.crossfade_frame=0 |
|
self.fade_in_window:np.ndarray=None |
|
self.fade_out_window:np.ndarray=None |
|
self.f_safe_prefix_pad_length:float = 1.0 |
|
self.input_wav:np.ndarray=None |
|
self.output_wav:np.ndarray=None |
|
self.temp_wav:np.ndarray=None |
|
self.f_pitch_change:float = 0.0 |
|
self.crossfade_last:np.ndarray=None |
|
self.f0_mode=["parselmouth", "dio", "harvest", "crepe"] |
|
self.spk_id = 1 |
|
self.svc_model:SvcDDSP = None |
|
self.launcher() |
|
|
|
|
|
self.spk_mix_dict = None |
|
self.use_vocoder_based_enhancer = True |
|
|
|
|
|
def launcher(self): |
|
'''窗口加载''' |
|
input_devices,output_devices,_, _=self.get_devices() |
|
sg.theme('DarkAmber') |
|
|
|
layout = [ |
|
[ sg.Frame(layout=[ |
|
[sg.Input(key='sg_model',default_text='exp\\model_chino.pt'),sg.FileBrowse('选择模型文件')] |
|
],title='模型.pt格式(自动识别同目录下config.yaml)') |
|
], |
|
[ sg.Frame(layout=[ |
|
[sg.Text("输入设备"),sg.Combo(input_devices,key='sg_input_device',default_value=input_devices[sd.default.device[0]])], |
|
[sg.Text("输出设备"),sg.Combo(output_devices,key='sg_output_device',default_value=output_devices[sd.default.device[1]])] |
|
],title='音频设备') |
|
], |
|
[ sg.Frame(layout=[ |
|
[sg.Text("说话人id"),sg.Input(key='spk_id',default_text='1')], |
|
[sg.Text("响应阈值"),sg.Slider(range=(-60,0),orientation='h',key='noise',resolution=1,default_value=-35)], |
|
[sg.Text("变调"),sg.Slider(range=(-24,24),orientation='h',key='pitch',resolution=1,default_value=12)], |
|
[sg.Text("采样率"),sg.Input(key='samplerate',default_text='44100')], |
|
[sg.Checkbox(text='启用捏音色功能',default=False,key='spk_mix'),sg.Button("设置混合音色",key='set_spk_mix')] |
|
],title='普通设置'), |
|
sg.Frame(layout=[ |
|
[sg.Text("音频切分大小"),sg.Slider(range=(0.1,3.0),orientation='h',key='block',resolution=0.05,default_value=0.5)], |
|
[sg.Text("交叉淡化时长"),sg.Slider(range=(0.02,0.1),orientation='h',key='crossfade',resolution=0.01)], |
|
[sg.Text("使用历史区块数量"),sg.Slider(range=(1,10),orientation='h',key='buffernum',resolution=1,default_value=2)], |
|
[sg.Text("f0预测模式"),sg.Combo(values=self.f0_mode,key='f0_mode',default_value=self.f0_mode[2])], |
|
[sg.Checkbox(text='启用增强器',default=True,key='use_enhancer')] |
|
],title='性能设置'), |
|
], |
|
[sg.Button("开始音频转换",key="start_vc"),sg.Button("停止音频转换",key="stop_vc")] |
|
] |
|
|
|
|
|
window = sg.Window('DDSP - GUI by INT16', layout) |
|
self.event_handler(window=window) |
|
|
|
|
|
def event_handler(self,window): |
|
'''事件处理''' |
|
while True: |
|
event, values = window.read() |
|
if event ==sg.WINDOW_CLOSED: |
|
self.flag_vc=False |
|
exit() |
|
if event=='start_vc' and self.flag_vc==False: |
|
|
|
checkpoint_path = values['sg_model'] |
|
self.set_devices(values["sg_input_device"],values['sg_output_device']) |
|
self.spk_id=int(values['spk_id']) |
|
threhold = values['noise'] |
|
self.f_pitch_change = values['pitch'] |
|
self.samplerate=int(values['samplerate']) |
|
block_time = float(values['block']) |
|
crossfade_time = values['crossfade'] |
|
buffer_num = int(values['buffernum']) |
|
select_pitch_extractor=values['f0_mode'] |
|
self.use_vocoder_based_enhancer=values['use_enhancer'] |
|
if not values['spk_mix']: |
|
self.spk_mix_dict=None |
|
self.block_frame=int(block_time*self.samplerate) |
|
self.crossfade_frame=int(crossfade_time*self.samplerate) |
|
self.f_safe_prefix_pad_length=block_time*(buffer_num)-crossfade_time*2 |
|
print('crossfade_time:'+str(crossfade_time)) |
|
print("buffer_num:"+str(buffer_num)) |
|
print("samplerate:"+str(self.samplerate)) |
|
print('block_time:'+str(block_time)) |
|
print("prefix_pad_length:"+str(self.f_safe_prefix_pad_length)) |
|
print("mix_mode:"+str(self.spk_mix_dict)) |
|
print("enhancer:"+str(self.use_vocoder_based_enhancer)) |
|
self.start_vc(checkpoint_path,select_pitch_extractor,threhold,buffer_num) |
|
if event=='stop_vc'and self.flag_vc==True: |
|
self.flag_vc = False |
|
if event=='set_spk_mix' and self.flag_vc==False: |
|
spk_mix = sg.popup_get_text(message='示例:1:0.3,2:0.5,3:0.2',title="设置混合音色,支持多人") |
|
if spk_mix != None: |
|
self.spk_mix_dict=eval("{"+spk_mix.replace(',',',').replace(':',':')+"}") |
|
|
|
|
|
def start_vc(self,checkpoint_path,select_pitch_extractor,threhold,buffer_num): |
|
'''开始音频转换''' |
|
self.flag_vc = True |
|
|
|
|
|
enhancer_adaptive_key = 0 |
|
|
|
limit_f0_min = 50 |
|
limit_f0_max = 1100 |
|
enable_spk_id_cover = True |
|
|
|
self.input_wav=np.zeros(int((1+buffer_num)*self.block_frame),dtype='float32') |
|
self.output_wav=np.zeros(self.block_frame,dtype='float32') |
|
self.temp_wav=np.zeros(self.block_frame+self.crossfade_frame,dtype='float32') |
|
self.crossfade_last=np.zeros(self.crossfade_frame,dtype='float32') |
|
self.fade_in_window = np.linspace(0, 1,self.crossfade_frame) |
|
self.fade_out_window = np.linspace(1, 0,self.crossfade_frame) |
|
self.svc_model = SvcDDSP(checkpoint_path, self.use_vocoder_based_enhancer, enhancer_adaptive_key, select_pitch_extractor,limit_f0_min, limit_f0_max, threhold, self.spk_id, self.spk_mix_dict, enable_spk_id_cover) |
|
thread_vc=threading.Thread(target=self.soundinput) |
|
thread_vc.start() |
|
|
|
|
|
def soundinput(self): |
|
''' |
|
接受音频输入 |
|
''' |
|
with sd.Stream(callback=self.audio_callback, blocksize=self.block_frame,samplerate=self.samplerate,dtype='float32'): |
|
while self.flag_vc: |
|
time.sleep(self.block_time) |
|
print('Audio block passed.') |
|
print('ENDing VC') |
|
|
|
|
|
def audio_callback(self,indata,outdata, frames, time, status): |
|
''' |
|
音频处理 |
|
''' |
|
print("Realtime VCing...") |
|
self.input_wav[:]=np.roll(self.input_wav,-self.block_frame) |
|
self.input_wav[-self.block_frame:]=librosa.to_mono(indata.T) |
|
print('input_wav.shape:'+str(self.input_wav.shape)) |
|
_audio, _model_sr = self.svc_model.infer( self.f_pitch_change, self.spk_id, self.f_safe_prefix_pad_length,self.input_wav,self.samplerate) |
|
self.temp_wav[:] = librosa.resample(_audio, orig_sr=_model_sr, target_sr=self.samplerate)[-self.block_frame-self.crossfade_frame:] |
|
|
|
self.output_wav[:]=self.temp_wav[:self.block_frame] |
|
self.output_wav[:self.crossfade_frame]*=self.fade_in_window |
|
self.output_wav[:self.crossfade_frame]+=self.crossfade_last |
|
self.crossfade_last[:]=self.temp_wav[-self.crossfade_frame:] |
|
self.crossfade_last[:]*=self.fade_out_window |
|
print("infered _audio.shape:"+str(_audio.shape)) |
|
outdata[:] = np.array([self.output_wav, self.output_wav]).T |
|
print('Outputed.') |
|
|
|
|
|
def get_devices(self,update: bool = True): |
|
'''获取设备列表''' |
|
if update: |
|
sd._terminate() |
|
sd._initialize() |
|
devices = sd.query_devices() |
|
hostapis = sd.query_hostapis() |
|
for hostapi in hostapis: |
|
for device_idx in hostapi["devices"]: |
|
devices[device_idx]["hostapi_name"] = hostapi["name"] |
|
input_devices = [ |
|
f"{d['name']} ({d['hostapi_name']})" |
|
for d in devices |
|
if d["max_input_channels"] > 0 |
|
] |
|
output_devices = [ |
|
f"{d['name']} ({d['hostapi_name']})" |
|
for d in devices |
|
if d["max_output_channels"] > 0 |
|
] |
|
input_devices_indices = [d["index"] for d in devices if d["max_input_channels"] > 0] |
|
output_devices_indices = [ |
|
d["index"] for d in devices if d["max_output_channels"] > 0 |
|
] |
|
return input_devices, output_devices, input_devices_indices, output_devices_indices |
|
|
|
def set_devices(self,input_device,output_device): |
|
'''设置输出设备''' |
|
input_devices,output_devices,input_device_indices, output_device_indices=self.get_devices() |
|
sd.default.device[0]=input_device_indices[input_devices.index(input_device)] |
|
sd.default.device[1]=output_device_indices[output_devices.index(output_device)] |
|
print("input device:"+str(sd.default.device[0])+":"+str(input_device)) |
|
print("output device:"+str(sd.default.device[1])+":"+str(output_device)) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
gui=GUI() |
|
|