|
from __future__ import annotations |
|
from typing import TYPE_CHECKING |
|
from demucs.apply import apply_model, demucs_segments |
|
from demucs.hdemucs import HDemucs |
|
from demucs.model_v2 import auto_load_demucs_model_v2 |
|
from demucs.pretrained import get_model as _gm |
|
from demucs.utils import apply_model_v1 |
|
from demucs.utils import apply_model_v2 |
|
from lib_v5.tfc_tdf_v3 import TFC_TDF_net, STFT |
|
from lib_v5 import spec_utils |
|
from lib_v5.vr_network import nets |
|
from lib_v5.vr_network import nets_new |
|
from lib_v5.vr_network.model_param_init import ModelParameters |
|
from pathlib import Path |
|
from gui_data.constants import * |
|
from gui_data.error_handling import * |
|
from scipy import signal |
|
import audioread |
|
import gzip |
|
import librosa |
|
import math |
|
import numpy as np |
|
import onnxruntime as ort |
|
import os |
|
import torch |
|
import warnings |
|
import pydub |
|
import soundfile as sf |
|
import lib_v5.mdxnet as MdxnetSet |
|
import math |
|
|
|
from onnx import load |
|
from onnx2pytorch import ConvertModel |
|
import gc |
|
|
|
if TYPE_CHECKING: |
|
from UVR import ModelData |
|
|
|
|
|
|
|
|
|
mps_available = torch.backends.mps.is_available() if is_macos else False |
|
cuda_available = torch.cuda.is_available() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clear_gpu_cache(): |
|
gc.collect() |
|
if is_macos: |
|
torch.mps.empty_cache() |
|
else: |
|
torch.cuda.empty_cache() |
|
|
|
warnings.filterwarnings("ignore") |
|
cpu = torch.device('cpu') |
|
|
|
class SeperateAttributes: |
|
def __init__(self, model_data: ModelData, |
|
process_data: dict, |
|
main_model_primary_stem_4_stem=None, |
|
main_process_method=None, |
|
is_return_dual=True, |
|
main_model_primary=None, |
|
vocal_stem_path=None, |
|
master_inst_source=None, |
|
master_vocal_source=None): |
|
|
|
self.list_all_models: list |
|
self.process_data = process_data |
|
self.progress_value = 0 |
|
self.set_progress_bar = process_data['set_progress_bar'] |
|
self.write_to_console = process_data['write_to_console'] |
|
if vocal_stem_path: |
|
self.audio_file, self.audio_file_base = vocal_stem_path |
|
self.audio_file_base_voc_split = lambda stem, split:os.path.join(self.export_path, f'{self.audio_file_base.replace("_(Vocals)", "")}_({stem}_{split}).wav') |
|
else: |
|
self.audio_file = process_data['audio_file'] |
|
self.audio_file_base = process_data['audio_file_base'] |
|
self.audio_file_base_voc_split = None |
|
self.export_path = process_data['export_path'] |
|
self.cached_source_callback = process_data['cached_source_callback'] |
|
self.cached_model_source_holder = process_data['cached_model_source_holder'] |
|
self.is_4_stem_ensemble = process_data['is_4_stem_ensemble'] |
|
self.list_all_models = process_data['list_all_models'] |
|
self.process_iteration = process_data['process_iteration'] |
|
self.is_return_dual = is_return_dual |
|
self.is_pitch_change = model_data.is_pitch_change |
|
self.semitone_shift = model_data.semitone_shift |
|
self.is_match_frequency_pitch = model_data.is_match_frequency_pitch |
|
self.overlap = model_data.overlap |
|
self.overlap_mdx = model_data.overlap_mdx |
|
self.overlap_mdx23 = model_data.overlap_mdx23 |
|
self.is_mdx_combine_stems = model_data.is_mdx_combine_stems |
|
self.is_mdx_c = model_data.is_mdx_c |
|
self.mdx_c_configs = model_data.mdx_c_configs |
|
self.mdxnet_stem_select = model_data.mdxnet_stem_select |
|
self.mixer_path = model_data.mixer_path |
|
self.model_samplerate = model_data.model_samplerate |
|
self.model_capacity = model_data.model_capacity |
|
self.is_vr_51_model = model_data.is_vr_51_model |
|
self.is_pre_proc_model = model_data.is_pre_proc_model |
|
self.is_secondary_model_activated = model_data.is_secondary_model_activated if not self.is_pre_proc_model else False |
|
self.is_secondary_model = model_data.is_secondary_model if not self.is_pre_proc_model else True |
|
self.process_method = model_data.process_method |
|
self.model_path = model_data.model_path |
|
self.model_name = model_data.model_name |
|
self.model_basename = model_data.model_basename |
|
self.wav_type_set = model_data.wav_type_set |
|
self.mp3_bit_set = model_data.mp3_bit_set |
|
self.save_format = model_data.save_format |
|
self.is_gpu_conversion = model_data.is_gpu_conversion |
|
self.is_normalization = model_data.is_normalization |
|
self.is_primary_stem_only = model_data.is_primary_stem_only if not self.is_secondary_model else model_data.is_primary_model_primary_stem_only |
|
self.is_secondary_stem_only = model_data.is_secondary_stem_only if not self.is_secondary_model else model_data.is_primary_model_secondary_stem_only |
|
self.is_ensemble_mode = model_data.is_ensemble_mode |
|
self.secondary_model = model_data.secondary_model |
|
self.primary_model_primary_stem = model_data.primary_model_primary_stem |
|
self.primary_stem_native = model_data.primary_stem_native |
|
self.primary_stem = model_data.primary_stem |
|
self.secondary_stem = model_data.secondary_stem |
|
self.is_invert_spec = model_data.is_invert_spec |
|
self.is_deverb_vocals = model_data.is_deverb_vocals |
|
self.is_mixer_mode = model_data.is_mixer_mode |
|
self.secondary_model_scale = model_data.secondary_model_scale |
|
self.is_demucs_pre_proc_model_inst_mix = model_data.is_demucs_pre_proc_model_inst_mix |
|
self.primary_source_map = {} |
|
self.secondary_source_map = {} |
|
self.primary_source = None |
|
self.secondary_source = None |
|
self.secondary_source_primary = None |
|
self.secondary_source_secondary = None |
|
self.main_model_primary_stem_4_stem = main_model_primary_stem_4_stem |
|
self.main_model_primary = main_model_primary |
|
self.ensemble_primary_stem = model_data.ensemble_primary_stem |
|
self.is_multi_stem_ensemble = model_data.is_multi_stem_ensemble |
|
self.is_other_gpu = False |
|
self.is_deverb = True |
|
self.DENOISER_MODEL = model_data.DENOISER_MODEL |
|
self.DEVERBER_MODEL = model_data.DEVERBER_MODEL |
|
self.is_source_swap = False |
|
self.vocal_split_model = model_data.vocal_split_model |
|
self.is_vocal_split_model = model_data.is_vocal_split_model |
|
self.master_vocal_path = None |
|
self.set_master_inst_source = None |
|
self.master_inst_source = master_inst_source |
|
self.master_vocal_source = master_vocal_source |
|
self.is_save_inst_vocal_splitter = isinstance(master_inst_source, np.ndarray) and model_data.is_save_inst_vocal_splitter |
|
self.is_inst_only_voc_splitter = model_data.is_inst_only_voc_splitter |
|
self.is_karaoke = model_data.is_karaoke |
|
self.is_bv_model = model_data.is_bv_model |
|
self.is_bv_model_rebalenced = model_data.bv_model_rebalance and self.is_vocal_split_model |
|
self.is_sec_bv_rebalance = model_data.is_sec_bv_rebalance |
|
self.stem_path_init = os.path.join(self.export_path, f'{self.audio_file_base}_({self.secondary_stem}).wav') |
|
self.deverb_vocal_opt = model_data.deverb_vocal_opt |
|
self.is_save_vocal_only = model_data.is_save_vocal_only |
|
self.device = cpu |
|
self.run_type = ['CPUExecutionProvider'] |
|
self.is_opencl = False |
|
self.device_set = model_data.device_set |
|
self.is_use_opencl = model_data.is_use_opencl |
|
|
|
if self.is_inst_only_voc_splitter or self.is_sec_bv_rebalance: |
|
self.is_primary_stem_only = False |
|
self.is_secondary_stem_only = False |
|
|
|
if main_model_primary and self.is_multi_stem_ensemble: |
|
self.primary_stem, self.secondary_stem = main_model_primary, secondary_stem(main_model_primary) |
|
|
|
if self.is_gpu_conversion >= 0: |
|
if mps_available: |
|
self.device, self.is_other_gpu = 'mps', True |
|
else: |
|
device_prefix = None |
|
if self.device_set != DEFAULT: |
|
device_prefix = CUDA_DEVICE |
|
|
|
|
|
|
|
|
|
if cuda_available: |
|
self.device = CUDA_DEVICE if not device_prefix else f'{device_prefix}:{self.device_set}' |
|
self.run_type = ['CUDAExecutionProvider'] |
|
|
|
if model_data.process_method == MDX_ARCH_TYPE: |
|
self.is_mdx_ckpt = model_data.is_mdx_ckpt |
|
self.primary_model_name, self.primary_sources = self.cached_source_callback(MDX_ARCH_TYPE, model_name=self.model_basename) |
|
self.is_denoise = model_data.is_denoise |
|
self.is_denoise_model = model_data.is_denoise_model |
|
self.is_mdx_c_seg_def = model_data.is_mdx_c_seg_def |
|
self.mdx_batch_size = model_data.mdx_batch_size |
|
self.compensate = model_data.compensate |
|
self.mdx_segment_size = model_data.mdx_segment_size |
|
|
|
if self.is_mdx_c: |
|
if not self.is_4_stem_ensemble: |
|
self.primary_stem = model_data.ensemble_primary_stem if process_data['is_ensemble_master'] else model_data.primary_stem |
|
self.secondary_stem = model_data.ensemble_secondary_stem if process_data['is_ensemble_master'] else model_data.secondary_stem |
|
else: |
|
self.dim_f, self.dim_t = model_data.mdx_dim_f_set, 2**model_data.mdx_dim_t_set |
|
|
|
self.check_label_secondary_stem_runs() |
|
self.n_fft = model_data.mdx_n_fft_scale_set |
|
self.chunks = model_data.chunks |
|
self.margin = model_data.margin |
|
self.adjust = 1 |
|
self.dim_c = 4 |
|
self.hop = 1024 |
|
|
|
if model_data.process_method == DEMUCS_ARCH_TYPE: |
|
self.demucs_stems = model_data.demucs_stems if not main_process_method in [MDX_ARCH_TYPE, VR_ARCH_TYPE] else None |
|
self.secondary_model_4_stem = model_data.secondary_model_4_stem |
|
self.secondary_model_4_stem_scale = model_data.secondary_model_4_stem_scale |
|
self.is_chunk_demucs = model_data.is_chunk_demucs |
|
self.segment = model_data.segment |
|
self.demucs_version = model_data.demucs_version |
|
self.demucs_source_list = model_data.demucs_source_list |
|
self.demucs_source_map = model_data.demucs_source_map |
|
self.is_demucs_combine_stems = model_data.is_demucs_combine_stems |
|
self.demucs_stem_count = model_data.demucs_stem_count |
|
self.pre_proc_model = model_data.pre_proc_model |
|
self.device = cpu if self.is_other_gpu and not self.demucs_version in [DEMUCS_V3, DEMUCS_V4] else self.device |
|
|
|
self.primary_stem = model_data.ensemble_primary_stem if process_data['is_ensemble_master'] else model_data.primary_stem |
|
self.secondary_stem = model_data.ensemble_secondary_stem if process_data['is_ensemble_master'] else model_data.secondary_stem |
|
|
|
if (self.is_multi_stem_ensemble or self.is_4_stem_ensemble) and not self.is_secondary_model: |
|
self.is_return_dual = False |
|
|
|
if self.is_multi_stem_ensemble and main_model_primary: |
|
self.is_4_stem_ensemble = False |
|
if main_model_primary in self.demucs_source_map.keys(): |
|
self.primary_stem = main_model_primary |
|
self.secondary_stem = secondary_stem(main_model_primary) |
|
elif secondary_stem(main_model_primary) in self.demucs_source_map.keys(): |
|
self.primary_stem = secondary_stem(main_model_primary) |
|
self.secondary_stem = main_model_primary |
|
|
|
if self.is_secondary_model and not process_data['is_ensemble_master']: |
|
if not self.demucs_stem_count == 2 and model_data.primary_model_primary_stem == INST_STEM: |
|
self.primary_stem = VOCAL_STEM |
|
self.secondary_stem = INST_STEM |
|
else: |
|
self.primary_stem = model_data.primary_model_primary_stem |
|
self.secondary_stem = secondary_stem(self.primary_stem) |
|
|
|
self.shifts = model_data.shifts |
|
self.is_split_mode = model_data.is_split_mode if not self.demucs_version == DEMUCS_V4 else True |
|
self.primary_model_name, self.primary_sources = self.cached_source_callback(DEMUCS_ARCH_TYPE, model_name=self.model_basename) |
|
|
|
if model_data.process_method == VR_ARCH_TYPE: |
|
self.check_label_secondary_stem_runs() |
|
self.primary_model_name, self.primary_sources = self.cached_source_callback(VR_ARCH_TYPE, model_name=self.model_basename) |
|
self.mp = model_data.vr_model_param |
|
self.high_end_process = model_data.is_high_end_process |
|
self.is_tta = model_data.is_tta |
|
self.is_post_process = model_data.is_post_process |
|
self.is_gpu_conversion = model_data.is_gpu_conversion |
|
self.batch_size = model_data.batch_size |
|
self.window_size = model_data.window_size |
|
self.input_high_end_h = None |
|
self.input_high_end = None |
|
self.post_process_threshold = model_data.post_process_threshold |
|
self.aggressiveness = {'value': model_data.aggression_setting, |
|
'split_bin': self.mp.param['band'][1]['crop_stop'], |
|
'aggr_correction': self.mp.param.get('aggr_correction')} |
|
|
|
def check_label_secondary_stem_runs(self): |
|
|
|
|
|
if self.process_data['is_ensemble_master'] and not self.is_4_stem_ensemble and not self.is_mdx_c: |
|
if self.ensemble_primary_stem != self.primary_stem: |
|
self.is_primary_stem_only, self.is_secondary_stem_only = self.is_secondary_stem_only, self.is_primary_stem_only |
|
|
|
|
|
if self.is_pre_proc_model or self.is_secondary_model: |
|
self.is_primary_stem_only = False |
|
self.is_secondary_stem_only = False |
|
|
|
def start_inference_console_write(self): |
|
if self.is_secondary_model and not self.is_pre_proc_model and not self.is_vocal_split_model: |
|
self.write_to_console(INFERENCE_STEP_2_SEC(self.process_method, self.model_basename)) |
|
|
|
if self.is_pre_proc_model: |
|
self.write_to_console(INFERENCE_STEP_2_PRE(self.process_method, self.model_basename)) |
|
|
|
if self.is_vocal_split_model: |
|
self.write_to_console(INFERENCE_STEP_2_VOC_S(self.process_method, self.model_basename)) |
|
|
|
def running_inference_console_write(self, is_no_write=False): |
|
self.write_to_console(DONE, base_text='') if not is_no_write else None |
|
self.set_progress_bar(0.05) if not is_no_write else None |
|
|
|
if self.is_secondary_model and not self.is_pre_proc_model and not self.is_vocal_split_model: |
|
self.write_to_console(INFERENCE_STEP_1_SEC) |
|
elif self.is_pre_proc_model: |
|
self.write_to_console(INFERENCE_STEP_1_PRE) |
|
elif self.is_vocal_split_model: |
|
self.write_to_console(INFERENCE_STEP_1_VOC_S) |
|
else: |
|
self.write_to_console(INFERENCE_STEP_1) |
|
|
|
def running_inference_progress_bar(self, length, is_match_mix=False): |
|
if not is_match_mix: |
|
self.progress_value += 1 |
|
|
|
if (0.8/length*self.progress_value) >= 0.8: |
|
length = self.progress_value + 1 |
|
|
|
self.set_progress_bar(0.1, (0.8/length*self.progress_value)) |
|
|
|
def load_cached_sources(self): |
|
|
|
if self.is_secondary_model and not self.is_pre_proc_model: |
|
self.write_to_console(INFERENCE_STEP_2_SEC_CACHED_MODOEL(self.process_method, self.model_basename)) |
|
elif self.is_pre_proc_model: |
|
self.write_to_console(INFERENCE_STEP_2_PRE_CACHED_MODOEL(self.process_method, self.model_basename)) |
|
else: |
|
self.write_to_console(INFERENCE_STEP_2_PRIMARY_CACHED, "") |
|
|
|
def cache_source(self, secondary_sources): |
|
|
|
model_occurrences = self.list_all_models.count(self.model_basename) |
|
|
|
if not model_occurrences <= 1: |
|
if self.process_method == MDX_ARCH_TYPE: |
|
self.cached_model_source_holder(MDX_ARCH_TYPE, secondary_sources, self.model_basename) |
|
|
|
if self.process_method == VR_ARCH_TYPE: |
|
self.cached_model_source_holder(VR_ARCH_TYPE, secondary_sources, self.model_basename) |
|
|
|
if self.process_method == DEMUCS_ARCH_TYPE: |
|
self.cached_model_source_holder(DEMUCS_ARCH_TYPE, secondary_sources, self.model_basename) |
|
|
|
def process_vocal_split_chain(self, sources: dict): |
|
|
|
def is_valid_vocal_split_condition(master_vocal_source): |
|
"""Checks if conditions for vocal split processing are met.""" |
|
conditions = [ |
|
isinstance(master_vocal_source, np.ndarray), |
|
self.vocal_split_model, |
|
not self.is_ensemble_mode, |
|
not self.is_karaoke, |
|
not self.is_bv_model |
|
] |
|
return all(conditions) |
|
|
|
|
|
master_inst_source = sources.get(INST_STEM, None) |
|
master_vocal_source = sources.get(VOCAL_STEM, None) |
|
|
|
|
|
if is_valid_vocal_split_condition(master_vocal_source): |
|
process_chain_model( |
|
self.vocal_split_model, |
|
self.process_data, |
|
vocal_stem_path=self.master_vocal_path, |
|
master_vocal_source=master_vocal_source, |
|
master_inst_source=master_inst_source |
|
) |
|
|
|
def process_secondary_stem(self, stem_source, secondary_model_source=None, model_scale=None): |
|
if not self.is_secondary_model: |
|
if self.is_secondary_model_activated and isinstance(secondary_model_source, np.ndarray): |
|
secondary_model_scale = model_scale if model_scale else self.secondary_model_scale |
|
stem_source = spec_utils.average_dual_sources(stem_source, secondary_model_source, secondary_model_scale) |
|
|
|
return stem_source |
|
|
|
def final_process(self, stem_path, source, secondary_source, stem_name, samplerate): |
|
source = self.process_secondary_stem(source, secondary_source) |
|
self.write_audio(stem_path, source, samplerate, stem_name=stem_name) |
|
|
|
return {stem_name: source} |
|
|
|
def write_audio(self, stem_path: str, stem_source, samplerate, stem_name=None): |
|
|
|
def save_audio_file(path, source): |
|
source = spec_utils.normalize(source, self.is_normalization) |
|
sf.write(path, source, samplerate, subtype=self.wav_type_set) |
|
|
|
if is_not_ensemble: |
|
save_format(path, self.save_format, self.mp3_bit_set) |
|
|
|
def save_voc_split_instrumental(stem_name, stem_source, is_inst_invert=False): |
|
inst_stem_name = "Instrumental (With Lead Vocals)" if stem_name == LEAD_VOCAL_STEM else "Instrumental (With Backing Vocals)" |
|
inst_stem_path_name = LEAD_VOCAL_STEM_I if stem_name == LEAD_VOCAL_STEM else BV_VOCAL_STEM_I |
|
inst_stem_path = self.audio_file_base_voc_split(INST_STEM, inst_stem_path_name) |
|
stem_source = -stem_source if is_inst_invert else stem_source |
|
inst_stem_source = spec_utils.combine_arrarys([self.master_inst_source, stem_source], is_swap=True) |
|
save_with_message(inst_stem_path, inst_stem_name, inst_stem_source) |
|
|
|
def save_voc_split_vocal(stem_name, stem_source): |
|
voc_split_stem_name = LEAD_VOCAL_STEM_LABEL if stem_name == LEAD_VOCAL_STEM else BV_VOCAL_STEM_LABEL |
|
voc_split_stem_path = self.audio_file_base_voc_split(VOCAL_STEM, stem_name) |
|
save_with_message(voc_split_stem_path, voc_split_stem_name, stem_source) |
|
|
|
def save_with_message(stem_path, stem_name, stem_source): |
|
is_deverb = self.is_deverb_vocals and ( |
|
self.deverb_vocal_opt == stem_name or |
|
(self.deverb_vocal_opt == 'ALL' and |
|
(stem_name == VOCAL_STEM or stem_name == LEAD_VOCAL_STEM_LABEL or stem_name == BV_VOCAL_STEM_LABEL))) |
|
|
|
self.write_to_console(f'{SAVING_STEM[0]}{stem_name}{SAVING_STEM[1]}') |
|
|
|
if is_deverb and is_not_ensemble: |
|
deverb_vocals(stem_path, stem_source) |
|
|
|
save_audio_file(stem_path, stem_source) |
|
self.write_to_console(DONE, base_text='') |
|
|
|
def deverb_vocals(stem_path:str, stem_source): |
|
self.write_to_console(INFERENCE_STEP_DEVERBING, base_text='') |
|
stem_source_deverbed, stem_source_2 = vr_denoiser(stem_source, self.device, is_deverber=True, model_path=self.DEVERBER_MODEL) |
|
save_audio_file(stem_path.replace(".wav", "_deverbed.wav"), stem_source_deverbed) |
|
save_audio_file(stem_path.replace(".wav", "_reverb_only.wav"), stem_source_2) |
|
|
|
is_bv_model_lead = (self.is_bv_model_rebalenced and self.is_vocal_split_model and stem_name == LEAD_VOCAL_STEM) |
|
is_bv_rebalance_lead = (self.is_bv_model_rebalenced and self.is_vocal_split_model and stem_name == BV_VOCAL_STEM) |
|
is_no_vocal_save = self.is_inst_only_voc_splitter and (stem_name == VOCAL_STEM or stem_name == BV_VOCAL_STEM or stem_name == LEAD_VOCAL_STEM) or is_bv_model_lead |
|
is_not_ensemble = (not self.is_ensemble_mode or self.is_vocal_split_model) |
|
is_do_not_save_inst = (self.is_save_vocal_only and self.is_sec_bv_rebalance and stem_name == INST_STEM) |
|
|
|
if is_bv_rebalance_lead: |
|
master_voc_source = spec_utils.match_array_shapes(self.master_vocal_source, stem_source, is_swap=True) |
|
bv_rebalance_lead_source = stem_source-master_voc_source |
|
|
|
if not is_bv_model_lead and not is_do_not_save_inst: |
|
if self.is_vocal_split_model or not self.is_secondary_model: |
|
if self.is_vocal_split_model and not self.is_inst_only_voc_splitter: |
|
save_voc_split_vocal(stem_name, stem_source) |
|
if is_bv_rebalance_lead: |
|
save_voc_split_vocal(LEAD_VOCAL_STEM, bv_rebalance_lead_source) |
|
else: |
|
if not is_no_vocal_save: |
|
save_with_message(stem_path, stem_name, stem_source) |
|
|
|
if self.is_save_inst_vocal_splitter and not self.is_save_vocal_only: |
|
save_voc_split_instrumental(stem_name, stem_source) |
|
if is_bv_rebalance_lead: |
|
save_voc_split_instrumental(LEAD_VOCAL_STEM, bv_rebalance_lead_source, is_inst_invert=True) |
|
|
|
self.set_progress_bar(0.95) |
|
|
|
if stem_name == VOCAL_STEM: |
|
self.master_vocal_path = stem_path |
|
|
|
def pitch_fix(self, source, sr_pitched, org_mix): |
|
semitone_shift = self.semitone_shift |
|
source = spec_utils.change_pitch_semitones(source, sr_pitched, semitone_shift=semitone_shift)[0] |
|
source = spec_utils.match_array_shapes(source, org_mix) |
|
return source |
|
|
|
def match_frequency_pitch(self, mix): |
|
source = mix |
|
if self.is_match_frequency_pitch and self.is_pitch_change: |
|
source, sr_pitched = spec_utils.change_pitch_semitones(mix, 44100, semitone_shift=-self.semitone_shift) |
|
source = self.pitch_fix(source, sr_pitched, mix) |
|
|
|
return source |
|
|
|
class SeperateMDX(SeperateAttributes): |
|
|
|
def seperate(self): |
|
samplerate = 44100 |
|
|
|
if self.primary_model_name == self.model_basename and isinstance(self.primary_sources, tuple): |
|
mix, source = self.primary_sources |
|
self.load_cached_sources() |
|
else: |
|
self.start_inference_console_write() |
|
|
|
if self.is_mdx_ckpt: |
|
model_params = torch.load(self.model_path, map_location=lambda storage, loc: storage)['hyper_parameters'] |
|
self.dim_c, self.hop = model_params['dim_c'], model_params['hop_length'] |
|
separator = MdxnetSet.ConvTDFNet(**model_params) |
|
self.model_run = separator.load_from_checkpoint(self.model_path).to(self.device).eval() |
|
else: |
|
if self.mdx_segment_size == self.dim_t and not self.is_other_gpu: |
|
ort_ = ort.InferenceSession(self.model_path, providers=self.run_type) |
|
self.model_run = lambda spek:ort_.run(None, {'input': spek.cpu().numpy()})[0] |
|
else: |
|
self.model_run = ConvertModel(load(self.model_path)) |
|
self.model_run.to(self.device).eval() |
|
|
|
self.running_inference_console_write() |
|
mix = prepare_mix(self.audio_file) |
|
|
|
source = self.demix(mix) |
|
|
|
if not self.is_vocal_split_model: |
|
self.cache_source((mix, source)) |
|
self.write_to_console(DONE, base_text='') |
|
|
|
mdx_net_cut = True if self.primary_stem in MDX_NET_FREQ_CUT and self.is_match_frequency_pitch else False |
|
|
|
if self.is_secondary_model_activated and self.secondary_model: |
|
self.secondary_source_primary, self.secondary_source_secondary = process_secondary_model(self.secondary_model, self.process_data, main_process_method=self.process_method, main_model_primary=self.primary_stem) |
|
|
|
if not self.is_primary_stem_only: |
|
secondary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.secondary_stem}).wav') |
|
if not isinstance(self.secondary_source, np.ndarray): |
|
raw_mix = self.demix(self.match_frequency_pitch(mix), is_match_mix=True) if mdx_net_cut else self.match_frequency_pitch(mix) |
|
self.secondary_source = spec_utils.invert_stem(raw_mix, source) if self.is_invert_spec else mix.T-source.T |
|
|
|
self.secondary_source_map = self.final_process(secondary_stem_path, self.secondary_source, self.secondary_source_secondary, self.secondary_stem, samplerate) |
|
|
|
if not self.is_secondary_stem_only: |
|
primary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.primary_stem}).wav') |
|
|
|
if not isinstance(self.primary_source, np.ndarray): |
|
self.primary_source = source.T |
|
|
|
self.primary_source_map = self.final_process(primary_stem_path, self.primary_source, self.secondary_source_primary, self.primary_stem, samplerate) |
|
|
|
clear_gpu_cache() |
|
|
|
secondary_sources = {**self.primary_source_map, **self.secondary_source_map} |
|
|
|
self.process_vocal_split_chain(secondary_sources) |
|
|
|
if self.is_secondary_model or self.is_pre_proc_model: |
|
return secondary_sources |
|
|
|
def initialize_model_settings(self): |
|
self.n_bins = self.n_fft//2+1 |
|
self.trim = self.n_fft//2 |
|
self.chunk_size = self.hop * (self.mdx_segment_size-1) |
|
self.gen_size = self.chunk_size-2*self.trim |
|
self.stft = STFT(self.n_fft, self.hop, self.dim_f, self.device) |
|
|
|
def demix(self, mix, is_match_mix=False): |
|
self.initialize_model_settings() |
|
|
|
org_mix = mix |
|
tar_waves_ = [] |
|
|
|
if is_match_mix: |
|
chunk_size = self.hop * (256-1) |
|
overlap = 0.02 |
|
else: |
|
chunk_size = self.chunk_size |
|
overlap = self.overlap_mdx |
|
|
|
if self.is_pitch_change: |
|
mix, sr_pitched = spec_utils.change_pitch_semitones(mix, 44100, semitone_shift=-self.semitone_shift) |
|
|
|
gen_size = chunk_size-2*self.trim |
|
|
|
pad = gen_size + self.trim - ((mix.shape[-1]) % gen_size) |
|
mixture = np.concatenate((np.zeros((2, self.trim), dtype='float32'), mix, np.zeros((2, pad), dtype='float32')), 1) |
|
|
|
step = self.chunk_size - self.n_fft if overlap == DEFAULT else int((1 - overlap) * chunk_size) |
|
result = np.zeros((1, 2, mixture.shape[-1]), dtype=np.float32) |
|
divider = np.zeros((1, 2, mixture.shape[-1]), dtype=np.float32) |
|
total = 0 |
|
total_chunks = (mixture.shape[-1] + step - 1) // step |
|
|
|
for i in range(0, mixture.shape[-1], step): |
|
total += 1 |
|
start = i |
|
end = min(i + chunk_size, mixture.shape[-1]) |
|
|
|
chunk_size_actual = end - start |
|
|
|
if overlap == 0: |
|
window = None |
|
else: |
|
window = np.hanning(chunk_size_actual) |
|
window = np.tile(window[None, None, :], (1, 2, 1)) |
|
|
|
mix_part_ = mixture[:, start:end] |
|
if end != i + chunk_size: |
|
pad_size = (i + chunk_size) - end |
|
mix_part_ = np.concatenate((mix_part_, np.zeros((2, pad_size), dtype='float32')), axis=-1) |
|
|
|
mix_part = torch.tensor([mix_part_], dtype=torch.float32).to(self.device) |
|
mix_waves = mix_part.split(self.mdx_batch_size) |
|
|
|
with torch.no_grad(): |
|
for mix_wave in mix_waves: |
|
self.running_inference_progress_bar(total_chunks, is_match_mix=is_match_mix) |
|
|
|
tar_waves = self.run_model(mix_wave, is_match_mix=is_match_mix) |
|
|
|
if window is not None: |
|
tar_waves[..., :chunk_size_actual] *= window |
|
divider[..., start:end] += window |
|
else: |
|
divider[..., start:end] += 1 |
|
|
|
result[..., start:end] += tar_waves[..., :end-start] |
|
|
|
tar_waves = result / divider |
|
tar_waves_.append(tar_waves) |
|
|
|
tar_waves_ = np.vstack(tar_waves_)[:, :, self.trim:-self.trim] |
|
tar_waves = np.concatenate(tar_waves_, axis=-1)[:, :mix.shape[-1]] |
|
|
|
source = tar_waves[:,0:None] |
|
|
|
if self.is_pitch_change and not is_match_mix: |
|
source = self.pitch_fix(source, sr_pitched, org_mix) |
|
|
|
source = source if is_match_mix else source*self.compensate |
|
|
|
if self.is_denoise_model and not is_match_mix: |
|
if NO_STEM in self.primary_stem_native or self.primary_stem_native == INST_STEM: |
|
if org_mix.shape[1] != source.shape[1]: |
|
source = spec_utils.match_array_shapes(source, org_mix) |
|
source = org_mix - vr_denoiser(org_mix-source, self.device, model_path=self.DENOISER_MODEL) |
|
else: |
|
source = vr_denoiser(source, self.device, model_path=self.DENOISER_MODEL) |
|
|
|
return source |
|
|
|
def run_model(self, mix, is_match_mix=False): |
|
|
|
spek = self.stft(mix.to(self.device))*self.adjust |
|
spek[:, :, :3, :] *= 0 |
|
|
|
if is_match_mix: |
|
spec_pred = spek.cpu().numpy() |
|
else: |
|
spec_pred = -self.model_run(-spek)*0.5+self.model_run(spek)*0.5 if self.is_denoise else self.model_run(spek) |
|
|
|
return self.stft.inverse(torch.tensor(spec_pred).to(self.device)).cpu().detach().numpy() |
|
|
|
class SeperateMDXC(SeperateAttributes): |
|
|
|
def seperate(self): |
|
samplerate = 44100 |
|
sources = None |
|
|
|
if self.primary_model_name == self.model_basename and isinstance(self.primary_sources, tuple): |
|
mix, sources = self.primary_sources |
|
self.load_cached_sources() |
|
else: |
|
self.start_inference_console_write() |
|
self.running_inference_console_write() |
|
mix = prepare_mix(self.audio_file) |
|
sources = self.demix(mix) |
|
if not self.is_vocal_split_model: |
|
self.cache_source((mix, sources)) |
|
self.write_to_console(DONE, base_text='') |
|
|
|
stem_list = [self.mdx_c_configs.training.target_instrument] if self.mdx_c_configs.training.target_instrument else [i for i in self.mdx_c_configs.training.instruments] |
|
|
|
if self.is_secondary_model: |
|
if self.is_pre_proc_model: |
|
self.mdxnet_stem_select = stem_list[0] |
|
else: |
|
self.mdxnet_stem_select = self.main_model_primary_stem_4_stem if self.main_model_primary_stem_4_stem else self.primary_model_primary_stem |
|
self.primary_stem = self.mdxnet_stem_select |
|
self.secondary_stem = secondary_stem(self.mdxnet_stem_select) |
|
self.is_primary_stem_only, self.is_secondary_stem_only = False, False |
|
|
|
is_all_stems = self.mdxnet_stem_select == ALL_STEMS |
|
is_not_ensemble_master = not self.process_data['is_ensemble_master'] |
|
is_not_single_stem = not len(stem_list) <= 2 |
|
is_not_secondary_model = not self.is_secondary_model |
|
is_ensemble_4_stem = self.is_4_stem_ensemble and is_not_single_stem |
|
|
|
if (is_all_stems and is_not_ensemble_master and is_not_single_stem and is_not_secondary_model) or is_ensemble_4_stem and not self.is_pre_proc_model: |
|
for stem in stem_list: |
|
primary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({stem}).wav') |
|
self.primary_source = sources[stem].T |
|
self.write_audio(primary_stem_path, self.primary_source, samplerate, stem_name=stem) |
|
|
|
if stem == VOCAL_STEM and not self.is_sec_bv_rebalance: |
|
self.process_vocal_split_chain({VOCAL_STEM:stem}) |
|
else: |
|
if len(stem_list) == 1: |
|
source_primary = sources |
|
else: |
|
source_primary = sources[stem_list[0]] if self.is_multi_stem_ensemble and len(stem_list) == 2 else sources[self.mdxnet_stem_select] |
|
if self.is_secondary_model_activated and self.secondary_model: |
|
self.secondary_source_primary, self.secondary_source_secondary = process_secondary_model(self.secondary_model, |
|
self.process_data, |
|
main_process_method=self.process_method, |
|
main_model_primary=self.primary_stem) |
|
|
|
if not self.is_primary_stem_only: |
|
secondary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.secondary_stem}).wav') |
|
if not isinstance(self.secondary_source, np.ndarray): |
|
|
|
if self.is_mdx_combine_stems and len(stem_list) >= 2: |
|
if len(stem_list) == 2: |
|
secondary_source = sources[self.secondary_stem] |
|
else: |
|
sources.pop(self.primary_stem) |
|
next_stem = next(iter(sources)) |
|
secondary_source = np.zeros_like(sources[next_stem]) |
|
for v in sources.values(): |
|
secondary_source += v |
|
|
|
self.secondary_source = secondary_source.T |
|
else: |
|
self.secondary_source, raw_mix = source_primary, self.match_frequency_pitch(mix) |
|
self.secondary_source = spec_utils.to_shape(self.secondary_source, raw_mix.shape) |
|
|
|
if self.is_invert_spec: |
|
self.secondary_source = spec_utils.invert_stem(raw_mix, self.secondary_source) |
|
else: |
|
self.secondary_source = (-self.secondary_source.T+raw_mix.T) |
|
|
|
self.secondary_source_map = self.final_process(secondary_stem_path, self.secondary_source, self.secondary_source_secondary, self.secondary_stem, samplerate) |
|
|
|
if not self.is_secondary_stem_only: |
|
primary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.primary_stem}).wav') |
|
if not isinstance(self.primary_source, np.ndarray): |
|
self.primary_source = source_primary.T |
|
|
|
self.primary_source_map = self.final_process(primary_stem_path, self.primary_source, self.secondary_source_primary, self.primary_stem, samplerate) |
|
|
|
clear_gpu_cache() |
|
|
|
secondary_sources = {**self.primary_source_map, **self.secondary_source_map} |
|
self.process_vocal_split_chain(secondary_sources) |
|
|
|
if self.is_secondary_model or self.is_pre_proc_model: |
|
return secondary_sources |
|
|
|
def demix(self, mix): |
|
sr_pitched = 441000 |
|
org_mix = mix |
|
if self.is_pitch_change: |
|
mix, sr_pitched = spec_utils.change_pitch_semitones(mix, 44100, semitone_shift=-self.semitone_shift) |
|
|
|
model = TFC_TDF_net(self.mdx_c_configs, device=self.device) |
|
model.load_state_dict(torch.load(self.model_path, map_location=cpu)) |
|
model.to(self.device).eval() |
|
mix = torch.tensor(mix, dtype=torch.float32) |
|
|
|
try: |
|
S = model.num_target_instruments |
|
except Exception as e: |
|
S = model.module.num_target_instruments |
|
|
|
mdx_segment_size = self.mdx_c_configs.inference.dim_t if self.is_mdx_c_seg_def else self.mdx_segment_size |
|
|
|
batch_size = self.mdx_batch_size |
|
chunk_size = self.mdx_c_configs.audio.hop_length * (mdx_segment_size - 1) |
|
overlap = self.overlap_mdx23 |
|
|
|
hop_size = chunk_size // overlap |
|
mix_shape = mix.shape[1] |
|
pad_size = hop_size - (mix_shape - chunk_size) % hop_size |
|
mix = torch.cat([torch.zeros(2, chunk_size - hop_size), mix, torch.zeros(2, pad_size + chunk_size - hop_size)], 1) |
|
|
|
chunks = mix.unfold(1, chunk_size, hop_size).transpose(0, 1) |
|
batches = [chunks[i : i + batch_size] for i in range(0, len(chunks), batch_size)] |
|
|
|
X = torch.zeros(S, *mix.shape) if S > 1 else torch.zeros_like(mix) |
|
X = X.to(self.device) |
|
|
|
with torch.no_grad(): |
|
cnt = 0 |
|
for batch in batches: |
|
self.running_inference_progress_bar(len(batches)) |
|
x = model(batch.to(self.device)) |
|
|
|
for w in x: |
|
X[..., cnt * hop_size : cnt * hop_size + chunk_size] += w |
|
cnt += 1 |
|
|
|
estimated_sources = X[..., chunk_size - hop_size:-(pad_size + chunk_size - hop_size)] / overlap |
|
del X |
|
pitch_fix = lambda s:self.pitch_fix(s, sr_pitched, org_mix) |
|
|
|
if S > 1: |
|
sources = {k: pitch_fix(v) if self.is_pitch_change else v for k, v in zip(self.mdx_c_configs.training.instruments, estimated_sources.cpu().detach().numpy())} |
|
del estimated_sources |
|
if self.is_denoise_model: |
|
if VOCAL_STEM in sources.keys() and INST_STEM in sources.keys(): |
|
sources[VOCAL_STEM] = vr_denoiser(sources[VOCAL_STEM], self.device, model_path=self.DENOISER_MODEL) |
|
if sources[VOCAL_STEM].shape[1] != org_mix.shape[1]: |
|
sources[VOCAL_STEM] = spec_utils.match_array_shapes(sources[VOCAL_STEM], org_mix) |
|
sources[INST_STEM] = org_mix - sources[VOCAL_STEM] |
|
|
|
return sources |
|
else: |
|
est_s = estimated_sources.cpu().detach().numpy() |
|
del estimated_sources |
|
return pitch_fix(est_s) if self.is_pitch_change else est_s |
|
|
|
class SeperateDemucs(SeperateAttributes): |
|
def seperate(self): |
|
samplerate = 44100 |
|
source = None |
|
model_scale = None |
|
stem_source = None |
|
stem_source_secondary = None |
|
inst_mix = None |
|
inst_source = None |
|
is_no_write = False |
|
is_no_piano_guitar = False |
|
is_no_cache = False |
|
|
|
if self.primary_model_name == self.model_basename and isinstance(self.primary_sources, np.ndarray) and not self.pre_proc_model: |
|
source = self.primary_sources |
|
self.load_cached_sources() |
|
else: |
|
self.start_inference_console_write() |
|
is_no_cache = True |
|
|
|
mix = prepare_mix(self.audio_file) |
|
|
|
if is_no_cache: |
|
if self.demucs_version == DEMUCS_V1: |
|
if str(self.model_path).endswith(".gz"): |
|
self.model_path = gzip.open(self.model_path, "rb") |
|
klass, args, kwargs, state = torch.load(self.model_path) |
|
self.demucs = klass(*args, **kwargs) |
|
self.demucs.to(self.device) |
|
self.demucs.load_state_dict(state) |
|
elif self.demucs_version == DEMUCS_V2: |
|
self.demucs = auto_load_demucs_model_v2(self.demucs_source_list, self.model_path) |
|
self.demucs.to(self.device) |
|
self.demucs.load_state_dict(torch.load(self.model_path)) |
|
self.demucs.eval() |
|
else: |
|
self.demucs = HDemucs(sources=self.demucs_source_list) |
|
self.demucs = _gm(name=os.path.splitext(os.path.basename(self.model_path))[0], |
|
repo=Path(os.path.dirname(self.model_path))) |
|
self.demucs = demucs_segments(self.segment, self.demucs) |
|
self.demucs.to(self.device) |
|
self.demucs.eval() |
|
|
|
if self.pre_proc_model: |
|
if self.primary_stem not in [VOCAL_STEM, INST_STEM]: |
|
is_no_write = True |
|
self.write_to_console(DONE, base_text='') |
|
mix_no_voc = process_secondary_model(self.pre_proc_model, self.process_data, is_pre_proc_model=True) |
|
inst_mix = prepare_mix(mix_no_voc[INST_STEM]) |
|
self.process_iteration() |
|
self.running_inference_console_write(is_no_write=is_no_write) |
|
inst_source = self.demix_demucs(inst_mix) |
|
self.process_iteration() |
|
|
|
self.running_inference_console_write(is_no_write=is_no_write) if not self.pre_proc_model else None |
|
|
|
if self.primary_model_name == self.model_basename and isinstance(self.primary_sources, np.ndarray) and self.pre_proc_model: |
|
source = self.primary_sources |
|
else: |
|
source = self.demix_demucs(mix) |
|
|
|
self.write_to_console(DONE, base_text='') |
|
|
|
del self.demucs |
|
clear_gpu_cache() |
|
|
|
if isinstance(inst_source, np.ndarray): |
|
source_reshape = spec_utils.reshape_sources(inst_source[self.demucs_source_map[VOCAL_STEM]], source[self.demucs_source_map[VOCAL_STEM]]) |
|
inst_source[self.demucs_source_map[VOCAL_STEM]] = source_reshape |
|
source = inst_source |
|
|
|
if isinstance(source, np.ndarray): |
|
|
|
if len(source) == 2: |
|
self.demucs_source_map = DEMUCS_2_SOURCE_MAPPER |
|
else: |
|
self.demucs_source_map = DEMUCS_6_SOURCE_MAPPER if len(source) == 6 else DEMUCS_4_SOURCE_MAPPER |
|
|
|
if len(source) == 6 and self.process_data['is_ensemble_master'] or len(source) == 6 and self.is_secondary_model: |
|
is_no_piano_guitar = True |
|
six_stem_other_source = list(source) |
|
six_stem_other_source = [i for n, i in enumerate(source) if n in [self.demucs_source_map[OTHER_STEM], self.demucs_source_map[GUITAR_STEM], self.demucs_source_map[PIANO_STEM]]] |
|
other_source = np.zeros_like(six_stem_other_source[0]) |
|
for i in six_stem_other_source: |
|
other_source += i |
|
source_reshape = spec_utils.reshape_sources(source[self.demucs_source_map[OTHER_STEM]], other_source) |
|
source[self.demucs_source_map[OTHER_STEM]] = source_reshape |
|
|
|
if not self.is_vocal_split_model: |
|
self.cache_source(source) |
|
|
|
if (self.demucs_stems == ALL_STEMS and not self.process_data['is_ensemble_master']) or self.is_4_stem_ensemble and not self.is_return_dual: |
|
for stem_name, stem_value in self.demucs_source_map.items(): |
|
if self.is_secondary_model_activated and not self.is_secondary_model and not stem_value >= 4: |
|
if self.secondary_model_4_stem[stem_value]: |
|
model_scale = self.secondary_model_4_stem_scale[stem_value] |
|
stem_source_secondary = process_secondary_model(self.secondary_model_4_stem[stem_value], self.process_data, main_model_primary_stem_4_stem=stem_name, is_source_load=True, is_return_dual=False) |
|
if isinstance(stem_source_secondary, np.ndarray): |
|
stem_source_secondary = stem_source_secondary[1 if self.secondary_model_4_stem[stem_value].demucs_stem_count == 2 else stem_value].T |
|
elif type(stem_source_secondary) is dict: |
|
stem_source_secondary = stem_source_secondary[stem_name] |
|
|
|
stem_source_secondary = None if stem_value >= 4 else stem_source_secondary |
|
stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({stem_name}).wav') |
|
stem_source = source[stem_value].T |
|
|
|
stem_source = self.process_secondary_stem(stem_source, secondary_model_source=stem_source_secondary, model_scale=model_scale) |
|
self.write_audio(stem_path, stem_source, samplerate, stem_name=stem_name) |
|
|
|
if stem_name == VOCAL_STEM and not self.is_sec_bv_rebalance: |
|
self.process_vocal_split_chain({VOCAL_STEM:stem_source}) |
|
|
|
if self.is_secondary_model: |
|
return source |
|
else: |
|
if self.is_secondary_model_activated and self.secondary_model: |
|
self.secondary_source_primary, self.secondary_source_secondary = process_secondary_model(self.secondary_model, self.process_data, main_process_method=self.process_method) |
|
|
|
if not self.is_primary_stem_only: |
|
def secondary_save(sec_stem_name, source, raw_mixture=None, is_inst_mixture=False): |
|
secondary_source = self.secondary_source if not is_inst_mixture else None |
|
secondary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({sec_stem_name}).wav') |
|
secondary_source_secondary = None |
|
|
|
if not isinstance(secondary_source, np.ndarray): |
|
if self.is_demucs_combine_stems: |
|
source = list(source) |
|
if is_inst_mixture: |
|
source = [i for n, i in enumerate(source) if not n in [self.demucs_source_map[self.primary_stem], self.demucs_source_map[VOCAL_STEM]]] |
|
else: |
|
source.pop(self.demucs_source_map[self.primary_stem]) |
|
|
|
source = source[:len(source) - 2] if is_no_piano_guitar else source |
|
secondary_source = np.zeros_like(source[0]) |
|
for i in source: |
|
secondary_source += i |
|
secondary_source = secondary_source.T |
|
else: |
|
if not isinstance(raw_mixture, np.ndarray): |
|
raw_mixture = prepare_mix(self.audio_file) |
|
|
|
secondary_source = source[self.demucs_source_map[self.primary_stem]] |
|
|
|
if self.is_invert_spec: |
|
secondary_source = spec_utils.invert_stem(raw_mixture, secondary_source) |
|
else: |
|
raw_mixture = spec_utils.reshape_sources(secondary_source, raw_mixture) |
|
secondary_source = (-secondary_source.T+raw_mixture.T) |
|
|
|
if not is_inst_mixture: |
|
self.secondary_source = secondary_source |
|
secondary_source_secondary = self.secondary_source_secondary |
|
self.secondary_source = self.process_secondary_stem(secondary_source, secondary_source_secondary) |
|
self.secondary_source_map = {self.secondary_stem: self.secondary_source} |
|
|
|
self.write_audio(secondary_stem_path, secondary_source, samplerate, stem_name=sec_stem_name) |
|
|
|
secondary_save(self.secondary_stem, source, raw_mixture=mix) |
|
|
|
if self.is_demucs_pre_proc_model_inst_mix and self.pre_proc_model and not self.is_4_stem_ensemble: |
|
secondary_save(f"{self.secondary_stem} {INST_STEM}", source, raw_mixture=inst_mix, is_inst_mixture=True) |
|
|
|
if not self.is_secondary_stem_only: |
|
primary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.primary_stem}).wav') |
|
if not isinstance(self.primary_source, np.ndarray): |
|
self.primary_source = source[self.demucs_source_map[self.primary_stem]].T |
|
|
|
self.primary_source_map = self.final_process(primary_stem_path, self.primary_source, self.secondary_source_primary, self.primary_stem, samplerate) |
|
|
|
secondary_sources = {**self.primary_source_map, **self.secondary_source_map} |
|
|
|
self.process_vocal_split_chain(secondary_sources) |
|
|
|
if self.is_secondary_model: |
|
return secondary_sources |
|
|
|
def demix_demucs(self, mix): |
|
|
|
org_mix = mix |
|
|
|
if self.is_pitch_change: |
|
mix, sr_pitched = spec_utils.change_pitch_semitones(mix, 44100, semitone_shift=-self.semitone_shift) |
|
|
|
processed = {} |
|
mix = torch.tensor(mix, dtype=torch.float32) |
|
ref = mix.mean(0) |
|
mix = (mix - ref.mean()) / ref.std() |
|
mix_infer = mix |
|
|
|
with torch.no_grad(): |
|
if self.demucs_version == DEMUCS_V1: |
|
sources = apply_model_v1(self.demucs, |
|
mix_infer.to(self.device), |
|
self.shifts, |
|
self.is_split_mode, |
|
set_progress_bar=self.set_progress_bar) |
|
elif self.demucs_version == DEMUCS_V2: |
|
sources = apply_model_v2(self.demucs, |
|
mix_infer.to(self.device), |
|
self.shifts, |
|
self.is_split_mode, |
|
self.overlap, |
|
set_progress_bar=self.set_progress_bar) |
|
else: |
|
sources = apply_model(self.demucs, |
|
mix_infer[None], |
|
self.shifts, |
|
self.is_split_mode, |
|
self.overlap, |
|
static_shifts=1 if self.shifts == 0 else self.shifts, |
|
set_progress_bar=self.set_progress_bar, |
|
device=self.device)[0] |
|
|
|
sources = (sources * ref.std() + ref.mean()).cpu().numpy() |
|
sources[[0,1]] = sources[[1,0]] |
|
processed[mix] = sources[:,:,0:None].copy() |
|
sources = list(processed.values()) |
|
sources = [s[:,:,0:None] for s in sources] |
|
|
|
sources = np.concatenate(sources, axis=-1) |
|
|
|
if self.is_pitch_change: |
|
sources = np.stack([self.pitch_fix(stem, sr_pitched, org_mix) for stem in sources]) |
|
|
|
return sources |
|
|
|
class SeperateVR(SeperateAttributes): |
|
|
|
def seperate(self): |
|
if self.primary_model_name == self.model_basename and isinstance(self.primary_sources, tuple): |
|
y_spec, v_spec = self.primary_sources |
|
self.load_cached_sources() |
|
else: |
|
self.start_inference_console_write() |
|
|
|
device = self.device |
|
|
|
nn_arch_sizes = [ |
|
31191, |
|
33966, 56817, 123821, 123812, 129605, 218409, 537238, 537227] |
|
vr_5_1_models = [56817, 218409] |
|
model_size = math.ceil(os.stat(self.model_path).st_size / 1024) |
|
nn_arch_size = min(nn_arch_sizes, key=lambda x:abs(x-model_size)) |
|
|
|
if nn_arch_size in vr_5_1_models or self.is_vr_51_model: |
|
self.model_run = nets_new.CascadedNet(self.mp.param['bins'] * 2, |
|
nn_arch_size, |
|
nout=self.model_capacity[0], |
|
nout_lstm=self.model_capacity[1]) |
|
self.is_vr_51_model = True |
|
else: |
|
self.model_run = nets.determine_model_capacity(self.mp.param['bins'] * 2, nn_arch_size) |
|
|
|
self.model_run.load_state_dict(torch.load(self.model_path, map_location=cpu)) |
|
self.model_run.to(device) |
|
|
|
self.running_inference_console_write() |
|
|
|
y_spec, v_spec = self.inference_vr(self.loading_mix(), device, self.aggressiveness) |
|
if not self.is_vocal_split_model: |
|
self.cache_source((y_spec, v_spec)) |
|
self.write_to_console(DONE, base_text='') |
|
|
|
if self.is_secondary_model_activated and self.secondary_model: |
|
self.secondary_source_primary, self.secondary_source_secondary = process_secondary_model(self.secondary_model, self.process_data, main_process_method=self.process_method, main_model_primary=self.primary_stem) |
|
|
|
if not self.is_secondary_stem_only: |
|
primary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.primary_stem}).wav') |
|
if not isinstance(self.primary_source, np.ndarray): |
|
self.primary_source = self.spec_to_wav(y_spec).T |
|
if not self.model_samplerate == 44100: |
|
self.primary_source = librosa.resample(self.primary_source.T, orig_sr=self.model_samplerate, target_sr=44100).T |
|
|
|
self.primary_source_map = self.final_process(primary_stem_path, self.primary_source, self.secondary_source_primary, self.primary_stem, 44100) |
|
|
|
if not self.is_primary_stem_only: |
|
secondary_stem_path = os.path.join(self.export_path, f'{self.audio_file_base}_({self.secondary_stem}).wav') |
|
if not isinstance(self.secondary_source, np.ndarray): |
|
self.secondary_source = self.spec_to_wav(v_spec).T |
|
if not self.model_samplerate == 44100: |
|
self.secondary_source = librosa.resample(self.secondary_source.T, orig_sr=self.model_samplerate, target_sr=44100).T |
|
|
|
self.secondary_source_map = self.final_process(secondary_stem_path, self.secondary_source, self.secondary_source_secondary, self.secondary_stem, 44100) |
|
|
|
clear_gpu_cache() |
|
secondary_sources = {**self.primary_source_map, **self.secondary_source_map} |
|
|
|
self.process_vocal_split_chain(secondary_sources) |
|
|
|
if self.is_secondary_model: |
|
return secondary_sources |
|
|
|
def loading_mix(self): |
|
|
|
X_wave, X_spec_s = {}, {} |
|
|
|
bands_n = len(self.mp.param['band']) |
|
|
|
audio_file = spec_utils.write_array_to_mem(self.audio_file, subtype=self.wav_type_set) |
|
is_mp3 = audio_file.endswith('.mp3') if isinstance(audio_file, str) else False |
|
|
|
for d in range(bands_n, 0, -1): |
|
bp = self.mp.param['band'][d] |
|
|
|
if OPERATING_SYSTEM == 'Darwin': |
|
wav_resolution = 'polyphase' if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else bp['res_type'] |
|
else: |
|
wav_resolution = bp['res_type'] |
|
|
|
if d == bands_n: |
|
X_wave[d], _ = librosa.load(audio_file, bp['sr'], False, dtype=np.float32, res_type=wav_resolution) |
|
X_spec_s[d] = spec_utils.wave_to_spectrogram(X_wave[d], bp['hl'], bp['n_fft'], self.mp, band=d, is_v51_model=self.is_vr_51_model) |
|
|
|
if not np.any(X_wave[d]) and is_mp3: |
|
X_wave[d] = rerun_mp3(audio_file, bp['sr']) |
|
|
|
if X_wave[d].ndim == 1: |
|
X_wave[d] = np.asarray([X_wave[d], X_wave[d]]) |
|
else: |
|
X_wave[d] = librosa.resample(X_wave[d+1], self.mp.param['band'][d+1]['sr'], bp['sr'], res_type=wav_resolution) |
|
X_spec_s[d] = spec_utils.wave_to_spectrogram(X_wave[d], bp['hl'], bp['n_fft'], self.mp, band=d, is_v51_model=self.is_vr_51_model) |
|
|
|
if d == bands_n and self.high_end_process != 'none': |
|
self.input_high_end_h = (bp['n_fft']//2 - bp['crop_stop']) + (self.mp.param['pre_filter_stop'] - self.mp.param['pre_filter_start']) |
|
self.input_high_end = X_spec_s[d][:, bp['n_fft']//2-self.input_high_end_h:bp['n_fft']//2, :] |
|
|
|
X_spec = spec_utils.combine_spectrograms(X_spec_s, self.mp, is_v51_model=self.is_vr_51_model) |
|
|
|
del X_wave, X_spec_s, audio_file |
|
|
|
return X_spec |
|
|
|
def inference_vr(self, X_spec, device, aggressiveness): |
|
def _execute(X_mag_pad, roi_size): |
|
X_dataset = [] |
|
patches = (X_mag_pad.shape[2] - 2 * self.model_run.offset) // roi_size |
|
total_iterations = patches//self.batch_size if not self.is_tta else (patches//self.batch_size)*2 |
|
for i in range(patches): |
|
start = i * roi_size |
|
X_mag_window = X_mag_pad[:, :, start:start + self.window_size] |
|
X_dataset.append(X_mag_window) |
|
|
|
X_dataset = np.asarray(X_dataset) |
|
self.model_run.eval() |
|
with torch.no_grad(): |
|
mask = [] |
|
for i in range(0, patches, self.batch_size): |
|
self.progress_value += 1 |
|
if self.progress_value >= total_iterations: |
|
self.progress_value = total_iterations |
|
self.set_progress_bar(0.1, 0.8/total_iterations*self.progress_value) |
|
X_batch = X_dataset[i: i + self.batch_size] |
|
X_batch = torch.from_numpy(X_batch).to(device) |
|
pred = self.model_run.predict_mask(X_batch) |
|
if not pred.size()[3] > 0: |
|
raise Exception(ERROR_MAPPER[WINDOW_SIZE_ERROR]) |
|
pred = pred.detach().cpu().numpy() |
|
pred = np.concatenate(pred, axis=2) |
|
mask.append(pred) |
|
if len(mask) == 0: |
|
raise Exception(ERROR_MAPPER[WINDOW_SIZE_ERROR]) |
|
|
|
mask = np.concatenate(mask, axis=2) |
|
return mask |
|
|
|
def postprocess(mask, X_mag, X_phase): |
|
is_non_accom_stem = False |
|
for stem in NON_ACCOM_STEMS: |
|
if stem == self.primary_stem: |
|
is_non_accom_stem = True |
|
|
|
mask = spec_utils.adjust_aggr(mask, is_non_accom_stem, aggressiveness) |
|
|
|
if self.is_post_process: |
|
mask = spec_utils.merge_artifacts(mask, thres=self.post_process_threshold) |
|
|
|
y_spec = mask * X_mag * np.exp(1.j * X_phase) |
|
v_spec = (1 - mask) * X_mag * np.exp(1.j * X_phase) |
|
|
|
return y_spec, v_spec |
|
|
|
X_mag, X_phase = spec_utils.preprocess(X_spec) |
|
n_frame = X_mag.shape[2] |
|
pad_l, pad_r, roi_size = spec_utils.make_padding(n_frame, self.window_size, self.model_run.offset) |
|
X_mag_pad = np.pad(X_mag, ((0, 0), (0, 0), (pad_l, pad_r)), mode='constant') |
|
X_mag_pad /= X_mag_pad.max() |
|
mask = _execute(X_mag_pad, roi_size) |
|
|
|
if self.is_tta: |
|
pad_l += roi_size // 2 |
|
pad_r += roi_size // 2 |
|
X_mag_pad = np.pad(X_mag, ((0, 0), (0, 0), (pad_l, pad_r)), mode='constant') |
|
X_mag_pad /= X_mag_pad.max() |
|
mask_tta = _execute(X_mag_pad, roi_size) |
|
mask_tta = mask_tta[:, :, roi_size // 2:] |
|
mask = (mask[:, :, :n_frame] + mask_tta[:, :, :n_frame]) * 0.5 |
|
else: |
|
mask = mask[:, :, :n_frame] |
|
|
|
y_spec, v_spec = postprocess(mask, X_mag, X_phase) |
|
|
|
return y_spec, v_spec |
|
|
|
def spec_to_wav(self, spec): |
|
if self.high_end_process.startswith('mirroring') and isinstance(self.input_high_end, np.ndarray) and self.input_high_end_h: |
|
input_high_end_ = spec_utils.mirroring(self.high_end_process, spec, self.input_high_end, self.mp) |
|
wav = spec_utils.cmb_spectrogram_to_wave(spec, self.mp, self.input_high_end_h, input_high_end_, is_v51_model=self.is_vr_51_model) |
|
else: |
|
wav = spec_utils.cmb_spectrogram_to_wave(spec, self.mp, is_v51_model=self.is_vr_51_model) |
|
|
|
return wav |
|
|
|
def process_secondary_model(secondary_model: ModelData, |
|
process_data, |
|
main_model_primary_stem_4_stem=None, |
|
is_source_load=False, |
|
main_process_method=None, |
|
is_pre_proc_model=False, |
|
is_return_dual=True, |
|
main_model_primary=None): |
|
|
|
if not is_pre_proc_model: |
|
process_iteration = process_data['process_iteration'] |
|
process_iteration() |
|
|
|
if secondary_model.process_method == VR_ARCH_TYPE: |
|
seperator = SeperateVR(secondary_model, process_data, main_model_primary_stem_4_stem=main_model_primary_stem_4_stem, main_process_method=main_process_method, main_model_primary=main_model_primary) |
|
if secondary_model.process_method == MDX_ARCH_TYPE: |
|
if secondary_model.is_mdx_c: |
|
seperator = SeperateMDXC(secondary_model, process_data, main_model_primary_stem_4_stem=main_model_primary_stem_4_stem, main_process_method=main_process_method, is_return_dual=is_return_dual, main_model_primary=main_model_primary) |
|
else: |
|
seperator = SeperateMDX(secondary_model, process_data, main_model_primary_stem_4_stem=main_model_primary_stem_4_stem, main_process_method=main_process_method, main_model_primary=main_model_primary) |
|
if secondary_model.process_method == DEMUCS_ARCH_TYPE: |
|
seperator = SeperateDemucs(secondary_model, process_data, main_model_primary_stem_4_stem=main_model_primary_stem_4_stem, main_process_method=main_process_method, is_return_dual=is_return_dual, main_model_primary=main_model_primary) |
|
|
|
secondary_sources = seperator.seperate() |
|
|
|
if type(secondary_sources) is dict and not is_source_load and not is_pre_proc_model: |
|
return gather_sources(secondary_model.primary_model_primary_stem, secondary_stem(secondary_model.primary_model_primary_stem), secondary_sources) |
|
else: |
|
return secondary_sources |
|
|
|
def process_chain_model(secondary_model: ModelData, |
|
process_data, |
|
vocal_stem_path, |
|
master_vocal_source, |
|
master_inst_source=None): |
|
|
|
process_iteration = process_data['process_iteration'] |
|
process_iteration() |
|
|
|
if secondary_model.bv_model_rebalance: |
|
vocal_source = spec_utils.reduce_mix_bv(master_inst_source, master_vocal_source, reduction_rate=secondary_model.bv_model_rebalance) |
|
else: |
|
vocal_source = master_vocal_source |
|
|
|
vocal_stem_path = [vocal_source, os.path.splitext(os.path.basename(vocal_stem_path))[0]] |
|
|
|
if secondary_model.process_method == VR_ARCH_TYPE: |
|
seperator = SeperateVR(secondary_model, process_data, vocal_stem_path=vocal_stem_path, master_inst_source=master_inst_source, master_vocal_source=master_vocal_source) |
|
if secondary_model.process_method == MDX_ARCH_TYPE: |
|
if secondary_model.is_mdx_c: |
|
seperator = SeperateMDXC(secondary_model, process_data, vocal_stem_path=vocal_stem_path, master_inst_source=master_inst_source, master_vocal_source=master_vocal_source) |
|
else: |
|
seperator = SeperateMDX(secondary_model, process_data, vocal_stem_path=vocal_stem_path, master_inst_source=master_inst_source, master_vocal_source=master_vocal_source) |
|
if secondary_model.process_method == DEMUCS_ARCH_TYPE: |
|
seperator = SeperateDemucs(secondary_model, process_data, vocal_stem_path=vocal_stem_path, master_inst_source=master_inst_source, master_vocal_source=master_vocal_source) |
|
|
|
secondary_sources = seperator.seperate() |
|
|
|
if type(secondary_sources) is dict: |
|
return secondary_sources |
|
else: |
|
return None |
|
|
|
def gather_sources(primary_stem_name, secondary_stem_name, secondary_sources: dict): |
|
|
|
source_primary = False |
|
source_secondary = False |
|
|
|
for key, value in secondary_sources.items(): |
|
if key in primary_stem_name: |
|
source_primary = value |
|
if key in secondary_stem_name: |
|
source_secondary = value |
|
|
|
return source_primary, source_secondary |
|
|
|
def prepare_mix(mix): |
|
|
|
audio_path = mix |
|
|
|
if not isinstance(mix, np.ndarray): |
|
mix, sr = librosa.load(mix, mono=False, sr=44100) |
|
else: |
|
mix = mix.T |
|
|
|
if isinstance(audio_path, str): |
|
if not np.any(mix) and audio_path.endswith('.mp3'): |
|
mix = rerun_mp3(audio_path) |
|
|
|
if mix.ndim == 1: |
|
mix = np.asfortranarray([mix,mix]) |
|
|
|
return mix |
|
|
|
def rerun_mp3(audio_file, sample_rate=44100): |
|
|
|
with audioread.audio_open(audio_file) as f: |
|
track_length = int(f.duration) |
|
|
|
return librosa.load(audio_file, duration=track_length, mono=False, sr=sample_rate)[0] |
|
|
|
def save_format(audio_path, save_format, mp3_bit_set): |
|
|
|
if not save_format == WAV: |
|
|
|
if OPERATING_SYSTEM == 'Darwin': |
|
FFMPEG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'ffmpeg') |
|
pydub.AudioSegment.converter = FFMPEG_PATH |
|
|
|
musfile = pydub.AudioSegment.from_wav(audio_path) |
|
|
|
if save_format == FLAC: |
|
audio_path_flac = audio_path.replace(".wav", ".flac") |
|
musfile.export(audio_path_flac, format="flac") |
|
|
|
if save_format == MP3: |
|
audio_path_mp3 = audio_path.replace(".wav", ".mp3") |
|
try: |
|
musfile.export(audio_path_mp3, format="mp3", bitrate=mp3_bit_set, codec="libmp3lame") |
|
except Exception as e: |
|
print(e) |
|
musfile.export(audio_path_mp3, format="mp3", bitrate=mp3_bit_set) |
|
|
|
try: |
|
os.remove(audio_path) |
|
except Exception as e: |
|
print(e) |
|
|
|
def pitch_shift(mix): |
|
new_sr = 31183 |
|
|
|
|
|
resampled_audio = signal.resample_poly(mix, new_sr, 44100) |
|
|
|
return resampled_audio |
|
|
|
def list_to_dictionary(lst): |
|
dictionary = {item: index for index, item in enumerate(lst)} |
|
return dictionary |
|
|
|
def vr_denoiser(X, device, hop_length=1024, n_fft=2048, cropsize=256, is_deverber=False, model_path=None): |
|
batchsize = 4 |
|
|
|
if is_deverber: |
|
nout, nout_lstm = 64, 128 |
|
mp = ModelParameters(os.path.join('lib_v5', 'vr_network', 'modelparams', '4band_v3.json')) |
|
n_fft = mp.param['bins'] * 2 |
|
else: |
|
mp = None |
|
hop_length=1024 |
|
nout, nout_lstm = 16, 128 |
|
|
|
model = nets_new.CascadedNet(n_fft, nout=nout, nout_lstm=nout_lstm) |
|
model.load_state_dict(torch.load(model_path, map_location=cpu)) |
|
model.to(device) |
|
|
|
if mp is None: |
|
X_spec = spec_utils.wave_to_spectrogram_old(X, hop_length, n_fft) |
|
else: |
|
X_spec = loading_mix(X.T, mp) |
|
|
|
|
|
X_mag = np.abs(X_spec) |
|
X_phase = np.angle(X_spec) |
|
|
|
|
|
n_frame = X_mag.shape[2] |
|
pad_l, pad_r, roi_size = spec_utils.make_padding(n_frame, cropsize, model.offset) |
|
X_mag_pad = np.pad(X_mag, ((0, 0), (0, 0), (pad_l, pad_r)), mode='constant') |
|
X_mag_pad /= X_mag_pad.max() |
|
|
|
X_dataset = [] |
|
patches = (X_mag_pad.shape[2] - 2 * model.offset) // roi_size |
|
for i in range(patches): |
|
start = i * roi_size |
|
X_mag_crop = X_mag_pad[:, :, start:start + cropsize] |
|
X_dataset.append(X_mag_crop) |
|
|
|
X_dataset = np.asarray(X_dataset) |
|
|
|
model.eval() |
|
|
|
with torch.no_grad(): |
|
mask = [] |
|
|
|
for i in range(0, patches, batchsize): |
|
X_batch = X_dataset[i: i + batchsize] |
|
X_batch = torch.from_numpy(X_batch).to(device) |
|
|
|
pred = model.predict_mask(X_batch) |
|
|
|
pred = pred.detach().cpu().numpy() |
|
pred = np.concatenate(pred, axis=2) |
|
mask.append(pred) |
|
|
|
mask = np.concatenate(mask, axis=2) |
|
|
|
mask = mask[:, :, :n_frame] |
|
|
|
|
|
if is_deverber: |
|
v_spec = mask * X_mag * np.exp(1.j * X_phase) |
|
y_spec = (1 - mask) * X_mag * np.exp(1.j * X_phase) |
|
else: |
|
v_spec = (1 - mask) * X_mag * np.exp(1.j * X_phase) |
|
|
|
if mp is None: |
|
wave = spec_utils.spectrogram_to_wave_old(v_spec, hop_length=1024) |
|
else: |
|
wave = spec_utils.cmb_spectrogram_to_wave(v_spec, mp, is_v51_model=True).T |
|
|
|
wave = spec_utils.match_array_shapes(wave, X) |
|
|
|
if is_deverber: |
|
wave_2 = spec_utils.cmb_spectrogram_to_wave(y_spec, mp, is_v51_model=True).T |
|
wave_2 = spec_utils.match_array_shapes(wave_2, X) |
|
return wave, wave_2 |
|
else: |
|
return wave |
|
|
|
def loading_mix(X, mp): |
|
|
|
X_wave, X_spec_s = {}, {} |
|
|
|
bands_n = len(mp.param['band']) |
|
|
|
for d in range(bands_n, 0, -1): |
|
bp = mp.param['band'][d] |
|
|
|
if OPERATING_SYSTEM == 'Darwin': |
|
wav_resolution = 'polyphase' if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else bp['res_type'] |
|
else: |
|
wav_resolution = 'polyphase' |
|
|
|
if d == bands_n: |
|
X_wave[d] = X |
|
|
|
else: |
|
X_wave[d] = librosa.resample(X_wave[d+1], mp.param['band'][d+1]['sr'], bp['sr'], res_type=wav_resolution) |
|
|
|
X_spec_s[d] = spec_utils.wave_to_spectrogram(X_wave[d], bp['hl'], bp['n_fft'], mp, band=d, is_v51_model=True) |
|
|
|
|
|
|
|
|
|
|
|
X_spec = spec_utils.combine_spectrograms(X_spec_s, mp) |
|
|
|
del X_wave, X_spec_s |
|
|
|
return X_spec |
|
|