|
import os |
|
import cv2 |
|
import pickle |
|
import numpy as np |
|
from keras.models import load_model |
|
from moviepy.editor import VideoFileClip |
|
import librosa |
|
from sklearn.preprocessing import LabelEncoder |
|
import xml.etree.ElementTree as ET |
|
import random |
|
|
|
class VideoAudioFeatureExtractor: |
|
def __init__(self, video_path, output_path): |
|
self.video_path = video_path |
|
self.output_path = output_path |
|
self.num_frames = 15 |
|
self.height = 224 |
|
self.width = 224 |
|
self.channels = 6 |
|
self.audio_feature_dim = 20 |
|
self.num_actions = 3 |
|
self.classification_model = load_model("EdAi-gamestyle.h5") |
|
with open("label_encoder.pkl", "rb") as f: |
|
self.label_encoder = pickle.load(f) |
|
with open("clip_items.pkl", "rb") as f: |
|
self.all_clip_items = pickle.load(f) |
|
|
|
def generate_clipitem_id(self, video_name, num_digits=10): |
|
random_number = ''.join(random.choices('0123456789', k=num_digits)) |
|
return f"{video_name}{random_number}" |
|
|
|
def combine_features(self, video_features, audio_features): |
|
audio_features_resized = cv2.resize(audio_features, (self.height, self.width)) |
|
audio_features_reshaped = np.repeat(audio_features_resized[:, :, np.newaxis], 3, axis=2) |
|
audio_features_reshaped = np.repeat(audio_features_reshaped[np.newaxis, :, :, :], self.num_frames, axis=0) |
|
combined_features = np.concatenate([video_features, audio_features_reshaped], axis=-1) |
|
return combined_features |
|
|
|
def extract_audio(self, video_path): |
|
clip = VideoFileClip(video_path) |
|
audio = clip.audio |
|
audio_signal = audio.to_soundarray(fps=48000) |
|
audio_signal = audio_signal.astype(np.float32) |
|
if len(audio_signal.shape) == 2: |
|
audio_signal = librosa.to_mono(audio_signal.T) |
|
return audio_signal |
|
|
|
def process_audio(self, audio_signal_segment): |
|
mfccs = librosa.feature.mfcc(y=audio_signal_segment, sr=48000, n_mfcc=self.audio_feature_dim) |
|
mfccs_fixed_length = librosa.util.fix_length(mfccs, size=self.num_frames) |
|
return mfccs_fixed_length.T |
|
|
|
def preprocess_frame(self, frame): |
|
frame_resized = cv2.resize(frame, (self.width, self.height)) |
|
frame_normalized = (frame_resized / 255.0).astype(np.float32) |
|
return frame_normalized |
|
|
|
def extract_clip(self, video, audio_features, decision): |
|
start_frame = int(decision["in"]) |
|
end_frame = int(decision["out"]) |
|
video.set(cv2.CAP_PROP_POS_FRAMES, start_frame) |
|
clip_frames = [] |
|
for i in range(start_frame, end_frame, 2): |
|
ret, frame = video.read() |
|
if not ret: |
|
break |
|
frame_processed = self.preprocess_frame(frame) |
|
clip_frames.append(frame_processed) |
|
while len(clip_frames) < self.num_frames: |
|
clip_frames.append(np.zeros((self.height, self.width, 3), dtype=np.float32)) |
|
clip_frames = np.array(clip_frames[:self.num_frames]) |
|
return clip_frames, audio_features |
|
|
|
def create_audio_track(self, clip_item_ids, pan_value, link_ids, video_clip_items, video_file_elements): |
|
track = ET.Element("track") |
|
for idx, clipitem_id in enumerate(clip_item_ids): |
|
clipitem_element = ET.Element("clipitem", id=clipitem_id) |
|
video_clip_item = video_clip_items[idx] |
|
in_value = video_clip_item['in'] |
|
out_value = video_clip_item['out'] |
|
start_value = video_clip_item['start'] |
|
end_value = video_clip_item['end'] |
|
ET.SubElement(clipitem_element, "name").text = video_clip_item['name'] |
|
ET.SubElement(clipitem_element, "duration").text = video_clip_item['duration'] |
|
rate_elem = ET.SubElement(clipitem_element, "rate") |
|
ET.SubElement(rate_elem, "ntsc").text = "TRUE" |
|
ET.SubElement(rate_elem, "timebase").text = "30" |
|
ET.SubElement(clipitem_element, "in").text = in_value |
|
ET.SubElement(clipitem_element, "out").text = out_value |
|
ET.SubElement(clipitem_element, "start").text = start_value |
|
ET.SubElement(clipitem_element, "end").text = end_value |
|
ET.SubElement(clipitem_element, "masterclipid").text = os.path.basename(self.video_path) |
|
sourcetrack = ET.SubElement(clipitem_element, "sourcetrack") |
|
ET.SubElement(sourcetrack, "mediatype").text = "audio" |
|
ET.SubElement(sourcetrack, "trackindex").text = str(idx + 1) |
|
video_file_elem = video_clip_item.get("file") |
|
if video_file_elem is not None: |
|
ET.SubElement(clipitem_element, "file", id=video_file_elem.get("id")) |
|
else: |
|
ET.SubElement(clipitem_element, "file", id=os.path.basename(self.video_path).split('.')[0]) |
|
filter_elem = ET.SubElement(clipitem_element, "filter") |
|
effect_elem = ET.SubElement(filter_elem, "effect") |
|
ET.SubElement(effect_elem, "name").text = "Audio Levels" |
|
ET.SubElement(effect_elem, "effectid").text = "audiolevels" |
|
ET.SubElement(effect_elem, "effectcategory").text = "audiolevels" |
|
ET.SubElement(effect_elem, "effecttype").text = "audiolevels" |
|
ET.SubElement(effect_elem, "mediatype").text = "audio" |
|
parameter_elem = ET.SubElement(effect_elem, "parameter") |
|
ET.SubElement(parameter_elem, "name").text = "Level" |
|
ET.SubElement(parameter_elem, "parameterid").text = "level" |
|
ET.SubElement(parameter_elem, "valuemin").text = "0" |
|
ET.SubElement(parameter_elem, "valuemax").text = "3.98109" |
|
ET.SubElement(parameter_elem, "value").text = "1" |
|
filter_elem = ET.SubElement(clipitem_element, "filter") |
|
effect_elem = ET.SubElement(filter_elem, "effect") |
|
ET.SubElement(effect_elem, "name").text = "Audio Pan" |
|
ET.SubElement(effect_elem, "effectid").text = "audiopan" |
|
ET.SubElement(effect_elem, "effectcategory").text = "audiopan" |
|
ET.SubElement(effect_elem, "effecttype").text = "audiopan" |
|
ET.SubElement(effect_elem, "mediatype").text = "audio" |
|
parameter_elem = ET.SubElement(effect_elem, "parameter") |
|
ET.SubElement(parameter_elem, "name").text = "Pan" |
|
ET.SubElement(parameter_elem, "parameterid").text = "pan" |
|
ET.SubElement(parameter_elem, "valuemin").text = "-1" |
|
ET.SubElement(parameter_elem, "valuemax").text = "1" |
|
ET.SubElement(parameter_elem, "value").text = str(pan_value) |
|
for link_id in link_ids[idx]: |
|
link_elem = ET.SubElement(clipitem_element, "link") |
|
ET.SubElement(link_elem, "linkclipref").text = link_id |
|
track.append(clipitem_element) |
|
self.adjust_clipitem_start_end(track.findall('clipitem')) |
|
return track |
|
|
|
def create_clip_structure(self, video_name, video_path, total_duration): |
|
clip = ET.Element("clip", id=video_name) |
|
ET.SubElement(clip, "updatebehavior").text = "add" |
|
ET.SubElement(clip, "name").text = video_name |
|
ET.SubElement(clip, "duration").text = str(total_duration) |
|
rate = ET.SubElement(clip, "rate") |
|
ET.SubElement(rate, "ntsc").text = "TRUE" |
|
ET.SubElement(rate, "timebase").text = "30" |
|
ET.SubElement(clip, "in").text = "-1" |
|
ET.SubElement(clip, "out").text = "-1" |
|
ET.SubElement(clip, "masterclipid").text = video_name |
|
ET.SubElement(clip, "ismasterclip").text = "TRUE" |
|
|
|
|
|
logginginfo = ET.SubElement(clip, "logginginfo") |
|
ET.SubElement(logginginfo, "scene") |
|
ET.SubElement(logginginfo, "shottake") |
|
ET.SubElement(logginginfo, "lognote") |
|
ET.SubElement(logginginfo, "good").text = "FALSE" |
|
|
|
|
|
labels = ET.SubElement(clip, "labels") |
|
ET.SubElement(labels, "label2") |
|
|
|
|
|
comments = ET.SubElement(clip, "comments") |
|
ET.SubElement(comments, "mastercomment1") |
|
ET.SubElement(comments, "mastercomment2") |
|
ET.SubElement(comments, "mastercomment3") |
|
ET.SubElement(comments, "mastercomment4") |
|
|
|
|
|
media = ET.SubElement(clip, "media") |
|
video = ET.SubElement(media, "video") |
|
track = ET.SubElement(video, "track") |
|
clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}1") |
|
ET.SubElement(clipitem, "name").text = video_name |
|
ET.SubElement(clipitem, "duration").text = str(total_duration) |
|
rate = ET.SubElement(clipitem, "rate") |
|
ET.SubElement(rate, "ntsc").text = "TRUE" |
|
ET.SubElement(rate, "timebase").text = "30" |
|
ET.SubElement(clipitem, "in").text = "0" |
|
ET.SubElement(clipitem, "out").text = str(total_duration) |
|
ET.SubElement(clipitem, "start").text = "0" |
|
ET.SubElement(clipitem, "end").text = str(total_duration) |
|
ET.SubElement(clipitem, "pixelaspectratio").text = "Square" |
|
ET.SubElement(clipitem, "anamorphic").text = "FALSE" |
|
ET.SubElement(clipitem, "alphatype").text = "none" |
|
ET.SubElement(clipitem, "masterclipid").text = video_name |
|
|
|
|
|
file_id = video_name.split('.')[0] |
|
file = ET.SubElement(clipitem, "file") |
|
file.text = f'file id="{file_id}"' |
|
ET.SubElement(file, "name").text = video_name |
|
ET.SubElement(file, "pathurl").text = f"file://localhost/{video_path}" |
|
rate = ET.SubElement(file, "rate") |
|
ET.SubElement(rate, "timebase").text = "30" |
|
ET.SubElement(rate, "ntsc").text = "TRUE" |
|
ET.SubElement(file, "duration").text = str(total_duration) |
|
timecode = ET.SubElement(file, "timecode") |
|
ET.SubElement(timecode, "string").text = "00:00:00:00" |
|
ET.SubElement(timecode, "frame").text = "0" |
|
ET.SubElement(timecode, "rate").text = "30" |
|
ET.SubElement(timecode, "displayformat").text = "NDF" |
|
media = ET.SubElement(file, "media") |
|
video = ET.SubElement(media, "video") |
|
audio = ET.SubElement(media, "audio") |
|
|
|
|
|
for i in range(2): |
|
track = ET.SubElement(audio, "track") |
|
clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}{i + 2}") |
|
ET.SubElement(clipitem, "name").text = video_name |
|
ET.SubElement(clipitem, "duration").text = str(total_duration) |
|
rate = ET.SubElement(clipitem, "rate") |
|
ET.SubElement(rate, "ntsc").text = "TRUE" |
|
ET.SubElement(rate, "timebase").text = "30" |
|
ET.SubElement(clipitem, "in").text = "0" |
|
ET.SubElement(clipitem, "out").text = str(total_duration) |
|
ET.SubElement(clipitem, "start").text = "0" |
|
ET.SubElement(clipitem, "end").text = str(total_duration) |
|
ET.SubElement(clipitem, "masterclipid").text = video_name |
|
file_id = video_name.split('.')[0] |
|
file_elem = ET.SubElement(clipitem, "file", id=file_id) |
|
ET.SubElement(file_elem, "name").text = video_name |
|
ET.SubElement(file_elem, "pathurl").text = f"file://localhost/{video_path}" |
|
sourcetrack = ET.SubElement(clipitem, "sourcetrack") |
|
ET.SubElement(sourcetrack, "mediatype").text = "audio" |
|
ET.SubElement(sourcetrack, "trackindex").text = str(i + 1) |
|
|
|
return clip |
|
|
|
def adjust_clipitem_start_end(self, clipitems): |
|
last_end = 0 |
|
for clipitem in clipitems: |
|
in_val = int(clipitem.find('in').text) |
|
out_val = int(clipitem.find('out').text) |
|
clipitem.find('start').text = str(last_end) |
|
clipitem.find('end').text = str(last_end + (out_val - in_val)) |
|
last_end = int(clipitem.find('end').text) |
|
|
|
def dict_to_xml(self, tag, dictionary): |
|
attributes = dictionary.pop('attributes', {}) |
|
if tag == "clipitem" or tag == "file": |
|
attributes["id"] = dictionary.pop("id", None) |
|
for key, val in attributes.items(): |
|
if val is None: |
|
attributes[key] = "none" |
|
elem = ET.Element(tag, **attributes) |
|
for key, val in dictionary.items(): |
|
if val is None: |
|
val = "none" |
|
if isinstance(val, dict): |
|
child = self.dict_to_xml(key, val) |
|
elem.append(child) |
|
elif isinstance(val, list): |
|
for item in val: |
|
child = self.dict_to_xml(key, item) |
|
elem.append(child) |
|
else: |
|
child = ET.Element(key) |
|
child.text = str(val) if val is not None else "none" |
|
elem.append(child) |
|
return elem |
|
|
|
def create_sequence_structure(self, video_name, total_duration): |
|
sequence = ET.Element("sequence", id="Sequence 1") |
|
ET.SubElement(sequence, "updatebehavior").text = "add" |
|
ET.SubElement(sequence, "name").text = "Sequence 1" |
|
ET.SubElement(sequence, "duration").text = str(total_duration) |
|
rate = ET.SubElement(sequence, "rate") |
|
ET.SubElement(rate, "ntsc").text = "TRUE" |
|
ET.SubElement(rate, "timebase").text = "30" |
|
timecode = ET.SubElement(sequence, "timecode") |
|
rate_timecode = ET.SubElement(timecode, "rate") |
|
ET.SubElement(rate_timecode, "ntsc").text = "TRUE" |
|
ET.SubElement(rate_timecode, "timebase").text = "30" |
|
ET.SubElement(timecode, "frame").text = "107891" |
|
ET.SubElement(timecode, "source").text = "source" |
|
ET.SubElement(timecode, "displayformat").text = "DF" |
|
ET.SubElement(sequence, "in").text = "-1" |
|
ET.SubElement(sequence, "out").text = "-1" |
|
media = ET.SubElement(sequence, "media") |
|
video = ET.SubElement(media, "video") |
|
format_ = ET.SubElement(video, "format") |
|
samplecharacteristics = ET.SubElement(format_, "samplecharacteristics") |
|
ET.SubElement(samplecharacteristics, "width").text = "1920" |
|
ET.SubElement(samplecharacteristics, "height").text = "1080" |
|
ET.SubElement(samplecharacteristics, "pixelaspectratio").text = "Square" |
|
ET.SubElement(samplecharacteristics, "anamorphic").text = "FALSE" |
|
ET.SubElement(samplecharacteristics, "fielddominance").text = "none" |
|
rate_sample = ET.SubElement(samplecharacteristics, "rate") |
|
ET.SubElement(rate_sample, "ntsc").text = "TRUE" |
|
ET.SubElement(rate_sample, "timebase").text = "30" |
|
ET.SubElement(samplecharacteristics, "colordepth").text = "24" |
|
track = ET.SubElement(video, "track") |
|
return sequence, track, media |
|
|
|
def process_video(self): |
|
for video_path in [self.video_path]: |
|
|
|
audio_signal = self.extract_audio(video_path) |
|
audio_frame_features = self.process_audio(audio_signal) |
|
|
|
|
|
video = cv2.VideoCapture(video_path) |
|
ret, frame = video.read() |
|
if not ret: |
|
print(f"Failed to read video: {video_path}") |
|
continue |
|
clip_frames = [] |
|
while len(clip_frames) < self.num_frames: |
|
frame_processed = self.preprocess_frame(frame) |
|
clip_frames.append(frame_processed) |
|
ret, frame = video.read() |
|
if not ret: |
|
break |
|
clip_frames = np.array(clip_frames[:self.num_frames]) |
|
video.release() |
|
|
|
|
|
combined_features_for_prediction = self.combine_features(clip_frames, audio_frame_features) |
|
|
|
|
|
prediction = self.classification_model.predict( |
|
[np.array([combined_features_for_prediction]), np.array([audio_frame_features])]) |
|
predicted_label = self.label_encoder.inverse_transform([np.argmax(prediction)])[0] |
|
|
|
|
|
if predicted_label in self.all_clip_items: |
|
clip_items_for_predicted_label = self.all_clip_items[predicted_label] |
|
print("Clip items for predicted label:", clip_items_for_predicted_label) |
|
|
|
|
|
total_duration = sum([int(clip_item["duration"]) for clip_item in clip_items_for_predicted_label]) |
|
|
|
|
|
root = ET.Element("xmeml", version="5") |
|
|
|
|
|
project = ET.SubElement(root, "project") |
|
ET.SubElement(project, "name").text = "Untitled Project 1" |
|
children = ET.SubElement(project, "children") |
|
|
|
|
|
bin_element = ET.Element("bin") |
|
ET.SubElement(bin_element, "updatebehavior").text = "add" |
|
ET.SubElement(bin_element, "name").text = "Custom Bins" |
|
ET.SubElement(bin_element, "children") |
|
children.append(bin_element) |
|
|
|
|
|
video_name_with_extension = os.path.basename(video_path) |
|
|
|
|
|
clip_structure = self.create_clip_structure(video_name_with_extension, video_path, |
|
total_duration) |
|
children.append(clip_structure) |
|
|
|
|
|
sequence, track, media = self.create_sequence_structure(video_name_with_extension, total_duration) |
|
children.append(sequence) |
|
|
|
clip_item_ids = [] |
|
link_ids_list = [] |
|
video_file_elements = [] |
|
|
|
for clip_item in clip_items_for_predicted_label: |
|
|
|
video_clip_item_id = self.generate_clipitem_id(video_name_with_extension) |
|
audio_clip_item_id_1 = self.generate_clipitem_id(video_name_with_extension) |
|
audio_clip_item_id_2 = self.generate_clipitem_id(video_name_with_extension) |
|
|
|
|
|
clip_item_ids.append(video_clip_item_id) |
|
link_ids_list.append([video_clip_item_id, audio_clip_item_id_1, audio_clip_item_id_2]) |
|
|
|
clip_item['id'] = video_clip_item_id |
|
clip_item['name'] = video_name_with_extension |
|
|
|
|
|
clipitem_element = self.dict_to_xml("clipitem", clip_item) |
|
|
|
|
|
for link in clipitem_element.findall("link"): |
|
clipitem_element.remove(link) |
|
|
|
|
|
file_element = clipitem_element.find("file") |
|
file_id_value = os.path.splitext(os.path.basename(video_path))[ |
|
0] |
|
if file_element is not None: |
|
file_element.set("id", file_id_value) |
|
file_element.text = None |
|
else: |
|
|
|
ET.SubElement(clipitem_element, "file", id=file_id_value) |
|
|
|
|
|
fielddominance_elem = clipitem_element.find("fielddominance") |
|
if fielddominance_elem is not None: |
|
clipitem_element.remove(fielddominance_elem) |
|
|
|
|
|
link_elem_self = ET.SubElement(clipitem_element, "link") |
|
ET.SubElement(link_elem_self, "linkclipref").text = video_clip_item_id |
|
link_elem_audio1 = ET.SubElement(clipitem_element, "link") |
|
ET.SubElement(link_elem_audio1, "linkclipref").text = audio_clip_item_id_1 |
|
link_elem_audio2 = ET.SubElement(clipitem_element, "link") |
|
ET.SubElement(link_elem_audio2, "linkclipref").text = audio_clip_item_id_2 |
|
|
|
|
|
if fielddominance_elem is not None: |
|
clipitem_element.append(fielddominance_elem) |
|
|
|
track.append(clipitem_element) |
|
|
|
self.adjust_clipitem_start_end(track.findall('clipitem')) |
|
|
|
file_element = clipitem_element.find("file") |
|
if file_element is not None: |
|
video_file_elements.append(file_element) |
|
|
|
|
|
video_clip_items_list = clip_items_for_predicted_label |
|
|
|
audio_track_left = self.create_audio_track([id_list[1] for id_list in link_ids_list], -1, link_ids_list, |
|
video_clip_items_list, video_file_elements) |
|
audio_track_right = self.create_audio_track([id_list[2] for id_list in link_ids_list], 1, link_ids_list, |
|
video_clip_items_list, video_file_elements) |
|
|
|
audio = ET.SubElement(media, "audio") |
|
audio.append(audio_track_left) |
|
audio.append(audio_track_right) |
|
|
|
|
|
output_path = os.path.join(os.path.dirname(video_path), f"{os.path.basename(video_path)}_predicted.xml") |
|
tree = ET.ElementTree(root) |
|
with open(output_path, 'wb') as f: |
|
tree.write(f, encoding='utf-8', xml_declaration=True) |
|
else: |
|
print(f"Predicted label for {video_path}: {predicted_label} (No clip items found)") |
|
|
|
|
|
|
|
|
|
|
|
|