# MIT License | |
# | |
# Copyright (c) 2023 CNRS | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
from pyannote.audio import Pipeline, Audio | |
import torch | |
import os | |
import threading | |
import time | |
class EndpointHandler: | |
def __init__(self, path=""): | |
# initialize pretrained pipeline | |
print("-----------------------------------") | |
print(f"\nPATH: {path}\n") | |
print(f"\nls {path}: {os.listdir(path)}") | |
print("-----------------------------------") | |
self._pipeline = Pipeline.from_pretrained("collinbarnwell/pyannote-speaker-diarization-31") | |
HYPER_PARAMETERS = { | |
"segmentation": { | |
"min_duration_off": 3.0, | |
} | |
} | |
self._pipeline.instantiate(HYPER_PARAMETERS) | |
# send pipeline to GPU if available | |
if torch.cuda.is_available(): | |
self._pipeline.to(torch.device("cuda")) | |
# initialize audio reader | |
self._io = Audio() | |
def __call__(self, data): | |
inputs = data.pop("inputs", data) | |
waveform = torch.tensor(inputs["waveform"]) | |
sample_rate = inputs["sample_rate"] | |
parameters = data.pop("parameters", dict()) | |
# Container for storing diarization result | |
diarization_result = {} | |
def diarize(): | |
nonlocal diarization_result | |
diarization = self._pipeline( | |
{"waveform": waveform, "sample_rate": sample_rate}, **parameters | |
) | |
diarization_result = [ | |
{ | |
"speaker": speaker, | |
"start": f"{turn.start:.3f}", | |
"end": f"{turn.end:.3f}", | |
} | |
for turn, _, speaker in diarization.itertracks(yield_label=True) | |
] | |
# Running diarization in a separate thread | |
diarization_thread = threading.Thread(target=diarize) | |
diarization_thread.start() | |
# Wait for the diarization to complete or timeout | |
diarization_thread.join(timeout=298) | |
# Check if the thread is still alive (indicating a timeout occurred) | |
if diarization_thread.is_alive(): | |
print("Diarization timed out") | |
# Handle the timeout case, maybe by raising an error or a warning | |
raise TimeoutError("Diarization process exceeded time limit.") | |
return {"diarization": diarization_result} |