| import os |
| import subprocess |
| import cv2 |
| import json |
| import math |
| import torch |
| import librosa |
| import ffmpeg |
| import numpy as np |
| import soundfile as sf |
| import mediapipe as mp |
| from PIL import Image |
| from transformers import AutoImageProcessor, AutoModelForImageClassification, pipeline |
| from sentence_transformers import SentenceTransformer, CrossEncoder |
| from sklearn.metrics.pairwise import cosine_similarity |
| from mediapipe.tasks import python |
| from mediapipe.tasks.python import vision |
|
|
| |
| import warnings |
| warnings.filterwarnings("ignore", category=UserWarning) |
| warnings.filterwarnings("ignore", category=FutureWarning) |
|
|
| TONE_MAPPING = { |
| "Hesitant": 0, |
| "Confident": 1, |
| "Unstable": 2, |
| "Natural": 3, |
| "Excited": 3 |
| } |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| |
| MODEL_PATH = "face_landmarker.task" |
| if not os.path.exists(MODEL_PATH): |
| os.system(f"wget -O {MODEL_PATH} -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task") |
|
|
| |
| asr = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1) |
| semantic_model = SentenceTransformer("all-MiniLM-L6-v2") |
| cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") |
|
|
| FACE_MODEL_NAME = "dima806/facial_emotions_image_detection" |
| face_processor = AutoImageProcessor.from_pretrained(FACE_MODEL_NAME) |
| face_model = AutoModelForImageClassification.from_pretrained(FACE_MODEL_NAME).to(device).eval() |
|
|
| |
| emotion_va = { |
| "happy": (0.8, 0.2), "fear": (0.2, 0.8), "angry": (-0.7, 0.65), |
| "sad": (-0.65, -0.55), "surprise": (0.1, -0.75), "disgust": (0.6, -0.4), "neutral": (0.0, 0.0) |
| } |
| EMOTION_RING = [ |
| ("Happy", 0, 0.84), ("Surprise", 45, 0.84), ("Fear", 100, 0.84), |
| ("Sad", 160, 0.84), ("Disgust", 215, 0.84), ("Angry", 270, 0.84) |
| ] |
|
|
| |
|
|
| def normalize(v, mn, mx): |
| return np.clip((v - mn) / (mx - mn), 0, 1) if mx - mn != 0 else 0.0 |
|
|
| def extract_audio(v_in, a_out): |
| ffmpeg.input(v_in).output(a_out, ac=1, ar=16000).overwrite_output().run(quiet=True) |
|
|
| def merge_audio_video(v_in, a_in, v_out): |
| ffmpeg.output(ffmpeg.input(v_in).video, ffmpeg.input(a_in).audio, v_out, vcodec="libx264", acodec="aac").overwrite_output().run(quiet=True) |
|
|
| def draw_face_box(frame, x, y, w, h, emotion_name=""): |
| color, th, cl = (0, 255, 100), 2, 20 |
| cv2.rectangle(frame, (x, y), (x+w, y+h), color, 1) |
|
|
| |
| if emotion_name: |
| cv2.putText( |
| frame, |
| emotion_name.upper(), |
| (x + 10, y - 15), |
| cv2.FONT_HERSHEY_DUPLEX, |
| 0.7, |
| (0, 255, 100), |
| 2, |
| cv2.LINE_AA |
| ) |
|
|
| |
| for px, py, dx, dy in [(x,y,cl,0), (x,y,0,cl), (x+w,y,-cl,0), (x+w,y,0,cl), (x,y+h,cl,0), (x,y+h,0,-cl), (x+w,y+h,-cl,0), (x+w,y+h,0,-cl)]: |
| cv2.line(frame, (px, py), (px+dx, py+dy), color, 5) |
| return frame |
|
|
| def compute_eye_contact_ratio(frame, landmarks): |
| h, w, _ = frame.shape |
| def ear(idx): |
| p = [np.array([landmarks[i].x * w, landmarks[i].y * h]) for i in idx] |
| return (np.linalg.norm(p[1]-p[5]) + np.linalg.norm(p[2]-p[4])) / (2.0 * np.linalg.norm(p[0]-p[3])) |
| avg_ear = (ear([33, 160, 158, 133, 153, 144]) + ear([362, 385, 387, 263, 373, 380])) / 2.0 |
| return min(max(avg_ear * 3, 0), 1) |
|
|
| def analyze_face_emotion(frame): |
| img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| inputs = face_processor(images=img, return_tensors="pt").to(device) |
| with torch.no_grad(): |
| outputs = face_model(**inputs) |
| probs = torch.nn.functional.softmax(outputs.logits, dim=-1)[0] |
| return {face_model.config.id2label[i].lower(): float(probs[i]) for i in range(len(probs))} |
|
|
| |
|
|
| def extract_audio_features(y, sr): |
| duration = librosa.get_duration(y=y, sr=sr) |
| if duration == 0: |
| return {"pitch_std": 0, "jitter": 0, "energy_std": 0, "pause_ratio": 0, "speech_rate": 0} |
|
|
| |
| f0 = librosa.yin(y, fmin=75, fmax=300, sr=sr) |
| f0 = f0[~np.isnan(f0)] |
| pitch_std = np.std(f0) if len(f0) else 0 |
| jitter = np.mean(np.abs(np.diff(f0)) / np.maximum(f0[:-1], 1e-6)) if len(f0) > 1 else 0 |
|
|
| |
| rms = librosa.feature.rms(y=y)[0] |
| energy_std = np.std(rms) |
|
|
| intervals = librosa.effects.split(y, top_db=20) |
| speech_duration = sum((e - s) for s, e in intervals) / sr |
| pause_ratio = 1 - (speech_duration / duration) if duration > 0 else 0 |
|
|
| |
| oenv = librosa.onset.onset_strength(y=y, sr=sr) |
| onsets = librosa.onset.onset_detect(onset_envelope=oenv, sr=sr) |
| speech_rate = len(onsets) / duration if duration > 0 else 0 |
|
|
| return { |
| "pitch_std": pitch_std, |
| "jitter": jitter, |
| "energy_std": energy_std, |
| "pause_ratio": pause_ratio, |
| "speech_rate": speech_rate |
| } |
|
|
|
|
| def compute_audio_scores(features, baseline=None): |
| """ |
| Fairness-aware audio scoring with personal baseline comparison |
| """ |
| |
| if baseline is None: |
| baseline = {"pitch_std": 30.0, "energy_std": 0.05, "jitter": 0.02, "pause_ratio": 0.2, "speech_rate": 4.0} |
|
|
| |
| pitch_ratio = features["pitch_std"] / max(baseline["pitch_std"], 1e-6) |
| energy_ratio = features["energy_std"] / max(baseline["energy_std"], 1e-6) |
| rate_ratio = features["speech_rate"] / max(baseline["speech_rate"], 1e-6) |
|
|
| |
| pitch_dev = abs(1 - pitch_ratio) |
| energy_dev = abs(1 - energy_ratio) |
| stress_val = (pitch_dev * 0.4 + energy_dev * 0.4 + features["jitter"] * 0.2) * 150 |
| stress = np.clip(stress_val + 20, 0, 100) |
|
|
| |
| pause_dev = max(0, features["pause_ratio"] - baseline["pause_ratio"]) |
| clarity = 100 - (pause_dev * 120 + features["jitter"] * 400) |
|
|
| |
| rate_dev = abs(1 - rate_ratio) |
| confidence_audio = 100 - (rate_dev * 40 + energy_dev * 30 + features["pause_ratio"] * 50) |
|
|
| |
| tones = { |
| "Confident": confidence_audio, |
| "Hesitant": features["pause_ratio"] * 150, |
| "Excited": (energy_ratio - 1) * 100 if energy_ratio > 1 else 0, |
| "Unstable": stress, |
| "Natural": 100 - (pitch_dev * 60 + rate_dev * 40) |
| } |
|
|
| dominant_tone = max(tones, key=tones.get) |
|
|
| return { |
| "confidence_audio": round(float(np.clip(confidence_audio, 0, 100)), 2), |
| "clarity": round(float(np.clip(clarity, 0, 100)), 2), |
| "stress": round(float(np.clip(stress, 0, 100)), 2), |
| "pauses": round(float(features["pause_ratio"] * 100), 2), |
| "tone_of_voice": TONE_MAPPING.get(dominant_tone, 3) |
| } |
|
|
| def analyze_audio_segment(audio_path, baseline=None): |
| """ |
| Main entry point for audio segment analysis |
| """ |
| y, sr = librosa.load(audio_path, sr=16000) |
| features = extract_audio_features(y, sr) |
| return compute_audio_scores(features, baseline) |
|
|
|
|
| |
|
|
| def get_user_answer(audio_path): |
| """Transcribe audio using Whisper""" |
| result = asr(audio_path, chunk_length_s=20) |
| return result["text"].strip() |
|
|
|
|
| def compute_similarity_score(user_answer, ideal_answer): |
| emb = semantic_model.encode([user_answer, ideal_answer]) |
| sim = cosine_similarity([emb[0]], [emb[1]])[0][0] |
| score = float(sim * 100) |
| return round(max(0, score), 2) |
|
|
| def compute_relevance_score(question, user_answer): |
| raw_score = cross_encoder.predict([(question, user_answer)])[0] |
| prob = 1 / (1 + np.exp(-raw_score)) |
| score = float(prob * 100) |
| return round(max(0, score), 2) |
|
|
| |
|
|
| |
| LEFT_EYE = [33, 160, 158, 133, 153, 144] |
| RIGHT_EYE = [362, 385, 387, 263, 373, 380] |
|
|
| |
| def compute_eye_contact_ratio(frame, landmarks): |
| """ |
| Compute eye contact ratio from detected face landmarks |
| """ |
|
|
| if not landmarks: |
| return 0.5 |
|
|
| h, w, _ = frame.shape |
|
|
| def ear(indices): |
| points = [ |
| np.array([ |
| landmarks[i].x * w, |
| landmarks[i].y * h |
| ]) |
| for i in indices |
| ] |
|
|
| v1 = np.linalg.norm(points[1] - points[5]) |
| v2 = np.linalg.norm(points[2] - points[4]) |
| h_dist = np.linalg.norm(points[0] - points[3]) |
|
|
| return (v1 + v2) / (2.0 * h_dist) |
|
|
| ear_left = ear(LEFT_EYE) |
| ear_right = ear(RIGHT_EYE) |
|
|
| avg_ear = (ear_left + ear_right) / 2.0 |
|
|
| eye_score = min(max(avg_ear * 3, 0), 1) |
|
|
| return eye_score |
|
|
| def analyze_face_emotion(frame): |
| """ |
| Predict facial emotion probabilities from single frame |
| """ |
|
|
| |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| image = Image.fromarray(rgb) |
|
|
| |
| inputs = face_processor(images=image, return_tensors="pt").to(device) |
|
|
| with torch.no_grad(): |
| outputs = face_model(**inputs) |
|
|
| probs = torch.nn.functional.softmax(outputs.logits, dim=-1)[0] |
| labels = face_model.config.id2label |
|
|
| emotion_probs = { |
| labels[i].lower(): float(probs[i]) |
| for i in range(len(probs)) |
| } |
|
|
| return emotion_probs |
|
|
| def draw_face_box(frame, x, y, w, h, emotion_label="Neutral"): |
| """ |
| Draw face bounding box with emotion label above it |
| """ |
|
|
| |
| color = (0, 255, 0) |
|
|
| thickness = 2 |
| corner_len = 22 |
|
|
| |
| cv2.rectangle(frame, (x, y), (x+w, y+h), color, thickness) |
|
|
| |
| for (px, py, dx, dy) in [ |
| (x, y, corner_len, 0), (x, y, 0, corner_len), |
| (x+w, y, -corner_len, 0), (x+w, y, 0, corner_len), |
| (x, y+h, corner_len, 0), (x, y+h, 0, -corner_len), |
| (x+w, y+h, -corner_len, 0), (x+w, y+h, 0, -corner_len), |
| ]: |
| cv2.line(frame, (px, py), (px+dx, py+dy), color, 4) |
|
|
| |
| label_text = emotion_label.capitalize() |
|
|
| (tw, th), _ = cv2.getTextSize( |
| label_text, |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 0.7, |
| 2 |
| ) |
|
|
| text_x = x + (w - tw) // 2 |
| text_y = y - 10 |
|
|
| cv2.putText( |
| frame, |
| label_text, |
| (text_x, text_y), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 0.7, |
| (0, 255, 0), |
| 2, |
| cv2.LINE_AA |
| ) |
|
|
| return frame |
|
|
| def compute_valence_arousal_from_probs(emotion_probs): |
| """Computing Valence and Arousal from emotion probabilities""" |
| v, a, total = 0.0, 0.0, 0.0 |
|
|
| for emo, score in emotion_probs.items(): |
| emo = emo.lower() |
| if emo in emotion_va: |
| v += emotion_va[emo][0] * score |
| a += emotion_va[emo][1] * score |
| total += score |
|
|
| if total == 0: |
| return 0.0, 0.0 |
|
|
| return v / total, a / total |
|
|
| def draw_full_emotion_wheel(panel, center, radius, valence, arousal, |
| dominant_emotion="neutral"): |
| cx, cy = center |
|
|
| |
| cv2.circle(panel, center, radius + 5, (15, 15, 25), -1) |
| cv2.circle(panel, center, radius, (60, 60, 85), 2) |
| for rf in [0.33, 0.66]: |
| cv2.circle(panel, center, int(radius * rf), (35, 35, 50), 1) |
|
|
| |
| for angle_deg in range(0, 360, 60): |
| rad = math.radians(angle_deg) |
| x1 = int(cx + radius * math.cos(rad)) |
| y1 = int(cy - radius * math.sin(rad)) |
| cv2.line(panel, (cx, cy), (x1, y1), (40, 40, 60), 1) |
|
|
| |
| ef, es, et = cv2.FONT_HERSHEY_SIMPLEX, 0.40, 1 |
| for emotion_data in EMOTION_RING: |
| if emotion_data[1] is None: |
| continue |
|
|
| label, angle_deg, rf = emotion_data |
| rad = math.radians(angle_deg) |
| lx = int(cx + rf * radius * math.cos(rad)) |
| ly = int(cy - rf * radius * math.sin(rad)) |
| (tw, th), _ = cv2.getTextSize(label, ef, es, et) |
| tx, ty = lx - tw//2, ly + th//2 |
|
|
| |
| if label.lower() == dominant_emotion.lower(): |
| cv2.putText(panel, label, (tx, ty), ef, es+0.08, (0, 255, 200), 2, cv2.LINE_AA) |
| else: |
| cv2.putText(panel, label, (tx, ty), ef, es, (190, 190, 255), et, cv2.LINE_AA) |
|
|
| |
| nc = (0, 255, 200) if dominant_emotion == "neutral" else (160, 160, 160) |
| (tw, th), _ = cv2.getTextSize("Neutral", ef, es, et) |
| cv2.putText(panel, "Neutral", (cx-tw//2, cy+th//2), ef, es, nc, et, cv2.LINE_AA) |
|
|
| |
| dot_x = int(cx + valence * radius * 0.88) |
| dot_y = int(cy - arousal * radius * 0.88) |
| cv2.circle(panel, (dot_x, dot_y), 15, (160, 120, 0), -1) |
| cv2.circle(panel, (dot_x, dot_y), 11, (220, 180, 0), -1) |
| cv2.circle(panel, (dot_x, dot_y), 7, (255, 230, 60), -1) |
|
|
| return panel |
|
|
| BAR_CONFIGS = [ |
| ("Confidence", (70, 180, 255), (30, 50, 100)), |
| ("Clarity", (100, 220, 150), (25, 70, 50)), |
| ("Stress", (255, 120, 100), (100, 40, 30)), |
| ] |
|
|
| def draw_metric_bars(panel, |
| bars_x_start, |
| bar_y_top, |
| bar_height, |
| bar_width, |
| bar_gap, |
| confidence, |
| clarity, |
| stress): |
| """ |
| Draw horizontal metric bars with label above each bar |
| """ |
|
|
| values = [confidence, clarity, stress] |
| labels_list = ["Confidence", "Clarity", "Stress"] |
|
|
| |
| label_space = 20 |
|
|
| for i, value in enumerate(values): |
|
|
| label, fill_color, bg_color = BAR_CONFIGS[i] |
|
|
| |
| y = bar_y_top + i * (bar_height + label_space + bar_gap) |
|
|
| x_right = bars_x_start + bar_width |
|
|
| filled = int((value / 100) * bar_width) |
|
|
| |
| cv2.putText( |
| panel, |
| label, |
| (bars_x_start, y), |
| cv2.FONT_HERSHEY_DUPLEX, |
| 0.6, |
| (230, 230, 230), |
| 1, |
| cv2.LINE_AA |
| ) |
|
|
| |
| bar_y = y + 8 |
|
|
| |
| cv2.rectangle( |
| panel, |
| (bars_x_start, bar_y), |
| (x_right, bar_y + bar_height), |
| bg_color, |
| -1 |
| ) |
|
|
| |
| cv2.rectangle( |
| panel, |
| (bars_x_start, bar_y), |
| (bars_x_start + filled, bar_y + bar_height), |
| fill_color, |
| -1 |
| ) |
|
|
| |
| cv2.putText( |
| panel, |
| f"{int(value)}%", |
| (bars_x_start + 12, bar_y + bar_height - 6), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 0.6, |
| (255, 255, 255), |
| 2, |
| cv2.LINE_AA |
| ) |
|
|
| return panel |
|
|
| |
|
|
| def process_full_video(video_path, output_dir, questions_config, audio_results_map=None): |
|
|
| """ |
| Enhanced video processing with: |
| 1. Real-time (Live) Audio Metric Bars using a sliding window. |
| 2. Dynamic Emotion Wheel and Face Tracking. |
| 3. Auto-wrapping Question Text that avoids UI overlap. |
| """ |
| cap = cv2.VideoCapture(video_path) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| |
| full_audio, sr = librosa.load(video_path, sr=16000) |
|
|
| temp_output = os.path.join(output_dir, "annotated_full_raw.mp4") |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height)) |
|
|
| |
| base_options = python.BaseOptions(model_asset_path="face_landmarker.task") |
| options = vision.FaceLandmarkerOptions( |
| base_options=base_options, |
| running_mode=vision.RunningMode.VIDEO, |
| num_faces=1 |
| ) |
|
|
| frame_idx = 0 |
| smooth_v, smooth_a = 0.0, 0.0 |
| dom_emo = "neutral" |
| last_landmarks = None |
| |
| |
| live_scores = {"confidence_audio": 0.0, "clarity": 0.0, "stress": 0.0} |
| smoothing_factor = 0.15 |
|
|
| with vision.FaceLandmarker.create_from_options(options) as landmarker: |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| current_time = frame_idx / fps |
| active_answer = next((q for q in questions_config if q["start_time"] <= current_time <= q["end_time"]), None) |
| next_q = next((q for q in questions_config if current_time < q["start_time"]), None) |
| next_text = f"Q) {next_q['question_text']}" if next_q else "Preparing..." |
|
|
| |
| if frame_idx % 10 == 0: |
| |
| start_sample = max(0, int((current_time - 3) * sr)) |
| end_sample = int(current_time * sr) |
| audio_segment = full_audio[start_sample:end_sample] |
|
|
| if len(audio_segment) > sr * 0.5: |
| feats = extract_audio_features(audio_segment, sr) |
| |
| instant_scores = compute_audio_scores(feats, baseline=None) |
| |
| |
| live_scores["confidence_audio"] += smoothing_factor * (instant_scores["confidence_audio"] - live_scores["confidence_audio"]) |
| live_scores["clarity"] += smoothing_factor * (instant_scores["clarity"] - live_scores["clarity"]) |
| live_scores["stress"] += smoothing_factor * (instant_scores["stress"] - live_scores["stress"]) |
|
|
| |
| if frame_idx % 4 == 0: |
| mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| results = landmarker.detect_for_video(mp_image, int(current_time * 1000)) |
| |
| if results.face_landmarks: |
| last_landmarks = results.face_landmarks[0] |
| emo_probs = analyze_face_emotion(frame) |
| dom_emo = max(emo_probs, key=emo_probs.get) |
| v_target, a_target = compute_valence_arousal_from_probs(emo_probs) |
| smooth_v += 0.15 * (v_target - smooth_v) |
| smooth_a += 0.15 * (a_target - smooth_a) |
|
|
| |
| |
| if last_landmarks: |
| xs = [lm.x * width for lm in last_landmarks] |
| ys = [lm.y * height for lm in last_landmarks] |
| draw_face_box(frame, int(min(xs)), int(min(ys)), int(max(xs)-min(xs)), int(max(ys)-min(ys)), dom_emo) |
|
|
| |
| draw_full_emotion_wheel(frame, (width - 130, height - 100), 90, smooth_v, smooth_a, dom_emo) |
|
|
| |
| draw_metric_bars( |
| frame, 30, height - 160, 28, 200, 6, |
| live_scores["confidence_audio"], live_scores["clarity"], live_scores["stress"] |
| ) |
|
|
| |
| if not active_answer: |
| frame = draw_question_overlay(frame, next_text, width, height) |
|
|
| out.write(frame) |
| frame_idx += 1 |
|
|
| cap.release() |
| out.release() |
| return temp_output |
|
|
| def draw_question_overlay(frame, text, width, height): |
| """Draws a wrapped text box above the Wheel and Bars.""" |
| font = cv2.FONT_HERSHEY_DUPLEX |
| font_scale = 0.65 |
| thickness = 1 |
| side_margin = 50 |
| bottom_limit = height - 270 |
| line_height = 35 |
|
|
| |
| max_w = width - (2 * side_margin) |
| words = text.split(' ') |
| lines, current_line = [], "" |
| for word in words: |
| test = current_line + word + " " |
| (w, _), _ = cv2.getTextSize(test, font, font_scale, thickness) |
| if w < max_w: current_line = test |
| else: |
| lines.append(current_line) |
| current_line = word + " " |
| lines.append(current_line) |
|
|
| |
| rect_h = (len(lines) * line_height) + 20 |
| y2 = bottom_limit |
| y1 = y2 - rect_h |
|
|
| |
| overlay = frame.copy() |
| cv2.rectangle(overlay, (side_margin - 10, y1), (width - side_margin + 10, y2), (20, 20, 20), -1) |
| cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame) |
|
|
| |
| for i, line in enumerate(lines): |
| (tw, th), _ = cv2.getTextSize(line.strip(), font, font_scale, thickness) |
| tx = (width - tw) // 2 |
| ty = y1 + 25 + (i * line_height) |
| cv2.putText(frame, line.strip(), (tx, ty), font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA) |
| |
| return frame |
|
|
|
|
| |
| def run_intervision_pipeline(video_path, questions_config, output_dir): |
| """ |
| Run the full Intervision analysis pipeline. |
| |
| Steps: |
| 1. Extract baseline audio |
| 2. Run video annotation |
| 3. Merge annotated video with original audio |
| 4. Generate report |
| """ |
|
|
| os.makedirs(output_dir, exist_ok=True) |
|
|
| print("[PIPELINE] Starting pipeline") |
| print("[PIPELINE] Video path:", video_path) |
|
|
| |
| |
| |
|
|
| baseline_wav = os.path.join(output_dir, "baseline.wav") |
|
|
| print("[PIPELINE] Extracting baseline audio") |
|
|
| subprocess.run([ |
| "ffmpeg", |
| "-y", |
| "-i", video_path, |
| "-t", "10", |
| "-vn", |
| "-acodec", "pcm_s16le", |
| "-ar", "16000", |
| baseline_wav |
| ], check=True) |
|
|
| if not os.path.exists(baseline_wav): |
| raise Exception("Baseline audio extraction failed") |
|
|
| y_b, sr_b = librosa.load(baseline_wav, sr=16000) |
|
|
| baseline_features = extract_audio_features(y_b, sr_b) |
|
|
| |
| |
| |
|
|
| print("[PIPELINE] Running video annotation") |
|
|
| annotated_video_raw = process_full_video( |
| video_path, |
| output_dir, |
| questions_config |
| ) |
|
|
| if not os.path.exists(annotated_video_raw): |
| raise Exception("Annotated video was not generated") |
|
|
| |
| |
| |
|
|
| final_output = os.path.join( |
| output_dir, |
| "Intervision_Final_Report.mp4" |
| ) |
|
|
| print("[PIPELINE] Merging audio and annotated video") |
|
|
| subprocess.run([ |
| 'ffmpeg', '-y', |
| '-i', annotated_video_raw, |
| '-i', video_path, |
| '-map', '0:v:0', |
| '-map', '1:a:0', |
| '-c:v', 'libx264', |
| '-preset', 'veryfast', |
| '-crf', '23', |
| '-c:a', 'aac', |
| '-b:a', '160k', |
| '-shortest', |
| final_output |
| ], check=True) |
|
|
|
|
| if not os.path.exists(final_output): |
| raise Exception("Final video merge failed") |
|
|
| print("[PIPELINE] Final video created:", final_output) |
|
|
| |
| |
| |
|
|
| report = { |
| "status": "completed", |
| "questionsAnalyzed": len(questions_config) |
| } |
|
|
| report_path = os.path.join(output_dir, "report.json") |
|
|
| with open(report_path, "w") as f: |
| json.dump(report, f, indent=2) |
|
|
| print("[PIPELINE] Report saved:", report_path) |
|
|
| return final_output, report_path |
|
|