Spaces:
Running
Running
from flask import Flask, request, jsonify | |
from flask_cors import CORS | |
import os | |
from transformers import pipeline | |
import numpy as np | |
import torch | |
import re | |
from werkzeug.utils import secure_filename | |
import uuid | |
import platform | |
# Set Transformers Cache Directory | |
if platform.system() == "Windows": | |
print("Windows detected. Assigning cache directory to Transformers in AppData\\Local.") | |
transformers_cache_directory = os.path.join(os.getenv('LOCALAPPDATA'), 'transformers_cache') | |
else: | |
print("Non-Windows system detected. Assigning cache directory to /tmp/transformers_cache.") | |
transformers_cache_directory = '/tmp/transformers_cache' | |
# Ensure the directory exists | |
if not os.path.exists(transformers_cache_directory): | |
try: | |
os.makedirs(transformers_cache_directory, exist_ok=True) | |
print(f"Directory '{transformers_cache_directory}' created successfully.") | |
except OSError as e: | |
print(f"Error creating directory '{transformers_cache_directory}': {e}") | |
else: | |
print(f"Directory '{transformers_cache_directory}' already exists.") | |
# Set the TRANSFORMERS_CACHE environment variable | |
os.environ['TRANSFORMERS_CACHE'] = transformers_cache_directory | |
print(f"Environment variable TRANSFORMERS_CACHE set to '{transformers_cache_directory}'.") | |
class Config: | |
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '/tmp/uploads') # Correct path | |
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max file size | |
CORS_HEADERS = 'Content-Type' | |
class DialogueSentimentAnalyzer: | |
def __init__(self, model_name: str = "microsoft/DialogRPT-updown"): | |
self.device = 0 if torch.cuda.is_available() else -1 | |
self.dialogue_model = pipeline( | |
'text-classification', | |
model="microsoft/DialogRPT-updown", | |
device=self.device | |
) | |
self.sentiment_model = pipeline( | |
'sentiment-analysis', | |
model="distilbert-base-uncased-finetuned-sst-2-english", | |
device=self.device | |
) | |
self.max_length = 512 | |
def parse_dialogue(self, text: str): | |
lines = text.strip().split('\n') | |
dialogue = [] | |
current_speaker = None | |
current_text = [] | |
for line in lines: | |
line = line.strip() | |
if not line: | |
continue | |
speaker_match = re.match(r'^([^:]+):', line) | |
if speaker_match: | |
if current_speaker and current_text: | |
dialogue.append({'speaker': current_speaker, 'text': ' '.join(current_text)}) | |
current_speaker = speaker_match.group(1) | |
current_text = [line[len(current_speaker) + 1:].strip()] | |
else: | |
if current_speaker: | |
current_text.append(line.strip()) | |
if current_speaker and current_text: | |
dialogue.append({'speaker': current_speaker, 'text': ' '.join(current_text)}) | |
return dialogue | |
def analyze_utterance(self, utterance): | |
text = utterance['text'] | |
dialogue_score = self.dialogue_model(text)[0] | |
sentiment = self.sentiment_model(text)[0] | |
positive_phrases = ['thank you', 'thanks', 'appreciate', 'great', 'perfect', 'looking forward', 'flexible', 'competitive'] | |
negative_phrases = ['concerned', 'worry', 'issue', 'problem', 'difficult', 'unfortunately', 'sorry'] | |
text_lower = text.lower() | |
positive_count = sum(1 for phrase in positive_phrases if phrase in text_lower) | |
negative_count = sum(1 for phrase in negative_phrases if phrase in text_lower) | |
sentiment_score = float(sentiment['score']) | |
if sentiment['label'] == 'NEGATIVE': | |
sentiment_score = 1 - sentiment_score | |
final_score = sentiment_score | |
if positive_count > negative_count: | |
final_score = min(1.0, final_score + 0.1 * (positive_count - negative_count)) | |
elif negative_count > positive_count: | |
final_score = max(0.0, final_score - 0.1 * (negative_count - positive_count)) | |
return { | |
'speaker': utterance['speaker'], | |
'text': text, | |
'sentiment_score': final_score, | |
'engagement_score': float(dialogue_score['score']), | |
'positive_phrases': positive_count, | |
'negative_phrases': negative_count | |
} | |
def analyze_dialogue(self, text: str): | |
dialogue = self.parse_dialogue(text) | |
utterance_results = [self.analyze_utterance(utterance) for utterance in dialogue] | |
overall_sentiment = np.mean([r['sentiment_score'] for r in utterance_results]) | |
overall_engagement = np.mean([r['engagement_score'] for r in utterance_results]) | |
sentiment_variance = np.std([r['sentiment_score'] for r in utterance_results]) | |
confidence = max(0.0, 1.0 - sentiment_variance) | |
speaker_sentiments = {} | |
for result in utterance_results: | |
if result['speaker'] not in speaker_sentiments: | |
speaker_sentiments[result['speaker']] = [] | |
speaker_sentiments[result['speaker']].append(result['sentiment_score']) | |
speaker_averages = {speaker: np.mean(scores) for speaker, scores in speaker_sentiments.items()} | |
return [{'label': 'Overall Sentiment', 'score': float(overall_sentiment)}, | |
{'label': 'Confidence', 'score': float(confidence)}, | |
{'label': 'Engagement', 'score': float(overall_engagement)}] + [ | |
{'label': f'{speaker} Sentiment', 'score': float(score)} for speaker, score in speaker_averages.items() | |
] | |
def save_uploaded_file(content, upload_folder): | |
filename = f"{uuid.uuid4().hex}.txt" | |
file_path = os.path.join(upload_folder, secure_filename(filename)) | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(content) | |
return file_path | |
def analyze_sentiment(file_path: str): | |
try: | |
analyzer = DialogueSentimentAnalyzer() | |
with open(file_path, 'r', encoding='utf-8') as f: | |
text = f.read() | |
return analyzer.analyze_dialogue(text) | |
except Exception as e: | |
print(f"Error in sentiment analysis: {str(e)}") | |
return [{'label': 'Error', 'score': 0.5}] | |
def create_app(): | |
app = Flask(__name__) | |
app.config.from_object(Config) | |
# Ensure the uploads directory exists | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
def upload_transcript(): | |
try: | |
transcript = request.form.get('transcript') | |
if not transcript: | |
return jsonify({'error': 'No transcript received'}), 400 | |
# Save the transcript in the current folder | |
file_path = os.path.join(os.getcwd(), 'transcript.txt') | |
with open(file_path, 'w') as file: | |
file.write(transcript) | |
# Analyze sentiment | |
sentiment_result = analyze_sentiment(file_path) | |
# Remove the temporary file | |
os.remove(file_path) | |
return jsonify({'sentiment': sentiment_result}), 200 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
return app | |
if __name__ == '__main__': | |
app = create_app() | |
app.run(host="0.0.0.0", port=5000) |