Spaces:
Running
on
Zero
Running
on
Zero
import hashlib | |
import os | |
import urllib | |
from collections.abc import Callable | |
import numpy as np | |
import pandas as pd | |
import torch | |
from pyannote.audio import Model, Pipeline | |
from pyannote.audio.core.io import AudioFile | |
from pyannote.audio.pipelines import VoiceActivityDetection | |
from pyannote.audio.pipelines.utils import PipelineModel | |
from pyannote.core import Annotation, Segment, SlidingWindowFeature | |
from tqdm import tqdm | |
VAD_SEGMENTATION_URL = "https://whisperx.s3.eu-west-2.amazonaws.com/model_weights/segmentation/0b5b3216d60a2d32fc086b47ea8c67589aaeb26b7e07fcbe620d6d0b83e209ea/pytorch_model.bin" | |
pipeline = None | |
pipeline_name = "pyannote/voice-activity-detection" | |
def detect_voice_activity(waveform, pipe=None): | |
"""16khz""" | |
waveform = waveform.flatten().float()[None] | |
global pipeline | |
if pipe is not None: | |
pipeline = pipe | |
elif pipeline is None: | |
pipeline = Pipeline.from_pretrained(pipeline_name) | |
initial_params = { | |
"onset": 0.8, | |
"offset": 0.5, | |
"min_duration_on": 0, | |
"min_duration_off": 0.0, | |
} | |
pipeline.instantiate(initial_params) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
pipeline = pipeline.to(device) | |
vad = pipeline({"waveform": waveform, "sample_rate": 16000}) | |
segments = [ | |
(segment.start, segment.end) for segment in vad.get_timeline().support() | |
] | |
return segments | |
def load_vad_model( | |
device, | |
vad_onset=0.500, | |
vad_offset=0.363, | |
use_auth_token=None, | |
model_fp=None, | |
batch_size=32, | |
): | |
model_dir = torch.hub._get_torch_home() | |
os.makedirs(model_dir, exist_ok=True) | |
if model_fp is None: | |
model_fp = os.path.join(model_dir, "whisperx-vad-segmentation.bin") | |
if os.path.exists(model_fp) and not os.path.isfile(model_fp): | |
raise RuntimeError(f"{model_fp} exists and is not a regular file") | |
if not os.path.isfile(model_fp): | |
with ( | |
urllib.request.urlopen(VAD_SEGMENTATION_URL) as source, | |
open(model_fp, "wb") as output, | |
): | |
with tqdm( | |
total=int(source.info().get("Content-Length")), | |
ncols=80, | |
unit="iB", | |
unit_scale=True, | |
unit_divisor=1024, | |
) as loop: | |
while True: | |
buffer = source.read(8192) | |
if not buffer: | |
break | |
output.write(buffer) | |
loop.update(len(buffer)) | |
model_bytes = open(model_fp, "rb").read() | |
if hashlib.sha256(model_bytes).hexdigest() != VAD_SEGMENTATION_URL.split("/")[-2]: | |
raise RuntimeError( | |
"Model has been downloaded but the SHA256 checksum does not not match. Please retry loading the model." | |
) | |
vad_model = Model.from_pretrained(model_fp, use_auth_token=use_auth_token) | |
hyperparameters = { | |
"onset": vad_onset, | |
"offset": vad_offset, | |
"min_duration_on": 0.1, | |
"min_duration_off": 0.1, | |
} | |
vad_pipeline = VoiceActivitySegmentation( | |
segmentation=vad_model, device=torch.device(device), batch_size=batch_size | |
) | |
vad_pipeline.instantiate(hyperparameters) | |
return vad_pipeline | |
class Binarize: | |
"""Binarize detection scores using hysteresis thresholding, with min-cut operation | |
to ensure not segments are longer than max_duration. | |
Parameters | |
---------- | |
onset : float, optional | |
Onset threshold. Defaults to 0.5. | |
offset : float, optional | |
Offset threshold. Defaults to `onset`. | |
min_duration_on : float, optional | |
Remove active regions shorter than that many seconds. Defaults to 0s. | |
min_duration_off : float, optional | |
Fill inactive regions shorter than that many seconds. Defaults to 0s. | |
pad_onset : float, optional | |
Extend active regions by moving their start time by that many seconds. | |
Defaults to 0s. | |
pad_offset : float, optional | |
Extend active regions by moving their end time by that many seconds. | |
Defaults to 0s. | |
max_duration: float | |
The maximum length of an active segment, divides segment at timestamp with lowest score. | |
Reference | |
--------- | |
Gregory Gelly and Jean-Luc Gauvain. "Minimum Word Error Training of | |
RNN-based Voice Activity Detection", InterSpeech 2015. | |
Modified by Max Bain to include WhisperX's min-cut operation | |
https://arxiv.org/abs/2303.00747 | |
Pyannote-audio | |
""" | |
def __init__( | |
self, | |
onset: float = 0.5, | |
offset: float | None = None, | |
min_duration_on: float = 0.0, | |
min_duration_off: float = 0.0, | |
pad_onset: float = 0.0, | |
pad_offset: float = 0.0, | |
max_duration: float = float("inf"), | |
): | |
super().__init__() | |
self.onset = onset | |
self.offset = offset or onset | |
self.pad_onset = pad_onset | |
self.pad_offset = pad_offset | |
self.min_duration_on = min_duration_on | |
self.min_duration_off = min_duration_off | |
self.max_duration = max_duration | |
def __call__(self, scores: SlidingWindowFeature) -> Annotation: | |
"""Binarize detection scores | |
Parameters | |
---------- | |
scores : SlidingWindowFeature | |
Detection scores. | |
Returns | |
------- | |
active : Annotation | |
Binarized scores. | |
""" | |
num_frames, num_classes = scores.data.shape | |
frames = scores.sliding_window | |
timestamps = [frames[i].middle for i in range(num_frames)] | |
# annotation meant to store 'active' regions | |
active = Annotation() | |
for k, k_scores in enumerate(scores.data.T): | |
label = k if scores.labels is None else scores.labels[k] | |
# initial state | |
start = timestamps[0] | |
is_active = k_scores[0] > self.onset | |
curr_scores = [k_scores[0]] | |
curr_timestamps = [start] | |
t = start | |
for t, y in zip(timestamps[1:], k_scores[1:], strict=False): | |
# currently active | |
if is_active: | |
curr_duration = t - start | |
if curr_duration > self.max_duration: | |
search_after = len(curr_scores) // 2 | |
# divide segment | |
min_score_div_idx = search_after + np.argmin( | |
curr_scores[search_after:] | |
) | |
min_score_t = curr_timestamps[min_score_div_idx] | |
region = Segment( | |
start - self.pad_onset, min_score_t + self.pad_offset | |
) | |
active[region, k] = label | |
start = curr_timestamps[min_score_div_idx] | |
curr_scores = curr_scores[min_score_div_idx + 1 :] | |
curr_timestamps = curr_timestamps[min_score_div_idx + 1 :] | |
# switching from active to inactive | |
elif y < self.offset: | |
region = Segment(start - self.pad_onset, t + self.pad_offset) | |
active[region, k] = label | |
start = t | |
is_active = False | |
curr_scores = [] | |
curr_timestamps = [] | |
curr_scores.append(y) | |
curr_timestamps.append(t) | |
# currently inactive | |
else: | |
# switching from inactive to active | |
if y > self.onset: | |
start = t | |
is_active = True | |
# if active at the end, add final region | |
if is_active: | |
region = Segment(start - self.pad_onset, t + self.pad_offset) | |
active[region, k] = label | |
# because of padding, some active regions might be overlapping: merge them. | |
# also: fill same speaker gaps shorter than min_duration_off | |
if self.pad_offset > 0.0 or self.pad_onset > 0.0 or self.min_duration_off > 0.0: | |
if self.max_duration < float("inf"): | |
raise NotImplementedError("This would break current max_duration param") | |
active = active.support(collar=self.min_duration_off) | |
# remove tracks shorter than min_duration_on | |
if self.min_duration_on > 0: | |
for segment, track in list(active.itertracks()): | |
if segment.duration < self.min_duration_on: | |
del active[segment, track] | |
return active | |
class VoiceActivitySegmentation(VoiceActivityDetection): | |
def __init__( | |
self, | |
segmentation: PipelineModel = "pyannote/segmentation", | |
fscore: bool = False, | |
use_auth_token: str | None = None, | |
**inference_kwargs, | |
): | |
super().__init__( | |
segmentation=segmentation, | |
fscore=fscore, | |
use_auth_token=use_auth_token, | |
**inference_kwargs, | |
) | |
def apply(self, file: AudioFile, hook: Callable | None = None) -> Annotation: | |
"""Apply voice activity detection | |
Parameters | |
---------- | |
file : AudioFile | |
Processed file. | |
hook : callable, optional | |
Hook called after each major step of the pipeline with the following | |
signature: hook("step_name", step_artefact, file=file) | |
Returns | |
------- | |
speech : Annotation | |
Speech regions. | |
""" | |
# setup hook (e.g. for debugging purposes) | |
hook = self.setup_hook(file, hook=hook) | |
# apply segmentation model (only if needed) | |
# output shape is (num_chunks, num_frames, 1) | |
if self.training: | |
if self.CACHED_SEGMENTATION in file: | |
segmentations = file[self.CACHED_SEGMENTATION] | |
else: | |
segmentations = self._segmentation(file) | |
file[self.CACHED_SEGMENTATION] = segmentations | |
else: | |
segmentations: SlidingWindowFeature = self._segmentation(file) | |
return segmentations | |
def merge_vad( | |
vad_arr, pad_onset=0.0, pad_offset=0.0, min_duration_off=0.0, min_duration_on=0.0 | |
): | |
active = Annotation() | |
for k, vad_t in enumerate(vad_arr): | |
region = Segment(vad_t[0] - pad_onset, vad_t[1] + pad_offset) | |
active[region, k] = 1 | |
if pad_offset > 0.0 or pad_onset > 0.0 or min_duration_off > 0.0: | |
active = active.support(collar=min_duration_off) | |
# remove tracks shorter than min_duration_on | |
if min_duration_on > 0: | |
for segment, track in list(active.itertracks()): | |
if segment.duration < min_duration_on: | |
del active[segment, track] | |
active = active.for_json() | |
active_segs = pd.DataFrame([x["segment"] for x in active["content"]]) | |
return active_segs | |
def merge_chunks( | |
segments, | |
chunk_size, | |
onset: float = 0.5, | |
offset: float | None = None, | |
): | |
""" | |
Merge operation described in paper | |
""" | |
curr_end = 0 | |
merged_segments = [] | |
seg_idxs = [] | |
assert chunk_size > 0 | |
binarize = Binarize(max_duration=chunk_size, onset=onset, offset=offset) | |
segments = binarize(segments) | |
segments_list = [] | |
for speech_turn in segments.get_timeline(): | |
segments_list.append(Segment(speech_turn.start, speech_turn.end)) | |
if len(segments_list) == 0: | |
print("No active speech found in audio") | |
return [] | |
# assert segments_list, "segments_list is empty." | |
# Make sur the starting point is the start of the segment. | |
curr_start = segments_list[0].start | |
for seg in segments_list: | |
if seg.end - curr_start > chunk_size and curr_end - curr_start > 0: | |
merged_segments.append( | |
{ | |
"start": curr_start, | |
"end": curr_end, | |
} | |
) | |
curr_start = seg.start | |
seg_idxs = [] | |
curr_end = seg.end | |
seg_idxs.append((seg.start, seg.end)) | |
merged_segments.append( | |
{ | |
"start": curr_start, | |
"end": curr_end, | |
} | |
) | |
return merged_segments | |