File size: 4,321 Bytes
7bc29af |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import os
import sys
import traceback
import parselmouth
now_dir = os.getcwd()
sys.path.append(now_dir)
import logging
import numpy as np
import pyworld
from infer.lib.audio import load_audio
logging.getLogger("numba").setLevel(logging.WARNING)
exp_dir = sys.argv[1]
import torch_directml
device = torch_directml.device(torch_directml.default_device())
f = open("%s/extract_f0_feature.log" % exp_dir, "a+")
def printt(strr):
print(strr)
f.write("%s\n" % strr)
f.flush()
class FeatureInput(object):
def __init__(self, samplerate=16000, hop_size=160):
self.fs = samplerate
self.hop = hop_size
self.f0_bin = 256
self.f0_max = 1100.0
self.f0_min = 50.0
self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700)
self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700)
def compute_f0(self, path, f0_method):
x = load_audio(path, self.fs)
# p_len = x.shape[0] // self.hop
if f0_method == "rmvpe":
if hasattr(self, "model_rmvpe") == False:
from infer.lib.rmvpe import RMVPE
print("Loading rmvpe model")
self.model_rmvpe = RMVPE(
"assets/rmvpe/rmvpe.pt", is_half=False, device=device
)
f0 = self.model_rmvpe.infer_from_audio(x, thred=0.03)
return f0
def coarse_f0(self, f0):
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - self.f0_mel_min) * (
self.f0_bin - 2
) / (self.f0_mel_max - self.f0_mel_min) + 1
# use 0 or 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > self.f0_bin - 1] = self.f0_bin - 1
f0_coarse = np.rint(f0_mel).astype(int)
assert f0_coarse.max() <= 255 and f0_coarse.min() >= 1, (
f0_coarse.max(),
f0_coarse.min(),
)
return f0_coarse
def go(self, paths, f0_method):
if len(paths) == 0:
printt("no-f0-todo")
else:
printt("todo-f0-%s" % len(paths))
n = max(len(paths) // 5, 1) # 每个进程最多打印5条
for idx, (inp_path, opt_path1, opt_path2) in enumerate(paths):
try:
if idx % n == 0:
printt("f0ing,now-%s,all-%s,-%s" % (idx, len(paths), inp_path))
if (
os.path.exists(opt_path1 + ".npy") == True
and os.path.exists(opt_path2 + ".npy") == True
):
continue
featur_pit = self.compute_f0(inp_path, f0_method)
np.save(
opt_path2,
featur_pit,
allow_pickle=False,
) # nsf
coarse_pit = self.coarse_f0(featur_pit)
np.save(
opt_path1,
coarse_pit,
allow_pickle=False,
) # ori
except:
printt("f0fail-%s-%s-%s" % (idx, inp_path, traceback.format_exc()))
if __name__ == "__main__":
# exp_dir=r"E:\codes\py39\dataset\mi-test"
# n_p=16
# f = open("%s/log_extract_f0.log"%exp_dir, "w")
printt(sys.argv)
featureInput = FeatureInput()
paths = []
inp_root = "%s/1_16k_wavs" % (exp_dir)
opt_root1 = "%s/2a_f0" % (exp_dir)
opt_root2 = "%s/2b-f0nsf" % (exp_dir)
os.makedirs(opt_root1, exist_ok=True)
os.makedirs(opt_root2, exist_ok=True)
for name in sorted(list(os.listdir(inp_root))):
inp_path = "%s/%s" % (inp_root, name)
if "spec" in inp_path:
continue
opt_path1 = "%s/%s" % (opt_root1, name)
opt_path2 = "%s/%s" % (opt_root2, name)
paths.append([inp_path, opt_path1, opt_path2])
try:
featureInput.go(paths, "rmvpe")
except:
printt("f0_all_fail-%s" % (traceback.format_exc()))
# ps = []
# for i in range(n_p):
# p = Process(
# target=featureInput.go,
# args=(
# paths[i::n_p],
# f0method,
# ),
# )
# ps.append(p)
# p.start()
# for i in range(n_p):
# ps[i].join()
|