import math import os from io import BytesIO import gradio as gr import cv2 from PIL import Image import requests from transformers import pipeline from pydub import AudioSegment from faster_whisper import WhisperModel import joblib import mediapipe as mp import numpy as np import pandas as pd import moviepy.editor as mpe import time theme = gr.themes.Base( primary_hue="cyan", secondary_hue="blue", neutral_hue="slate", ) model = WhisperModel("small", device="cpu", compute_type="int8") body_lang_model = joblib.load('body_language.pkl') mp_holistic = mp.solutions.holistic holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5) API_KEY = os.getenv('HF_API_KEY') pipe1 = pipeline("image-classification", model="dima806/facial_emotions_image_detection") pipe2 = pipeline("text-classification", model="SamLowe/roberta-base-go_emotions") AUDIO_API_URL = "https://api-inference.huggingface.co/models/ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition" headers = {"Authorization": "Bearer " + API_KEY + ""} def extract_frames(video_path): clip = mpe.VideoFileClip(video_path) clip.write_videofile('mp4file.mp4', fps=60) cap = cv2.VideoCapture('mp4file.mp4') fps = int(cap.get(cv2.CAP_PROP_FPS)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) interval = int(fps/2) print(interval, total_frames) result = [] distract_count = 0 total_count = 0 output_list = [] for i in range(0, total_frames, interval): total_count += 1 cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if ret: image = cv2.cvtColor(cv2.flip(frame, 1), cv2.COLOR_BGR2RGB) image.flags.writeable = False results = face_mesh.process(image) image.flags.writeable = True image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) img_h, img_w, img_c = image.shape face_3d = [] face_2d = [] flag = False if results.multi_face_landmarks: for face_landmarks in results.multi_face_landmarks: for idx, lm in enumerate(face_landmarks.landmark): if idx == 33 or idx == 263 or idx == 1 or idx == 61 or idx == 291 or idx == 199: if idx == 1: nose_2d = (lm.x * img_w, lm.y * img_h) nose_3d = (lm.x * img_w, lm.y * img_h, lm.z * 3000) x, y = int(lm.x * img_w), int(lm.y * img_h) face_2d.append([x, y]) face_3d.append([x, y, lm.z]) face_2d = np.array(face_2d, dtype=np.float64) face_3d = np.array(face_3d, dtype=np.float64) focal_length = 1 * img_w cam_matrix = np.array([ [focal_length, 0, img_h / 2], [0, focal_length, img_w / 2], [0, 0, 1]]) dist_matrix = np.zeros((4, 1), dtype=np.float64) success, rot_vec, trans_vec = cv2.solvePnP(face_3d, face_2d, cam_matrix, dist_matrix) rmat, jac = cv2.Rodrigues(rot_vec) angles, mtxR, mtxQ, Qx, Qy, Qz = cv2.RQDecomp3x3(rmat) x = angles[0] * 360 y = angles[1] * 360 z = angles[2] * 360 if y < -7 or y > 7 or x < -7 or x > 7: flag = True else: flag = False if flag == True: distract_count += 1 image2 = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results2 = holistic.process(image2) pose = results2.pose_landmarks.landmark pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten()) face = results2.face_landmarks.landmark face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten()) row = pose_row+face_row X = pd.DataFrame([row]) body_language_class = body_lang_model.predict(X)[0] body_language_prob = body_lang_model.predict_proba(X)[0] output_dict = {} for class_name, prob in zip(body_lang_model.classes_, body_language_prob): output_dict[class_name] = prob output_list.append(output_dict) pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) response = pipe1(pil_image) temp = {} for ele in response: label, score = ele.values() temp[label] = score result.append(temp) distraction_rate = distract_count/total_count total_bad_prob = 0 total_good_prob = 0 for output_dict in output_list: total_bad_prob += output_dict['Bad'] total_good_prob += output_dict['Good'] num_frames = len(output_list) avg_bad_prob = total_bad_prob / num_frames avg_good_prob = total_good_prob / num_frames final_output = {'Bad': avg_bad_prob, 'Good': avg_good_prob} cap.release() video_emotion_totals = {} emotion_totals = { 'admiration': 0.0, 'amusement': 0.0, 'angry': 0.0, 'annoyance': 0.0, 'approval': 0.0, 'caring': 0.0, 'confusion': 0.0, 'curiosity': 0.0, 'desire': 0.0, 'disappointment': 0.0, 'disapproval': 0.0, 'disgust': 0.0, 'embarrassment': 0.0, 'excitement': 0.0, 'fear': 0.0, 'gratitude': 0.0, 'grief': 0.0, 'happy': 0.0, 'love': 0.0, 'nervousness': 0.0, 'optimism': 0.0, 'pride': 0.0, 'realization': 0.0, 'relief': 0.0, 'remorse': 0.0, 'sad': 0.0, 'surprise': 0.0, 'neutral': 0.0 } counter = 0 for ele in result: for emotion in ele.keys(): emotion_totals[emotion] += ele.get(emotion) counter += 1 for emotion in emotion_totals: emotion_totals[emotion] /= counter if (emotion_totals[emotion]) > 0.0: video_emotion_totals[emotion] = emotion_totals[emotion] return video_emotion_totals, result, final_output, distraction_rate def analyze_sentiment(text): response = pipe2(text) sentiment_results = {} for ele in response: label, score = ele.values() sentiment_results[label] = score return sentiment_results def video_to_audio(input_video): temp = requests.get('https://parthcodes-flask-tester.hf.space/useridping') user_id = temp.json()['current'] print(user_id) video_emotion_totals, frames_sentiments, body_language, distraction_rate = extract_frames(input_video) print("Total Video Emotions ... Done") print("Video Frame Sentiment ... Done") print("Body Language ... Done") print("Distraction Rate ... Done") cap = cv2.VideoCapture(input_video) fps = int(cap.get(cv2.CAP_PROP_FPS)) audio = AudioSegment.from_file(input_video) audio_binary = audio.export(format="wav").read() audio_bytesio2 = BytesIO(audio_binary) flag = False while flag == False: audio_bytesio = BytesIO(audio_binary) response = requests.post(AUDIO_API_URL, headers=headers, data=audio_bytesio) if type(response.json()) == type({}): print(response.json()) time.sleep(30) print("Retrying for Speech Emotions") else: flag = True formatted_response = {} for ele in response.json(): score, label = ele.values() formatted_response[label] = score print("Speech Sentiments ... Done") segments, info = model.transcribe(audio_bytesio2, beam_size=5) transcript = '' video_sentiment_final = [] final_output = [] for segment in segments: transcript = transcript + segment.text + " " transcript_segment_sentiment = analyze_sentiment(segment.text) emotion_totals = { 'admiration': 0.0, 'amusement': 0.0, 'angry': 0.0, 'annoyance': 0.0, 'approval': 0.0, 'caring': 0.0, 'confusion': 0.0, 'curiosity': 0.0, 'desire': 0.0, 'disappointment': 0.0, 'disapproval': 0.0, 'disgust': 0.0, 'embarrassment': 0.0, 'excitement': 0.0, 'fear': 0.0, 'gratitude': 0.0, 'grief': 0.0, 'happy': 0.0, 'love': 0.0, 'nervousness': 0.0, 'optimism': 0.0, 'pride': 0.0, 'realization': 0.0, 'relief': 0.0, 'remorse': 0.0, 'sad': 0.0, 'surprise': 0.0, 'neutral': 0.0 } counter = 0 for i in range(math.ceil(segment.start), math.floor(segment.end)): for emotion in frames_sentiments[i].keys(): emotion_totals[emotion] += frames_sentiments[i].get(emotion) counter += 1 for emotion in emotion_totals: emotion_totals[emotion] /= counter video_sentiment_final.append(emotion_totals) video_segment_sentiment = {key: value for key, value in emotion_totals.items() if value != 0.0} segment_finals = {segment.id: (segment.text, segment.start, segment.end, transcript_segment_sentiment, video_segment_sentiment)} final_output.append(segment_finals) total_transcript_sentiment = {key: value for key, value in analyze_sentiment(transcript).items() if value >= 0.01} print("Full Transcript Sentiments ... Done") emotion_finals = { 'admiration': 0.0, 'amusement': 0.0, 'angry': 0.0, 'annoyance': 0.0, 'approval': 0.0, 'caring': 0.0, 'confusion': 0.0, 'curiosity': 0.0, 'desire': 0.0, 'disappointment': 0.0, 'disapproval': 0.0, 'disgust': 0.0, 'embarrassment': 0.0, 'excitement': 0.0, 'fear': 0.0, 'gratitude': 0.0, 'grief': 0.0, 'happy': 0.0, 'love': 0.0, 'nervousness': 0.0, 'optimism': 0.0, 'pride': 0.0, 'realization': 0.0, 'relief': 0.0, 'remorse': 0.0, 'sad': 0.0, 'surprise': 0.0, 'neutral': 0.0 } for i in range(0, video_sentiment_final.__len__()-1): for emotion in video_sentiment_final[i].keys(): emotion_finals[emotion] += video_sentiment_final[i].get(emotion) for emotion in emotion_finals: emotion_finals[emotion] /= video_sentiment_final.__len__() emotion_finals = {key: value for key, value in emotion_finals.items() if value != 0.0} print("Video Frame (Mapping & AVG.) ... Done") print("\nProcessing Completed!!\n") payload = { 'from': 'gradio', 'user_id': user_id, 'total_video_emotions': video_emotion_totals, 'emotions_final': emotion_finals, 'body_language': body_language, 'distraction_rate': distraction_rate, 'formatted_response': formatted_response, 'total_transcript_sentiment': total_transcript_sentiment } print(payload) response = requests.post('https://parthcodes-test-flask-deploy.hf.space/interview', json=payload) with gr.Blocks(theme=theme, css=".gradio-container { background: rgba(0, 0, 0, 0.4) !important; box-shadow: 0 8px 32px 0 rgba( 31, 38, 135, 0.37 ) !important; backdrop-filter: blur( 10px ) !important; -webkit-backdrop-filter: blur( 10px ) !important; border-radius: 12px !important;}") as Video: input_video = gr.Video(sources=["webcam", "upload"], format='mp4') input_video.stop_recording(fn=video_to_audio, inputs=input_video) input_video.upload(fn=video_to_audio, inputs=input_video) Video.launch()