File size: 6,070 Bytes
05b4fca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import shutil
import argparse
import numpy as np
import soundfile as sf
import pyroomacoustics as pra
from glob import glob
from tqdm import tqdm
SEED = 100
np.random.seed(SEED)
T60_RANGE = [0.4, 1.0]
SNR_RANGE = [0, 20]
DIM_RANGE = [5, 15, 5, 15, 2, 6]
MIN_DISTANCE_TO_WALL = 1
MIC_ARRAY_RADIUS = 0.16
TARGET_T60_SHAPE = {"CI": 0.08, "HA": 0.2}
TARGET_T60_SHAPE = {"CI": 0.10, "HA": 0.2}
TARGETS_CROP = {"CI": 16e-3, "HA": 40e-3}
NB_SAMPLES_PER_ROOM = 1
CHANNELS = 1
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--wsj0_dir', type=str, required=True, help='Path to the WSJ0 directory which should contain subdirectories "si_dt_05", "si_tr_s" and "si_et_05".')
parser.add_argument('--target_dir', type=str, required=True, help='Path to the target directory for saving WSJ0-REVERB.')
args = parser.parse_args()
def obtain_clean_file(speech_list, i_sample, sample_rate=16000):
speech, speech_sr = sf.read(speech_list[i_sample])
speech_basename = os.path.basename(speech_list[i_sample])
assert speech_sr == sample_rate, f"wrong speech sampling rate here: expected {sample_rate} got {speech_sr}"
return speech.squeeze(), speech_sr, speech_basename[: -4]
splits = ['valid', 'train', 'test']
dic_split = {"valid": "si_dt_05", "train": "si_tr_s", "test": "si_et_05"}
speech_lists = {split:sorted(glob(f"{os.path.join(args.wsj0_dir, dic_split[split])}/**/*.wav")) for split in splits}
sample_rate = 16000
output_dir = args.target_dir
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
for i_split, split in enumerate(splits):
print("Processing split n° {}: {}...".format(i_split+1, split))
reverberant_output_dir = os.path.join(output_dir, "audio", split, "reverb")
dry_output_dir = os.path.join(output_dir, "audio", split, "anechoic")
noisy_reverberant_output_dir = os.path.join(output_dir, "audio", split, "noisy_reverb")
if split == "test":
unauralized_output_dir = os.path.join(output_dir, "audio", split, "unauralized")
os.makedirs(reverberant_output_dir, exist_ok=True)
os.makedirs(dry_output_dir, exist_ok=True)
if split == "test":
os.makedirs(unauralized_output_dir, exist_ok=True)
speech_list = speech_lists[split]
speech_dir = None
real_nb_samples = len(speech_list)
for i_sample in tqdm(range(real_nb_samples)):
if not i_sample % NB_SAMPLES_PER_ROOM: #Generate new room
t60 = np.random.uniform(T60_RANGE[0], T60_RANGE[1]) #Draw T60
room_dim = np.array([ np.random.uniform(DIM_RANGE[2*n], DIM_RANGE[2*n+1]) for n in range(3) ]) #Draw Dimensions
center_mic_position = np.array([ np.random.uniform(MIN_DISTANCE_TO_WALL, room_dim[n] - MIN_DISTANCE_TO_WALL) for n in range(3) ]) #draw source position
source_position = np.array([ np.random.uniform(MIN_DISTANCE_TO_WALL, room_dim[n] - MIN_DISTANCE_TO_WALL) for n in range(3) ]) #draw source position
mic_array_2d = pra.beamforming.circular_2D_array(center_mic_position[: -1], CHANNELS, phi0=0, radius=MIC_ARRAY_RADIUS) # Compute microphone array
mic_array = np.pad(mic_array_2d, ((0, 1), (0, 0)), mode="constant", constant_values=center_mic_position[-1])
### Reverberant Room
e_absorption, max_order = pra.inverse_sabine(t60, room_dim) #Compute absorption coeff
reverberant_room = pra.ShoeBox(
room_dim, fs=16000, materials=pra.Material(e_absorption), max_order=min(3, max_order)
) # Create room
reverberant_room.set_ray_tracing()
reverberant_room.add_microphone_array(mic_array) # Add microphone array
# Pick unauralized files
speech, speech_sr, speech_basename = obtain_clean_file(speech_list, i_sample, sample_rate=sample_rate)
# Generate reverberant room
reverberant_room.add_source(source_position, signal=speech)
reverberant_room.compute_rir()
reverberant_room.simulate()
t60_real = np.mean(reverberant_room.measure_rt60()).squeeze()
reverberant = np.stack(reverberant_room.mic_array.signals).swapaxes(0, 1)
e_absorption_dry = 0.99 #For Neural Networks OK but clearly not for WPE
dry_room = pra.ShoeBox(
room_dim, fs=16000, materials=pra.Material(e_absorption_dry), max_order=0
) # Create room
dry_room.add_microphone_array(mic_array) # Add microphone array
# Generate dry room
dry_room.add_source(source_position, signal=speech)
dry_room.compute_rir()
dry_room.simulate()
t60_real_dry = np.mean(dry_room.measure_rt60()).squeeze()
rir_dry = dry_room.rir
dry = np.stack(dry_room.mic_array.signals).swapaxes(0, 1)
dry = np.pad(dry, ((0, int(.5*sample_rate)), (0, 0)), mode="constant", constant_values=0) #Add 1 second of silence after dry (because very dry) so that the reverb is not cut, and all samples have same length
min_len_sample = min(reverberant.shape[0], dry.shape[0])
dry = dry[: min_len_sample]
reverberant = reverberant[: min_len_sample]
output_scaling = np.max(reverberant) / .9
drr = 10*np.log10( np.mean(dry**2) / (np.mean(reverberant**2) + 1e-8) + 1e-8 )
output_filename = f"{speech_basename}_{i_sample//NB_SAMPLES_PER_ROOM}_{t60_real:.2f}_{drr:.1f}.wav"
sf.write(os.path.join(dry_output_dir, output_filename), 1/output_scaling*dry, samplerate=sample_rate)
sf.write(os.path.join(reverberant_output_dir, output_filename), 1/output_scaling*reverberant, samplerate=sample_rate)
if split == "test":
sf.write(os.path.join(unauralized_output_dir, output_filename), speech, samplerate=sample_rate)
|