|
|
|
|
|
import warnings |
|
warnings.filterwarnings("ignore", message="Failed to initialize NumPy: _ARRAY_API not found") |
|
|
|
import os |
|
import torch |
|
import numpy as np |
|
import torchaudio |
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
try: |
|
from . import asteroid_test |
|
except ImportError as e: |
|
raise ImportError("無法載入 asteroid_test 模組,請確認該模組與訓練時相同") from e |
|
|
|
torchaudio.set_audio_backend("sox_io") |
|
|
|
|
|
def get_conf(): |
|
"""取得模型參數設定""" |
|
conf_filterbank = { |
|
'n_filters': 64, |
|
'kernel_size': 16, |
|
'stride': 8 |
|
} |
|
|
|
conf_masknet = { |
|
'in_chan': 64, |
|
'n_src': 2, |
|
'out_chan': 64, |
|
'ff_hid': 256, |
|
'ff_activation': "relu", |
|
'norm_type': "gLN", |
|
'chunk_size': 100, |
|
'hop_size': 50, |
|
'n_repeats': 2, |
|
'mask_act': 'sigmoid', |
|
'bidirectional': True, |
|
'dropout': 0 |
|
} |
|
return conf_filterbank, conf_masknet |
|
|
|
|
|
def load_dpt_model(): |
|
print('Load Separation Model...') |
|
|
|
speech_sep_token = os.getenv("SpeechSeparation") |
|
if not speech_sep_token: |
|
raise EnvironmentError("環境變數 SpeechSeparation 未設定!") |
|
|
|
model_path = hf_hub_download( |
|
repo_id="DeepLearning101/speech-separation", |
|
filename="train_dptnet_aishell_partOverlap_B2_300epoch_quan-int8.p", |
|
token=speech_sep_token |
|
) |
|
|
|
conf_filterbank, conf_masknet = get_conf() |
|
|
|
try: |
|
model_class = getattr(asteroid_test, "DPTNet") |
|
model = model_class(**conf_filterbank, **conf_masknet) |
|
except Exception as e: |
|
raise RuntimeError("模型結構錯誤:請確認 asteroid_test.py 是否與訓練時相同") from e |
|
|
|
model = torch.quantization.quantize_dynamic( |
|
model, |
|
{torch.nn.LSTM, torch.nn.Linear}, |
|
dtype=torch.qint8 |
|
) |
|
|
|
state_dict = torch.load(model_path, map_location="cpu") |
|
own_state = model.state_dict() |
|
|
|
|
|
filtered_state_dict = {} |
|
for k, v in state_dict.items(): |
|
if k in own_state: |
|
if isinstance(v, torch.Tensor) and isinstance(own_state[k], torch.Tensor): |
|
if v.shape == own_state[k].shape: |
|
filtered_state_dict[k] = v |
|
else: |
|
print(f"Skip '{k}': shape mismatch") |
|
else: |
|
print(f"Skip '{k}': not a tensor") |
|
|
|
missing_keys, unexpected_keys = model.load_state_dict(filtered_state_dict, strict=False) |
|
|
|
if missing_keys: |
|
print("⚠️ Missing keys:", missing_keys) |
|
if unexpected_keys: |
|
print("ℹ️ Unexpected keys:", unexpected_keys) |
|
|
|
model.eval() |
|
return model |
|
|
|
|
|
def dpt_sep_process(wav_path, model=None, outfilename=None): |
|
"""進行語音分離處理""" |
|
if model is None: |
|
model = load_dpt_model() |
|
|
|
x, sr = torchaudio.load(wav_path) |
|
x = x.cpu() |
|
|
|
with torch.no_grad(): |
|
est_sources = model(x) |
|
|
|
est_sources = est_sources.squeeze(0) |
|
sep_1, sep_2 = est_sources |
|
|
|
|
|
max_abs = x[0].abs().max().item() |
|
sep_1 = sep_1 * max_abs / sep_1.abs().max().item() |
|
sep_2 = sep_2 * max_abs / sep_2.abs().max().item() |
|
|
|
|
|
sep_1 = sep_1.unsqueeze(0) |
|
sep_2 = sep_2.unsqueeze(0) |
|
|
|
|
|
if outfilename is not None: |
|
torchaudio.save(outfilename.replace('.wav', '_sep1.wav'), sep_1, sr) |
|
torchaudio.save(outfilename.replace('.wav', '_sep2.wav'), sep_2, sr) |
|
torchaudio.save(outfilename.replace('.wav', '_mix.wav'), x, sr) |
|
else: |
|
torchaudio.save(wav_path.replace('.wav', '_sep1.wav'), sep_1, sr) |
|
torchaudio.save(wav_path.replace('.wav', '_sep2.wav'), sep_2, sr) |
|
|
|
|
|
if __name__ == '__main__': |
|
print("This module should be used via Flask or Gradio.") |