Spaces:
Running
Running
| # Adapted from https://github.com/SYSTRAN/faster-whisper/blob/master/faster_whisper/vad.py | |
| from faster_whisper.vad import VadOptions, get_vad_model | |
| import numpy as np | |
| from typing import BinaryIO, Union, List, Optional, Tuple | |
| import warnings | |
| import faster_whisper | |
| from faster_whisper.transcribe import SpeechTimestampsMap, Segment | |
| import gradio as gr | |
| class SileroVAD: | |
| def __init__(self): | |
| self.sampling_rate = 16000 | |
| self.window_size_samples = 512 | |
| self.model = None | |
| def run(self, | |
| audio: Union[str, BinaryIO, np.ndarray], | |
| vad_parameters: VadOptions, | |
| progress: gr.Progress = gr.Progress() | |
| ) -> Tuple[np.ndarray, List[dict]]: | |
| """ | |
| Run VAD | |
| Parameters | |
| ---------- | |
| audio: Union[str, BinaryIO, np.ndarray] | |
| Audio path or file binary or Audio numpy array | |
| vad_parameters: | |
| Options for VAD processing. | |
| progress: gr.Progress | |
| Indicator to show progress directly in gradio. | |
| Returns | |
| ---------- | |
| np.ndarray | |
| Pre-processed audio with VAD | |
| List[dict] | |
| Chunks of speeches to be used to restore the timestamps later | |
| """ | |
| sampling_rate = self.sampling_rate | |
| if not isinstance(audio, np.ndarray): | |
| audio = faster_whisper.decode_audio(audio, sampling_rate=sampling_rate) | |
| duration = audio.shape[0] / sampling_rate | |
| duration_after_vad = duration | |
| if vad_parameters is None: | |
| vad_parameters = VadOptions() | |
| elif isinstance(vad_parameters, dict): | |
| vad_parameters = VadOptions(**vad_parameters) | |
| speech_chunks = self.get_speech_timestamps( | |
| audio=audio, | |
| vad_options=vad_parameters, | |
| progress=progress | |
| ) | |
| audio = self.collect_chunks(audio, speech_chunks) | |
| duration_after_vad = audio.shape[0] / sampling_rate | |
| return audio, speech_chunks | |
| def get_speech_timestamps( | |
| self, | |
| audio: np.ndarray, | |
| vad_options: Optional[VadOptions] = None, | |
| progress: gr.Progress = gr.Progress(), | |
| **kwargs, | |
| ) -> List[dict]: | |
| """This method is used for splitting long audios into speech chunks using silero VAD. | |
| Args: | |
| audio: One dimensional float array. | |
| vad_options: Options for VAD processing. | |
| kwargs: VAD options passed as keyword arguments for backward compatibility. | |
| progress: Gradio progress to indicate progress. | |
| Returns: | |
| List of dicts containing begin and end samples of each speech chunk. | |
| """ | |
| if self.model is None: | |
| self.update_model() | |
| if vad_options is None: | |
| vad_options = VadOptions(**kwargs) | |
| threshold = vad_options.threshold | |
| min_speech_duration_ms = vad_options.min_speech_duration_ms | |
| max_speech_duration_s = vad_options.max_speech_duration_s | |
| min_silence_duration_ms = vad_options.min_silence_duration_ms | |
| window_size_samples = self.window_size_samples | |
| speech_pad_ms = vad_options.speech_pad_ms | |
| sampling_rate = 16000 | |
| min_speech_samples = sampling_rate * min_speech_duration_ms / 1000 | |
| speech_pad_samples = sampling_rate * speech_pad_ms / 1000 | |
| max_speech_samples = ( | |
| sampling_rate * max_speech_duration_s | |
| - window_size_samples | |
| - 2 * speech_pad_samples | |
| ) | |
| min_silence_samples = sampling_rate * min_silence_duration_ms / 1000 | |
| min_silence_samples_at_max_speech = sampling_rate * 98 / 1000 | |
| audio_length_samples = len(audio) | |
| state, context = self.model.get_initial_states(batch_size=1) | |
| speech_probs = [] | |
| for current_start_sample in range(0, audio_length_samples, window_size_samples): | |
| progress(current_start_sample/audio_length_samples, desc="Detecting speeches only using VAD...") | |
| chunk = audio[current_start_sample: current_start_sample + window_size_samples] | |
| if len(chunk) < window_size_samples: | |
| chunk = np.pad(chunk, (0, int(window_size_samples - len(chunk)))) | |
| speech_prob, state, context = self.model(chunk, state, context, sampling_rate) | |
| speech_probs.append(speech_prob) | |
| triggered = False | |
| speeches = [] | |
| current_speech = {} | |
| neg_threshold = threshold - 0.15 | |
| # to save potential segment end (and tolerate some silence) | |
| temp_end = 0 | |
| # to save potential segment limits in case of maximum segment size reached | |
| prev_end = next_start = 0 | |
| for i, speech_prob in enumerate(speech_probs): | |
| if (speech_prob >= threshold) and temp_end: | |
| temp_end = 0 | |
| if next_start < prev_end: | |
| next_start = window_size_samples * i | |
| if (speech_prob >= threshold) and not triggered: | |
| triggered = True | |
| current_speech["start"] = window_size_samples * i | |
| continue | |
| if ( | |
| triggered | |
| and (window_size_samples * i) - current_speech["start"] > max_speech_samples | |
| ): | |
| if prev_end: | |
| current_speech["end"] = prev_end | |
| speeches.append(current_speech) | |
| current_speech = {} | |
| # previously reached silence (< neg_thres) and is still not speech (< thres) | |
| if next_start < prev_end: | |
| triggered = False | |
| else: | |
| current_speech["start"] = next_start | |
| prev_end = next_start = temp_end = 0 | |
| else: | |
| current_speech["end"] = window_size_samples * i | |
| speeches.append(current_speech) | |
| current_speech = {} | |
| prev_end = next_start = temp_end = 0 | |
| triggered = False | |
| continue | |
| if (speech_prob < neg_threshold) and triggered: | |
| if not temp_end: | |
| temp_end = window_size_samples * i | |
| # condition to avoid cutting in very short silence | |
| if (window_size_samples * i) - temp_end > min_silence_samples_at_max_speech: | |
| prev_end = temp_end | |
| if (window_size_samples * i) - temp_end < min_silence_samples: | |
| continue | |
| else: | |
| current_speech["end"] = temp_end | |
| if ( | |
| current_speech["end"] - current_speech["start"] | |
| ) > min_speech_samples: | |
| speeches.append(current_speech) | |
| current_speech = {} | |
| prev_end = next_start = temp_end = 0 | |
| triggered = False | |
| continue | |
| if ( | |
| current_speech | |
| and (audio_length_samples - current_speech["start"]) > min_speech_samples | |
| ): | |
| current_speech["end"] = audio_length_samples | |
| speeches.append(current_speech) | |
| for i, speech in enumerate(speeches): | |
| if i == 0: | |
| speech["start"] = int(max(0, speech["start"] - speech_pad_samples)) | |
| if i != len(speeches) - 1: | |
| silence_duration = speeches[i + 1]["start"] - speech["end"] | |
| if silence_duration < 2 * speech_pad_samples: | |
| speech["end"] += int(silence_duration // 2) | |
| speeches[i + 1]["start"] = int( | |
| max(0, speeches[i + 1]["start"] - silence_duration // 2) | |
| ) | |
| else: | |
| speech["end"] = int( | |
| min(audio_length_samples, speech["end"] + speech_pad_samples) | |
| ) | |
| speeches[i + 1]["start"] = int( | |
| max(0, speeches[i + 1]["start"] - speech_pad_samples) | |
| ) | |
| else: | |
| speech["end"] = int( | |
| min(audio_length_samples, speech["end"] + speech_pad_samples) | |
| ) | |
| return speeches | |
| def update_model(self): | |
| self.model = get_vad_model() | |
| def collect_chunks(audio: np.ndarray, chunks: List[dict]) -> np.ndarray: | |
| """Collects and concatenates audio chunks.""" | |
| if not chunks: | |
| return np.array([], dtype=np.float32) | |
| return np.concatenate([audio[chunk["start"]: chunk["end"]] for chunk in chunks]) | |
| def format_timestamp( | |
| seconds: float, | |
| always_include_hours: bool = False, | |
| decimal_marker: str = ".", | |
| ) -> str: | |
| assert seconds >= 0, "non-negative timestamp expected" | |
| milliseconds = round(seconds * 1000.0) | |
| hours = milliseconds // 3_600_000 | |
| milliseconds -= hours * 3_600_000 | |
| minutes = milliseconds // 60_000 | |
| milliseconds -= minutes * 60_000 | |
| seconds = milliseconds // 1_000 | |
| milliseconds -= seconds * 1_000 | |
| hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else "" | |
| return ( | |
| f"{hours_marker}{minutes:02d}:{seconds:02d}{decimal_marker}{milliseconds:03d}" | |
| ) | |
| def restore_speech_timestamps( | |
| self, | |
| segments: List[dict], | |
| speech_chunks: List[dict], | |
| sampling_rate: Optional[int] = None, | |
| ) -> List[dict]: | |
| if sampling_rate is None: | |
| sampling_rate = self.sampling_rate | |
| ts_map = SpeechTimestampsMap(speech_chunks, sampling_rate) | |
| for segment in segments: | |
| segment["start"] = ts_map.get_original_time(segment["start"]) | |
| segment["end"] = ts_map.get_original_time(segment["end"]) | |
| return segments | |