Spaces:
Running
Running
| # Patch missing HfFolder before gradio imports it | |
| import sys | |
| from unittest.mock import MagicMock | |
| try: | |
| from huggingface_hub import HfFolder | |
| except ImportError: | |
| import huggingface_hub | |
| huggingface_hub.HfFolder = MagicMock() | |
| sys.modules["huggingface_hub"].HfFolder = MagicMock() | |
| import gradio as gr | |
| import assemblyai as aai | |
| import librosa | |
| import soundfile as sf | |
| import torch | |
| import json | |
| import csv | |
| import os | |
| import tempfile | |
| import warnings | |
| from datetime import datetime | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import torch.nn.functional as F | |
| from docx import Document | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=RuntimeWarning) | |
| # ========================= | |
| # CONFIG | |
| # ========================= | |
| aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| "j-hartmann/emotion-english-distilroberta-base" | |
| ) | |
| sentiment_model = AutoModelForSequenceClassification.from_pretrained( | |
| "j-hartmann/emotion-english-distilroberta-base" | |
| ) | |
| sentiment_model.to(device) | |
| sentiment_model.eval() | |
| # Maps model's 7 emotion classes to business-friendly labels | |
| EMOTION_LABELS = { | |
| 0: ("π΄", "Negative"), # Anger | |
| 1: ("π΄", "Negative"), # Disgust | |
| 2: ("π΄", "Negative"), # Fear | |
| 3: ("π’", "Positive"), # Joy | |
| 4: ("π‘", "Neutral"), # Neutral | |
| 5: ("π΄", "Negative"), # Sadness | |
| 6: ("π’", "Positive"), # Surprise | |
| } | |
| # ========================= | |
| # HELPERS | |
| # ========================= | |
| def format_time(ms): | |
| s = ms / 1000 | |
| return f"{int(s // 60):02d}:{int(s % 60):02d}" | |
| def split_into_chunks(text, chunk_size=200): | |
| """ | |
| Split text into equal fixed-character chunks. | |
| Breaks at the nearest space to avoid cutting mid-word. | |
| """ | |
| text = text.strip() | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks = [] | |
| while len(text) > chunk_size: | |
| split_at = text.rfind(" ", 0, chunk_size) | |
| if split_at == -1: | |
| split_at = chunk_size | |
| chunks.append(text[:split_at].strip()) | |
| text = text[split_at:].strip() | |
| if text: | |
| chunks.append(text) | |
| return chunks | |
| def analyze_sentiment(text): | |
| inputs = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True | |
| ).to(device) | |
| with torch.no_grad(): | |
| logits = sentiment_model(**inputs).logits | |
| probs = F.softmax(logits, dim=-1)[0] | |
| return torch.argmax(probs).item() | |
| def build_segments(transcript): | |
| speaker_map = {} | |
| counter = 1 | |
| segments = [] | |
| for u in transcript.utterances: | |
| raw = str(u.speaker) | |
| if raw not in speaker_map: | |
| speaker_map[raw] = counter | |
| counter += 1 | |
| segments.append({ | |
| "speaker": speaker_map[raw], | |
| "start": format_time(u.start or 0), | |
| "end": format_time(u.end or 0), | |
| "text": u.text, | |
| }) | |
| return segments | |
| # ========================= | |
| # MAIN PROCESS | |
| # ========================= | |
| def process_audio(file, speakers, language, state): | |
| if file is None: | |
| return "β No audio provided", "", "", state | |
| temp_wav = None | |
| try: | |
| audio, sr = librosa.load(file, sr=None, mono=True) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: | |
| sf.write(tmp.name, audio, sr) | |
| temp_wav = tmp.name | |
| config = aai.TranscriptionConfig( | |
| speaker_labels=True, | |
| speakers_expected=int(speakers) if speakers > 0 else None, | |
| language_code=None if language == "auto" else language, | |
| speech_model=aai.SpeechModel.best | |
| ) | |
| transcript = aai.Transcriber().transcribe(temp_wav, config) | |
| if transcript.error: | |
| return f"β {transcript.error}", "", "", state | |
| segments = build_segments(transcript) | |
| speaker_count = len(set(s["speaker"] for s in segments)) | |
| conversation = "" | |
| export_segments = [] | |
| for i, seg in enumerate(segments, start=1): | |
| chunks = split_into_chunks(seg["text"]) | |
| for c_idx, chunk in enumerate(chunks, start=1): | |
| emotion_idx = analyze_sentiment(chunk) | |
| emoji, label = EMOTION_LABELS.get(emotion_idx, ("βͺ", "Unknown")) | |
| chunk_label = f" | Chunk {c_idx}" if len(chunks) > 1 else "" | |
| conversation += ( | |
| f"Speaker {seg['speaker']} | Utterance {i}{chunk_label}\n" | |
| f"({seg['start']} - {seg['end']})\n" | |
| f"{emoji} {label}: {chunk}\n\n" | |
| ) | |
| export_segments.append({ | |
| "speaker": seg["speaker"], | |
| "start": seg["start"], | |
| "end": seg["end"], | |
| "chunk": c_idx, | |
| "text": chunk, | |
| "sentiment": label, | |
| }) | |
| new_state = {"segments": export_segments, "conversation": conversation} | |
| return ( | |
| "β Done", | |
| conversation, | |
| f"Speakers: {speaker_count} | Utterances: {len(segments)}", | |
| new_state, | |
| ) | |
| except Exception as e: | |
| return f"β Error: {str(e)}", "", "", state | |
| finally: | |
| if temp_wav and os.path.exists(temp_wav): | |
| os.remove(temp_wav) | |
| # ========================= | |
| # EXPORT | |
| # ========================= | |
| def export_file(format_type, state): | |
| segments = state.get("segments", []) | |
| conversation = state.get("conversation", "") | |
| if not conversation and not segments: | |
| return None | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if format_type == "TXT": | |
| path = f"/tmp/conversation_{timestamp}.txt" | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(conversation) | |
| elif format_type == "JSON": | |
| path = f"/tmp/conversation_{timestamp}.json" | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(segments, f, indent=4) | |
| elif format_type == "CSV": | |
| path = f"/tmp/conversation_{timestamp}.csv" | |
| with open(path, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter( | |
| f, fieldnames=["speaker", "start", "end", "chunk", "text", "sentiment"] | |
| ) | |
| writer.writeheader() | |
| writer.writerows(segments) | |
| elif format_type == "WORD": | |
| path = f"/tmp/conversation_{timestamp}.docx" | |
| doc = Document() | |
| doc.add_heading("Conversation Transcript", 0) | |
| doc.add_paragraph(conversation) | |
| doc.save(path) | |
| elif format_type == "PDF": | |
| path = f"/tmp/conversation_{timestamp}.pdf" | |
| doc = SimpleDocTemplate(path) | |
| styles = getSampleStyleSheet() | |
| content = [Paragraph(conversation.replace("\n", "<br/>"), styles["Normal"])] | |
| doc.build(content) | |
| else: | |
| return None | |
| return path | |
| # ========================= | |
| # UI | |
| # ========================= | |
| with gr.Blocks(title="AI Conversation Sentiment Analyzer", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# π AI Conversation Sentiment Analyzer") | |
| state = gr.State({"segments": [], "conversation": ""}) | |
| with gr.Group(): | |
| gr.Markdown("### π Input Audio") | |
| audio = gr.Audio(sources=["upload", "microphone"], type="filepath") | |
| with gr.Group(): | |
| gr.Markdown("### β Settings") | |
| with gr.Row(): | |
| speakers = gr.Number(value=0, label="Speakers (0 = auto-detect)") | |
| language = gr.Dropdown( | |
| ["auto", "en", "fr", "es", "de"], value="auto", label="Language" | |
| ) | |
| analyze_btn = gr.Button("π Analyze", variant="primary") | |
| with gr.Group(): | |
| gr.Markdown("### π¬ Conversation Output") | |
| status = gr.Textbox(label="Status") | |
| conversation_box = gr.Textbox(lines=18, label="Conversation + Sentiment") | |
| info = gr.Textbox(label="Info") | |
| with gr.Group(): | |
| gr.Markdown("### π Export") | |
| with gr.Row(): | |
| export_format = gr.Dropdown( | |
| ["TXT", "JSON", "CSV", "WORD", "PDF"], value="TXT", label="Format" | |
| ) | |
| export_btn = gr.Button("β¬ Export") | |
| download = gr.File() | |
| analyze_btn.click( | |
| process_audio, | |
| inputs=[audio, speakers, language, state], | |
| outputs=[status, conversation_box, info, state], | |
| ) | |
| export_btn.click( | |
| export_file, | |
| inputs=[export_format, state], | |
| outputs=[download], | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(server_name="0.0.0.0", server_port=7860) |