|
|
import torch |
|
|
from models import MultimodalSentimentModel |
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import subprocess |
|
|
import torchaudio |
|
|
from transformers import AutoTokenizer |
|
|
import whisper |
|
|
import sys |
|
|
|
|
|
EMOTION_MAP = {0: "anger", 1: "disgust", 2: "fear", |
|
|
3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"} |
|
|
SENTIMENT_MAP = {0: "negative", 1: "neutral", 2: "positive"} |
|
|
|
|
|
|
|
|
def install_ffmpeg(): |
|
|
print("Starting Ffmpeg installation...") |
|
|
|
|
|
subprocess.check_call([sys.executable, "-m", "pip", |
|
|
"install", "--upgrade", "pip"]) |
|
|
|
|
|
subprocess.check_call([sys.executable, "-m", "pip", |
|
|
"install", "--upgrade", "setuptools"]) |
|
|
|
|
|
try: |
|
|
subprocess.check_call([sys.executable, "-m", "pip", |
|
|
"install", "ffmpeg-python"]) |
|
|
print("Installed ffmpeg-python successfully") |
|
|
except subprocess.CalledProcessError as e: |
|
|
print("Failed to install ffmpeg-python via pip") |
|
|
|
|
|
try: |
|
|
subprocess.check_call([ |
|
|
"wget", |
|
|
"https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz", |
|
|
"-O", "/tmp/ffmpeg.tar.xz" |
|
|
]) |
|
|
|
|
|
subprocess.check_call([ |
|
|
"tar", "-xf", "/tmp/ffmpeg.tar.xz", "-C", "/tmp/" |
|
|
]) |
|
|
|
|
|
result = subprocess.run( |
|
|
["find", "/tmp", "-name", "ffmpeg", "-type", "f"], |
|
|
capture_output=True, |
|
|
text=True |
|
|
) |
|
|
ffmpeg_path = result.stdout.strip() |
|
|
|
|
|
subprocess.check_call(["cp", ffmpeg_path, "/usr/local/bin/ffmpeg"]) |
|
|
|
|
|
subprocess.check_call(["chmod", "+x", "/usr/local/bin/ffmpeg"]) |
|
|
|
|
|
print("Installed static FFmpeg binary successfully") |
|
|
except Exception as e: |
|
|
print(f"Failed to install static FFmpeg: {e}") |
|
|
|
|
|
try: |
|
|
result = subprocess.run(["ffmpeg", "-version"], |
|
|
capture_output=True, text=True, check=True) |
|
|
print("FFmpeg version:") |
|
|
print(result.stdout) |
|
|
return True |
|
|
except (subprocess.CalledProcessError, FileNotFoundError): |
|
|
print("FFmpeg installation verification failed") |
|
|
return False |
|
|
|
|
|
class VideoProcessor: |
|
|
def process_video(self, video_path): |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
frames = [] |
|
|
|
|
|
try: |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Video not found: {video_path}") |
|
|
|
|
|
|
|
|
ret, frame = cap.read() |
|
|
if not ret or frame is None: |
|
|
raise ValueError(f"Video not found: {video_path}") |
|
|
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|
|
|
|
|
while len(frames) < 30 and cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
frame = cv2.resize(frame, (224, 224)) |
|
|
frame = frame / 255.0 |
|
|
frames.append(frame) |
|
|
|
|
|
except Exception as e: |
|
|
raise ValueError(f"Video error: {str(e)}") |
|
|
finally: |
|
|
cap.release() |
|
|
|
|
|
if (len(frames) == 0): |
|
|
raise ValueError("No frames could be extracted") |
|
|
|
|
|
|
|
|
if len(frames) < 30: |
|
|
frames += [np.zeros_like(frames[0])] * (30 - len(frames)) |
|
|
else: |
|
|
frames = frames[:30] |
|
|
|
|
|
|
|
|
|
|
|
return torch.FloatTensor(np.array(frames)).permute(0, 3, 1, 2) |
|
|
|
|
|
|
|
|
class AudioProcessor: |
|
|
def extract_features(self, video_path, max_length=300): |
|
|
audio_path = video_path.replace('.mp4', '.wav') |
|
|
|
|
|
try: |
|
|
subprocess.run([ |
|
|
'ffmpeg', |
|
|
'-i', video_path, |
|
|
'-vn', |
|
|
'-acodec', 'pcm_s16le', |
|
|
'-ar', '16000', |
|
|
'-ac', '1', |
|
|
audio_path |
|
|
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
|
waveform, sample_rate = torchaudio.load(audio_path) |
|
|
|
|
|
if sample_rate != 16000: |
|
|
resampler = torchaudio.transforms.Resample(sample_rate, 16000) |
|
|
waveform = resampler(waveform) |
|
|
|
|
|
mel_spectrogram = torchaudio.transforms.MelSpectrogram( |
|
|
sample_rate=16000, |
|
|
n_mels=64, |
|
|
n_fft=1024, |
|
|
hop_length=512 |
|
|
) |
|
|
|
|
|
mel_spec = mel_spectrogram(waveform) |
|
|
|
|
|
|
|
|
mel_spec = (mel_spec - mel_spec.mean()) / mel_spec.std() |
|
|
|
|
|
if mel_spec.size(2) < 300: |
|
|
padding = 300 - mel_spec.size(2) |
|
|
mel_spec = torch.nn.functional.pad(mel_spec, (0, padding)) |
|
|
else: |
|
|
mel_spec = mel_spec[:, :, :300] |
|
|
|
|
|
return mel_spec |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
raise ValueError(f"Audio extraction error: {str(e)}") |
|
|
except Exception as e: |
|
|
raise ValueError(f"Audio error: {str(e)}") |
|
|
finally: |
|
|
if os.path.exists(audio_path): |
|
|
os.remove(audio_path) |
|
|
|
|
|
|
|
|
class VideoUtteranceProcessor: |
|
|
def __init__(self): |
|
|
self.video_processor = VideoProcessor() |
|
|
self.audio_processor = AudioProcessor() |
|
|
|
|
|
def extract_segment(self, video_path, start_time, end_time, temp_dir="/tmp"): |
|
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
segment_path = os.path.join( |
|
|
temp_dir, f"segment_{start_time}_{end_time}.mp4") |
|
|
|
|
|
subprocess.run([ |
|
|
"ffmpeg", "-i", video_path, |
|
|
"-ss", str(start_time), |
|
|
"-to", str(end_time), |
|
|
"-c:v", "libx264", |
|
|
"-c:a", "aac", |
|
|
"-y", |
|
|
segment_path |
|
|
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
|
if not os.path.exists(segment_path) or os.path.getsize(segment_path) == 0: |
|
|
raise ValueError("Segment extraction failed: " + segment_path) |
|
|
|
|
|
return segment_path |
|
|
|
|
|
|
|
|
def model_fn(model_dir): |
|
|
|
|
|
if not install_ffmpeg(): |
|
|
raise RuntimeError( |
|
|
"FFmpeg installation failed - required for inference") |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model = MultimodalSentimentModel().to(device) |
|
|
|
|
|
model_path = os.path.join(model_dir, 'model.pth') |
|
|
if not os.path.exists(model_path): |
|
|
model_path = os.path.join(model_dir, "saved_models", 'checkpoint.pth') |
|
|
if not os.path.exists(model_path): |
|
|
raise FileNotFoundError( |
|
|
"Model file not found in path " + model_path) |
|
|
|
|
|
print("Loading model from path: " + model_path) |
|
|
model.load_state_dict(torch.load( |
|
|
model_path, map_location=device, weights_only=True)) |
|
|
model.eval() |
|
|
|
|
|
return { |
|
|
'model': model, |
|
|
'tokenizer': AutoTokenizer.from_pretrained('bert-base-uncased'), |
|
|
'transcriber': whisper.load_model( |
|
|
"base", |
|
|
device="cpu" if device.type == "cpu" else device, |
|
|
), |
|
|
'device': device |
|
|
} |
|
|
|
|
|
|
|
|
def predict_fn(input_data, model_dict): |
|
|
model = model_dict['model'] |
|
|
tokenizer = model_dict['tokenizer'] |
|
|
device = model_dict['device'] |
|
|
video_path = input_data['video_path'] |
|
|
|
|
|
result = model_dict['transcriber'].transcribe( |
|
|
video_path, word_timestamps=True) |
|
|
|
|
|
utterance_processor = VideoUtteranceProcessor() |
|
|
predictions = [] |
|
|
|
|
|
for segment in result["segments"]: |
|
|
try: |
|
|
segment_path = utterance_processor.extract_segment( |
|
|
video_path, |
|
|
segment["start"], |
|
|
segment["end"] |
|
|
) |
|
|
|
|
|
video_frames = utterance_processor.video_processor.process_video( |
|
|
segment_path) |
|
|
audio_features = utterance_processor.audio_processor.extract_features( |
|
|
segment_path) |
|
|
text_inputs = tokenizer( |
|
|
segment["text"], |
|
|
padding="max_length", |
|
|
truncation=True, |
|
|
max_length=128, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
|
|
|
text_inputs = {k: v.to(device) for k, v in text_inputs.items()} |
|
|
video_frames = video_frames.unsqueeze(0).to(device) |
|
|
audio_features = audio_features.unsqueeze(0).to(device) |
|
|
|
|
|
|
|
|
with torch.inference_mode(): |
|
|
outputs = model(text_inputs, video_frames, audio_features) |
|
|
emotion_probs = torch.softmax(outputs["emotions"], dim=1)[0] |
|
|
sentiment_probs = torch.softmax( |
|
|
outputs["sentiments"], dim=1)[0] |
|
|
|
|
|
emotion_values, emotion_indices = torch.topk(emotion_probs, 3) |
|
|
sentiment_values, sentiment_indices = torch.topk( |
|
|
sentiment_probs, 3) |
|
|
|
|
|
predictions.append({ |
|
|
"start_time": segment["start"], |
|
|
"end_time": segment["end"], |
|
|
"text": segment["text"], |
|
|
"emotions": [ |
|
|
{"label": EMOTION_MAP[idx.item()], "confidence": conf.item()} for idx, conf in zip(emotion_indices, emotion_values) |
|
|
], |
|
|
"sentiments": [ |
|
|
{"label": SENTIMENT_MAP[idx.item()], "confidence": conf.item()} for idx, conf in zip(sentiment_indices, sentiment_values) |
|
|
] |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print("Segment failed inference: " + str(e)) |
|
|
|
|
|
finally: |
|
|
|
|
|
if os.path.exists(segment_path): |
|
|
os.remove(segment_path) |
|
|
return {"utterances": predictions} |
|
|
|
|
|
|
|
|
def process_local_video(video_path, model_dir="."): |
|
|
model_dict = model_fn(model_dir) |
|
|
|
|
|
input_data = {'video_path': video_path} |
|
|
|
|
|
predictions = predict_fn(input_data, model_dict) |
|
|
|
|
|
for utterance in predictions["utterances"]: |
|
|
print("\nUtterance:") |
|
|
print(f"""Start: {utterance['start_time']}s, End: { |
|
|
utterance['end_time']}s""") |
|
|
print(f"Text: {utterance['text']}") |
|
|
print("\n Top Emotions:") |
|
|
for emotion in utterance['emotions']: |
|
|
print(f"{emotion['label']}: {emotion['confidence']:.2f}") |
|
|
print("\n Top Sentiments:") |
|
|
for sentiment in utterance['sentiments']: |
|
|
print(f"{sentiment['label']}: {sentiment['confidence']:.2f}") |
|
|
print("-"*50) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
process_local_video("./dia2_utt3.mp4") |