# MIT License # # Copyright (c) 2023 CNRS # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from pyannote.audio import Pipeline, Audio import torch import os import threading import time class EndpointHandler: def __init__(self, path=""): # initialize pretrained pipeline print("-----------------------------------") print(f"\nPATH: {path}\n") print(f"\nls {path}: {os.listdir(path)}") print("-----------------------------------") self._pipeline = Pipeline.from_pretrained("collinbarnwell/pyannote-speaker-diarization-31") HYPER_PARAMETERS = { "segmentation": { "min_duration_off": 3.0, } } self._pipeline.instantiate(HYPER_PARAMETERS) # send pipeline to GPU if available if torch.cuda.is_available(): self._pipeline.to(torch.device("cuda")) # initialize audio reader self._io = Audio() def __call__(self, data): inputs = data.pop("inputs", data) waveform = torch.tensor(inputs["waveform"]) sample_rate = inputs["sample_rate"] parameters = data.pop("parameters", dict()) # Container for storing diarization result diarization_result = {} def diarize(): nonlocal diarization_result diarization = self._pipeline( {"waveform": waveform, "sample_rate": sample_rate}, **parameters ) diarization_result = [ { "speaker": speaker, "start": f"{turn.start:.3f}", "end": f"{turn.end:.3f}", } for turn, _, speaker in diarization.itertracks(yield_label=True) ] # Running diarization in a separate thread diarization_thread = threading.Thread(target=diarize) diarization_thread.start() # Wait for the diarization to complete or timeout diarization_thread.join(timeout=298) # Check if the thread is still alive (indicating a timeout occurred) if diarization_thread.is_alive(): print("Diarization timed out") # Handle the timeout case, maybe by raising an error or a warning raise TimeoutError("Diarization process exceeded time limit.") return {"diarization": diarization_result}