|
import os |
|
import re |
|
import torch |
|
import logging |
|
import numpy as np |
|
import gradio as gr |
|
from typing import List, Dict, Optional |
|
from datetime import datetime |
|
|
|
from transformers import ( |
|
AutoTokenizer, |
|
AutoModelForSequenceClassification, |
|
pipeline |
|
) |
|
from dataclasses import dataclass |
|
|
|
import nltk |
|
from nltk.tokenize import sent_tokenize |
|
from neo4j import GraphDatabase |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
class SentimentResult: |
|
"""Structured class to hold sentiment analysis results""" |
|
text: str |
|
sentiment: str |
|
score: float |
|
confidence: float |
|
intensity: str |
|
|
|
class OptimizedSentimentAnalyzer: |
|
def __init__( |
|
self, |
|
models: List[str] = None, |
|
device: str = None, |
|
max_length: int = 512 |
|
): |
|
""" |
|
Initialize sentiment analyzer with robust sequence handling |
|
|
|
Args: |
|
models: List of Hugging Face model names |
|
device: Processing device (cuda/cpu) |
|
max_length: Maximum token sequence length |
|
""" |
|
|
|
try: |
|
nltk.download('punkt', quiet=True) |
|
except Exception as e: |
|
logger.warning(f"Could not download NLTK resources: {e}") |
|
|
|
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
|
|
self.model_configs = models or [ |
|
'cardiffnlp/twitter-roberta-base-sentiment-latest', |
|
'finiteautomata/bertweet-base-sentiment-analysis' |
|
] |
|
|
|
|
|
self.max_length = max_length |
|
|
|
|
|
self._load_models() |
|
|
|
def _load_models(self): |
|
"""Efficiently load sentiment analysis models with robust tokenization""" |
|
self.analyzers = [] |
|
self.tokenizers = [] |
|
|
|
for model_name in self.model_configs: |
|
try: |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSequenceClassification.from_pretrained(model_name) |
|
|
|
|
|
self.tokenizers.append(tokenizer) |
|
|
|
|
|
sentiment_pipeline = pipeline( |
|
task="sentiment-analysis", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device=0 if self.device == 'cuda' else -1 |
|
) |
|
|
|
self.analyzers.append(sentiment_pipeline) |
|
logger.info(f"Successfully loaded model: {model_name}") |
|
|
|
except Exception as e: |
|
logger.warning(f"Could not load model {model_name}: {e}") |
|
|
|
if not self.analyzers: |
|
raise RuntimeError("No sentiment models could be loaded") |
|
|
|
def _truncate_text(self, text: str, tokenizer) -> str: |
|
""" |
|
Intelligently truncate text to fit model's max length |
|
|
|
Args: |
|
text: Input text to truncate |
|
tokenizer: Tokenizer for the specific model |
|
|
|
Returns: |
|
Truncated text |
|
""" |
|
|
|
tokens = tokenizer.encode( |
|
text, |
|
add_special_tokens=True, |
|
max_length=self.max_length, |
|
truncation=True |
|
) |
|
|
|
|
|
return tokenizer.decode(tokens, skip_special_tokens=True) |
|
|
|
def _process_long_text(self, text: str) -> List[str]: |
|
""" |
|
Split long text into manageable chunks |
|
|
|
Args: |
|
text: Full input text |
|
|
|
Returns: |
|
List of text chunks |
|
""" |
|
|
|
sentences = sent_tokenize(text) |
|
|
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for sentence in sentences: |
|
|
|
sentence_tokens = len(sentence.split()) |
|
|
|
|
|
if current_length + sentence_tokens > self.max_length - 50: |
|
chunks.append(' '.join(current_chunk)) |
|
current_chunk = [sentence] |
|
current_length = sentence_tokens |
|
else: |
|
current_chunk.append(sentence) |
|
current_length += sentence_tokens |
|
|
|
|
|
if current_chunk: |
|
chunks.append(' '.join(current_chunk)) |
|
|
|
return chunks |
|
|
|
def analyze_transcript(self, transcript: str) -> Dict: |
|
""" |
|
Analyze sentiment of a transcript with robust processing |
|
|
|
Args: |
|
transcript: Full transcript text |
|
|
|
Returns: |
|
Comprehensive sentiment analysis results |
|
""" |
|
|
|
if not transcript or not isinstance(transcript, str): |
|
raise ValueError("Invalid transcript input") |
|
|
|
|
|
text_chunks = self._process_long_text(transcript) |
|
|
|
|
|
chunk_results = [] |
|
for chunk in text_chunks: |
|
|
|
truncated_chunk = self._truncate_text(chunk, self.tokenizers[0]) |
|
|
|
|
|
chunk_sentiments = [] |
|
for analyzer in self.analyzers: |
|
try: |
|
sentiment = analyzer(truncated_chunk)[0] |
|
chunk_sentiments.append(sentiment) |
|
except Exception as e: |
|
logger.warning(f"Model sentiment analysis failed: {e}") |
|
|
|
|
|
if chunk_sentiments: |
|
|
|
dominant_sentiment = max( |
|
chunk_sentiments, |
|
key=lambda x: x['score'] |
|
) |
|
|
|
chunk_results.append(SentimentResult( |
|
text=truncated_chunk, |
|
sentiment=dominant_sentiment['label'], |
|
score=dominant_sentiment['score'], |
|
confidence=dominant_sentiment['score'], |
|
intensity='moderate' |
|
)) |
|
|
|
|
|
if chunk_results: |
|
overall_sentiment_score = np.mean([ |
|
result.score for result in chunk_results |
|
]) |
|
|
|
return { |
|
'overall_sentiment_score': float(overall_sentiment_score), |
|
'chunk_results': [ |
|
{ |
|
'text': result.text, |
|
'sentiment': result.sentiment, |
|
'score': result.score |
|
} for result in chunk_results |
|
] |
|
} |
|
|
|
raise ValueError("Could not perform sentiment analysis") |
|
|
|
class TranscriptParser: |
|
"""Parse and extract conversations from transcript files""" |
|
|
|
@staticmethod |
|
def parse_timestamp(time_str: str) -> str: |
|
""" |
|
Convert timestamp from MM:SS format to full datetime |
|
|
|
Args: |
|
time_str: Timestamp in MM:SS format |
|
|
|
Returns: |
|
ISO formatted datetime |
|
""" |
|
try: |
|
minutes, seconds = map(int, time_str.split(':')) |
|
base_date = datetime.now() |
|
return base_date.replace( |
|
hour=0, |
|
minute=minutes, |
|
second=seconds, |
|
microsecond=0 |
|
).isoformat() |
|
except Exception as e: |
|
logger.warning(f"Timestamp parsing failed: {e}") |
|
return datetime.now().isoformat() |
|
|
|
@classmethod |
|
def extract_conversations(cls, text: str) -> List[Dict]: |
|
""" |
|
Extract structured conversations from transcript text |
|
|
|
Args: |
|
text: Full transcript text |
|
|
|
Returns: |
|
List of conversation dictionaries |
|
""" |
|
conversations = [] |
|
lines = text.strip().split('\n') |
|
|
|
for line in lines: |
|
if not line.strip(): |
|
continue |
|
|
|
|
|
timestamp_match = re.search(r'\[([0-9]{2}:[0-9]{2})\]', line) |
|
if timestamp_match: |
|
timestamp = timestamp_match.group(1) |
|
|
|
line = line.replace(f'[{timestamp}]', '').strip() |
|
|
|
|
|
parts = line.split(' ', 1) |
|
if len(parts) > 1: |
|
speaker, content = parts |
|
conversations.append({ |
|
'timestamp': cls.parse_timestamp(timestamp), |
|
'speaker': speaker, |
|
'content': content |
|
}) |
|
|
|
return conversations |
|
|
|
class TranscriptProcessor: |
|
""" |
|
Process and store transcript sentiment analysis results |
|
Handles database interaction and sentiment analysis |
|
""" |
|
|
|
def __init__( |
|
self, |
|
neo4j_uri: str = None, |
|
neo4j_username: str = None, |
|
neo4j_password: str = None |
|
): |
|
""" |
|
Initialize transcript processor |
|
|
|
Args: |
|
neo4j_uri: Neo4j database URI |
|
neo4j_username: Database username |
|
neo4j_password: Database password |
|
""" |
|
|
|
self.neo4j_uri = neo4j_uri or os.getenv('NEO4J_URI', 'neo4j+s://5cbd784c.databases.neo4j.io') |
|
self.neo4j_username = neo4j_username or os.getenv('NEO4J_USERNAME', 'neo4j') |
|
self.neo4j_password = neo4j_password or os.getenv('NEO4J_PASSWORD', 'xwsXwnfCdaXWoEbf8uAQGM-F8lq-cLw1ZRkXORcErUQ') |
|
|
|
|
|
self.sentiment_analyzer = OptimizedSentimentAnalyzer() |
|
|
|
try: |
|
|
|
self.driver = GraphDatabase.driver( |
|
self.neo4j_uri, |
|
auth=(self.neo4j_username, self.neo4j_password) |
|
) |
|
logger.info("Successfully connected to Neo4j database") |
|
|
|
except Exception as e: |
|
logger.error(f"Database connection failed: {e}") |
|
raise |
|
|
|
def process_transcript(self, transcript_path: str): |
|
""" |
|
Comprehensive transcript processing workflow |
|
|
|
Args: |
|
transcript_path: Path to the transcript file |
|
""" |
|
try: |
|
|
|
with open(transcript_path, 'r', encoding='utf-8') as file: |
|
transcript_text = file.read() |
|
|
|
|
|
conversations = TranscriptParser.extract_conversations(transcript_text) |
|
|
|
|
|
sentiment_analysis = self.sentiment_analyzer.analyze_transcript(transcript_text) |
|
|
|
|
|
with self.driver.session() as session: |
|
|
|
session.run( |
|
""" |
|
CREATE (t:Transcript { |
|
overall_sentiment_score: $score, |
|
processed_at: datetime(), |
|
file_path: $path |
|
}) |
|
""", |
|
score=sentiment_analysis['overall_sentiment_score'], |
|
path=transcript_path |
|
) |
|
|
|
|
|
for conversation in conversations: |
|
session.run( |
|
""" |
|
MATCH (t:Transcript {file_path: $path}) |
|
CREATE (c:Conversation { |
|
speaker: $speaker, |
|
content: $content, |
|
timestamp: $timestamp |
|
}) |
|
CREATE (t)-[:HAS_CONVERSATION]->(c) |
|
""", |
|
path=transcript_path, |
|
**conversation |
|
) |
|
|
|
|
|
for chunk in sentiment_analysis.get('chunk_results', []): |
|
session.run( |
|
""" |
|
MATCH (t:Transcript {file_path: $path}) |
|
CREATE (s:SentimentChunk { |
|
text: $text, |
|
sentiment: $sentiment, |
|
score: $score |
|
}) |
|
CREATE (t)-[:HAS_SENTIMENT_CHUNK]->(s) |
|
""", |
|
path=transcript_path, |
|
**chunk |
|
) |
|
|
|
logger.info(f"Successfully processed transcript: {transcript_path}") |
|
return sentiment_analysis |
|
|
|
except Exception as e: |
|
logger.error(f"Transcript processing failed: {e}") |
|
raise |
|
|
|
def close(self): |
|
"""Close database connection""" |
|
if hasattr(self, 'driver'): |
|
self.driver.close() |
|
logger.info("Database connection closed") |
|
|
|
def create_gradio_interface(): |
|
""" |
|
Create and launch the Gradio interface for transcript sentiment analysis. |
|
""" |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
|
|
def analyze_transcript(transcript_file): |
|
""" |
|
Wrapper function for Gradio to process transcript and format results. |
|
|
|
Args: |
|
transcript_file (str): Path to the uploaded transcript file |
|
|
|
Returns: |
|
str: Formatted sentiment analysis results |
|
""" |
|
try: |
|
|
|
if transcript_file is None: |
|
raise gr.Error("Please upload a transcript file.") |
|
|
|
|
|
processor = TranscriptProcessor() |
|
|
|
|
|
results = processor.process_transcript(transcript_file.name) |
|
|
|
|
|
processor.close() |
|
|
|
|
|
output = [] |
|
|
|
|
|
output.append(f"🌈 Overall Sentiment Score: {results.get('overall_sentiment_score', 'N/A'):.2f}") |
|
output.append("\n📊 Sentiment Chunks Analysis:\n") |
|
|
|
|
|
for idx, chunk in enumerate(results.get('chunk_results', []), 1): |
|
output.append(f"Chunk {idx}:") |
|
output.append(f" • Sentiment: {chunk.get('sentiment', 'N/A')}") |
|
output.append(f" • Score: {chunk.get('score', 'N/A'):.2f}") |
|
output.append(f" • Text: {chunk.get('text', 'N/A')[:100]}...\n") |
|
|
|
return "\n".join(output) |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
try: |
|
nltk.download('punkt', quiet=True) |
|
except Exception as e: |
|
logger.warning(f"NLTK download failed: {e}") |
|
|
|
|
|
with gr.Blocks(title="Transcript Sentiment Analyzer") as demo: |
|
gr.Markdown("# 📊 Transcript Sentiment Analysis") |
|
gr.Markdown("Upload a conversation transcript to analyze its sentiment.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
file_input = gr.File( |
|
type="filepath", |
|
label="Upload Transcript", |
|
file_types=['.txt'] |
|
) |
|
analyze_btn = gr.Button("Analyze Transcript", variant="primary") |
|
|
|
with gr.Column(): |
|
output = gr.Textbox( |
|
label="Sentiment Analysis Results", |
|
lines=15, |
|
placeholder="Results will appear here..." |
|
) |
|
|
|
|
|
analyze_btn.click( |
|
fn=analyze_transcript, |
|
inputs=file_input, |
|
outputs=output |
|
) |
|
|
|
|
|
demo.launch( |
|
debug=True, |
|
show_error=True, |
|
server_port=7860, |
|
share=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
try: |
|
nltk.download('punkt', quiet=True) |
|
except Exception as e: |
|
logger.warning(f"NLTK download failed: {e}") |
|
|
|
|
|
create_gradio_interface() |