# Patch missing HfFolder before gradio imports it import sys from unittest.mock import MagicMock try: from huggingface_hub import HfFolder except ImportError: import huggingface_hub huggingface_hub.HfFolder = MagicMock() sys.modules["huggingface_hub"].HfFolder = MagicMock() import gradio as gr import assemblyai as aai import librosa import soundfile as sf import torch import json import csv import os import tempfile import warnings from datetime import datetime from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch.nn.functional as F from docx import Document from reportlab.platypus import SimpleDocTemplate, Paragraph from reportlab.lib.styles import getSampleStyleSheet warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=RuntimeWarning) # ========================= # CONFIG # ========================= aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") device = "cuda" if torch.cuda.is_available() else "cpu" tokenizer = AutoTokenizer.from_pretrained( "j-hartmann/emotion-english-distilroberta-base" ) sentiment_model = AutoModelForSequenceClassification.from_pretrained( "j-hartmann/emotion-english-distilroberta-base" ) sentiment_model.to(device) sentiment_model.eval() # Maps model's 7 emotion classes to business-friendly labels EMOTION_LABELS = { 0: ("🔴", "Negative"), # Anger 1: ("🔴", "Negative"), # Disgust 2: ("🔴", "Negative"), # Fear 3: ("🟢", "Positive"), # Joy 4: ("🟡", "Neutral"), # Neutral 5: ("🔴", "Negative"), # Sadness 6: ("🟢", "Positive"), # Surprise } # ========================= # HELPERS # ========================= def format_time(ms): s = ms / 1000 return f"{int(s // 60):02d}:{int(s % 60):02d}" def split_into_chunks(text, chunk_size=200): """ Split text into equal fixed-character chunks. Breaks at the nearest space to avoid cutting mid-word. """ text = text.strip() if len(text) <= chunk_size: return [text] chunks = [] while len(text) > chunk_size: split_at = text.rfind(" ", 0, chunk_size) if split_at == -1: split_at = chunk_size chunks.append(text[:split_at].strip()) text = text[split_at:].strip() if text: chunks.append(text) return chunks def analyze_sentiment(text): inputs = tokenizer( text, return_tensors="pt", truncation=True, max_length=512, padding=True ).to(device) with torch.no_grad(): logits = sentiment_model(**inputs).logits probs = F.softmax(logits, dim=-1)[0] return torch.argmax(probs).item() def build_segments(transcript): speaker_map = {} counter = 1 segments = [] for u in transcript.utterances: raw = str(u.speaker) if raw not in speaker_map: speaker_map[raw] = counter counter += 1 segments.append({ "speaker": speaker_map[raw], "start": format_time(u.start or 0), "end": format_time(u.end or 0), "text": u.text, }) return segments # ========================= # MAIN PROCESS # ========================= def process_audio(file, speakers, language, state): if file is None: return "❌ No audio provided", "", "", state temp_wav = None try: audio, sr = librosa.load(file, sr=None, mono=True) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: sf.write(tmp.name, audio, sr) temp_wav = tmp.name config = aai.TranscriptionConfig( speaker_labels=True, speakers_expected=int(speakers) if speakers > 0 else None, language_code=None if language == "auto" else language, speech_model=aai.SpeechModel.best ) transcript = aai.Transcriber().transcribe(temp_wav, config) if transcript.error: return f"❌ {transcript.error}", "", "", state segments = build_segments(transcript) speaker_count = len(set(s["speaker"] for s in segments)) conversation = "" export_segments = [] for i, seg in enumerate(segments, start=1): chunks = split_into_chunks(seg["text"]) for c_idx, chunk in enumerate(chunks, start=1): emotion_idx = analyze_sentiment(chunk) emoji, label = EMOTION_LABELS.get(emotion_idx, ("⚪", "Unknown")) chunk_label = f" | Chunk {c_idx}" if len(chunks) > 1 else "" conversation += ( f"Speaker {seg['speaker']} | Utterance {i}{chunk_label}\n" f"({seg['start']} - {seg['end']})\n" f"{emoji} {label}: {chunk}\n\n" ) export_segments.append({ "speaker": seg["speaker"], "start": seg["start"], "end": seg["end"], "chunk": c_idx, "text": chunk, "sentiment": label, }) new_state = {"segments": export_segments, "conversation": conversation} return ( "✅ Done", conversation, f"Speakers: {speaker_count} | Utterances: {len(segments)}", new_state, ) except Exception as e: return f"❌ Error: {str(e)}", "", "", state finally: if temp_wav and os.path.exists(temp_wav): os.remove(temp_wav) # ========================= # EXPORT # ========================= def export_file(format_type, state): segments = state.get("segments", []) conversation = state.get("conversation", "") if not conversation and not segments: return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if format_type == "TXT": path = f"/tmp/conversation_{timestamp}.txt" with open(path, "w", encoding="utf-8") as f: f.write(conversation) elif format_type == "JSON": path = f"/tmp/conversation_{timestamp}.json" with open(path, "w", encoding="utf-8") as f: json.dump(segments, f, indent=4) elif format_type == "CSV": path = f"/tmp/conversation_{timestamp}.csv" with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=["speaker", "start", "end", "chunk", "text", "sentiment"] ) writer.writeheader() writer.writerows(segments) elif format_type == "WORD": path = f"/tmp/conversation_{timestamp}.docx" doc = Document() doc.add_heading("Conversation Transcript", 0) doc.add_paragraph(conversation) doc.save(path) elif format_type == "PDF": path = f"/tmp/conversation_{timestamp}.pdf" doc = SimpleDocTemplate(path) styles = getSampleStyleSheet() content = [Paragraph(conversation.replace("\n", "
"), styles["Normal"])] doc.build(content) else: return None return path # ========================= # UI # ========================= with gr.Blocks(title="AI Conversation Sentiment Analyzer", theme=gr.themes.Soft()) as app: gr.Markdown("# 🎙 AI Conversation Sentiment Analyzer") state = gr.State({"segments": [], "conversation": ""}) with gr.Group(): gr.Markdown("### 🎙 Input Audio") audio = gr.Audio(sources=["upload", "microphone"], type="filepath") with gr.Group(): gr.Markdown("### ⚙ Settings") with gr.Row(): speakers = gr.Number(value=0, label="Speakers (0 = auto-detect)") language = gr.Dropdown( ["auto", "en", "fr", "es", "de"], value="auto", label="Language" ) analyze_btn = gr.Button("🚀 Analyze", variant="primary") with gr.Group(): gr.Markdown("### 💬 Conversation Output") status = gr.Textbox(label="Status") conversation_box = gr.Textbox(lines=18, label="Conversation + Sentiment") info = gr.Textbox(label="Info") with gr.Group(): gr.Markdown("### 📁 Export") with gr.Row(): export_format = gr.Dropdown( ["TXT", "JSON", "CSV", "WORD", "PDF"], value="TXT", label="Format" ) export_btn = gr.Button("⬇ Export") download = gr.File() analyze_btn.click( process_audio, inputs=[audio, speakers, language, state], outputs=[status, conversation_box, info, state], ) export_btn.click( export_file, inputs=[export_format, state], outputs=[download], ) if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860)