import os import re import torch import logging import numpy as np import gradio as gr from typing import List, Dict, Optional from datetime import datetime from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, pipeline ) from dataclasses import dataclass import nltk from nltk.tokenize import sent_tokenize from neo4j import GraphDatabase logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # [All the existing classes from the first script remain the same] @dataclass class SentimentResult: """Structured class to hold sentiment analysis results""" text: str sentiment: str score: float confidence: float intensity: str class OptimizedSentimentAnalyzer: def __init__( self, models: List[str] = None, device: str = None, max_length: int = 512 ): """ Initialize sentiment analyzer with robust sequence handling Args: models: List of Hugging Face model names device: Processing device (cuda/cpu) max_length: Maximum token sequence length """ # Ensure NLTK resources are downloaded try: nltk.download('punkt', quiet=True) except Exception as e: logger.warning(f"Could not download NLTK resources: {e}") self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu') self.model_configs = models or [ 'cardiffnlp/twitter-roberta-base-sentiment-latest', 'finiteautomata/bertweet-base-sentiment-analysis' ] # Configuration parameters self.max_length = max_length # Load models self._load_models() def _load_models(self): """Efficiently load sentiment analysis models with robust tokenization""" self.analyzers = [] self.tokenizers = [] for model_name in self.model_configs: try: tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name) self.tokenizers.append(tokenizer) sentiment_pipeline = pipeline( task="sentiment-analysis", model=model, tokenizer=tokenizer, device=0 if self.device == 'cuda' else -1 ) self.analyzers.append(sentiment_pipeline) logger.info(f"Successfully loaded model: {model_name}") except Exception as e: logger.warning(f"Could not load model {model_name}: {e}") if not self.analyzers: raise RuntimeError("No sentiment models could be loaded") def _truncate_text(self, text: str, tokenizer) -> str: """ Intelligently truncate text to fit model's max length Args: text: Input text to truncate tokenizer: Tokenizer for the specific model Returns: Truncated text """ # Tokenize and truncate tokens = tokenizer.encode( text, add_special_tokens=True, max_length=self.max_length, truncation=True ) # Decode back to text return tokenizer.decode(tokens, skip_special_tokens=True) def _process_long_text(self, text: str) -> List[str]: """ Split long text into manageable chunks Args: text: Full input text Returns: List of text chunks """ # Tokenize into sentences sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_length = 0 for sentence in sentences: # Estimate token length (rough approximation) sentence_tokens = len(sentence.split()) # If adding this sentence would exceed max length, start a new chunk if current_length + sentence_tokens > self.max_length - 50: # Leave room for special tokens chunks.append(' '.join(current_chunk)) current_chunk = [sentence] current_length = sentence_tokens else: current_chunk.append(sentence) current_length += sentence_tokens # Add final chunk if current_chunk: chunks.append(' '.join(current_chunk)) return chunks def analyze_transcript(self, transcript: str) -> Dict: """ Analyze sentiment of a transcript with robust processing Args: transcript: Full transcript text Returns: Comprehensive sentiment analysis results """ # Validate input if not transcript or not isinstance(transcript, str): raise ValueError("Invalid transcript input") # Process long text into chunks text_chunks = self._process_long_text(transcript) # Analyze each chunk chunk_results = [] for chunk in text_chunks: # Use the first tokenizer for truncation truncated_chunk = self._truncate_text(chunk, self.tokenizers[0]) # Ensemble sentiment across multiple models chunk_sentiments = [] for analyzer in self.analyzers: try: sentiment = analyzer(truncated_chunk)[0] chunk_sentiments.append(sentiment) except Exception as e: logger.warning(f"Model sentiment analysis failed: {e}") # Aggregate sentiments if chunk_sentiments: # Determine dominant sentiment dominant_sentiment = max( chunk_sentiments, key=lambda x: x['score'] ) chunk_results.append(SentimentResult( text=truncated_chunk, sentiment=dominant_sentiment['label'], score=dominant_sentiment['score'], confidence=dominant_sentiment['score'], intensity='moderate' )) # Overall transcript analysis if chunk_results: overall_sentiment_score = np.mean([ result.score for result in chunk_results ]) return { 'overall_sentiment_score': float(overall_sentiment_score), 'chunk_results': [ { 'text': result.text, 'sentiment': result.sentiment, 'score': result.score } for result in chunk_results ] } raise ValueError("Could not perform sentiment analysis") class TranscriptParser: """Parse and extract conversations from transcript files""" @staticmethod def parse_timestamp(time_str: str) -> str: """ Convert timestamp from MM:SS format to full datetime Args: time_str: Timestamp in MM:SS format Returns: ISO formatted datetime """ try: minutes, seconds = map(int, time_str.split(':')) base_date = datetime.now() return base_date.replace( hour=0, minute=minutes, second=seconds, microsecond=0 ).isoformat() except Exception as e: logger.warning(f"Timestamp parsing failed: {e}") return datetime.now().isoformat() @classmethod def extract_conversations(cls, text: str) -> List[Dict]: """ Extract structured conversations from transcript text Args: text: Full transcript text Returns: List of conversation dictionaries """ conversations = [] lines = text.strip().split('\n') for line in lines: if not line.strip(): continue # Extract timestamp timestamp_match = re.search(r'\[([0-9]{2}:[0-9]{2})\]', line) if timestamp_match: timestamp = timestamp_match.group(1) # Remove timestamp from line line = line.replace(f'[{timestamp}]', '').strip() # Split speaker and content parts = line.split(' ', 1) if len(parts) > 1: speaker, content = parts conversations.append({ 'timestamp': cls.parse_timestamp(timestamp), 'speaker': speaker, 'content': content }) return conversations class TranscriptProcessor: """ Process and store transcript sentiment analysis results Handles database interaction and sentiment analysis """ def __init__( self, neo4j_uri: str = None, neo4j_username: str = None, neo4j_password: str = None ): """ Initialize transcript processor Args: neo4j_uri: Neo4j database URI neo4j_username: Database username neo4j_password: Database password """ # Use environment variables or fallback to default self.neo4j_uri = neo4j_uri or os.getenv('NEO4J_URI', 'neo4j+s://5cbd784c.databases.neo4j.io') self.neo4j_username = neo4j_username or os.getenv('NEO4J_USERNAME', 'neo4j') self.neo4j_password = neo4j_password or os.getenv('NEO4J_PASSWORD', 'xwsXwnfCdaXWoEbf8uAQGM-F8lq-cLw1ZRkXORcErUQ') # Initialize components self.sentiment_analyzer = OptimizedSentimentAnalyzer() try: # Establish database connection self.driver = GraphDatabase.driver( self.neo4j_uri, auth=(self.neo4j_username, self.neo4j_password) ) logger.info("Successfully connected to Neo4j database") except Exception as e: logger.error(f"Database connection failed: {e}") raise def process_transcript(self, transcript_path: str): """ Comprehensive transcript processing workflow Args: transcript_path: Path to the transcript file """ try: # Read transcript file with open(transcript_path, 'r', encoding='utf-8') as file: transcript_text = file.read() # Extract conversations conversations = TranscriptParser.extract_conversations(transcript_text) # Analyze full transcript sentiment sentiment_analysis = self.sentiment_analyzer.analyze_transcript(transcript_text) # Store results in Neo4j with self.driver.session() as session: # Create transcript node session.run( """ CREATE (t:Transcript { overall_sentiment_score: $score, processed_at: datetime(), file_path: $path }) """, score=sentiment_analysis['overall_sentiment_score'], path=transcript_path ) # Store conversation segments for conversation in conversations: session.run( """ MATCH (t:Transcript {file_path: $path}) CREATE (c:Conversation { speaker: $speaker, content: $content, timestamp: $timestamp }) CREATE (t)-[:HAS_CONVERSATION]->(c) """, path=transcript_path, **conversation ) # Store sentiment chunks for chunk in sentiment_analysis.get('chunk_results', []): session.run( """ MATCH (t:Transcript {file_path: $path}) CREATE (s:SentimentChunk { text: $text, sentiment: $sentiment, score: $score }) CREATE (t)-[:HAS_SENTIMENT_CHUNK]->(s) """, path=transcript_path, **chunk ) logger.info(f"Successfully processed transcript: {transcript_path}") return sentiment_analysis except Exception as e: logger.error(f"Transcript processing failed: {e}") raise def close(self): """Close database connection""" if hasattr(self, 'driver'): self.driver.close() logger.info("Database connection closed") def create_gradio_interface(): """ Create and launch the Gradio interface for transcript sentiment analysis. """ # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def analyze_transcript(transcript_file): """ Wrapper function for Gradio to process transcript and format results. Args: transcript_file (str): Path to the uploaded transcript file Returns: str: Formatted sentiment analysis results """ try: # Ensure file is uploaded if transcript_file is None: raise gr.Error("Please upload a transcript file.") # Initialize the transcript processor processor = TranscriptProcessor() # Process the transcript results = processor.process_transcript(transcript_file.name) # Close database connection processor.close() # Format output output = [] # Overall sentiment output.append(f"šŸŒˆ Overall Sentiment Score: {results.get('overall_sentiment_score', 'N/A'):.2f}") output.append("\nšŸ“Š Sentiment Chunks Analysis:\n") # Individual chunk results for idx, chunk in enumerate(results.get('chunk_results', []), 1): output.append(f"Chunk {idx}:") output.append(f" ā€¢ Sentiment: {chunk.get('sentiment', 'N/A')}") output.append(f" ā€¢ Score: {chunk.get('score', 'N/A'):.2f}") output.append(f" ā€¢ Text: {chunk.get('text', 'N/A')[:100]}...\n") return "\n".join(output) except Exception as e: return f"Error: {str(e)}" # Ensure NLTK resources are downloaded try: nltk.download('punkt', quiet=True) except Exception as e: logger.warning(f"NLTK download failed: {e}") # Create Gradio interface with gr.Blocks(title="Transcript Sentiment Analyzer") as demo: gr.Markdown("# šŸ“Š Transcript Sentiment Analysis") gr.Markdown("Upload a conversation transcript to analyze its sentiment.") with gr.Row(): with gr.Column(): file_input = gr.File( type="filepath", label="Upload Transcript", file_types=['.txt'] ) analyze_btn = gr.Button("Analyze Transcript", variant="primary") with gr.Column(): output = gr.Textbox( label="Sentiment Analysis Results", lines=15, placeholder="Results will appear here..." ) # Connect components analyze_btn.click( fn=analyze_transcript, inputs=file_input, outputs=output ) # Launch the interface demo.launch( debug=True, show_error=True, server_port=7860, share=True # Creates a public link ) if __name__ == "__main__": # Ensure NLTK resources are downloaded try: nltk.download('punkt', quiet=True) except Exception as e: logger.warning(f"NLTK download failed: {e}") # Launch the Gradio interface create_gradio_interface()