import logging import os import shutil import subprocess import time import librosa import maad import numpy as np import torch import torchaudio from sovits import hubert_model from sovits import utils from sovits.mel_processing import spectrogram_torch from sovits.models import SynthesizerTrn from sovits.preprocess_wave import FeatureInput logging.getLogger('matplotlib').setLevel(logging.WARNING) def timeit(func): def run(*args, **kwargs): t = time.time() res = func(*args, **kwargs) print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t)) return res return run def cut_wav(raw_audio_path, out_audio_name, input_wav_path, cut_time): raw_audio, raw_sr = torchaudio.load(raw_audio_path) if raw_audio.shape[-1] / raw_sr > cut_time: subprocess.Popen( f"python ./sovits/slicer.py {raw_audio_path} --out_name {out_audio_name} --out {input_wav_path} --db_thresh -30", shell=True).wait() else: shutil.copy(raw_audio_path, f"{input_wav_path}/{out_audio_name}-00.wav") def get_end_file(dir_path, end): file_lists = [] for root, dirs, files in os.walk(dir_path): files = [f for f in files if f[0] != '.'] dirs[:] = [d for d in dirs if d[0] != '.'] for f_file in files: if f_file.endswith(end): file_lists.append(os.path.join(root, f_file).replace("\\", "/")) return file_lists def resize2d_f0(x, target_len): source = np.array(x) source[source < 0.001] = np.nan target = np.interp(np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)), source) res = np.nan_to_num(target) return res def clean_pitch(input_pitch): num_nan = np.sum(input_pitch == 1) if num_nan / len(input_pitch) > 0.9: input_pitch[input_pitch != 1] = 1 return input_pitch def plt_pitch(input_pitch): input_pitch = input_pitch.astype(float) input_pitch[input_pitch == 1] = np.nan return input_pitch def f0_to_pitch(ff): f0_pitch = 69 + 12 * np.log2(ff / 440) return f0_pitch def del_temp_wav(path_data): for i in get_end_file(path_data, "wav"): # os.listdir(path_data)#返回一个列表,里面是当前目录下面的所有东西的相对路径 os.remove(i) def fill_a_to_b(a, b): if len(a) < len(b): for _ in range(0, len(b) - len(a)): a.append(a[0]) def mkdir(paths: list): for path in paths: if not os.path.exists(path): os.mkdir(path) class Svc(object): def __init__(self, model_path, config_path, device="cpu"): self.model_path = model_path self.dev = torch.device(device) self.net_g_ms = None self.hps_ms = utils.get_hparams_from_file(config_path) self.target_sample = self.hps_ms.data.sampling_rate self.speakers = self.hps_ms.speakers # 加载hubert self.hubert_soft = hubert_model.hubert_soft(get_end_file("./pth", "pt")[0]) self.feature_input = FeatureInput(self.hps_ms.data.sampling_rate, self.hps_ms.data.hop_length) self.load_model() def load_model(self): # 获取模型配置 self.net_g_ms = SynthesizerTrn( 178, self.hps_ms.data.filter_length // 2 + 1, self.hps_ms.train.segment_size // self.hps_ms.data.hop_length, n_speakers=self.hps_ms.data.n_speakers, **self.hps_ms.model) _ = utils.load_checkpoint(self.model_path, self.net_g_ms, None) if "half" in self.model_path and torch.cuda.is_available(): _ = self.net_g_ms.half().eval().to(self.dev) else: _ = self.net_g_ms.eval().to(self.dev) def calc_error(self, in_path, out_path, tran): a, s = torchaudio.load(in_path) input_pitch = self.feature_input.compute_f0(a.cpu().numpy()[0], s) a, s = torchaudio.load(out_path) output_pitch = self.feature_input.compute_f0(a.cpu().numpy()[0], s) sum_y = [] if np.sum(input_pitch == 0) / len(input_pitch) > 0.9: mistake, var_take = 0, 0 else: for i in range(min(len(input_pitch), len(output_pitch))): if input_pitch[i] > 0 and output_pitch[i] > 0: sum_y.append(abs(f0_to_pitch(output_pitch[i]) - (f0_to_pitch(input_pitch[i]) + tran))) num_y = 0 for x in sum_y: num_y += x len_y = len(sum_y) if len(sum_y) else 1 mistake = round(float(num_y / len_y), 2) var_take = round(float(np.std(sum_y, ddof=1)), 2) return mistake, var_take def get_units(self, source, sr): source = torchaudio.functional.resample(source, sr, 16000) if len(source.shape) == 2 and source.shape[1] >= 2: source = torch.mean(source, dim=0).unsqueeze(0) source = source.unsqueeze(0).to(self.dev) with torch.inference_mode(): units = self.hubert_soft.units(source) return units def transcribe(self, source, sr, length, transform): feature_pit = self.feature_input.compute_f0(source, sr) feature_pit = feature_pit * 2 ** (transform / 12) feature_pit = resize2d_f0(feature_pit, length) coarse_pit = self.feature_input.coarse_f0(feature_pit) return coarse_pit def get_unit_pitch(self, in_path, tran): source, sr = torchaudio.load(in_path) soft = self.get_units(source, sr).squeeze(0).cpu().numpy() input_pitch = self.transcribe(source.cpu().numpy()[0], sr, soft.shape[0], tran) return soft, input_pitch def infer(self, speaker_id, tran, raw_path): sid = torch.LongTensor([int(speaker_id)]).to(self.dev) soft, pitch = self.get_unit_pitch(raw_path, tran) pitch = torch.LongTensor(clean_pitch(pitch)).unsqueeze(0).to(self.dev) if "half" in self.model_path and torch.cuda.is_available(): stn_tst = torch.HalfTensor(soft) else: stn_tst = torch.FloatTensor(soft) with torch.no_grad(): x_tst = stn_tst.unsqueeze(0).to(self.dev) x_tst_lengths = torch.LongTensor([stn_tst.size(0)]).to(self.dev) audio = self.net_g_ms.infer(x_tst, x_tst_lengths, pitch, sid=sid, noise_scale=0.3, noise_scale_w=0.5, length_scale=1)[0][0, 0].data.float() return audio, audio.shape[-1] def load_audio_to_torch(self, full_path): audio, sampling_rate = librosa.load(full_path, sr=self.target_sample, mono=True) return torch.FloatTensor(audio.astype(np.float32)) def vc(self, origin_id, target_id, raw_path): audio = self.load_audio_to_torch(raw_path) y = audio.unsqueeze(0).to(self.dev) spec = spectrogram_torch(y, self.hps_ms.data.filter_length, self.hps_ms.data.sampling_rate, self.hps_ms.data.hop_length, self.hps_ms.data.win_length, center=False) spec_lengths = torch.LongTensor([spec.size(-1)]).to(self.dev) sid_src = torch.LongTensor([origin_id]).to(self.dev) with torch.no_grad(): sid_tgt = torch.LongTensor([target_id]).to(self.dev) audio = self.net_g_ms.voice_conversion(spec, spec_lengths, sid_src=sid_src, sid_tgt=sid_tgt)[0][ 0, 0].data.float() return audio, audio.shape[-1] def format_wav(self, audio_path): raw_audio, raw_sample_rate = torchaudio.load(audio_path) if len(raw_audio.shape) == 2 and raw_audio.shape[1] >= 2: raw_audio = torch.mean(raw_audio, dim=0).unsqueeze(0) tar_audio = torchaudio.functional.resample(raw_audio, raw_sample_rate, self.target_sample) torchaudio.save(audio_path[:-4] + ".wav", tar_audio, self.target_sample) return tar_audio, self.target_sample def flask_format_wav(self, input_wav_path, daw_sample): raw_audio, raw_sample_rate = torchaudio.load(input_wav_path) tar_audio = torchaudio.functional.resample(raw_audio, daw_sample, self.target_sample) if len(tar_audio.shape) == 2 and tar_audio.shape[1] >= 2: tar_audio = torch.mean(tar_audio, dim=0).unsqueeze(0) return tar_audio.cpu().numpy(), self.target_sample class RealTimeVC: def __init__(self): self.last_chunk = None self.last_o = None self.chunk_len = 16000 # 区块长度 self.pre_len = 3840 # 交叉淡化长度,640的倍数 """输入输出都是1维numpy 音频波形数组""" def process(self, svc_model, speaker_id, f_pitch_change, input_wav_path): audio, sr = torchaudio.load(input_wav_path) audio = audio.cpu().numpy()[0] temp_wav = io.BytesIO() if self.last_chunk is None: input_wav_path.seek(0) audio, sr = svc_model.infer(speaker_id, f_pitch_change, input_wav_path) audio = audio.cpu().numpy() self.last_chunk = audio[-self.pre_len:] self.last_o = audio return audio[-self.chunk_len:] else: audio = np.concatenate([self.last_chunk, audio]) soundfile.write(temp_wav, audio, sr, format="wav") temp_wav.seek(0) audio, sr = svc_model.infer(speaker_id, f_pitch_change, temp_wav) audio = audio.cpu().numpy() ret = maad.util.crossfade(self.last_o, audio, self.pre_len) self.last_chunk = audio[-self.pre_len:] self.last_o = audio return ret[self.chunk_len:2 * self.chunk_len]