| import streamlit as st |
| import whisper |
| from pyannote.audio import Pipeline |
| import time |
| import os |
| import logging |
| os.environ["TRANSFORMERS_CACHE"] = "C:\\Users\\Admin\\.cache\\Documents\\huggingface_cache" |
| |
| UPLOAD_FOLDER = 'upload' |
|
|
| |
| logging.basicConfig(filename="audio_transcription.log", |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s") |
| @st.cache_resource |
| def load_models(): |
| try: |
| |
| whisper_model = whisper.load_model("medium") |
| if whisper_model is None: |
| raise ValueError("Whisper model failed to load.") |
|
|
| |
| diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", |
| use_auth_token="YOUR_TOKEN") |
|
|
| |
| if diarization_pipeline is None: |
| raise ValueError("Diarization model failed to load.") |
|
|
| return whisper_model, diarization_pipeline |
|
|
| except Exception as e: |
| print(f"Error loading models: {e}") |
| st.error(f"Error loading models: {e}") |
| return None, None |
|
|
| |
| whisper_model, diarization_pipeline = load_models() |
|
|
| |
| def save_uploaded_file(uploaded_file): |
| |
| timestamp = time.strftime("%Y%m%d-%H%M%S") |
| file_extension = uploaded_file.name.split('.')[-1] |
| file_name = f"{timestamp}_{uploaded_file.name}" |
| |
| |
| file_path = os.path.join(UPLOAD_FOLDER, file_name) |
| with open(file_path, "wb") as f: |
| f.write(uploaded_file.getbuffer()) |
| |
| return file_path |
|
|
|
|
| |
| def process_audio(audio_file_path): |
| if whisper_model is None or diarization_pipeline is None: |
| st.error("Models are not loaded properly. Please check the logs.") |
| return None, None |
|
|
| try: |
| |
| logging.info(f"Started processing audio file: {audio_file_path}") |
| |
| audio = whisper.load_audio(audio_file_path) |
| audio = whisper.pad_or_trim(audio) |
|
|
| |
| transcription = whisper_model.transcribe(audio, word_timestamps=True) |
| text = transcription["text"] |
| word_timestamps = transcription["segments"] |
| detected_language = transcription["language"] |
| |
| |
| logging.info(f"Detected language: {detected_language}") |
|
|
| |
| diarization = diarization_pipeline({"uri": "audio", "audio": audio_file_path}) |
|
|
| |
| labeled_text = [] |
| |
| |
| current_word_index = 0 |
| for segment, _, speaker in diarization.itertracks(yield_label=True): |
| start = segment.start |
| end = segment.end |
| labeled_segment = f"[{speaker}] " |
|
|
| |
| while current_word_index < len(word_timestamps): |
| word_info = word_timestamps[current_word_index] |
| word_start = word_info["start"] |
| word_end = word_info["end"] |
| word_text = word_info["text"] |
|
|
| |
| if word_end <= end: |
| labeled_segment += word_text + " " |
| current_word_index += 1 |
| else: |
| break |
| |
| labeled_text.append(labeled_segment.strip()) |
| |
| logging.info(f"Speaker {speaker} spoke: {labeled_segment.strip()}") |
| |
| logging.info(f"Processing completed for: {audio_file_path}") |
|
|
| return labeled_text, detected_language |
|
|
| except Exception as e: |
| st.error(f"Error processing audio: {e}") |
| |
| logging.error(f"Error processing audio {audio_file_path}: {e}") |
| print(f"Error processing audio: {e}") |
| return None, None |
|
|
|
|
| |
| st.title("Multilingual Audio Transcription with Speaker Labels") |
| st.write("Select an audio file from the 'upload' folder to transcribe and detect speakers.") |
|
|
| |
| uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "m4a"]) |
| if uploaded_file is not None: |
| |
| audio_file_path = save_uploaded_file(uploaded_file) |
| |
| |
| st.write(f"File uploaded successfully: {audio_file_path}") |
|
|
|
|
| st.audio(audio_file_path, format="audio/wav") |
| |
| with st.spinner("Processing audio..."): |
| try: |
| labeled_text, detected_language = process_audio(audio_file_path) |
| if labeled_text is not None: |
| st.success("Processing complete!") |
|
|
| |
| st.subheader("Detected Language") |
| st.write(f"**{detected_language}**") |
|
|
| |
| st.subheader("Transcription with Speaker Labels") |
| for line in labeled_text: |
| st.write(line) |
| except Exception as e: |
| st.error(f"An error occurred: {e}") |
|
|
| |
| st.markdown("---") |
| st.markdown( |
| "Developed with using [Whisper](https://github.com/openai/whisper) and " |
| "[PyAnnote](https://github.com/pyannote/pyannote-audio)." |
| ) |
|
|