Spaces:
Runtime error
Runtime error
import torchaudio | |
import gradio as gr | |
from pyannote.audio import Pipeline | |
from pyannote.audio.pipelines.utils.hook import ProgressHook | |
import scipy.io.wavfile | |
import os | |
from huggingface_hub import HfApi | |
# Global variable to store the user's token | |
HUGGINGFACE_ACCESS_TOKEN = None | |
def perform_separation(audio_file_path: str): | |
global HUGGINGFACE_ACCESS_TOKEN | |
if not HUGGINGFACE_ACCESS_TOKEN: | |
return [], "Please log in with your HuggingFace account first." | |
# Instantiate the pipeline | |
try: | |
pipeline = Pipeline.from_pretrained( | |
"pyannote/speech-separation-ami-1.0", | |
use_auth_token=HUGGINGFACE_ACCESS_TOKEN, | |
) | |
except Exception as e: | |
return [], f"Error loading pipeline: {str(e)}" | |
waveform, sample_rate = torchaudio.load(audio_file_path) | |
# Run the pipeline | |
with ProgressHook() as hook: | |
diarization, sources = pipeline( | |
{"waveform": waveform, "sample_rate": sample_rate}, hook=hook | |
) | |
# Save separated sources to disk as SPEAKER_XX.wav files | |
output_file_paths = [] | |
for s, speaker in enumerate(diarization.labels()): | |
number_of_separated_sources = sources.data.shape[1] | |
if s >= number_of_separated_sources: | |
break | |
output_file_path = f"{speaker}.wav" | |
scipy.io.wavfile.write( | |
output_file_path, sample_rate, sources.data[:, s].numpy() | |
) | |
output_file_paths.append(output_file_path) | |
# Generate RTTM content | |
rttm_content = diarization.to_rttm() | |
return output_file_paths, rttm_content | |
def gradio_wrapper(audio_file_path: str, request: gr.Request): | |
global HUGGINGFACE_ACCESS_TOKEN | |
if not HUGGINGFACE_ACCESS_TOKEN: | |
return [""] * 10 + ["Please log in with your HuggingFace account first."] | |
output_file_paths, rttm_content = perform_separation(audio_file_path) | |
return output_file_paths + [""] * (10 - len(output_file_paths)) + [rttm_content] | |
def login(request: gr.Request): | |
global HUGGINGFACE_ACCESS_TOKEN | |
if request.username: | |
# User is authenticated | |
HUGGINGFACE_ACCESS_TOKEN = request.auth | |
return f"Welcome, {request.username}! You are now logged in." | |
else: | |
return "Please log in with your HuggingFace account to use this app." | |
with gr.Blocks() as demo: | |
gr.Markdown("## Speech Separation and Diarization") | |
gr.Markdown("Please log in with your HuggingFace account to use this app.") | |
login_status = gr.Markdown() | |
with gr.Row(): | |
input_audio = gr.Audio(label="Input Audio", type="filepath") | |
with gr.Row(): | |
submit_button = gr.Button("Process Audio") | |
outputs = [] | |
max_speakers = 10 | |
for i in range(max_speakers): | |
outputs.append(gr.Audio(label=f"Speaker {i+1}", type="filepath")) | |
rttm_output = gr.Textbox(label="RTTM Output") | |
demo.load(login, inputs=None, outputs=login_status) | |
submit_button.click( | |
gradio_wrapper, inputs=[input_audio], outputs=outputs + [rttm_output] | |
) | |
demo.launch(auth={"hf_oauth": True}) | |