Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import zipfile | |
| import os | |
| import shutil | |
| from pyannote.audio import Pipeline | |
| import torch | |
| # Set up the directory for processing | |
| TEMP_DIR = "temp_audio" | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| # Initialize the pyannote.audio pipeline | |
| pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", | |
| use_auth_token=os.getenv("HF_TOKEN") | |
| ) | |
| # Move pipeline to GPU if available | |
| if torch.cuda.is_available(): | |
| pipeline.to(torch.device("cuda")) | |
| def process_audio_zip(file_info): | |
| # Unzip the uploaded file | |
| with zipfile.ZipFile(file_info, 'r') as zip_ref: | |
| zip_ref.extractall(TEMP_DIR) | |
| speaker1_dir = os.path.join(TEMP_DIR, "speaker1") | |
| speaker2_dir = os.path.join(TEMP_DIR, "speaker2") | |
| os.makedirs(speaker1_dir, exist_ok=True) | |
| os.makedirs(speaker2_dir, exist_ok=True) | |
| # Process each audio file in the temporary directory | |
| for filename in os.listdir(TEMP_DIR): | |
| if filename.endswith(".wav"): | |
| file_path = os.path.join(TEMP_DIR, filename) | |
| # Run the diarization pipeline | |
| diarization = pipeline(file_path) | |
| # Determine if the audio is mostly from speaker1 or speaker2 | |
| total_duration = {1: 0.0, 2: 0.0} | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| total_duration[speaker] += turn.duration | |
| # Move file to the corresponding speaker directory | |
| dominant_speaker = 1 if total_duration[1] >= total_duration[2] else 2 | |
| if dominant_speaker == 1: | |
| shutil.move(file_path, os.path.join(speaker1_dir, filename)) | |
| else: | |
| shutil.move(file_path, os.path.join(speaker2_dir, filename)) | |
| # Zip the results | |
| speaker1_zip = "speaker1.zip" | |
| speaker2_zip = "speaker2.zip" | |
| def zipdir(path, ziph): | |
| # Zip the directories | |
| for root, dirs, files in os.walk(path): | |
| for file in files: | |
| ziph.write(os.path.join(root, file), | |
| os.path.relpath(os.path.join(root, file), | |
| os.path.join(path, '..'))) | |
| with zipfile.ZipFile(speaker1_zip, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| zipdir(speaker1_dir, zipf) | |
| with zipfile.ZipFile(speaker2_zip, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| zipdir(speaker2_dir, zipf) | |
| # Clean up the temporary directory | |
| shutil.rmtree(TEMP_DIR) | |
| return speaker1_zip, speaker2_zip | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=process_audio_zip, | |
| inputs=gr.File(type="filepath"), | |
| outputs=[ | |
| gr.File(label="Speaker 1 Audio"), | |
| gr.File(label="Speaker 2 Audio") | |
| ], | |
| title="Speaker Diarization", | |
| description="Upload a ZIP file containing audio files, and this will return two ZIP files containing diarized audio for each speaker." | |
| ) | |
| iface.launch() |