import gradio as gr import pandas as pd import cv2 import torch import tempfile import os import librosa from fer import FER from transformers import AutoModelForAudioClassification, pipeline from moviepy.editor import VideoFileClip, AudioFileClip import numpy as np from torch.nn.functional import softmax import whisper_timestamped as whisper from translate import Translator # Load pre-trained models audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True) face_detector = FER(mtcnn=True) classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None) # Set mean and std for audio model mean = audio_model.config.mean std = audio_model.config.std # Function to extract audio from video for audio emotion analysis def extract_audio_from_video(video_path): with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: video_clip = VideoFileClip(video_path) audio_clip = video_clip.audio audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le") return temp_audio_file.name # Function to perform audio emotion detection per second def process_audio_and_detect_emotions(audio_clip): audio_np = np.array(audio_clip) mask = torch.ones(1, len(audio_np)) wavs = torch.tensor(audio_np).unsqueeze(0) with torch.no_grad(): pred = audio_model(wavs, mask) logits = pred.logits if hasattr(pred, 'logits') else pred[0] labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'} probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]] probabilities = probabilities / probabilities.sum() df = pd.DataFrame([probabilities.numpy()], columns=labels.values()) return df # Function to analyze audio emotions def analyze_audio_emotions(video_path): temp_audio_path = None try: temp_audio_path = extract_audio_from_video(video_path) raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate) norm_wav = (raw_wav - mean) / (std + 0.000001) times = [] emotions_dfs = [] for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate): audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate] df = process_audio_and_detect_emotions(audio_segment) times.append(start_time / audio_model.config.sampling_rate) emotions_dfs.append(df) emotions_df = pd.concat(emotions_dfs, ignore_index=True) emotions_df.insert(0, "Time(s)", times) emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'} emotions_df.rename(columns=emotion_rename_map, inplace=True) emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name emotions_df.to_excel(emotions_xlsx_path, index=False) return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path except Exception as e: return f"Error during audio emotion detection: {str(e)}", None, None finally: if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) # Function to detect facial emotions def detect_faces_and_emotions(video_path): temp_video_path = None temp_audio_path = None output_video_path = None emotions_data = [] try: temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) temp_video_path = temp_video.name temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) temp_audio_path = temp_audio.name output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) output_xlsx_path = output_xlsx.name original_video = VideoFileClip(video_path) original_audio = original_video.audio original_audio.write_audiofile(temp_audio_path) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Error: Could not open video file.") fps = int(cap.get(cv2.CAP_PROP_FPS)) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height)) frame_number = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame is None: continue time_seconds = round(frame_number / fps) result = face_detector.detect_emotions(frame) for face in result: bounding_box = face["box"] emotions = face["emotions"] emotions["Time(s)"] = time_seconds emotions_data.append(emotions) cv2.rectangle(frame, (bounding_box[0], bounding_box[1]), (bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2) for index, (emotion_name, score) in enumerate(emotions.items()): color = (211, 211, 211) if score < 0.01 else (255, 0, 0) emotion_score = "{}: {:.2f}".format(emotion_name, score) cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA) out.write(frame) frame_number += 1 cap.release() out.release() emotions_df = pd.DataFrame(emotions_data) emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int) max_time = emotions_df['Time(s)'].max() all_times = pd.DataFrame({'Time(s)': range(max_time + 1)}) avg_scores = emotions_df.groupby("Time(s)").mean().reset_index() df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left') df_merged.fillna(0, inplace=True) df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec" df_merged.to_excel(output_xlsx_path, index=False) processed_video = VideoFileClip(temp_video_path) audio = AudioFileClip(temp_audio_path) final_video = processed_video.set_audio(audio) output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) output_video_path = output_video.name final_video.write_videofile(output_video_path, codec='libx264') return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path except Exception as e: return f"Error during processing: {str(e)}", None, None, None finally: if temp_video_path and os.path.exists(temp_video_path): os.remove(temp_video_path) if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) # Function to analyze text emotions def process_video_text(video_path): temp_audio_path = None try: video_clip = VideoFileClip(video_path) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: temp_audio_path = temp_audio_file.name video_clip.audio.write_audiofile(temp_audio_path) audio = whisper.load_audio(temp_audio_path) model = whisper.load_model("medium", device="cpu") result = whisper.transcribe(model, audio) # Create lists to store word-level data with timestamps word_texts = [] word_starts = [] word_ends = [] word_confidences = [] for segment in result['segments']: for word in segment['words']: word_texts.append(word['text']) word_starts.append(word['start']) word_ends.append(word['end']) word_confidences.append(word['confidence']) # Create segments DataFrame segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']] segments_df = pd.DataFrame(segments_data) # Translate from Korean to English translator = Translator(from_lang='ko', to_lang='en') segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x)) # Apply the sentiment analysis model to the translated text segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]}) # Split the sentiment scores into individual columns sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series) sentiment_df = pd.concat([segments_df, sentiment_df], axis=1) # Create words DataFrame words_data = { 'text': word_texts, 'start': word_starts, 'end': word_ends, 'confidence': word_confidences } words_df = pd.DataFrame(words_data) # Round up the start time to the next second words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x))) # Group words by second, concatenating words that belong to the same second words_grouped = words_df.groupby('second').agg({ 'text': lambda x: ' '.join(x), 'start': 'min', 'end': 'max', 'confidence': 'mean' }).reset_index() # Fill in missing seconds max_second = int(video_clip.duration) # The last second in the video all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) # Start from 0 and go to the maximum second words_grouped = all_seconds.merge(words_grouped, on='second', how='left') # Fill missing values with blanks or zeros words_grouped['text'].fillna('', inplace=True) words_grouped.fillna(0, inplace=True) # Initialize emotion columns with NaN values emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores']) for col in emotion_columns: words_grouped[col] = np.nan # For each second, find the corresponding segment and copy its emotion scores for i, row in words_grouped.iterrows(): matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])] if not matching_segment.empty: for emotion in emotion_columns: words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion] # Replace any NaN values in emotion columns with 0 words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0) # Save DataFrames to XLSX files segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name sentiment_df.to_excel(segments_xlsx_path, index=False) words_grouped.to_excel(words_xlsx_path, index=False) return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!" except Exception as e: return None, None, None, None, f"Error during text emotion processing: {str(e)}" finally: if temp_audio_path and os.path.exists(temp_audio_path): os.remove(temp_audio_path) # Gradio App def gradio_app(): interface = gr.Blocks() with interface: gr.Markdown("## I-MEQ: Emotion Monitoring System") video_input = gr.Video(label="Upload your video for analysis", height=600) with gr.Row(): analyze_audio_button = gr.Button("Analyze Audio Emotions") analyze_fer_button = gr.Button("Analyze Facial Emotions") analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions") with gr.Row(): with gr.Column(): audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status") audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False) audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX") with gr.Column(): fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status") fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False) fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX") processed_video_download = gr.File(label="Download Processed Video") with gr.Column(): text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status") words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False) segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False) words_xlsx_download = gr.File(label="Download Words XLSX") segments_xlsx_download = gr.File(label="Download Segments XLSX") analyze_audio_button.click( analyze_audio_emotions, inputs=video_input, outputs=[ audio_analysis_status, audio_emotions_dataframe, audio_emotions_xlsx_download ] ) analyze_fer_button.click( detect_faces_and_emotions, inputs=video_input, outputs=[ fer_analysis_status, fer_emotions_dataframe, fer_emotions_xlsx_download, processed_video_download ] ) analyze_text_button.click( process_video_text, inputs=video_input, outputs=[ words_dataframe, segments_dataframe, words_xlsx_download, segments_xlsx_download, text_analysis_status ] ) interface.launch() # Start the Gradio app gradio_app()