|
import hashlib |
|
import io |
|
import json |
|
import logging |
|
import os |
|
import time |
|
from pathlib import Path |
|
from inference import slicer |
|
|
|
import librosa |
|
import numpy as np |
|
|
|
import parselmouth |
|
import soundfile |
|
import torch |
|
import torchaudio |
|
|
|
import cluster |
|
from hubert import hubert_model |
|
import utils |
|
from models import SynthesizerTrn |
|
|
|
logging.getLogger('matplotlib').setLevel(logging.WARNING) |
|
|
|
|
|
def read_temp(file_name): |
|
if not os.path.exists(file_name): |
|
with open(file_name, "w") as f: |
|
f.write(json.dumps({"info": "temp_dict"})) |
|
return {} |
|
else: |
|
try: |
|
with open(file_name, "r") as f: |
|
data = f.read() |
|
data_dict = json.loads(data) |
|
if os.path.getsize(file_name) > 50 * 1024 * 1024: |
|
f_name = file_name.replace("\\", "/").split("/")[-1] |
|
print(f"clean {f_name}") |
|
for wav_hash in list(data_dict.keys()): |
|
if int(time.time()) - int(data_dict[wav_hash]["time"]) > 14 * 24 * 3600: |
|
del data_dict[wav_hash] |
|
except Exception as e: |
|
print(e) |
|
print(f"{file_name} error,auto rebuild file") |
|
data_dict = {"info": "temp_dict"} |
|
return data_dict |
|
|
|
|
|
def write_temp(file_name, data): |
|
with open(file_name, "w") as f: |
|
f.write(json.dumps(data)) |
|
|
|
|
|
def timeit(func): |
|
def run(*args, **kwargs): |
|
t = time.time() |
|
res = func(*args, **kwargs) |
|
print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t)) |
|
return res |
|
|
|
return run |
|
|
|
|
|
def format_wav(audio_path): |
|
if Path(audio_path).suffix == '.wav': |
|
return |
|
raw_audio, raw_sample_rate = librosa.load(audio_path, mono=True, sr=None) |
|
soundfile.write(Path(audio_path).with_suffix(".wav"), raw_audio, raw_sample_rate) |
|
|
|
|
|
def get_end_file(dir_path, end): |
|
file_lists = [] |
|
for root, dirs, files in os.walk(dir_path): |
|
files = [f for f in files if f[0] != '.'] |
|
dirs[:] = [d for d in dirs if d[0] != '.'] |
|
for f_file in files: |
|
if f_file.endswith(end): |
|
file_lists.append(os.path.join(root, f_file).replace("\\", "/")) |
|
return file_lists |
|
|
|
|
|
def get_md5(content): |
|
return hashlib.new("md5", content).hexdigest() |
|
|
|
def fill_a_to_b(a, b): |
|
if len(a) < len(b): |
|
for _ in range(0, len(b) - len(a)): |
|
a.append(a[0]) |
|
|
|
def mkdir(paths: list): |
|
for path in paths: |
|
if not os.path.exists(path): |
|
os.mkdir(path) |
|
|
|
|
|
class Svc(object): |
|
def __init__(self, net_g_path, config_path, |
|
device=None, |
|
cluster_model_path="logs/44k/kmeans_10000.pt"): |
|
self.net_g_path = net_g_path |
|
if device is None: |
|
self.dev = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
else: |
|
self.dev = torch.device(device) |
|
self.net_g_ms = None |
|
self.hps_ms = utils.get_hparams_from_file(config_path) |
|
self.target_sample = self.hps_ms.data.sampling_rate |
|
self.hop_size = self.hps_ms.data.hop_length |
|
self.spk2id = self.hps_ms.spk |
|
|
|
self.hubert_model = utils.get_hubert_model().to(self.dev) |
|
self.load_model() |
|
if os.path.exists(cluster_model_path): |
|
self.cluster_model = cluster.get_cluster_model(cluster_model_path) |
|
|
|
def load_model(self): |
|
|
|
self.net_g_ms = SynthesizerTrn( |
|
self.hps_ms.data.filter_length // 2 + 1, |
|
self.hps_ms.train.segment_size // self.hps_ms.data.hop_length, |
|
**self.hps_ms.model) |
|
_ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None) |
|
if "half" in self.net_g_path and torch.cuda.is_available(): |
|
_ = self.net_g_ms.half().eval().to(self.dev) |
|
else: |
|
_ = self.net_g_ms.eval().to(self.dev) |
|
|
|
|
|
|
|
def get_unit_f0(self, in_path, tran, cluster_infer_ratio, speaker): |
|
|
|
wav, sr = librosa.load(in_path, sr=self.target_sample) |
|
|
|
f0 = utils.compute_f0_parselmouth(wav, sampling_rate=self.target_sample, hop_length=self.hop_size) |
|
f0, uv = utils.interpolate_f0(f0) |
|
f0 = torch.FloatTensor(f0) |
|
uv = torch.FloatTensor(uv) |
|
f0 = f0 * 2 ** (tran / 12) |
|
f0 = f0.unsqueeze(0).to(self.dev) |
|
uv = uv.unsqueeze(0).to(self.dev) |
|
|
|
wav16k = librosa.resample(wav, orig_sr=self.target_sample, target_sr=16000) |
|
wav16k = torch.from_numpy(wav16k).to(self.dev) |
|
c = utils.get_hubert_content(self.hubert_model, wav_16k_tensor=wav16k) |
|
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1]) |
|
|
|
if cluster_infer_ratio !=0: |
|
cluster_c = cluster.get_cluster_center_result(self.cluster_model, c.numpy().T, speaker).T |
|
cluster_c = torch.FloatTensor(cluster_c) |
|
c = cluster_infer_ratio * cluster_c + (1 - cluster_infer_ratio) * c |
|
|
|
c = c.unsqueeze(0) |
|
return c, f0, uv |
|
|
|
def infer(self, speaker, tran, raw_path, |
|
cluster_infer_ratio=0, |
|
auto_predict_f0=False, |
|
noice_scale=0.4): |
|
speaker_id = self.spk2id[speaker] |
|
sid = torch.LongTensor([int(speaker_id)]).to(self.dev).unsqueeze(0) |
|
c, f0, uv = self.get_unit_f0(raw_path, tran, cluster_infer_ratio, speaker) |
|
if "half" in self.net_g_path and torch.cuda.is_available(): |
|
c = c.half() |
|
with torch.no_grad(): |
|
start = time.time() |
|
audio = self.net_g_ms.infer(c, f0=f0, g=sid, uv=uv, predict_f0=auto_predict_f0, noice_scale=noice_scale)[0,0].data.float() |
|
use_time = time.time() - start |
|
print("vits use time:{}".format(use_time)) |
|
return audio, audio.shape[-1] |
|
|
|
def slice_inference(self,raw_audio_path, spk, tran, slice_db,cluster_infer_ratio, auto_predict_f0,noice_scale, pad_seconds=0.5): |
|
wav_path = raw_audio_path |
|
chunks = slicer.cut(wav_path, db_thresh=slice_db) |
|
audio_data, audio_sr = slicer.chunks2audio(wav_path, chunks) |
|
|
|
audio = [] |
|
for (slice_tag, data) in audio_data: |
|
print(f'#=====segment start, {round(len(data) / audio_sr, 3)}s======') |
|
|
|
pad_len = int(audio_sr * pad_seconds) |
|
data = np.concatenate([np.zeros([pad_len]), data, np.zeros([pad_len])]) |
|
length = int(np.ceil(len(data) / audio_sr * self.target_sample)) |
|
raw_path = io.BytesIO() |
|
soundfile.write(raw_path, data, audio_sr, format="wav") |
|
raw_path.seek(0) |
|
if slice_tag: |
|
print('jump empty segment') |
|
_audio = np.zeros(length) |
|
else: |
|
out_audio, out_sr = self.infer(spk, tran, raw_path, |
|
cluster_infer_ratio=cluster_infer_ratio, |
|
auto_predict_f0=auto_predict_f0, |
|
noice_scale=noice_scale |
|
) |
|
_audio = out_audio.cpu().numpy() |
|
|
|
pad_len = int(self.target_sample * pad_seconds) |
|
_audio = _audio[pad_len:-pad_len] |
|
audio.extend(list(_audio)) |
|
return np.array(audio) |
|
|
|
|
|
class RealTimeVC: |
|
def __init__(self): |
|
self.last_chunk = None |
|
self.last_o = None |
|
self.chunk_len = 16000 |
|
self.pre_len = 3840 |
|
|
|
"""输入输出都是1维numpy 音频波形数组""" |
|
|
|
def process(self, svc_model, speaker_id, f_pitch_change, input_wav_path): |
|
import maad |
|
audio, sr = torchaudio.load(input_wav_path) |
|
audio = audio.cpu().numpy()[0] |
|
temp_wav = io.BytesIO() |
|
if self.last_chunk is None: |
|
input_wav_path.seek(0) |
|
audio, sr = svc_model.infer(speaker_id, f_pitch_change, input_wav_path) |
|
audio = audio.cpu().numpy() |
|
self.last_chunk = audio[-self.pre_len:] |
|
self.last_o = audio |
|
return audio[-self.chunk_len:] |
|
else: |
|
audio = np.concatenate([self.last_chunk, audio]) |
|
soundfile.write(temp_wav, audio, sr, format="wav") |
|
temp_wav.seek(0) |
|
audio, sr = svc_model.infer(speaker_id, f_pitch_change, temp_wav) |
|
audio = audio.cpu().numpy() |
|
ret = maad.util.crossfade(self.last_o, audio, self.pre_len) |
|
self.last_chunk = audio[-self.pre_len:] |
|
self.last_o = audio |
|
return ret[self.chunk_len:2 * self.chunk_len] |
|
|