Spaces:
Running
Running
| import gc | |
| import re | |
| import os | |
| import sys | |
| import time | |
| import torch | |
| import faiss | |
| import shutil | |
| import codecs | |
| import pyworld | |
| import librosa | |
| import logging | |
| import argparse | |
| import warnings | |
| import traceback | |
| import torchcrepe | |
| import subprocess | |
| import parselmouth | |
| import logging.handlers | |
| import numpy as np | |
| import soundfile as sf | |
| import noisereduce as nr | |
| import torch.nn.functional as F | |
| import torch.multiprocessing as mp | |
| from tqdm import tqdm | |
| from scipy import signal | |
| from torch import Tensor | |
| from scipy.io import wavfile | |
| from audio_upscaler import upscale | |
| from distutils.util import strtobool | |
| from fairseq import checkpoint_utils | |
| from pydub import AudioSegment, silence | |
| now_dir = os.getcwd() | |
| sys.path.append(now_dir) | |
| from main.configs.config import Config | |
| from main.library.predictors.FCPE import FCPE | |
| from main.library.predictors.RMVPE import RMVPE | |
| from main.library.algorithm.synthesizers import Synthesizer | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| logging.getLogger("wget").setLevel(logging.ERROR) | |
| logging.getLogger("torch").setLevel(logging.ERROR) | |
| logging.getLogger("faiss").setLevel(logging.ERROR) | |
| logging.getLogger("httpx").setLevel(logging.ERROR) | |
| logging.getLogger("fairseq").setLevel(logging.ERROR) | |
| logging.getLogger("httpcore").setLevel(logging.ERROR) | |
| logging.getLogger("faiss.loader").setLevel(logging.ERROR) | |
| FILTER_ORDER = 5 | |
| CUTOFF_FREQUENCY = 48 | |
| SAMPLE_RATE = 16000 | |
| bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE) | |
| input_audio_path2wav = {} | |
| log_file = os.path.join("assets", "logs", "convert.log") | |
| logger = logging.getLogger(__name__) | |
| logger.propagate = False | |
| translations = Config().translations | |
| if logger.hasHandlers(): logger.handlers.clear() | |
| else: | |
| console_handler = logging.StreamHandler() | |
| console_formatter = logging.Formatter(fmt="\n%(asctime)s.%(msecs)03d | %(levelname)s | %(module)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
| console_handler.setFormatter(console_formatter) | |
| console_handler.setLevel(logging.INFO) | |
| file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=3, encoding='utf-8') | |
| file_formatter = logging.Formatter(fmt="\n%(asctime)s.%(msecs)03d | %(levelname)s | %(module)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
| file_handler.setFormatter(file_formatter) | |
| file_handler.setLevel(logging.DEBUG) | |
| logger.addHandler(console_handler) | |
| logger.addHandler(file_handler) | |
| logger.setLevel(logging.DEBUG) | |
| def parse_arguments() -> tuple: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--pitch", type=int, default=0) | |
| parser.add_argument("--filter_radius", type=int, default=3) | |
| parser.add_argument("--index_rate", type=float, default=0.5) | |
| parser.add_argument("--volume_envelope", type=float, default=1) | |
| parser.add_argument("--protect", type=float, default=0.33) | |
| parser.add_argument("--hop_length", type=int, default=64) | |
| parser.add_argument( "--f0_method", type=str, default="rmvpe") | |
| parser.add_argument("--input_path", type=str, required=True) | |
| parser.add_argument("--output_path", type=str, default="./audios/output.wav") | |
| parser.add_argument("--pth_path", type=str, required=True) | |
| parser.add_argument("--index_path", type=str, required=True) | |
| parser.add_argument("--f0_autotune", type=lambda x: bool(strtobool(x)), default=False) | |
| parser.add_argument("--f0_autotune_strength", type=float, default=1) | |
| parser.add_argument("--clean_audio", type=lambda x: bool(strtobool(x)), default=False) | |
| parser.add_argument("--clean_strength", type=float, default=0.7) | |
| parser.add_argument("--export_format", type=str, default="wav") | |
| parser.add_argument("--embedder_model", type=str, default="contentvec_base") | |
| parser.add_argument("--upscale_audio", type=lambda x: bool(strtobool(x)), default=False) | |
| parser.add_argument("--resample_sr", type=int, default=0) | |
| parser.add_argument("--batch_process", type=lambda x: bool(strtobool(x)), default=False) | |
| parser.add_argument("--batch_size", type=int, default=2) | |
| parser.add_argument("--split_audio", type=lambda x: bool(strtobool(x)), default=False) | |
| args = parser.parse_args() | |
| return args | |
| def main(): | |
| args = parse_arguments() | |
| pitch = args.pitch | |
| filter_radius = args.filter_radius | |
| index_rate = args.index_rate | |
| volume_envelope = args.volume_envelope | |
| protect = args.protect | |
| hop_length = args.hop_length | |
| f0_method = args.f0_method | |
| input_path = args.input_path | |
| output_path = args.output_path | |
| pth_path = args.pth_path | |
| index_path = args.index_path | |
| f0_autotune = args.f0_autotune | |
| f0_autotune_strength = args.f0_autotune_strength | |
| clean_audio = args.clean_audio | |
| clean_strength = args.clean_strength | |
| export_format = args.export_format | |
| embedder_model = args.embedder_model | |
| upscale_audio = args.upscale_audio | |
| resample_sr = args.resample_sr | |
| batch_process = args.batch_process | |
| batch_size = args.batch_size | |
| split_audio = args.split_audio | |
| logger.debug(f"{translations['pitch']}: {pitch}") | |
| logger.debug(f"{translations['filter_radius']}: {filter_radius}") | |
| logger.debug(f"{translations['index_strength']} {index_rate}") | |
| logger.debug(f"{translations['volume_envelope']}: {volume_envelope}") | |
| logger.debug(f"{translations['protect']}: {protect}") | |
| if f0_method == "crepe" or f0_method == "crepe-tiny": logger.debug(f"Hop length: {hop_length}") | |
| logger.debug(f"{translations['f0_method']}: {f0_method}") | |
| logger.debug(f"f0_method: {input_path}") | |
| logger.debug(f"{translations['audio_path']}: {input_path}") | |
| logger.debug(f"{translations['output_path']}: {output_path.replace('.wav', f'.{export_format}')}") | |
| logger.debug(f"{translations['model_path']}: {pth_path}") | |
| logger.debug(f"{translations['indexpath']}: {index_path}") | |
| logger.debug(f"{translations['autotune']}: {f0_autotune}") | |
| logger.debug(f"{translations['clear_audio']}: {clean_audio}") | |
| if clean_audio: logger.debug(f"{translations['clean_strength']}: {clean_strength}") | |
| logger.debug(f"{translations['export_format']}: {export_format}") | |
| logger.debug(f"{translations['hubert_model']}: {embedder_model}") | |
| logger.debug(f"{translations['upscale_audio']}: {upscale_audio}") | |
| if resample_sr != 0: logger.debug(f"{translations['sample_rate']}: {resample_sr}") | |
| if split_audio: logger.debug(f"{translations['batch_process']}: {batch_process}") | |
| if batch_process and split_audio: logger.debug(f"{translations['batch_size']}: {batch_size}") | |
| logger.debug(f"{translations['split_audio']}: {split_audio}") | |
| if f0_autotune: logger.debug(f"{translations['autotune_rate_info']}: {f0_autotune_strength}") | |
| check_rmvpe_fcpe(f0_method) | |
| check_hubert(embedder_model) | |
| run_convert_script(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, input_path=input_path, output_path=output_path, pth_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, upscale_audio=upscale_audio, resample_sr=resample_sr, batch_process=batch_process, batch_size=batch_size, split_audio=split_audio) | |
| def check_rmvpe_fcpe(method): | |
| def download_rmvpe(): | |
| if not os.path.exists(os.path.join("assets", "model", "predictors", "rmvpe.pt")): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + "rmvpe.pt", "-P", os.path.join("assets", "model", "predictors")], check=True) | |
| def download_fcpe(): | |
| if not os.path.exists(os.path.join("assets", "model", "predictors", "fcpe.pt")): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + "fcpe.pt", "-P", os.path.join("assets", "model", "predictors")], check=True) | |
| if method == "rmvpe": download_rmvpe() | |
| elif method == "fcpe": download_fcpe() | |
| elif "hybrid" in method: | |
| methods_str = re.search("hybrid\[(.+)\]", method) | |
| if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")] | |
| for method in methods: | |
| if method == "rmvpe": download_rmvpe() | |
| elif method == "fcpe": download_fcpe() | |
| def check_hubert(hubert): | |
| if hubert == "contentvec_base" or hubert == "hubert_base" or hubert == "japanese_hubert_base" or hubert == "korean_hubert_base" or hubert == "chinese_hubert_base": | |
| model_path = os.path.join(now_dir, "assets", "model", "embedders", hubert + '.pt') | |
| if not os.path.exists(model_path): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + f"{hubert}.pt", "-P", os.path.join("assets", "model", "embedders")], check=True) | |
| def load_audio_infer(file, sample_rate): | |
| try: | |
| file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ") | |
| if not os.path.isfile(file): raise FileNotFoundError(translations["not_found"].format(name=file)) | |
| audio, sr = sf.read(file) | |
| if len(audio.shape) > 1: audio = librosa.to_mono(audio.T) | |
| if sr != sample_rate: audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate) | |
| except Exception as e: | |
| raise RuntimeError(f"{translations['errors_loading_audio']}: {e}") | |
| return audio.flatten() | |
| def process_audio(file_path, output_path): | |
| try: | |
| song = AudioSegment.from_file(file_path) | |
| nonsilent_parts = silence.detect_nonsilent(song, min_silence_len=750, silence_thresh=-70) | |
| cut_files = [] | |
| time_stamps = [] | |
| min_chunk_duration = 30 | |
| for i, (start_i, end_i) in enumerate(nonsilent_parts): | |
| chunk = song[start_i:end_i] | |
| if len(chunk) >= min_chunk_duration: | |
| chunk_file_path = os.path.join(output_path, f"chunk{i}.wav") | |
| if os.path.exists(chunk_file_path): os.remove(chunk_file_path) | |
| chunk.export(chunk_file_path, format="wav") | |
| cut_files.append(chunk_file_path) | |
| time_stamps.append((start_i, end_i)) | |
| else: logger.debug(translations["skip_file"].format(i=i, chunk=len(chunk))) | |
| logger.info(f"{translations['split_total']}: {len(cut_files)}") | |
| return cut_files, time_stamps | |
| except Exception as e: | |
| raise RuntimeError(f"{translations['process_audio_error']}: {e}") | |
| def merge_audio(files_list, time_stamps, original_file_path, output_path, format): | |
| try: | |
| def extract_number(filename): | |
| match = re.search(r'_(\d+)', filename) | |
| return int(match.group(1)) if match else 0 | |
| files_list = sorted(files_list, key=extract_number) | |
| total_duration = len(AudioSegment.from_file(original_file_path)) | |
| combined = AudioSegment.empty() | |
| current_position = 0 | |
| for file, (start_i, end_i) in zip(files_list, time_stamps): | |
| if start_i > current_position: | |
| silence_duration = start_i - current_position | |
| combined += AudioSegment.silent(duration=silence_duration) | |
| combined += AudioSegment.from_file(file) | |
| current_position = end_i | |
| if current_position < total_duration: combined += AudioSegment.silent(duration=total_duration - current_position) | |
| combined.export(output_path, format=format) | |
| return output_path | |
| except Exception as e: | |
| raise RuntimeError(f"{translations['merge_error']}: {e}") | |
| def run_batch_convert(params): | |
| cvt = VoiceConverter() | |
| path = params["path"] | |
| audio_temp = params["audio_temp"] | |
| export_format = params["export_format"] | |
| cut_files = params["cut_files"] | |
| pitch = params["pitch"] | |
| filter_radius = params["filter_radius"] | |
| index_rate = params["index_rate"] | |
| volume_envelope = params["volume_envelope"] | |
| protect = params["protect"] | |
| hop_length = params["hop_length"] | |
| f0_method = params["f0_method"] | |
| pth_path = params["pth_path"] | |
| index_path = params["index_path"] | |
| f0_autotune = params["f0_autotune"] | |
| f0_autotune_strength = params["f0_autotune_strength"] | |
| clean_audio = params["clean_audio"] | |
| clean_strength = params["clean_strength"] | |
| upscale_audio = params["upscale_audio"] | |
| embedder_model = params["embedder_model"] | |
| resample_sr = params["resample_sr"] | |
| segment_output_path = os.path.join(audio_temp, f"output_{cut_files.index(path)}.{export_format}") | |
| if os.path.exists(segment_output_path): os.remove(segment_output_path) | |
| cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=path, audio_output_path=segment_output_path, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr) | |
| os.remove(path) | |
| if os.path.exists(segment_output_path): return segment_output_path | |
| else: | |
| logger.warning(f"{translations['not_found_convert_file']}: {segment_output_path}") | |
| sys.exit(1) | |
| def run_convert_script(pitch, filter_radius, index_rate, volume_envelope, protect, hop_length, f0_method, input_path, output_path, pth_path, index_path, f0_autotune, f0_autotune_strength, clean_audio, clean_strength, export_format, upscale_audio, embedder_model, resample_sr, batch_process, batch_size, split_audio): | |
| cvt = VoiceConverter() | |
| start_time = time.time() | |
| if not pth_path or not os.path.exists(pth_path) or os.path.isdir(pth_path) or not pth_path.endswith(".pth"): | |
| logger.warning(translations["provide_file"].format(filename=translations["model"])) | |
| sys.exit(1) | |
| if not index_path or not os.path.exists(index_path) or os.path.isdir(index_path) or not index_path.endswith(".index"): | |
| logger.warning(translations["provide_file"].format(filename=translations["index"])) | |
| sys.exit(1) | |
| output_dir = os.path.dirname(output_path) | |
| output_dir = output_path if not output_dir else output_dir | |
| if output_dir is None: output_dir = "audios" | |
| if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) | |
| audio_temp = os.path.join("audios_temp") | |
| if not os.path.exists(audio_temp) and split_audio: os.makedirs(audio_temp, exist_ok=True) | |
| processed_segments = [] | |
| if os.path.isdir(input_path): | |
| try: | |
| logger.info(translations["convert_batch"]) | |
| audio_files = [f for f in os.listdir(input_path) if f.endswith(("wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3"))] | |
| if not audio_files: | |
| logger.warning(translations["not_found_audio"]) | |
| sys.exit(1) | |
| logger.info(translations["found_audio"].format(audio_files=len(audio_files))) | |
| for audio in audio_files: | |
| audio_path = os.path.join(input_path, audio) | |
| output_audio = os.path.join(input_path, os.path.splitext(audio)[0] + f"_output.{export_format}") | |
| if split_audio: | |
| try: | |
| cut_files, time_stamps = process_audio(audio_path, audio_temp) | |
| num_threads = min(batch_size, len(cut_files)) | |
| params_list = [ | |
| { | |
| "path": path, | |
| "audio_temp": audio_temp, | |
| "export_format": export_format, | |
| "cut_files": cut_files, | |
| "pitch": pitch, | |
| "filter_radius": filter_radius, | |
| "index_rate": index_rate, | |
| "volume_envelope": volume_envelope, | |
| "protect": protect, | |
| "hop_length": hop_length, | |
| "f0_method": f0_method, | |
| "pth_path": pth_path, | |
| "index_path": index_path, | |
| "f0_autotune": f0_autotune, | |
| "f0_autotune_strength": f0_autotune_strength, | |
| "clean_audio": clean_audio, | |
| "clean_strength": clean_strength, | |
| "upscale_audio": upscale_audio, | |
| "embedder_model": embedder_model, | |
| "resample_sr": resample_sr | |
| } | |
| for path in cut_files | |
| ] | |
| if batch_process: | |
| with mp.Pool(processes=num_threads) as pool: | |
| with tqdm(total=len(params_list), desc=translations["convert_audio"]) as pbar: | |
| for results in pool.imap_unordered(run_batch_convert, params_list): | |
| processed_segments.append(results) | |
| pbar.update(1) | |
| else: | |
| for params in tqdm(params_list, desc=translations["convert_audio"]): | |
| run_batch_convert(params) | |
| merge_audio(processed_segments, time_stamps, audio_path, output_audio, export_format) | |
| except Exception as e: | |
| logger.error(translations["error_convert_batch"].format(e=e)) | |
| finally: | |
| if os.path.exists(audio_temp): shutil.rmtree(audio_temp, ignore_errors=True) | |
| else: | |
| try: | |
| logger.info(f"{translations['convert_audio']} '{audio_path}'...") | |
| if os.path.exists(output_audio): os.remove(output_audio) | |
| with tqdm(total=1, desc=translations["convert_audio"]) as pbar: | |
| cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=audio_path, audio_output_path=output_audio, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr) | |
| pbar.update(1) | |
| except Exception as e: | |
| logger.error(translations["error_convert"].format(e=e)) | |
| elapsed_time = time.time() - start_time | |
| logger.info(translations["convert_batch_success"].format(elapsed_time=f"{elapsed_time:.2f}", output_path=output_path.replace('.wav', f'.{export_format}'))) | |
| except Exception as e: | |
| logger.error(translations["error_convert_batch_2"].format(e=e)) | |
| else: | |
| logger.info(f"{translations['convert_audio']} '{input_path}'...") | |
| if not os.path.exists(input_path): | |
| logger.warning(translations["not_found_audio"]) | |
| sys.exit(1) | |
| if os.path.isdir(output_path): output_path = os.path.join(output_path, f"output.{export_format}") | |
| if os.path.exists(output_path): os.remove(output_path) | |
| if split_audio: | |
| try: | |
| cut_files, time_stamps = process_audio(input_path, audio_temp) | |
| num_threads = min(batch_size, len(cut_files)) | |
| params_list = [ | |
| { | |
| "path": path, | |
| "audio_temp": audio_temp, | |
| "export_format": export_format, | |
| "cut_files": cut_files, | |
| "pitch": pitch, | |
| "filter_radius": filter_radius, | |
| "index_rate": index_rate, | |
| "volume_envelope": volume_envelope, | |
| "protect": protect, | |
| "hop_length": hop_length, | |
| "f0_method": f0_method, | |
| "pth_path": pth_path, | |
| "index_path": index_path, | |
| "f0_autotune": f0_autotune, | |
| "f0_autotune_strength": f0_autotune_strength, | |
| "clean_audio": clean_audio, | |
| "clean_strength": clean_strength, | |
| "upscale_audio": upscale_audio, | |
| "embedder_model": embedder_model, | |
| "resample_sr": resample_sr | |
| } | |
| for path in cut_files | |
| ] | |
| if batch_process: | |
| with mp.Pool(processes=num_threads) as pool: | |
| with tqdm(total=len(params_list), desc=translations["convert_audio"]) as pbar: | |
| for results in pool.imap_unordered(run_batch_convert, params_list): | |
| processed_segments.append(results) | |
| pbar.update(1) | |
| else: | |
| for params in tqdm(params_list, desc=translations["convert_audio"]): | |
| run_batch_convert(params) | |
| merge_audio(processed_segments, time_stamps, input_path, output_path.replace(".wav", f".{export_format}"), export_format) | |
| except Exception as e: | |
| logger.error(translations["error_convert_batch"].format(e=e)) | |
| finally: | |
| if os.path.exists(audio_temp): shutil.rmtree(audio_temp, ignore_errors=True) | |
| else: | |
| try: | |
| with tqdm(total=1, desc=translations["convert_audio"]) as pbar: | |
| cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=input_path, audio_output_path=output_path, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr) | |
| pbar.update(1) | |
| except Exception as e: | |
| logger.error(translations["error_convert"].format(e=e)) | |
| elapsed_time = time.time() - start_time | |
| logger.info(translations["convert_audio_success"].format(input_path=input_path, elapsed_time=f"{elapsed_time:.2f}", output_path=output_path.replace('.wav', f'.{export_format}'))) | |
| def change_rms(source_audio: np.ndarray, source_rate: int, target_audio: np.ndarray, target_rate: int, rate: float) -> np.ndarray: | |
| rms1 = librosa.feature.rms( | |
| y=source_audio, | |
| frame_length=source_rate // 2 * 2, | |
| hop_length=source_rate // 2, | |
| ) | |
| rms2 = librosa.feature.rms( | |
| y=target_audio, | |
| frame_length=target_rate // 2 * 2, | |
| hop_length=target_rate // 2, | |
| ) | |
| rms1 = F.interpolate( | |
| torch.from_numpy(rms1).float().unsqueeze(0), | |
| size=target_audio.shape[0], | |
| mode="linear", | |
| ).squeeze() | |
| rms2 = F.interpolate( | |
| torch.from_numpy(rms2).float().unsqueeze(0), | |
| size=target_audio.shape[0], | |
| mode="linear", | |
| ).squeeze() | |
| rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6) | |
| adjusted_audio = (target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy()) | |
| return adjusted_audio | |
| class Autotune: | |
| def __init__(self, ref_freqs): | |
| self.ref_freqs = ref_freqs | |
| self.note_dict = self.ref_freqs | |
| def autotune_f0(self, f0, f0_autotune_strength): | |
| autotuned_f0 = np.zeros_like(f0) | |
| for i, freq in enumerate(f0): | |
| closest_note = min(self.note_dict, key=lambda x: abs(x - freq)) | |
| autotuned_f0[i] = freq + (closest_note - freq) * f0_autotune_strength | |
| return autotuned_f0 | |
| class VC: | |
| def __init__(self, tgt_sr, config): | |
| self.x_pad = config.x_pad | |
| self.x_query = config.x_query | |
| self.x_center = config.x_center | |
| self.x_max = config.x_max | |
| self.is_half = config.is_half | |
| self.sample_rate = 16000 | |
| self.window = 160 | |
| self.t_pad = self.sample_rate * self.x_pad | |
| self.t_pad_tgt = tgt_sr * self.x_pad | |
| self.t_pad2 = self.t_pad * 2 | |
| self.t_query = self.sample_rate * self.x_query | |
| self.t_center = self.sample_rate * self.x_center | |
| self.t_max = self.sample_rate * self.x_max | |
| self.time_step = self.window / self.sample_rate * 1000 | |
| self.f0_min = 50 | |
| self.f0_max = 1100 | |
| self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700) | |
| self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700) | |
| self.device = config.device | |
| self.ref_freqs = [ | |
| 49.00, | |
| 51.91, | |
| 55.00, | |
| 58.27, | |
| 61.74, | |
| 65.41, | |
| 69.30, | |
| 73.42, | |
| 77.78, | |
| 82.41, | |
| 87.31, | |
| 92.50, | |
| 98.00, | |
| 103.83, | |
| 110.00, | |
| 116.54, | |
| 123.47, | |
| 130.81, | |
| 138.59, | |
| 146.83, | |
| 155.56, | |
| 164.81, | |
| 174.61, | |
| 185.00, | |
| 196.00, | |
| 207.65, | |
| 220.00, | |
| 233.08, | |
| 246.94, | |
| 261.63, | |
| 277.18, | |
| 293.66, | |
| 311.13, | |
| 329.63, | |
| 349.23, | |
| 369.99, | |
| 392.00, | |
| 415.30, | |
| 440.00, | |
| 466.16, | |
| 493.88, | |
| 523.25, | |
| 554.37, | |
| 587.33, | |
| 622.25, | |
| 659.25, | |
| 698.46, | |
| 739.99, | |
| 783.99, | |
| 830.61, | |
| 880.00, | |
| 932.33, | |
| 987.77, | |
| 1046.50 | |
| ] | |
| self.autotune = Autotune(self.ref_freqs) | |
| self.note_dict = self.autotune.note_dict | |
| def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"): | |
| x = x.astype(np.float32) | |
| x /= np.quantile(np.abs(x), 0.999) | |
| audio = torch.from_numpy(x).to(self.device, copy=True) | |
| audio = torch.unsqueeze(audio, dim=0) | |
| if audio.ndim == 2 and audio.shape[0] > 1: audio = torch.mean(audio, dim=0, keepdim=True).detach() | |
| audio = audio.detach() | |
| pitch: Tensor = torchcrepe.predict(audio, self.sample_rate, hop_length, f0_min, f0_max, model, batch_size=hop_length * 2, device=self.device, pad=True) | |
| p_len = p_len or x.shape[0] // hop_length | |
| source = np.array(pitch.squeeze(0).cpu().float().numpy()) | |
| source[source < 0.001] = np.nan | |
| target = np.interp( | |
| np.arange(0, len(source) * p_len, len(source)) / p_len, | |
| np.arange(0, len(source)), | |
| source, | |
| ) | |
| f0 = np.nan_to_num(target) | |
| return f0 | |
| def get_f0_hybrid(self, methods_str, x, f0_min, f0_max, p_len, hop_length, filter_radius): | |
| methods_str = re.search("hybrid\[(.+)\]", methods_str) | |
| if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")] | |
| f0_computation_stack = [] | |
| logger.debug(translations["hybrid_methods"].format(methods=methods)) | |
| x = x.astype(np.float32) | |
| x /= np.quantile(np.abs(x), 0.999) | |
| for method in methods: | |
| f0 = None | |
| if method == "pm": | |
| f0 = (parselmouth.Sound(x, self.sample_rate).to_pitch_ac(time_step=self.window / self.sample_rate * 1000 / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array["frequency"]) | |
| pad_size = (p_len - len(f0) + 1) // 2 | |
| if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") | |
| elif method == 'dio': | |
| f0, t = pyworld.dio(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
| f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
| f0 = signal.medfilt(f0, 3) | |
| elif method == "crepe-tiny": | |
| f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length), "tiny") | |
| elif method == "crepe": | |
| f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length)) | |
| elif method == "fcpe": | |
| self.model_fcpe = FCPE(os.path.join("assets", "model", "predictors", "fcpe.pt"), hop_length=int(hop_length), f0_min=int(f0_min), f0_max=int(f0_max), dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=0.03) | |
| f0 = self.model_fcpe.compute_f0(x, p_len=p_len) | |
| del self.model_fcpe | |
| gc.collect() | |
| elif method == "rmvpe": | |
| f0 = RMVPE(os.path.join("assets", "model", "predictors", "rmvpe.pt"), is_half=self.is_half, device=self.device).infer_from_audio(x, thred=0.03) | |
| f0 = f0[1:] | |
| elif method == "harvest": | |
| f0, t = pyworld.harvest(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
| f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
| if filter_radius > 2: f0 = signal.medfilt(f0, 3) | |
| else: raise ValueError(translations["method_not_valid"]) | |
| f0_computation_stack.append(f0) | |
| resampled_stack = [] | |
| for f0 in f0_computation_stack: | |
| resampled_f0 = np.interp(np.linspace(0, len(f0), p_len), np.arange(len(f0)), f0) | |
| resampled_stack.append(resampled_f0) | |
| f0_median_hybrid = resampled_stack[0] if len(resampled_stack) == 1 else np.nanmedian(np.vstack(resampled_stack), axis=0) | |
| return f0_median_hybrid | |
| def get_f0(self, input_audio_path, x, p_len, pitch, f0_method, filter_radius, hop_length, f0_autotune, f0_autotune_strength): | |
| global input_audio_path2wav | |
| if f0_method == "pm": | |
| f0 = (parselmouth.Sound(x, self.sample_rate).to_pitch_ac(time_step=self.window / self.sample_rate * 1000 / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array["frequency"]) | |
| pad_size = (p_len - len(f0) + 1) // 2 | |
| if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") | |
| elif f0_method == "dio": | |
| f0, t = pyworld.dio(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
| f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
| f0 = signal.medfilt(f0, 3) | |
| elif f0_method == "crepe-tiny": | |
| f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length), "tiny") | |
| elif f0_method == "crepe": | |
| f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length)) | |
| elif f0_method == "fcpe": | |
| self.model_fcpe = FCPE(os.path.join("assets", "model", "predictors", "fcpe.pt"), hop_length=int(hop_length), f0_min=int(self.f0_min), f0_max=int(self.f0_max), dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=0.03) | |
| f0 = self.model_fcpe.compute_f0(x, p_len=p_len) | |
| del self.model_fcpe | |
| gc.collect() | |
| elif f0_method == "rmvpe": | |
| f0 = RMVPE(os.path.join("assets", "model", "predictors", "rmvpe.pt"), is_half=self.is_half, device=self.device).infer_from_audio(x, thred=0.03) | |
| elif f0_method == "harvest": | |
| f0, t = pyworld.harvest(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
| f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
| if filter_radius > 2: f0 = signal.medfilt(f0, 3) | |
| elif "hybrid" in f0_method: | |
| input_audio_path2wav[input_audio_path] = x.astype(np.double) | |
| f0 = self.get_f0_hybrid(f0_method, x, self.f0_min, self.f0_max, p_len, hop_length, filter_radius) | |
| else: raise ValueError(translations["method_not_valid"]) | |
| if f0_autotune: f0 = Autotune.autotune_f0(self, f0, f0_autotune_strength) | |
| f0 *= pow(2, pitch / 12) | |
| f0bak = f0.copy() | |
| f0_mel = 1127 * np.log(1 + f0 / 700) | |
| f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - self.f0_mel_min) * 254 / (self.f0_mel_max - self.f0_mel_min) + 1 | |
| f0_mel[f0_mel <= 1] = 1 | |
| f0_mel[f0_mel > 255] = 255 | |
| f0_coarse = np.rint(f0_mel).astype(np.int32) | |
| return f0_coarse, f0bak | |
| def voice_conversion(self, model, net_g, sid, audio0, pitch, pitchf, index, big_npy, index_rate, version, protect): | |
| pitch_guidance = pitch != None and pitchf != None | |
| feats = (torch.from_numpy(audio0).half() if self.is_half else torch.from_numpy(audio0).float()) | |
| if feats.dim() == 2: feats = feats.mean(-1) | |
| assert feats.dim() == 1, feats.dim() | |
| feats = feats.view(1, -1) | |
| padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False) | |
| inputs = { | |
| "source": feats.to(self.device), | |
| "padding_mask": padding_mask, | |
| "output_layer": 9 if version == "v1" else 12, | |
| } | |
| with torch.no_grad(): | |
| logits = model.extract_features(**inputs) | |
| feats = model.final_proj(logits[0]) if version == "v1" else logits[0] | |
| if protect < 0.5 and pitch_guidance: feats0 = feats.clone() | |
| if (not isinstance(index, type(None)) and not isinstance(big_npy, type(None)) and index_rate != 0): | |
| npy = feats[0].cpu().numpy() | |
| if self.is_half: npy = npy.astype("float32") | |
| score, ix = index.search(npy, k=8) | |
| weight = np.square(1 / score) | |
| weight /= weight.sum(axis=1, keepdims=True) | |
| npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1) | |
| if self.is_half: npy = npy.astype("float16") | |
| feats = (torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate + (1 - index_rate) * feats) | |
| feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) | |
| if protect < 0.5 and pitch_guidance: feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) | |
| p_len = audio0.shape[0] // self.window | |
| if feats.shape[1] < p_len: | |
| p_len = feats.shape[1] | |
| if pitch_guidance: | |
| pitch = pitch[:, :p_len] | |
| pitchf = pitchf[:, :p_len] | |
| if protect < 0.5 and pitch_guidance: | |
| pitchff = pitchf.clone() | |
| pitchff[pitchf > 0] = 1 | |
| pitchff[pitchf < 1] = protect | |
| pitchff = pitchff.unsqueeze(-1) | |
| feats = feats * pitchff + feats0 * (1 - pitchff) | |
| feats = feats.to(feats0.dtype) | |
| p_len = torch.tensor([p_len], device=self.device).long() | |
| with torch.no_grad(): | |
| audio1 = ((net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0]).data.cpu().float().numpy()) if pitch_guidance else ((net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy()) | |
| del feats, p_len, padding_mask | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| return audio1 | |
| def pipeline(self, model, net_g, sid, audio, input_audio_path, pitch, f0_method, file_index, index_rate, pitch_guidance, filter_radius, tgt_sr, resample_sr, volume_envelope, version, protect, hop_length, f0_autotune, f0_autotune_strength): | |
| if file_index != "" and os.path.exists(file_index) and index_rate != 0: | |
| try: | |
| index = faiss.read_index(file_index) | |
| big_npy = index.reconstruct_n(0, index.ntotal) | |
| except Exception as e: | |
| logger.error(translations["read_faiss_index_error"].format(e=e)) | |
| index = big_npy = None | |
| else: index = big_npy = None | |
| audio = signal.filtfilt(bh, ah, audio) | |
| audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect") | |
| opt_ts = [] | |
| if audio_pad.shape[0] > self.t_max: | |
| audio_sum = np.zeros_like(audio) | |
| for i in range(self.window): | |
| audio_sum += audio_pad[i : i - self.window] | |
| for t in range(self.t_center, audio.shape[0], self.t_center): | |
| opt_ts.append(t - self.t_query + np.where(np.abs(audio_sum[t - self.t_query : t + self.t_query]) == np.abs(audio_sum[t - self.t_query : t + self.t_query]).min())[0][0]) | |
| s = 0 | |
| audio_opt = [] | |
| t = None | |
| audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect") | |
| p_len = audio_pad.shape[0] // self.window | |
| sid = torch.tensor(sid, device=self.device).unsqueeze(0).long() | |
| if pitch_guidance: | |
| pitch, pitchf = self.get_f0(input_audio_path, audio_pad, p_len, pitch, f0_method, filter_radius, hop_length, f0_autotune, f0_autotune_strength) | |
| pitch = pitch[:p_len] | |
| pitchf = pitchf[:p_len] | |
| if self.device == "mps": pitchf = pitchf.astype(np.float32) | |
| pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long() | |
| pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float() | |
| for t in opt_ts: | |
| t = t // self.window * self.window | |
| if pitch_guidance: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[s : t + self.t_pad2 + self.window], pitch[:, s // self.window : (t + self.t_pad2) // self.window], pitchf[:, s // self.window : (t + self.t_pad2) // self.window], index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
| else: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[s : t + self.t_pad2 + self.window], None, None, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
| s = t | |
| if pitch_guidance: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[t:], pitch[:, t // self.window :] if t is not None else pitch, pitchf[:, t // self.window :] if t is not None else pitchf, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
| else: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[t:], None, None, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
| audio_opt = np.concatenate(audio_opt) | |
| if volume_envelope != 1: audio_opt = change_rms(audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope) | |
| if resample_sr >= self.sample_rate and tgt_sr != resample_sr: audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr) | |
| audio_max = np.abs(audio_opt).max() / 0.99 | |
| max_int16 = 32768 | |
| if audio_max > 1: max_int16 /= audio_max | |
| audio_opt = (audio_opt * max_int16).astype(np.int16) | |
| if pitch_guidance: del pitch, pitchf | |
| del sid | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| return audio_opt | |
| class VoiceConverter: | |
| def __init__(self): | |
| self.config = Config() | |
| self.hubert_model = (None) | |
| self.tgt_sr = None | |
| self.net_g = None | |
| self.vc = None | |
| self.cpt = None | |
| self.version = None | |
| self.n_spk = None | |
| self.use_f0 = None | |
| self.loaded_model = None | |
| def load_hubert(self, embedder_model): | |
| try: | |
| models, _, _ = checkpoint_utils.load_model_ensemble_and_task([os.path.join(now_dir, "assets", "model", "embedders", embedder_model + '.pt')], suffix="") | |
| except Exception as e: | |
| raise ImportError(translations["read_model_error"].format(e=e)) | |
| self.hubert_model = models[0].to(self.config.device) | |
| self.hubert_model = (self.hubert_model.half() if self.config.is_half else self.hubert_model.float()) | |
| self.hubert_model.eval() | |
| def remove_audio_noise(input_audio_path, reduction_strength=0.7): | |
| try: | |
| rate, data = wavfile.read(input_audio_path) | |
| reduced_noise = nr.reduce_noise(y=data, sr=rate, prop_decrease=reduction_strength) | |
| return reduced_noise | |
| except Exception as e: | |
| logger.error(translations["denoise_error"].format(e=e)) | |
| return None | |
| def convert_audio_format(input_path, output_path, output_format): | |
| try: | |
| if output_format != "wav": | |
| logger.debug(translations["change_format"].format(output_format=output_format)) | |
| audio, sample_rate = sf.read(input_path) | |
| common_sample_rates = [ | |
| 8000, | |
| 11025, | |
| 12000, | |
| 16000, | |
| 22050, | |
| 24000, | |
| 32000, | |
| 44100, | |
| 48000 | |
| ] | |
| target_sr = min(common_sample_rates, key=lambda x: abs(x - sample_rate)) | |
| audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=target_sr) | |
| sf.write(output_path, audio, target_sr, format=output_format) | |
| return output_path | |
| except Exception as e: | |
| raise RuntimeError(translations["change_format_error"].format(e=e)) | |
| def convert_audio(self, audio_input_path, audio_output_path, model_path, index_path, embedder_model, pitch, f0_method, index_rate, volume_envelope, protect, hop_length, f0_autotune, f0_autotune_strength, filter_radius, clean_audio, clean_strength, export_format, upscale_audio, resample_sr = 0, sid = 0): | |
| self.get_vc(model_path, sid) | |
| try: | |
| if upscale_audio: upscale(audio_input_path, audio_input_path) | |
| audio = load_audio_infer(audio_input_path, 16000) | |
| audio_max = np.abs(audio).max() / 0.95 | |
| if audio_max > 1: audio /= audio_max | |
| if not self.hubert_model: | |
| if not os.path.exists(os.path.join(now_dir, "assets", "model", "embedders", embedder_model + '.pt')): raise FileNotFoundError(f"Không tìm thấy mô hình: {embedder_model}") | |
| self.load_hubert(embedder_model) | |
| if self.tgt_sr != resample_sr >= 16000: self.tgt_sr = resample_sr | |
| file_index = (index_path.strip().strip('"').strip("\n").strip('"').strip().replace("trained", "added")) | |
| audio_opt = self.vc.pipeline(model=self.hubert_model, net_g=self.net_g, sid=sid, audio=audio, input_audio_path=audio_input_path, pitch=pitch, f0_method=f0_method, file_index=file_index, index_rate=index_rate, pitch_guidance=self.use_f0, filter_radius=filter_radius, tgt_sr=self.tgt_sr, resample_sr=resample_sr, volume_envelope=volume_envelope, version=self.version, protect=protect, hop_length=hop_length, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength) | |
| if audio_output_path: sf.write(audio_output_path, audio_opt, self.tgt_sr, format="wav") | |
| if clean_audio: | |
| cleaned_audio = self.remove_audio_noise(audio_output_path, clean_strength) | |
| if cleaned_audio is not None: sf.write(audio_output_path, cleaned_audio, self.tgt_sr, format="wav") | |
| output_path_format = audio_output_path.replace(".wav", f".{export_format}") | |
| audio_output_path = self.convert_audio_format(audio_output_path, output_path_format, export_format) | |
| except Exception as e: | |
| logger.error(translations["error_convert"].format(e=e)) | |
| logger.error(traceback.format_exc()) | |
| def get_vc(self, weight_root, sid): | |
| if sid == "" or sid == []: | |
| self.cleanup_model() | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| if not self.loaded_model or self.loaded_model != weight_root: | |
| self.load_model(weight_root) | |
| if self.cpt is not None: | |
| self.setup_network() | |
| self.setup_vc_instance() | |
| self.loaded_model = weight_root | |
| def cleanup_model(self): | |
| if self.hubert_model is not None: | |
| del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr | |
| self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| del self.net_g, self.cpt | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| self.cpt = None | |
| def load_model(self, weight_root): | |
| self.cpt = (torch.load(weight_root, map_location="cpu") if os.path.isfile(weight_root) else None) | |
| def setup_network(self): | |
| if self.cpt is not None: | |
| self.tgt_sr = self.cpt["config"][-1] | |
| self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0] | |
| self.use_f0 = self.cpt.get("f0", 1) | |
| self.version = self.cpt.get("version", "v1") | |
| self.text_enc_hidden_dim = 768 if self.version == "v2" else 256 | |
| self.net_g = Synthesizer(*self.cpt["config"], use_f0=self.use_f0, text_enc_hidden_dim=self.text_enc_hidden_dim, is_half=self.config.is_half) | |
| del self.net_g.enc_q | |
| self.net_g.load_state_dict(self.cpt["weight"], strict=False) | |
| self.net_g.eval().to(self.config.device) | |
| self.net_g = (self.net_g.half() if self.config.is_half else self.net_g.float()) | |
| def setup_vc_instance(self): | |
| if self.cpt is not None: | |
| self.vc = VC(self.tgt_sr, self.config) | |
| self.n_spk = self.cpt["config"][-3] | |
| if __name__ == "__main__": | |
| mp.set_start_method("spawn", force=True) | |
| main() |