import os import streamlit as st import tempfile import torch import transformers from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer import plotly.express as px import logging import warnings import whisper from pydub import AudioSegment import time import base64 import io import streamlit.components.v1 as components # Suppress warnings for a clean console logging.getLogger("torch").setLevel(logging.CRITICAL) logging.getLogger("transformers").setLevel(logging.CRITICAL) warnings.filterwarnings("ignore") os.environ["TOKENIZERS_PARALLELISM"] = "false" # Check if CUDA is available, otherwise use CPU device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Set Streamlit app layout st.set_page_config(layout="wide", page_title="Voice Based Sentiment Analysis") # Interface design st.title("🎙 Voice Based Sentiment Analysis") st.write("Detect emotions, sentiment, and sarcasm from your voice with state-of-the-art accuracy using OpenAI Whisper.") # Emotion Detection Function @st.cache_resource def get_emotion_classifier(): try: tokenizer = AutoTokenizer.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion", use_fast=True) model = AutoModelForSequenceClassification.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion") model = model.to(device) classifier = pipeline("text-classification", model=model, tokenizer=tokenizer, top_k=None, device=0 if torch.cuda.is_available() else -1) # Add a verification test to make sure the model is working test_result = classifier("I am happy today") print(f"Emotion classifier test: {test_result}") return classifier except Exception as e: print(f"Error loading emotion model: {str(e)}") st.error(f"Failed to load emotion model. Please check logs.") return None def perform_emotion_detection(text): try: if not text or len(text.strip()) < 3: return {}, "neutral", {}, "NEUTRAL" emotion_classifier = get_emotion_classifier() if emotion_classifier is None: st.error("Emotion classifier not available.") return {}, "neutral", {}, "NEUTRAL" emotion_results = emotion_classifier(text) print(f"Raw emotion classifier output: {emotion_results}") if not emotion_results or not isinstance(emotion_results, list) or not emotion_results[0]: st.error("Emotion classifier returned invalid or empty results.") return {}, "neutral", {}, "NEUTRAL" # Access the first inner list, which contains the emotion dictionaries emotion_results = emotion_results[0] emotion_map = { "joy": "😊", "anger": "😡", "disgust": "🤢", "fear": "😨", "sadness": "😭", "surprise": "😲" } positive_emotions = ["joy"] negative_emotions = ["anger", "disgust", "fear", "sadness"] neutral_emotions = ["surprise"] emotions_dict = {} for result in emotion_results: if isinstance(result, dict) and 'label' in result and 'score' in result: emotions_dict[result['label']] = result['score'] else: print(f"Invalid result format: {result}") if not emotions_dict: st.error("No valid emotions detected.") return {}, "neutral", {}, "NEUTRAL" filtered_emotions = {k: v for k, v in emotions_dict.items() if v > 0.01} if not filtered_emotions: filtered_emotions = emotions_dict top_emotion = max(filtered_emotions, key=filtered_emotions.get) top_score = filtered_emotions[top_emotion] if top_emotion in positive_emotions: sentiment = "POSITIVE" elif top_emotion in negative_emotions: sentiment = "NEGATIVE" else: competing_emotions = sorted(filtered_emotions.items(), key=lambda x: x[1], reverse=True)[:3] if len(competing_emotions) > 1: if (competing_emotions[0][0] in neutral_emotions and competing_emotions[1][0] not in neutral_emotions and competing_emotions[1][1] > 0.7 * competing_emotions[0][1]): top_emotion = competing_emotions[1][0] if top_emotion in positive_emotions: sentiment = "POSITIVE" elif top_emotion in negative_emotions: sentiment = "NEGATIVE" else: sentiment = "NEUTRAL" else: sentiment = "NEUTRAL" else: sentiment = "NEUTRAL" print(f"Text: {text[:50]}...") print(f"Top 3 emotions: {sorted(filtered_emotions.items(), key=lambda x: x[1], reverse=True)[:3]}") print(f"Selected top emotion: {top_emotion} ({filtered_emotions.get(top_emotion, 0):.3f})") print(f"Sentiment determined: {sentiment}") print(f"All emotions detected: {emotions_dict}") print(f"Filtered emotions: {filtered_emotions}") print(f"Emotion classification threshold: 0.01") return emotions_dict, top_emotion, emotion_map, sentiment except Exception as e: st.error(f"Emotion detection failed: {str(e)}") print(f"Exception in emotion detection: {str(e)}") return {}, "neutral", {}, "NEUTRAL" # Sarcasm Detection Function @st.cache_resource def get_sarcasm_classifier(): try: tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-irony", use_fast=True) model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-irony") model = model.to(device) classifier = pipeline("text-classification", model=model, tokenizer=tokenizer, device=0 if torch.cuda.is_available() else -1) # Add a verification test to ensure the model is working test_result = classifier("This is totally amazing") print(f"Sarcasm classifier test: {test_result}") return classifier except Exception as e: print(f"Error loading sarcasm model: {str(e)}") st.error(f"Failed to load sarcasm model. Please check logs.") return None def perform_sarcasm_detection(text): try: if not text or len(text.strip()) < 3: return False, 0.0 sarcasm_classifier = get_sarcasm_classifier() if sarcasm_classifier is None: st.error("Sarcasm classifier not available.") return False, 0.0 result = sarcasm_classifier(text)[0] is_sarcastic = result['label'] == "LABEL_1" sarcasm_score = result['score'] if is_sarcastic else 1 - result['score'] return is_sarcastic, sarcasm_score except Exception as e: st.error(f"Sarcasm detection failed: {str(e)}") return False, 0.0 # Validate audio quality def validate_audio(audio_path): try: sound = AudioSegment.from_file(audio_path) if sound.dBFS < -55: st.warning("Audio volume is too low. Please record or upload a louder audio.") return False if len(sound) < 1000: # Less than 1 second st.warning("Audio is too short. Please record a longer audio.") return False return True except: st.error("Invalid or corrupted audio file.") return False # Speech Recognition with Whisper @st.cache_resource def load_whisper_model(): try: model = whisper.load_model("large-v3") return model except Exception as e: print(f"Error loading Whisper model: {str(e)}") st.error(f"Failed to load Whisper model. Please check logs.") return None def transcribe_audio(audio_path, show_alternative=False): try: st.write(f"Processing audio file: {audio_path}") sound = AudioSegment.from_file(audio_path) st.write( f"Audio duration: {len(sound) / 1000:.2f}s, Sample rate: {sound.frame_rate}, Channels: {sound.channels}") # Convert to WAV format (16kHz, mono) for Whisper temp_wav_path = os.path.join(tempfile.gettempdir(), "temp_converted.wav") sound = sound.set_frame_rate(22050) sound = sound.set_channels(1) sound.export(temp_wav_path, format="wav") # Load Whisper model model = load_whisper_model() # Transcribe audio result = model.transcribe(temp_wav_path, language="en") main_text = result["text"].strip() # Clean up if os.path.exists(temp_wav_path): os.remove(temp_wav_path) # Whisper doesn't provide alternatives, so return empty list if show_alternative: return main_text, [] return main_text except Exception as e: st.error(f"Transcription failed: {str(e)}") return "", [] if show_alternative else "" # Function to handle uploaded audio files def process_uploaded_audio(audio_file): if not audio_file: return None try: temp_dir = tempfile.gettempdir() ext = audio_file.name.split('.')[-1].lower() if ext not in ['wav', 'mp3', 'ogg']: st.error("Unsupported audio format. Please upload WAV, MP3, or OGG.") return None temp_file_path = os.path.join(temp_dir, f"uploaded_audio_{int(time.time())}.{ext}") with open(temp_file_path, "wb") as f: f.write(audio_file.getvalue()) if not validate_audio(temp_file_path): return None return temp_file_path except Exception as e: st.error(f"Error processing uploaded audio: {str(e)}") return None # Show model information def show_model_info(): st.sidebar.header("🧠 About the Models") model_tabs = st.sidebar.tabs(["Emotion", "Sarcasm", "Speech"]) with model_tabs[0]: st.markdown(""" *Emotion Model*: distilbert-base-uncased-emotion - Fine-tuned for six emotions (joy, anger, disgust, fear, sadness, surprise) - Architecture: DistilBERT base - High accuracy for basic emotion classification [🔍 Model Hub](https://huggingface.co/bhadresh-savani/distilbert-base-uncased-emotion) """) with model_tabs[1]: st.markdown(""" *Sarcasm Model*: cardiffnlp/twitter-roberta-base-irony - Trained on SemEval-2018 Task 3 (Twitter irony dataset) - Architecture: RoBERTa base - F1-score: 0.705 [🔍 Model Hub](https://huggingface.co/cardiffnlp/twitter-roberta-base-irony) """) with model_tabs[2]: st.markdown(""" *Speech Recognition*: OpenAI Whisper (large-v3) - State-of-the-art model for speech-to-text - Accuracy: ~5-10% WER on clean English audio - Robust to noise, accents, and varied conditions - Runs locally, no internet required *Tips*: Use good mic, reduce noise, speak clearly [🔍 Model Details](https://github.com/openai/whisper) """) # Custom audio recorder using HTML/JS def custom_audio_recorder(): st.warning("Browser-based recording requires microphone access and a modern browser. If recording fails, try uploading an audio file instead.") audio_recorder_html = """
""" return components.html(audio_recorder_html, height=150) # Function to display analysis results def display_analysis_results(transcribed_text): st.session_state.debug_info = st.session_state.get('debug_info', []) st.session_state.debug_info.append(f"Processing text: {transcribed_text[:50]}...") st.session_state.debug_info = st.session_state.debug_info[-100:] # Keep last 100 entries emotions_dict, top_emotion, emotion_map, sentiment = perform_emotion_detection(transcribed_text) is_sarcastic, sarcasm_score = perform_sarcasm_detection(transcribed_text) # Add results to debug info st.session_state.debug_info.append(f"Top emotion: {top_emotion}, Sentiment: {sentiment}") st.session_state.debug_info.append(f"Sarcasm: {is_sarcastic}, Score: {sarcasm_score:.3f}") st.header("Transcribed Text") st.text_area("Text", transcribed_text, height=150, disabled=True, help="The audio converted to text.") confidence_score = min(0.95, max(0.70, len(transcribed_text.split()) / 50)) st.caption(f"Estimated transcription confidence: {confidence_score:.2f} (based on text length)") st.header("Analysis Results") col1, col2 = st.columns([1, 2]) with col1: st.subheader("Sentiment") sentiment_icon = "👍" if sentiment == "POSITIVE" else "👎" if sentiment == "NEGATIVE" else "😐" st.markdown(f"{sentiment_icon} {sentiment.capitalize()}** (Based on {top_emotion})") st.info("Sentiment reflects the dominant emotion's tone.") st.subheader("Sarcasm") sarcasm_icon = "😏" if is_sarcastic else "😐" sarcasm_text = "Detected" if is_sarcastic else "Not Detected" st.markdown(f"{sarcasm_icon} {sarcasm_text}** (Score: {sarcasm_score:.3f})") st.info("Score indicates sarcasm confidence (0 to 1).") with col2: st.subheader("Emotions") if emotions_dict: st.markdown( f"*Dominant:* {emotion_map.get(top_emotion, '❓')} {top_emotion.capitalize()} (Score: {emotions_dict[top_emotion]:.3f})") sorted_emotions = sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True) top_emotions = sorted_emotions[:8] emotions = [e[0] for e in top_emotions] scores = [e[1] for e in top_emotions] fig = px.bar(x=emotions, y=scores, labels={'x': 'Emotion', 'y': 'Score'}, title="Top Emotions Distribution", color=emotions, color_discrete_sequence=px.colors.qualitative.Bold) fig.update_layout(yaxis_range=[0, 1], showlegend=False, title_font_size=14) st.plotly_chart(fig, use_container_width=True) else: st.write("No emotions detected.") with st.expander("Debug Information", expanded=False): st.write("Debugging information for troubleshooting:") for i, debug_line in enumerate(st.session_state.debug_info[-10:]): st.text(f"{i + 1}. {debug_line}") if emotions_dict: st.write("Raw emotion scores:") for emotion, score in sorted(emotions_dict.items(), key=lambda x: x[1], reverse=True): if score > 0.01: # Only show non-negligible scores st.text(f"{emotion}: {score:.4f}") with st.expander("Analysis Details", expanded=False): st.write(""" *How this works:* 1. *Speech Recognition*: Audio transcribed using OpenAI Whisper (large-v3) 2. *Emotion Analysis*: DistilBERT model trained for six emotions 3. *Sentiment Analysis*: Derived from dominant emotion 4. *Sarcasm Detection*: RoBERTa model for irony detection *Accuracy depends on*: - Audio quality - Speech clarity - Background noise - Speech patterns """) # Process base64 audio data def process_base64_audio(base64_data): try: base64_binary = base64_data.split(',')[1] binary_data = base64.b64decode(base64_binary) temp_dir = tempfile.gettempdir() temp_file_path = os.path.join(temp_dir, f"recording_{int(time.time())}.wav") with open(temp_file_path, "wb") as f: f.write(binary_data) if not validate_audio(temp_file_path): return None return temp_file_path except Exception as e: st.error(f"Error processing audio data: {str(e)}") return None # Main App Logic def main(): if 'debug_info' not in st.session_state: st.session_state.debug_info = [] tab1, tab2 = st.tabs(["📁 Upload Audio", "🎙 Record Audio"]) with tab1: st.header("Upload an Audio File") audio_file = st.file_uploader("Choose an audio file", type=["wav", "mp3", "ogg"], help="Upload an audio file for analysis") if audio_file: st.audio(audio_file.getvalue()) st.caption("🎧 Uploaded Audio Playback") upload_button = st.button("Analyze Upload", key="analyze_upload") if upload_button: with st.spinner('Analyzing audio with advanced precision...'): temp_audio_path = process_uploaded_audio(audio_file) if temp_audio_path: main_text, alternatives = transcribe_audio(temp_audio_path, show_alternative=True) if main_text: if alternatives: with st.expander("Alternative transcriptions detected", expanded=False): for i, alt in enumerate(alternatives[:3], 1): st.write(f"{i}. {alt}") display_analysis_results(main_text) else: st.error("Could not transcribe the audio. Please try again with clearer audio.") if os.path.exists(temp_audio_path): os.remove(temp_audio_path) with tab2: st.header("Record Your Voice") st.write("Use the recorder below to analyze your speech in real-time.") st.subheader("Browser-Based Recorder") st.write("Click the button below to start/stop recording.") audio_data = custom_audio_recorder() if audio_data: analyze_rec_button = st.button("Analyze Recording", key="analyze_rec") if analyze_rec_button: with st.spinner("Processing your recording..."): temp_audio_path = process_base64_audio(audio_data) if temp_audio_path: transcribed_text = transcribe_audio(temp_audio_path) if transcribed_text: display_analysis_results(transcribed_text) else: st.error("Could not transcribe the audio. Please try speaking more clearly.") if os.path.exists(temp_audio_path): os.remove(temp_audio_path) st.subheader("Manual Text Input") st.write("If recording doesn't work, you can type your text here:") manual_text = st.text_area("Enter text to analyze:", placeholder="Type what you want to analyze...") analyze_text_button = st.button("Analyze Text", key="analyze_manual") if analyze_text_button and manual_text: display_analysis_results(manual_text) show_model_info() if __name__ == "__main__": main()