import os import cv2 import pickle import numpy as np from keras.models import load_model from moviepy.editor import VideoFileClip import librosa from sklearn.preprocessing import LabelEncoder import xml.etree.ElementTree as ET import random class VideoAudioFeatureExtractor: def __init__(self, video_path, output_path): self.video_path = video_path self.output_path = output_path self.num_frames = 15 self.height = 224 self.width = 224 self.channels = 6 self.audio_feature_dim = 20 self.num_actions = 3 self.classification_model = load_model("EdAi-gamestyle.h5") with open("label_encoder.pkl", "rb") as f: self.label_encoder = pickle.load(f) with open("clip_items.pkl", "rb") as f: self.all_clip_items = pickle.load(f) def generate_clipitem_id(self, video_name, num_digits=10): random_number = ''.join(random.choices('0123456789', k=num_digits)) return f"{video_name}{random_number}" def combine_features(self, video_features, audio_features): audio_features_resized = cv2.resize(audio_features, (self.height, self.width)) audio_features_reshaped = np.repeat(audio_features_resized[:, :, np.newaxis], 3, axis=2) audio_features_reshaped = np.repeat(audio_features_reshaped[np.newaxis, :, :, :], self.num_frames, axis=0) combined_features = np.concatenate([video_features, audio_features_reshaped], axis=-1) return combined_features def extract_audio(self, video_path): clip = VideoFileClip(video_path) audio = clip.audio audio_signal = audio.to_soundarray(fps=48000) audio_signal = audio_signal.astype(np.float32) if len(audio_signal.shape) == 2: audio_signal = librosa.to_mono(audio_signal.T) return audio_signal def process_audio(self, audio_signal_segment): mfccs = librosa.feature.mfcc(y=audio_signal_segment, sr=48000, n_mfcc=self.audio_feature_dim) mfccs_fixed_length = librosa.util.fix_length(mfccs, size=self.num_frames) return mfccs_fixed_length.T def preprocess_frame(self, frame): frame_resized = cv2.resize(frame, (self.width, self.height)) frame_normalized = (frame_resized / 255.0).astype(np.float32) return frame_normalized def extract_clip(self, video, audio_features, decision): start_frame = int(decision["in"]) end_frame = int(decision["out"]) video.set(cv2.CAP_PROP_POS_FRAMES, start_frame) clip_frames = [] for i in range(start_frame, end_frame, 2): ret, frame = video.read() if not ret: break frame_processed = self.preprocess_frame(frame) clip_frames.append(frame_processed) while len(clip_frames) < self.num_frames: clip_frames.append(np.zeros((self.height, self.width, 3), dtype=np.float32)) clip_frames = np.array(clip_frames[:self.num_frames]) return clip_frames, audio_features def create_audio_track(self, clip_item_ids, pan_value, link_ids, video_clip_items, video_file_elements): track = ET.Element("track") for idx, clipitem_id in enumerate(clip_item_ids): clipitem_element = ET.Element("clipitem", id=clipitem_id) video_clip_item = video_clip_items[idx] in_value = video_clip_item['in'] out_value = video_clip_item['out'] start_value = video_clip_item['start'] end_value = video_clip_item['end'] ET.SubElement(clipitem_element, "name").text = video_clip_item['name'] ET.SubElement(clipitem_element, "duration").text = video_clip_item['duration'] rate_elem = ET.SubElement(clipitem_element, "rate") ET.SubElement(rate_elem, "ntsc").text = "TRUE" ET.SubElement(rate_elem, "timebase").text = "30" ET.SubElement(clipitem_element, "in").text = in_value ET.SubElement(clipitem_element, "out").text = out_value ET.SubElement(clipitem_element, "start").text = start_value ET.SubElement(clipitem_element, "end").text = end_value ET.SubElement(clipitem_element, "masterclipid").text = os.path.basename(self.video_path) sourcetrack = ET.SubElement(clipitem_element, "sourcetrack") ET.SubElement(sourcetrack, "mediatype").text = "audio" ET.SubElement(sourcetrack, "trackindex").text = str(idx + 1) video_file_elem = video_clip_item.get("file") if video_file_elem is not None: ET.SubElement(clipitem_element, "file", id=video_file_elem.get("id")) else: ET.SubElement(clipitem_element, "file", id=os.path.basename(self.video_path).split('.')[0]) filter_elem = ET.SubElement(clipitem_element, "filter") effect_elem = ET.SubElement(filter_elem, "effect") ET.SubElement(effect_elem, "name").text = "Audio Levels" ET.SubElement(effect_elem, "effectid").text = "audiolevels" ET.SubElement(effect_elem, "effectcategory").text = "audiolevels" ET.SubElement(effect_elem, "effecttype").text = "audiolevels" ET.SubElement(effect_elem, "mediatype").text = "audio" parameter_elem = ET.SubElement(effect_elem, "parameter") ET.SubElement(parameter_elem, "name").text = "Level" ET.SubElement(parameter_elem, "parameterid").text = "level" ET.SubElement(parameter_elem, "valuemin").text = "0" ET.SubElement(parameter_elem, "valuemax").text = "3.98109" ET.SubElement(parameter_elem, "value").text = "1" filter_elem = ET.SubElement(clipitem_element, "filter") effect_elem = ET.SubElement(filter_elem, "effect") ET.SubElement(effect_elem, "name").text = "Audio Pan" ET.SubElement(effect_elem, "effectid").text = "audiopan" ET.SubElement(effect_elem, "effectcategory").text = "audiopan" ET.SubElement(effect_elem, "effecttype").text = "audiopan" ET.SubElement(effect_elem, "mediatype").text = "audio" parameter_elem = ET.SubElement(effect_elem, "parameter") ET.SubElement(parameter_elem, "name").text = "Pan" ET.SubElement(parameter_elem, "parameterid").text = "pan" ET.SubElement(parameter_elem, "valuemin").text = "-1" ET.SubElement(parameter_elem, "valuemax").text = "1" ET.SubElement(parameter_elem, "value").text = str(pan_value) for link_id in link_ids[idx]: link_elem = ET.SubElement(clipitem_element, "link") ET.SubElement(link_elem, "linkclipref").text = link_id track.append(clipitem_element) self.adjust_clipitem_start_end(track.findall('clipitem')) return track def create_clip_structure(self, video_name, video_path, total_duration): clip = ET.Element("clip", id=video_name) ET.SubElement(clip, "updatebehavior").text = "add" ET.SubElement(clip, "name").text = video_name ET.SubElement(clip, "duration").text = str(total_duration) rate = ET.SubElement(clip, "rate") ET.SubElement(rate, "ntsc").text = "TRUE" ET.SubElement(rate, "timebase").text = "30" ET.SubElement(clip, "in").text = "-1" ET.SubElement(clip, "out").text = "-1" ET.SubElement(clip, "masterclipid").text = video_name ET.SubElement(clip, "ismasterclip").text = "TRUE" # Logging info logginginfo = ET.SubElement(clip, "logginginfo") ET.SubElement(logginginfo, "scene") ET.SubElement(logginginfo, "shottake") ET.SubElement(logginginfo, "lognote") ET.SubElement(logginginfo, "good").text = "FALSE" # Labels labels = ET.SubElement(clip, "labels") ET.SubElement(labels, "label2") # Comments comments = ET.SubElement(clip, "comments") ET.SubElement(comments, "mastercomment1") ET.SubElement(comments, "mastercomment2") ET.SubElement(comments, "mastercomment3") ET.SubElement(comments, "mastercomment4") # Media media = ET.SubElement(clip, "media") video = ET.SubElement(media, "video") track = ET.SubElement(video, "track") clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}1") ET.SubElement(clipitem, "name").text = video_name ET.SubElement(clipitem, "duration").text = str(total_duration) rate = ET.SubElement(clipitem, "rate") ET.SubElement(rate, "ntsc").text = "TRUE" ET.SubElement(rate, "timebase").text = "30" ET.SubElement(clipitem, "in").text = "0" ET.SubElement(clipitem, "out").text = str(total_duration) ET.SubElement(clipitem, "start").text = "0" ET.SubElement(clipitem, "end").text = str(total_duration) ET.SubElement(clipitem, "pixelaspectratio").text = "Square" ET.SubElement(clipitem, "anamorphic").text = "FALSE" ET.SubElement(clipitem, "alphatype").text = "none" ET.SubElement(clipitem, "masterclipid").text = video_name # File file_id = video_name.split('.')[0] # Extract filename without extension file = ET.SubElement(clipitem, "file") file.text = f'file id="{file_id}"' # Set the text of the tag to the file ID without the extension ET.SubElement(file, "name").text = video_name ET.SubElement(file, "pathurl").text = f"file://localhost/{video_path}" rate = ET.SubElement(file, "rate") ET.SubElement(rate, "timebase").text = "30" ET.SubElement(rate, "ntsc").text = "TRUE" ET.SubElement(file, "duration").text = str(total_duration) timecode = ET.SubElement(file, "timecode") ET.SubElement(timecode, "string").text = "00:00:00:00" ET.SubElement(timecode, "frame").text = "0" ET.SubElement(timecode, "rate").text = "30" ET.SubElement(timecode, "displayformat").text = "NDF" media = ET.SubElement(file, "media") video = ET.SubElement(media, "video") audio = ET.SubElement(media, "audio") # Audio for i in range(2): # Assuming there are 2 audio tracks as per the example track = ET.SubElement(audio, "track") clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}{i + 2}") ET.SubElement(clipitem, "name").text = video_name ET.SubElement(clipitem, "duration").text = str(total_duration) rate = ET.SubElement(clipitem, "rate") ET.SubElement(rate, "ntsc").text = "TRUE" ET.SubElement(rate, "timebase").text = "30" ET.SubElement(clipitem, "in").text = "0" ET.SubElement(clipitem, "out").text = str(total_duration) ET.SubElement(clipitem, "start").text = "0" ET.SubElement(clipitem, "end").text = str(total_duration) ET.SubElement(clipitem, "masterclipid").text = video_name file_id = video_name.split('.')[0] # Extract filename without extension file_elem = ET.SubElement(clipitem, "file", id=file_id) ET.SubElement(file_elem, "name").text = video_name ET.SubElement(file_elem, "pathurl").text = f"file://localhost/{video_path}" sourcetrack = ET.SubElement(clipitem, "sourcetrack") ET.SubElement(sourcetrack, "mediatype").text = "audio" ET.SubElement(sourcetrack, "trackindex").text = str(i + 1) return clip def adjust_clipitem_start_end(self, clipitems): last_end = 0 for clipitem in clipitems: in_val = int(clipitem.find('in').text) out_val = int(clipitem.find('out').text) clipitem.find('start').text = str(last_end) clipitem.find('end').text = str(last_end + (out_val - in_val)) last_end = int(clipitem.find('end').text) def dict_to_xml(self, tag, dictionary): attributes = dictionary.pop('attributes', {}) if tag == "clipitem" or tag == "file": attributes["id"] = dictionary.pop("id", None) for key, val in attributes.items(): if val is None: attributes[key] = "none" elem = ET.Element(tag, **attributes) for key, val in dictionary.items(): if val is None: val = "none" if isinstance(val, dict): child = self.dict_to_xml(key, val) elem.append(child) elif isinstance(val, list): for item in val: child = self.dict_to_xml(key, item) elem.append(child) else: child = ET.Element(key) child.text = str(val) if val is not None else "none" elem.append(child) return elem def create_sequence_structure(self, video_name, total_duration): sequence = ET.Element("sequence", id="Sequence 1") ET.SubElement(sequence, "updatebehavior").text = "add" ET.SubElement(sequence, "name").text = "Sequence 1" ET.SubElement(sequence, "duration").text = str(total_duration) rate = ET.SubElement(sequence, "rate") ET.SubElement(rate, "ntsc").text = "TRUE" ET.SubElement(rate, "timebase").text = "30" timecode = ET.SubElement(sequence, "timecode") rate_timecode = ET.SubElement(timecode, "rate") ET.SubElement(rate_timecode, "ntsc").text = "TRUE" ET.SubElement(rate_timecode, "timebase").text = "30" ET.SubElement(timecode, "frame").text = "107891" ET.SubElement(timecode, "source").text = "source" ET.SubElement(timecode, "displayformat").text = "DF" ET.SubElement(sequence, "in").text = "-1" ET.SubElement(sequence, "out").text = "-1" media = ET.SubElement(sequence, "media") video = ET.SubElement(media, "video") format_ = ET.SubElement(video, "format") samplecharacteristics = ET.SubElement(format_, "samplecharacteristics") ET.SubElement(samplecharacteristics, "width").text = "1920" ET.SubElement(samplecharacteristics, "height").text = "1080" ET.SubElement(samplecharacteristics, "pixelaspectratio").text = "Square" ET.SubElement(samplecharacteristics, "anamorphic").text = "FALSE" ET.SubElement(samplecharacteristics, "fielddominance").text = "none" rate_sample = ET.SubElement(samplecharacteristics, "rate") ET.SubElement(rate_sample, "ntsc").text = "TRUE" ET.SubElement(rate_sample, "timebase").text = "30" ET.SubElement(samplecharacteristics, "colordepth").text = "24" track = ET.SubElement(video, "track") return sequence, track, media def process_video(self): for video_path in [self.video_path]: # Extract audio features audio_signal = self.extract_audio(video_path) audio_frame_features = self.process_audio(audio_signal) # Extract video features video = cv2.VideoCapture(video_path) ret, frame = video.read() if not ret: print(f"Failed to read video: {video_path}") continue clip_frames = [] while len(clip_frames) < self.num_frames: frame_processed = self.preprocess_frame(frame) clip_frames.append(frame_processed) ret, frame = video.read() if not ret: break clip_frames = np.array(clip_frames[:self.num_frames]) video.release() # Combine video and audio features for prediction combined_features_for_prediction = self.combine_features(clip_frames, audio_frame_features) # Make a prediction prediction = self.classification_model.predict( [np.array([combined_features_for_prediction]), np.array([audio_frame_features])]) predicted_label = self.label_encoder.inverse_transform([np.argmax(prediction)])[0] # Retrieve clip items for the predicted label if predicted_label in self.all_clip_items: clip_items_for_predicted_label = self.all_clip_items[predicted_label] print("Clip items for predicted label:", clip_items_for_predicted_label) # Calculate the total duration total_duration = sum([int(clip_item["duration"]) for clip_item in clip_items_for_predicted_label]) # Create the root element root = ET.Element("xmeml", version="5") # Create the project structure project = ET.SubElement(root, "project") ET.SubElement(project, "name").text = "Untitled Project 1" children = ET.SubElement(project, "children") # Create and append the bin element to children bin_element = ET.Element("bin") ET.SubElement(bin_element, "updatebehavior").text = "add" ET.SubElement(bin_element, "name").text = "Custom Bins" ET.SubElement(bin_element, "children") # Empty children element for the bin children.append(bin_element) # Define video_name_with_extension here video_name_with_extension = os.path.basename(video_path) # Append the clip structure directly to children (not inside bin) clip_structure = self.create_clip_structure(video_name_with_extension, video_path, total_duration) # <-- Added total_duration here children.append(clip_structure) # Append the sequence structure to children sequence, track, media = self.create_sequence_structure(video_name_with_extension, total_duration) children.append(sequence) clip_item_ids = [] # List to store generated IDs link_ids_list = [] # List to store link IDs for each clip item video_file_elements = [] # List to store elements for each video for clip_item in clip_items_for_predicted_label: # Generate three unique IDs: one for video and two for audio video_clip_item_id = self.generate_clipitem_id(video_name_with_extension) audio_clip_item_id_1 = self.generate_clipitem_id(video_name_with_extension) audio_clip_item_id_2 = self.generate_clipitem_id(video_name_with_extension) # Store the generated IDs clip_item_ids.append(video_clip_item_id) link_ids_list.append([video_clip_item_id, audio_clip_item_id_1, audio_clip_item_id_2]) clip_item['id'] = video_clip_item_id clip_item['name'] = video_name_with_extension # Convert the modified clip item dictionary to XML and append to track clipitem_element = self.dict_to_xml("clipitem", clip_item) # Remove all existing elements from the video for link in clipitem_element.findall("link"): clipitem_element.remove(link) # Modify the section inside the file_element = clipitem_element.find("file") file_id_value = os.path.splitext(os.path.basename(video_path))[ 0] # Use the video name without extension if file_element is not None: file_element.set("id", file_id_value) # Set the 'id' attribute file_element.text = None # Ensure there's no text inside the element else: # If element doesn't exist, create one with the 'id' attribute ET.SubElement(clipitem_element, "file", id=file_id_value) # Temporarily extract the element fielddominance_elem = clipitem_element.find("fielddominance") if fielddominance_elem is not None: clipitem_element.remove(fielddominance_elem) # Add the correct elements link_elem_self = ET.SubElement(clipitem_element, "link") ET.SubElement(link_elem_self, "linkclipref").text = video_clip_item_id link_elem_audio1 = ET.SubElement(clipitem_element, "link") ET.SubElement(link_elem_audio1, "linkclipref").text = audio_clip_item_id_1 link_elem_audio2 = ET.SubElement(clipitem_element, "link") ET.SubElement(link_elem_audio2, "linkclipref").text = audio_clip_item_id_2 # Re-append the element to the end if fielddominance_elem is not None: clipitem_element.append(fielddominance_elem) track.append(clipitem_element) # Call the adjust_clipitem_start_end function here self.adjust_clipitem_start_end(track.findall('clipitem')) # Extract the element and store it in the video_file_elements list file_element = clipitem_element.find("file") if file_element is not None: video_file_elements.append(file_element) # Move the audio track creation outside the clip items loop video_clip_items_list = clip_items_for_predicted_label # Pass the video_file_elements list to the create_audio_track function audio_track_left = self.create_audio_track([id_list[1] for id_list in link_ids_list], -1, link_ids_list, video_clip_items_list, video_file_elements) audio_track_right = self.create_audio_track([id_list[2] for id_list in link_ids_list], 1, link_ids_list, video_clip_items_list, video_file_elements) audio = ET.SubElement(media, "audio") audio.append(audio_track_left) audio.append(audio_track_right) # Save the entire XML tree to a file after processing all clip items output_path = os.path.join(os.path.dirname(video_path), f"{os.path.basename(video_path)}_predicted.xml") tree = ET.ElementTree(root) with open(output_path, 'wb') as f: tree.write(f, encoding='utf-8', xml_declaration=True) else: print(f"Predicted label for {video_path}: {predicted_label} (No clip items found)") # Example usage of the class # extractor = VideoAudioFeatureExtractor("path_to_video.mp4", "output_path.xml") # extractor.process_video()