Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""ITI110_Final.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1wAe1__d6108Sb-qIL2rOlwhLXhE3B_Yo | |
""" | |
# Install and import necessary libraries to access Groq. | |
import subprocess | |
import sys | |
# Install required packages | |
def install_packages(): | |
packages = ["groq", "gradio", "ultralytics", "moviepy", "requests", "soundfile", "pandas", "datetime", "openai", | |
"pydub", "matplotlib", "numpy", "fpdf"] | |
subprocess.check_call([sys.executable, "-m", "pip", "install"] + packages) | |
install_packages() # Call function to install packages | |
import os | |
os.system("pip uninstall -y moviepy && pip install --no-cache-dir moviepy") | |
# FOR SENTIMENT ANALYSIS - SETYANI | |
# Install and import necessary libraries to access Groq | |
#!pip install groq gradio opencv-python moviepy requests soundfile pydub matplotlib numpy fpdf | |
import os | |
import groq | |
from groq import Groq | |
import gradio as gr | |
import numpy as np | |
import tempfile | |
import requests | |
from moviepy import VideoFileClip | |
from pydub import AudioSegment | |
import matplotlib.pyplot as plt | |
import time | |
import seaborn as sns | |
from collections import Counter | |
from fpdf import FPDF | |
# Global Variables | |
sentiment_scores = {"positive": 1, "neutral": 0, "negative": -1} | |
sentiment_history = [] | |
transcribed_text = "Listening..." | |
report_path = "sentiment_report.pdf" | |
sentiment_trend_path = "sentiment_trend.png" | |
sentiment_heatmap_path = "sentiment_heatmap.png" | |
sentiment_pie_chart_path = "sentiment_pie_chart.png" | |
emotion_trend_path = "emotiont_trend.png" | |
emotion_heatmap_path = "emotion_heatmap.png" | |
emotion_pie_chart_path = "emotion_pie_chart.png" | |
# Get the key to access Groq | |
API_KEY = os.environ.get("GROQ_API_KEY", "No Key Found") | |
# Initialize Groq Client | |
grog_client = groq.Groq(api_key=API_KEY) | |
# MAIN function to convert audio into text using Groq Whisper speech-to-text service | |
def transcribe_audio(audio_file_path): | |
# Open the audio file | |
with open(audio_file_path, "rb") as file: | |
# Create an audio transcription using the grog_client API | |
transcription = grog_client.audio.transcriptions.create( | |
file=(audio_file_path, file.read()), # Read the audio file from the specified path and send it as input | |
model="whisper-large-v3", # chosen Whisper model to be used for transcription | |
#model="whisper-large-v3-turbo", # tested another Whisper model | |
#model="distil-whisper-large-v3-en", # tested another Whisper model | |
prompt="Specify context or spelling", # Optional prompt to provide context or spelling preferences | |
response_format="json", # Specify the format of the response (JSON format in this case) | |
language="en", # Specify the language of the audio (English in this case) | |
temperature=0.0 # Control the randomness of the output (0.0 means deterministic output) | |
) | |
return transcription.text | |
# MAIN function to do sentiment analysis using Groq LLM model llama3-8b-8192 | |
def analyze_sentiment(text): | |
# Create a completion using the grog_client API | |
response = grog_client.chat.completions.create( | |
model="llama3-8b-8192", # Specify the model to be used for generating the completion | |
messages=[ | |
{"role": "system", "content": "You are an expert in text sentiment analysis. Analyze the sentiment of this text and return only 'Positive', 'Negative', or 'Neutral'."}, | |
{"role": "user", "content": text} | |
], | |
temperature=0.0, # Control the randomness of the output (0.0 means deterministic output) | |
max_tokens=200 # Limit the response length to 200 tokens | |
) | |
sentiment = response.choices[0].message.content | |
#print(sentiment) | |
sentiment_history.append(sentiment_scores.get(sentiment.lower(), 0)) | |
print(sentiment_history) | |
return sentiment | |
# Integrated and tested AZURE services for Speech-to-text using Whisper and | |
# Azure Sentiment Analysis using gpt-35-turbo-16k vs Azure LANGUAGE service for text analytic | |
#!pip install azure-cognitiveservices-speech azure-ai-textanalytics azure-core azure-identity | |
# Removed Azure codes here to protect the keys, only included in the project submission | |
# CLEANUP transcribed text before doing Sentiment Analysis | |
import re #used for regular expressions | |
# Helper function to remove suffixes from numbers in the input text. | |
def remove_suffixes(text): | |
# Regular expression to find numbers followed by common suffixes | |
pattern = r'(\d+)(st|nd|rd|th)' | |
# Replace the matched pattern with just the number (capture group 1) | |
cleaned_text = re.sub(pattern, r'\1', text) | |
return cleaned_text # Return the cleaned text without suffixes | |
# Helper function to remove repeated phrases in the transcript text which sometimes exist due to transcription error | |
def remove_repeated_phrases(text): | |
# Regular expression to find repeated phrases with length up to 3 words | |
pattern = r'\b(\w+\s+\w+\s+\w+|\w+\s+\w+|\w+)\s+\1\b' | |
prev_text = '' | |
while prev_text != text: | |
prev_text = text # Store previous version for comparison | |
text = re.sub(pattern, r'\1 \1', text, flags=re.IGNORECASE) # Keep only two instances for genuine repeat, e.g: bye. bye. | |
return text # Return the cleaned text without repeated phrases | |
# Example Usage | |
#text = "hello world hello world hello world test test test again again again" | |
#cleaned_text = remove_repeated_phrases(text) | |
#print(cleaned_text) # Output: "hello world hello world test test again again" | |
# Helper function for text preprocessing before calculating WER | |
def preprocess_text(text): | |
text = remove_repeated_phrases(text) #remove repeated phrases due to transcription error | |
text = text.replace('\n', ' ') #replace newline with space | |
text = text.lower() #convert text to lower case | |
text = text.replace('-', '') #replace hypen with none | |
text = re.sub(r'[^a-z\s0-9!?]', ' ', text)#replace with space those NON lowercase letters, NON whitespace chars, NON numbers, NON exclamation, NON question mark | |
text = re.sub(r'\b(okay)\b', 'ok', text) #replace okay with ok to standardize the format | |
text = re.sub(r'\b(yeah)\b', 'yes', text) #replace yeah with yes to standardize the format | |
text = re.sub(r'\b(um)\b', '', text) #remove the word um filler word | |
text = re.sub(r'\b(uh)\b', '', text) #remove the word uh filler word | |
text = remove_suffixes(text) #remove suffixes behind numbers like st, nd, rd, th | |
text = re.sub(r'\s+', ' ', text).strip() #Removes extra spaces, including leading, trailing, and multiple spaces between words | |
return text # Return the cleaned text after preprocessing | |
# HELPER function for Display Output of Sentiment Analysis | |
# Update the Sentiment Trend Over Time real-time graph | |
def update_plot(): | |
plt.clf() | |
# Generate timestamps | |
timestamps = list(range(len(sentiment_history))) | |
# Define color mapping for sentiment scores | |
colors = ["red" if s < -0.3 else "yellow" if -0.3 <= s <= 0.3 else "green" for s in sentiment_history] | |
plt.figure(figsize=(8, 4)) | |
# Plot sentiment scores with colored markers | |
for i in range(len(sentiment_history)): | |
plt.plot(timestamps[i], sentiment_history[i], marker="o", color=colors[i], markersize=8) | |
# Plot line segments with the color of the next point | |
for i in range(len(sentiment_history) - 1): | |
plt.plot(timestamps[i:i+2], sentiment_history[i:i+2], linestyle="-", color=colors[i+1], linewidth=2) | |
plt.title("Sentiment Trend Over Time") | |
plt.xlabel("Time (Speech Segments)") | |
plt.ylabel("Sentiment Score") | |
plt.ylim([-1, 1]) | |
plt.yticks([-1, 0, 1], ["Negative", "Neutral", "Positive"]) | |
plt.savefig(sentiment_trend_path) # Save the plot as an image | |
plt.close() | |
# Generate the sentiment heatmap using red, yellow, and green colors. | |
def generate_sentiment_heatmap(): | |
plt.clf() | |
#if not sentiment_history: | |
# return | |
# Convert sentiment scores to corresponding colors | |
heatmap_data = np.array(sentiment_history).reshape(1, -1) | |
#print(heatmap_data) | |
# Define color mapping for sentiment scores | |
color_mapping = ["red", "yellow", "green"] | |
plt.figure(figsize=(6, 3)) | |
ax = sns.heatmap(heatmap_data, annot=True, cmap=color_mapping, xticklabels=False, | |
yticklabels=["Sentiment"], cbar=True, vmin=-1, vmax=1) | |
# Customize color bar labels | |
colorbar = ax.collections[0].colorbar | |
colorbar.set_ticks([-1, 0, 1]) | |
colorbar.set_ticklabels(["Negative", "Neutral", "Positive"]) | |
plt.title("Sentiment Heatmap") # (Red = Negative, Yellow = Neutral, Green = Positive) | |
plt.show() | |
plt.savefig(sentiment_heatmap_path) | |
plt.close() | |
# Generate a Pie Chart for Sentiment Distribution. | |
def generate_sentiment_pie_chart(): | |
plt.clf() | |
#if not sentiment_history: | |
# return | |
# Count occurrences of each sentiment category | |
sentiment_labels = ["Negative", "Neutral", "Positive"] | |
sentiment_counts = Counter(["Negative" if s < -0.3 else "Neutral" if -0.3 <= s <= 0.3 else "Positive" for s in sentiment_history]) | |
# Extract count values | |
counts = [sentiment_counts[label] for label in sentiment_labels] | |
# Define colors | |
colors = ["red", "yellow", "green"] | |
# Plot pie chart | |
plt.figure(figsize=(4, 4)) | |
plt.pie(counts, labels=sentiment_labels, autopct="%1.1f%%", colors=colors, startangle=140) | |
plt.title("Sentiment Distribution") | |
plt.savefig(sentiment_pie_chart_path) | |
plt.close() | |
# Create and save a PDF report with transcription and sentiment analysis graphs. | |
def generate_pdf_report(text): | |
pdf = FPDF() | |
pdf.set_auto_page_break(auto=True, margin=15) | |
pdf.add_page() | |
# Title | |
pdf.set_font("Arial", style='B', size=16) | |
pdf.cell(200, 10, "Sentiment Analysis Report", ln=True, align="C") | |
pdf.ln(10) | |
# Transcribed Text | |
pdf.set_font("Arial", size=12) | |
pdf.multi_cell(0, 10, f"Transcribed Text:\n\n{text}") | |
pdf.ln(10) | |
# Add images | |
for img_path in [sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path]: | |
if os.path.exists(img_path): | |
pdf.add_page() | |
pdf.image(img_path, x=10, w=180) | |
pdf.output(report_path) | |
return report_path | |
# FOR FACE EMOTION ANALYSYS - SONG MING | |
#!pip install gradio ultralytics pandas matplotlib datetime | |
import gradio as gr | |
from ultralytics import YOLO | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import logging | |
import cv2 | |
from datetime import datetime | |
import os | |
# Configure logging (optional) | |
logging.basicConfig(filename='emotion_analysis.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Load model (outside the function) | |
try: | |
model = YOLO('yolo11m_affectnet_best.pt') # Replace with your model path. Download this model first! | |
except Exception as e: | |
logging.error(f"Error loading YOLO model: {e}. Make sure the path is correct.") | |
print(f"Error loading YOLO model: {e}. Make sure the path is correct.") | |
model = None | |
emotion_labels = ["neutral", "happy", "sad", "angry", "fearful", "disgusted", "surprised", "not_detected"] | |
# Initialize an empty global DataFrame | |
combined_df = pd.DataFrame(columns=['Emotion', 'Confidence', 'Frame', 'Class', 'Timestamp']) | |
def analyze_video(video_file, interval_seconds=5, confidence=30, iou=30): | |
if model is None: return "<p>YOLO model failed to load. Check the logs.</p>" | |
model.conf = confidence / 100.0 | |
model.iou = iou / 100.0 | |
cap = cv2.VideoCapture(video_file) | |
if not cap.isOpened(): | |
print(f"Error opening video file: {video_file}") | |
return "<p>Error opening video file.</p>" | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
all_emotions_data = [] | |
current_frame = 0 | |
interval_frames = int(fps * interval_seconds) | |
while current_frame < total_frame_count: | |
cap.set(cv2.CAP_PROP_POS_FRAMES, current_frame) | |
ret, frame = cap.read() | |
if not ret: | |
continue | |
analyze_emotion(frame, current_frame, all_emotions_data) | |
current_frame += interval_frames # Move to the next frame in the next interval | |
print(f"Finished Processing : {current_frame}") | |
cap.release() | |
print(f"Finished Processing all frames") | |
all_emotions_df = pd.DataFrame(all_emotions_data) | |
if all_emotions_df.empty: | |
return "No emotions detected in the video." | |
combined_df = all_emotions_df.groupby(['Frame', 'Emotion'], as_index=False).agg({'Confidence': 'mean', 'Class': 'first', 'Timestamp': 'first'}) | |
# Line plot | |
plt.figure(figsize=(10, 6)) | |
sns.lineplot(data=combined_df, x='Frame', y='Confidence', hue='Emotion', marker='o') | |
plt.title('Emotion Detections Over Time') | |
plt.xlabel('Frame') | |
plt.ylabel('Confidence') | |
#line_plot_path = os.path.abspath('line_plot.png') | |
plt.savefig(emotion_trend_path) | |
plt.close() | |
# Pie chart | |
pie_data = combined_df['Emotion'].value_counts() | |
plt.figure(figsize=(20, 12)) | |
plt.pie(pie_data, labels=pie_data.index, autopct='%1.1f%%', startangle=90) | |
plt.title('Emotion Distribution') | |
#pie_chart_path = os.path.abspath('pie_chart.png') | |
plt.savefig(emotion_pie_chart_path) | |
plt.close() | |
# Heatmap | |
plt.figure(figsize=(10, 6)) | |
heatmap_data = pd.pivot_table(combined_df, values='Confidence', index='Frame', columns='Emotion', fill_value=0) | |
sns.heatmap(heatmap_data, cmap='YlGnBu', cbar_kws={'label': 'Confidence'}) | |
plt.title('Emotion Heatmap') | |
plt.xlabel('Emotion') | |
plt.ylabel('Frame') | |
#heatmap_path = os.path.abspath('heatmap.png') | |
plt.savefig(emotion_heatmap_path) | |
plt.close() | |
def analyze_emotion(frame, frame_index, all_emotions_data): | |
if model is None: | |
return | |
results = model(frame) | |
for result in results: | |
boxes = result.boxes | |
for box in boxes: | |
conf = float(box.conf) | |
cls = int(box.cls.item()) | |
if cls < len(emotion_labels): | |
predicted_emotion = emotion_labels[cls] | |
else: | |
predicted_emotion = 'not_detected' | |
logging.warning(f"Predicted class {cls} out of range. Setting to 'not_detected'.") | |
conf = 0.0 | |
if conf > model.conf: | |
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') | |
all_emotions_data.append({ | |
'Emotion': predicted_emotion, | |
'Confidence': conf, | |
'Frame': frame_index, | |
'Class': cls, | |
'Timestamp': timestamp | |
}) | |
# MAIN FUNCTIONs FOR GRADIO APPLICATION - SETYANI | |
# 17/2 video file sentiment analysis working | |
# 21/2 fixed heatmap display, add button click handler for clear, download report | |
# 23/2 integrated Azure Whisper, GPT and Language services created by Thim Wai, however the performance is too slow so switch back to Groq | |
# 25/2 integrated Face Emotion analysis from SongMing | |
#========================================================================================================================================== | |
# MAIN function to process uploaded video from Gradio User Interface | |
def process_video_gradio(video_path): | |
global sentiment_history | |
sentiment_history = [] # Reset sentiment history | |
if not os.path.exists(video_path): | |
raise ValueError("File not found.") | |
clear_function() # clear the previous analysis files if exist | |
video_clip = VideoFileClip(video_path) # extract video | |
audio_clip = video_clip.audio # extract audio | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: | |
audio_clip.write_audiofile(temp_audio.name) | |
full_audio_path = temp_audio.name | |
audio = AudioSegment.from_wav(full_audio_path) | |
segment_length = 5000 # 5 seconds per segment | |
num_segments = len(audio) // segment_length | |
transcribed_text = "" | |
for i in range(num_segments): | |
segment = audio[i * segment_length: (i + 1) * segment_length] # split audio into segment of 5sec each to be analysed | |
segment_path = f"temp_segment_{i}.wav" | |
segment.export(segment_path, format="wav") | |
segment_text = transcribe_audio(segment_path) # CALL transcribe audio using Groq Whisper | |
#segment_text = transcribe_audio_azure(segment_path) # CALL transcribe audio using Azure Whisper | |
# Insert segment number inside the text for easy comparison with Sentiment Trend | |
segment_text = f"[{i}] {segment_text}" | |
transcribed_text += segment_text + "\n" # added new line for display purpose | |
preprocess_text(segment_text) | |
sentiment = analyze_sentiment(segment_text) # CALL analyze sentiment using Groq Llama | |
#sentiment = analyze_sentiment_gpt(segment_text) # CALL analyze sentiment using Azure GPT | |
#text_analytics_client = authenticate_text_analytics_client() # CALL analyze sentiment using Azure Language Service | |
#sentiment = analyze_sentiment_azure(text_analytics_client, segment_text) # CALL analyze sentiment using Azure Language Service | |
os.remove(segment_path) # Cleanup segment files | |
update_plot() # Update plot after processing each segment | |
yield transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
os.remove(full_audio_path) # Cleanup full audio file | |
generate_sentiment_heatmap() | |
generate_sentiment_pie_chart() | |
yield transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
analyze_video(video_path) | |
report_path = generate_pdf_report(transcribed_text) | |
# update final heatmap and pie chart before return | |
yield transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
return transcribed_text, sentiment_trend_path, sentiment_heatmap_path, sentiment_pie_chart_path, emotion_trend_path, emotion_heatmap_path, emotion_pie_chart_path | |
# Function to handle 'Download Report' button | |
def download_report_function(): | |
if not os.path.exists(report_path): | |
raise ValueError("Please upload video file for report analysis.") | |
return report_path | |
# Function to handle 'Clear' button | |
def clear_function(): | |
if os.path.isfile(sentiment_trend_path): # Ensure it is a file before attempting to delete | |
os.remove(sentiment_trend_path) | |
if os.path.isfile(sentiment_heatmap_path): | |
os.remove(sentiment_heatmap_path) | |
if os.path.isfile(sentiment_pie_chart_path): | |
os.remove(sentiment_pie_chart_path) | |
if os.path.isfile(emotion_trend_path): # Ensure it is a file before attempting to delete | |
os.remove(emotion_trend_path) | |
if os.path.isfile(emotion_heatmap_path): | |
os.remove(emotion_heatmap_path) | |
if os.path.isfile(emotion_pie_chart_path): | |
os.remove(emotion_pie_chart_path) | |
#if os.path.isfile(report_path): | |
#os.remove(report_path) | |
#return gr.update(value=None, interactive=True), gr.update(value="", interactive=False), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value="") | |
return None, None, None, None, None, None, None, None | |
iface = gr.Interface( | |
fn=process_video_gradio, | |
inputs=gr.Video(label="Video"), | |
outputs=[ | |
gr.Textbox(label="Transcribed Text"), | |
gr.Image(label="Sentiment Trend Over Time"), | |
gr.Image(label="Sentiment Heatmap"), | |
gr.Image(label="Sentiment Distribution Pie Chart"), | |
gr.Image(label="Emotion Trend Over Time"), | |
gr.Image(label="Emotion Heatmap"), | |
gr.Image(label="Emotion Distribution Pie Chart") | |
], | |
allow_flagging="never", # Disable flag button | |
title="Real-Time Video Sentiment Analysis", | |
description="Upload a video file or use your webcam for live video streaming to analyze speech sentiment dynamically.", | |
live=True # Enable live updates for streaming | |
) | |
with gr.Blocks() as iface: | |
with gr.Row(): | |
video_input = gr.Video(label="Video", scale=1, interactive = True) # Video box takes more space | |
transcribed_text = gr.Textbox(label="Transcribed Text", lines=15, max_lines=15, interactive=False, scale=1) | |
with gr.Row(): | |
sentiment_trend = gr.Image(label="Sentiment Trend Over Time", scale=2) | |
sentiment_heatmap = gr.Image(label="Sentiment Heatmap", scale=1) | |
sentiment_pie_chart = gr.Image(label="Sentiment Distribution Pie Chart", scale=1) | |
with gr.Row(): | |
emotion_trend = gr.Image(label="Emotion Trend Over Time", scale=2) | |
emotion_heatmap = gr.Image(label="Emotion Heatmap", scale=1) | |
emotion_pie_chart = gr.Image(label="Emotion Distribution Pie Chart", scale=1) | |
with gr.Row(): | |
# Buttons for manual control | |
download_button = gr.Button("Download Report") | |
clear_button = gr.Button("Clear") | |
video_input.change(fn=process_video_gradio, inputs=video_input, outputs=[transcribed_text, | |
sentiment_trend, sentiment_heatmap, sentiment_pie_chart, | |
emotion_trend, emotion_heatmap, emotion_pie_chart | |
]) | |
# Add custom JavaScript to trigger play button after uploading | |
instructions = gr.HTML(""" | |
<script> | |
document.querySelector('input[type="file"]').addEventListener('change', function() { | |
var intervalId = setInterval(function() { | |
var videoPlayer = document.querySelector('video'); | |
if (videoPlayer) { | |
videoPlayer.play(); | |
clearInterval(intervalId); | |
} | |
}, 500); | |
}); | |
</script> | |
""") | |
# Link the button clicks to the functions that handle them | |
download_button.click(fn=download_report_function, inputs=[], outputs=gr.File()) | |
clear_button.click( | |
fn=clear_function, | |
inputs=[], | |
outputs=[video_input, transcribed_text, sentiment_trend, sentiment_heatmap, sentiment_pie_chart, emotion_trend, emotion_heatmap, emotion_pie_chart]) | |
iface.launch(inline=False, share=True) |