ameerazam08's picture
Upload folder using huggingface_hub
79cf5f5 verified
import pathlib
import click
import librosa
import numpy as np
import parselmouth as pm
import textgrid as tg
import tqdm
@click.command(help='Enhance and finish the TextGrids')
@click.option('--wavs', required=True, help='Path to the segments directory')
@click.option('--dictionary', required=True, help='Path to the dictionary file')
@click.option('--src', required=True, help='Path to the raw TextGrids directory')
@click.option('--dst', required=True, help='Path to the final TextGrids directory')
@click.option('--f0_min', type=float, default=40., show_default=True, help='Minimum value of pitch')
@click.option('--f0_max', type=float, default=1100., show_default=True, help='Maximum value of pitch')
@click.option('--br_len', type=float, default=0.1, show_default=True,
help='Minimum length of breath in seconds')
@click.option('--br_db', type=float, default=-60., show_default=True,
help='Threshold of RMS in dB for detecting breath')
@click.option('--br_centroid', type=float, default=2000., show_default=True,
help='Threshold of spectral centroid in Hz for detecting breath')
@click.option('--time_step', type=float, default=0.005, show_default=True,
help='Time step for feature extraction')
@click.option('--min_space', type=float, default=0.04, show_default=True,
help='Minimum length of space in seconds')
@click.option('--voicing_thresh_vowel', type=float, default=0.45, show_default=True,
help='Threshold of voicing for fixing long utterances')
@click.option('--voicing_thresh_breath', type=float, default=0.6, show_default=True,
help='Threshold of voicing for detecting breath')
@click.option('--br_win_sz', type=float, default=0.05, show_default=True,
help='Size of sliding window in seconds for detecting breath')
def enhance_tg(
wavs, dictionary, src, dst,
f0_min, f0_max, br_len, br_db, br_centroid,
time_step, min_space, voicing_thresh_vowel, voicing_thresh_breath, br_win_sz
):
wavs = pathlib.Path(wavs)
dict_path = pathlib.Path(dictionary)
src = pathlib.Path(src)
dst = pathlib.Path(dst)
dst.mkdir(parents=True, exist_ok=True)
with open(dict_path, 'r', encoding='utf8') as f:
rules = [ln.strip().split('\t') for ln in f.readlines()]
dictionary = {}
phoneme_set = set()
for r in rules:
phonemes = r[1].split()
dictionary[r[0]] = phonemes
phoneme_set.update(phonemes)
filelist = list(wavs.glob('*.wav'))
for wavfile in tqdm.tqdm(filelist):
tgfile = src / wavfile.with_suffix('.TextGrid').name
textgrid = tg.TextGrid()
textgrid.read(str(tgfile))
words = textgrid[0]
phones = textgrid[1]
sound = pm.Sound(str(wavfile))
f0_voicing_breath = sound.to_pitch_ac(
time_step=time_step,
voicing_threshold=voicing_thresh_breath,
pitch_floor=f0_min,
pitch_ceiling=f0_max,
).selected_array['frequency']
f0_voicing_vowel = sound.to_pitch_ac(
time_step=time_step,
voicing_threshold=voicing_thresh_vowel,
pitch_floor=f0_min,
pitch_ceiling=f0_max,
).selected_array['frequency']
y, sr = librosa.load(wavfile, sr=24000, mono=True)
hop_size = int(time_step * sr)
spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr, n_fft=2048, hop_length=hop_size).squeeze(0)
# Fix long utterances
i = j = 0
while i < len(words):
word = words[i]
phone = phones[j]
if word.mark is not None and word.mark != '':
i += 1
j += len(dictionary[word.mark])
continue
if i == 0:
i += 1
j += 1
continue
prev_word = words[i - 1]
prev_phone = phones[j - 1]
# Extend length of long utterances
while word.minTime < word.maxTime - time_step:
pos = min(f0_voicing_vowel.shape[0] - 1, int(word.minTime / time_step))
if f0_voicing_vowel[pos] < f0_min:
break
prev_word.maxTime += time_step
prev_phone.maxTime += time_step
word.minTime += time_step
phone.minTime += time_step
i += 1
j += 1
# Detect aspiration
i = j = 0
while i < len(words):
word = words[i]
phone = phones[j]
if word.mark is not None and word.mark != '':
i += 1
j += len(dictionary[word.mark])
continue
if word.maxTime - word.minTime < br_len:
i += 1
j += 1
continue
ap_ranges = []
br_start = None
win_pos = word.minTime
while win_pos + br_win_sz <= word.maxTime:
all_noisy = (f0_voicing_breath[
int(win_pos / time_step): int((win_pos + br_win_sz) / time_step)] < f0_min).all()
rms_db = 20 * np.log10(
np.clip(sound.get_rms(from_time=win_pos, to_time=win_pos + br_win_sz), a_min=1e-12, a_max=1))
# print(win_pos, win_pos + br_win_sz, all_noisy, rms_db)
if all_noisy and rms_db >= br_db:
if br_start is None:
br_start = win_pos
else:
if br_start is not None:
br_end = win_pos + br_win_sz - time_step
if br_end - br_start >= br_len:
centroid = spectral_centroid[int(br_start / time_step): int(br_end / time_step)].mean()
if centroid >= br_centroid:
ap_ranges.append((br_start, br_end))
br_start = None
win_pos = br_end
win_pos += time_step
if br_start is not None:
br_end = win_pos + br_win_sz - time_step
if br_end - br_start >= br_len:
centroid = spectral_centroid[int(br_start / time_step): int(br_end / time_step)].mean()
if centroid >= br_centroid:
ap_ranges.append((br_start, br_end))
# print(ap_ranges)
if len(ap_ranges) == 0:
i += 1
j += 1
continue
words.removeInterval(word)
phones.removeInterval(phone)
if word.minTime < ap_ranges[0][0]:
words.add(minTime=word.minTime, maxTime=ap_ranges[0][0], mark=None)
phones.add(minTime=phone.minTime, maxTime=ap_ranges[0][0], mark=None)
i += 1
j += 1
for k, ap in enumerate(ap_ranges):
if k > 0:
words.add(minTime=ap_ranges[k - 1][1], maxTime=ap[0], mark=None)
phones.add(minTime=ap_ranges[k - 1][1], maxTime=ap[0], mark=None)
i += 1
j += 1
words.add(minTime=ap[0], maxTime=min(word.maxTime, ap[1]), mark='AP')
phones.add(minTime=ap[0], maxTime=min(word.maxTime, ap[1]), mark='AP')
i += 1
j += 1
if ap_ranges[-1][1] < word.maxTime:
words.add(minTime=ap_ranges[-1][1], maxTime=word.maxTime, mark=None)
phones.add(minTime=ap_ranges[-1][1], maxTime=phone.maxTime, mark=None)
i += 1
j += 1
# Remove short spaces
i = j = 0
while i < len(words):
word = words[i]
phone = phones[j]
if word.mark is not None and word.mark != '':
i += 1
j += (1 if word.mark == 'AP' else len(dictionary[word.mark]))
continue
if word.maxTime - word.minTime >= min_space:
word.mark = 'SP'
phone.mark = 'SP'
i += 1
j += 1
continue
if i == 0:
if len(words) >= 2:
words[i + 1].minTime = word.minTime
phones[j + 1].minTime = phone.minTime
words.removeInterval(word)
phones.removeInterval(phone)
else:
break
elif i == len(words) - 1:
if len(words) >= 2:
words[i - 1].maxTime = word.maxTime
phones[j - 1].maxTime = phone.maxTime
words.removeInterval(word)
phones.removeInterval(phone)
else:
break
else:
words[i - 1].maxTime = words[i + 1].minTime = (word.minTime + word.maxTime) / 2
phones[j - 1].maxTime = phones[j + 1].minTime = (phone.minTime + phone.maxTime) / 2
words.removeInterval(word)
phones.removeInterval(phone)
textgrid.write(str(dst / tgfile.name))
if __name__ == '__main__':
enhance_tg()