EdAi-gamestyle / EdAixml.py
ERGS's picture
Upload 2 files
a27b4e4
raw
history blame
23.2 kB
import os
import cv2
import pickle
import numpy as np
from keras.models import load_model
from moviepy.editor import VideoFileClip
import librosa
from sklearn.preprocessing import LabelEncoder
import xml.etree.ElementTree as ET
import random
class VideoAudioFeatureExtractor:
def __init__(self, video_path, output_path):
self.video_path = video_path
self.output_path = output_path
self.num_frames = 15
self.height = 224
self.width = 224
self.channels = 6
self.audio_feature_dim = 20
self.num_actions = 3
self.classification_model = load_model("EdAi-gamestyle.h5")
with open("label_encoder.pkl", "rb") as f:
self.label_encoder = pickle.load(f)
with open("clip_items.pkl", "rb") as f:
self.all_clip_items = pickle.load(f)
def generate_clipitem_id(self, video_name, num_digits=10):
random_number = ''.join(random.choices('0123456789', k=num_digits))
return f"{video_name}{random_number}"
def combine_features(self, video_features, audio_features):
audio_features_resized = cv2.resize(audio_features, (self.height, self.width))
audio_features_reshaped = np.repeat(audio_features_resized[:, :, np.newaxis], 3, axis=2)
audio_features_reshaped = np.repeat(audio_features_reshaped[np.newaxis, :, :, :], self.num_frames, axis=0)
combined_features = np.concatenate([video_features, audio_features_reshaped], axis=-1)
return combined_features
def extract_audio(self, video_path):
clip = VideoFileClip(video_path)
audio = clip.audio
audio_signal = audio.to_soundarray(fps=48000)
audio_signal = audio_signal.astype(np.float32)
if len(audio_signal.shape) == 2:
audio_signal = librosa.to_mono(audio_signal.T)
return audio_signal
def process_audio(self, audio_signal_segment):
mfccs = librosa.feature.mfcc(y=audio_signal_segment, sr=48000, n_mfcc=self.audio_feature_dim)
mfccs_fixed_length = librosa.util.fix_length(mfccs, size=self.num_frames)
return mfccs_fixed_length.T
def preprocess_frame(self, frame):
frame_resized = cv2.resize(frame, (self.width, self.height))
frame_normalized = (frame_resized / 255.0).astype(np.float32)
return frame_normalized
def extract_clip(self, video, audio_features, decision):
start_frame = int(decision["in"])
end_frame = int(decision["out"])
video.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
clip_frames = []
for i in range(start_frame, end_frame, 2):
ret, frame = video.read()
if not ret:
break
frame_processed = self.preprocess_frame(frame)
clip_frames.append(frame_processed)
while len(clip_frames) < self.num_frames:
clip_frames.append(np.zeros((self.height, self.width, 3), dtype=np.float32))
clip_frames = np.array(clip_frames[:self.num_frames])
return clip_frames, audio_features
def create_audio_track(self, clip_item_ids, pan_value, link_ids, video_clip_items, video_file_elements):
track = ET.Element("track")
for idx, clipitem_id in enumerate(clip_item_ids):
clipitem_element = ET.Element("clipitem", id=clipitem_id)
video_clip_item = video_clip_items[idx]
in_value = video_clip_item['in']
out_value = video_clip_item['out']
start_value = video_clip_item['start']
end_value = video_clip_item['end']
ET.SubElement(clipitem_element, "name").text = video_clip_item['name']
ET.SubElement(clipitem_element, "duration").text = video_clip_item['duration']
rate_elem = ET.SubElement(clipitem_element, "rate")
ET.SubElement(rate_elem, "ntsc").text = "TRUE"
ET.SubElement(rate_elem, "timebase").text = "30"
ET.SubElement(clipitem_element, "in").text = in_value
ET.SubElement(clipitem_element, "out").text = out_value
ET.SubElement(clipitem_element, "start").text = start_value
ET.SubElement(clipitem_element, "end").text = end_value
ET.SubElement(clipitem_element, "masterclipid").text = os.path.basename(self.video_path)
sourcetrack = ET.SubElement(clipitem_element, "sourcetrack")
ET.SubElement(sourcetrack, "mediatype").text = "audio"
ET.SubElement(sourcetrack, "trackindex").text = str(idx + 1)
video_file_elem = video_clip_item.get("file")
if video_file_elem is not None:
ET.SubElement(clipitem_element, "file", id=video_file_elem.get("id"))
else:
ET.SubElement(clipitem_element, "file", id=os.path.basename(self.video_path).split('.')[0])
filter_elem = ET.SubElement(clipitem_element, "filter")
effect_elem = ET.SubElement(filter_elem, "effect")
ET.SubElement(effect_elem, "name").text = "Audio Levels"
ET.SubElement(effect_elem, "effectid").text = "audiolevels"
ET.SubElement(effect_elem, "effectcategory").text = "audiolevels"
ET.SubElement(effect_elem, "effecttype").text = "audiolevels"
ET.SubElement(effect_elem, "mediatype").text = "audio"
parameter_elem = ET.SubElement(effect_elem, "parameter")
ET.SubElement(parameter_elem, "name").text = "Level"
ET.SubElement(parameter_elem, "parameterid").text = "level"
ET.SubElement(parameter_elem, "valuemin").text = "0"
ET.SubElement(parameter_elem, "valuemax").text = "3.98109"
ET.SubElement(parameter_elem, "value").text = "1"
filter_elem = ET.SubElement(clipitem_element, "filter")
effect_elem = ET.SubElement(filter_elem, "effect")
ET.SubElement(effect_elem, "name").text = "Audio Pan"
ET.SubElement(effect_elem, "effectid").text = "audiopan"
ET.SubElement(effect_elem, "effectcategory").text = "audiopan"
ET.SubElement(effect_elem, "effecttype").text = "audiopan"
ET.SubElement(effect_elem, "mediatype").text = "audio"
parameter_elem = ET.SubElement(effect_elem, "parameter")
ET.SubElement(parameter_elem, "name").text = "Pan"
ET.SubElement(parameter_elem, "parameterid").text = "pan"
ET.SubElement(parameter_elem, "valuemin").text = "-1"
ET.SubElement(parameter_elem, "valuemax").text = "1"
ET.SubElement(parameter_elem, "value").text = str(pan_value)
for link_id in link_ids[idx]:
link_elem = ET.SubElement(clipitem_element, "link")
ET.SubElement(link_elem, "linkclipref").text = link_id
track.append(clipitem_element)
self.adjust_clipitem_start_end(track.findall('clipitem'))
return track
def create_clip_structure(self, video_name, video_path, total_duration):
clip = ET.Element("clip", id=video_name)
ET.SubElement(clip, "updatebehavior").text = "add"
ET.SubElement(clip, "name").text = video_name
ET.SubElement(clip, "duration").text = str(total_duration)
rate = ET.SubElement(clip, "rate")
ET.SubElement(rate, "ntsc").text = "TRUE"
ET.SubElement(rate, "timebase").text = "30"
ET.SubElement(clip, "in").text = "-1"
ET.SubElement(clip, "out").text = "-1"
ET.SubElement(clip, "masterclipid").text = video_name
ET.SubElement(clip, "ismasterclip").text = "TRUE"
# Logging info
logginginfo = ET.SubElement(clip, "logginginfo")
ET.SubElement(logginginfo, "scene")
ET.SubElement(logginginfo, "shottake")
ET.SubElement(logginginfo, "lognote")
ET.SubElement(logginginfo, "good").text = "FALSE"
# Labels
labels = ET.SubElement(clip, "labels")
ET.SubElement(labels, "label2")
# Comments
comments = ET.SubElement(clip, "comments")
ET.SubElement(comments, "mastercomment1")
ET.SubElement(comments, "mastercomment2")
ET.SubElement(comments, "mastercomment3")
ET.SubElement(comments, "mastercomment4")
# Media
media = ET.SubElement(clip, "media")
video = ET.SubElement(media, "video")
track = ET.SubElement(video, "track")
clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}1")
ET.SubElement(clipitem, "name").text = video_name
ET.SubElement(clipitem, "duration").text = str(total_duration)
rate = ET.SubElement(clipitem, "rate")
ET.SubElement(rate, "ntsc").text = "TRUE"
ET.SubElement(rate, "timebase").text = "30"
ET.SubElement(clipitem, "in").text = "0"
ET.SubElement(clipitem, "out").text = str(total_duration)
ET.SubElement(clipitem, "start").text = "0"
ET.SubElement(clipitem, "end").text = str(total_duration)
ET.SubElement(clipitem, "pixelaspectratio").text = "Square"
ET.SubElement(clipitem, "anamorphic").text = "FALSE"
ET.SubElement(clipitem, "alphatype").text = "none"
ET.SubElement(clipitem, "masterclipid").text = video_name
# File
file_id = video_name.split('.')[0] # Extract filename without extension
file = ET.SubElement(clipitem, "file")
file.text = f'file id="{file_id}"' # Set the text of the <file> tag to the file ID without the extension
ET.SubElement(file, "name").text = video_name
ET.SubElement(file, "pathurl").text = f"file://localhost/{video_path}"
rate = ET.SubElement(file, "rate")
ET.SubElement(rate, "timebase").text = "30"
ET.SubElement(rate, "ntsc").text = "TRUE"
ET.SubElement(file, "duration").text = str(total_duration)
timecode = ET.SubElement(file, "timecode")
ET.SubElement(timecode, "string").text = "00:00:00:00"
ET.SubElement(timecode, "frame").text = "0"
ET.SubElement(timecode, "rate").text = "30"
ET.SubElement(timecode, "displayformat").text = "NDF"
media = ET.SubElement(file, "media")
video = ET.SubElement(media, "video")
audio = ET.SubElement(media, "audio")
# Audio
for i in range(2): # Assuming there are 2 audio tracks as per the example
track = ET.SubElement(audio, "track")
clipitem = ET.SubElement(track, "clipitem", id=f"{video_name}{i + 2}")
ET.SubElement(clipitem, "name").text = video_name
ET.SubElement(clipitem, "duration").text = str(total_duration)
rate = ET.SubElement(clipitem, "rate")
ET.SubElement(rate, "ntsc").text = "TRUE"
ET.SubElement(rate, "timebase").text = "30"
ET.SubElement(clipitem, "in").text = "0"
ET.SubElement(clipitem, "out").text = str(total_duration)
ET.SubElement(clipitem, "start").text = "0"
ET.SubElement(clipitem, "end").text = str(total_duration)
ET.SubElement(clipitem, "masterclipid").text = video_name
file_id = video_name.split('.')[0] # Extract filename without extension
file_elem = ET.SubElement(clipitem, "file", id=file_id)
ET.SubElement(file_elem, "name").text = video_name
ET.SubElement(file_elem, "pathurl").text = f"file://localhost/{video_path}"
sourcetrack = ET.SubElement(clipitem, "sourcetrack")
ET.SubElement(sourcetrack, "mediatype").text = "audio"
ET.SubElement(sourcetrack, "trackindex").text = str(i + 1)
return clip
def adjust_clipitem_start_end(self, clipitems):
last_end = 0
for clipitem in clipitems:
in_val = int(clipitem.find('in').text)
out_val = int(clipitem.find('out').text)
clipitem.find('start').text = str(last_end)
clipitem.find('end').text = str(last_end + (out_val - in_val))
last_end = int(clipitem.find('end').text)
def dict_to_xml(self, tag, dictionary):
attributes = dictionary.pop('attributes', {})
if tag == "clipitem" or tag == "file":
attributes["id"] = dictionary.pop("id", None)
for key, val in attributes.items():
if val is None:
attributes[key] = "none"
elem = ET.Element(tag, **attributes)
for key, val in dictionary.items():
if val is None:
val = "none"
if isinstance(val, dict):
child = self.dict_to_xml(key, val)
elem.append(child)
elif isinstance(val, list):
for item in val:
child = self.dict_to_xml(key, item)
elem.append(child)
else:
child = ET.Element(key)
child.text = str(val) if val is not None else "none"
elem.append(child)
return elem
def create_sequence_structure(self, video_name, total_duration):
sequence = ET.Element("sequence", id="Sequence 1")
ET.SubElement(sequence, "updatebehavior").text = "add"
ET.SubElement(sequence, "name").text = "Sequence 1"
ET.SubElement(sequence, "duration").text = str(total_duration)
rate = ET.SubElement(sequence, "rate")
ET.SubElement(rate, "ntsc").text = "TRUE"
ET.SubElement(rate, "timebase").text = "30"
timecode = ET.SubElement(sequence, "timecode")
rate_timecode = ET.SubElement(timecode, "rate")
ET.SubElement(rate_timecode, "ntsc").text = "TRUE"
ET.SubElement(rate_timecode, "timebase").text = "30"
ET.SubElement(timecode, "frame").text = "107891"
ET.SubElement(timecode, "source").text = "source"
ET.SubElement(timecode, "displayformat").text = "DF"
ET.SubElement(sequence, "in").text = "-1"
ET.SubElement(sequence, "out").text = "-1"
media = ET.SubElement(sequence, "media")
video = ET.SubElement(media, "video")
format_ = ET.SubElement(video, "format")
samplecharacteristics = ET.SubElement(format_, "samplecharacteristics")
ET.SubElement(samplecharacteristics, "width").text = "1920"
ET.SubElement(samplecharacteristics, "height").text = "1080"
ET.SubElement(samplecharacteristics, "pixelaspectratio").text = "Square"
ET.SubElement(samplecharacteristics, "anamorphic").text = "FALSE"
ET.SubElement(samplecharacteristics, "fielddominance").text = "none"
rate_sample = ET.SubElement(samplecharacteristics, "rate")
ET.SubElement(rate_sample, "ntsc").text = "TRUE"
ET.SubElement(rate_sample, "timebase").text = "30"
ET.SubElement(samplecharacteristics, "colordepth").text = "24"
track = ET.SubElement(video, "track")
return sequence, track, media
def process_video(self):
for video_path in [self.video_path]:
# Extract audio features
audio_signal = self.extract_audio(video_path)
audio_frame_features = self.process_audio(audio_signal)
# Extract video features
video = cv2.VideoCapture(video_path)
ret, frame = video.read()
if not ret:
print(f"Failed to read video: {video_path}")
continue
clip_frames = []
while len(clip_frames) < self.num_frames:
frame_processed = self.preprocess_frame(frame)
clip_frames.append(frame_processed)
ret, frame = video.read()
if not ret:
break
clip_frames = np.array(clip_frames[:self.num_frames])
video.release()
# Combine video and audio features for prediction
combined_features_for_prediction = self.combine_features(clip_frames, audio_frame_features)
# Make a prediction
prediction = self.classification_model.predict(
[np.array([combined_features_for_prediction]), np.array([audio_frame_features])])
predicted_label = self.label_encoder.inverse_transform([np.argmax(prediction)])[0]
# Retrieve clip items for the predicted label
if predicted_label in self.all_clip_items:
clip_items_for_predicted_label = self.all_clip_items[predicted_label]
print("Clip items for predicted label:", clip_items_for_predicted_label)
# Calculate the total duration
total_duration = sum([int(clip_item["duration"]) for clip_item in clip_items_for_predicted_label])
# Create the root element
root = ET.Element("xmeml", version="5")
# Create the project structure
project = ET.SubElement(root, "project")
ET.SubElement(project, "name").text = "Untitled Project 1"
children = ET.SubElement(project, "children")
# Create and append the bin element to children
bin_element = ET.Element("bin")
ET.SubElement(bin_element, "updatebehavior").text = "add"
ET.SubElement(bin_element, "name").text = "Custom Bins"
ET.SubElement(bin_element, "children") # Empty children element for the bin
children.append(bin_element)
# Define video_name_with_extension here
video_name_with_extension = os.path.basename(video_path)
# Append the clip structure directly to children (not inside bin)
clip_structure = self.create_clip_structure(video_name_with_extension, video_path,
total_duration) # <-- Added total_duration here
children.append(clip_structure)
# Append the sequence structure to children
sequence, track, media = self.create_sequence_structure(video_name_with_extension, total_duration)
children.append(sequence)
clip_item_ids = [] # List to store generated IDs
link_ids_list = [] # List to store link IDs for each clip item
video_file_elements = [] # List to store <file> elements for each video <clipitem>
for clip_item in clip_items_for_predicted_label:
# Generate three unique IDs: one for video and two for audio
video_clip_item_id = self.generate_clipitem_id(video_name_with_extension)
audio_clip_item_id_1 = self.generate_clipitem_id(video_name_with_extension)
audio_clip_item_id_2 = self.generate_clipitem_id(video_name_with_extension)
# Store the generated IDs
clip_item_ids.append(video_clip_item_id)
link_ids_list.append([video_clip_item_id, audio_clip_item_id_1, audio_clip_item_id_2])
clip_item['id'] = video_clip_item_id
clip_item['name'] = video_name_with_extension
# Convert the modified clip item dictionary to XML and append to track
clipitem_element = self.dict_to_xml("clipitem", clip_item)
# Remove all existing <link> elements from the video <clipitem>
for link in clipitem_element.findall("link"):
clipitem_element.remove(link)
# Modify the <file> section inside the <clipitem>
file_element = clipitem_element.find("file")
file_id_value = os.path.splitext(os.path.basename(video_path))[
0] # Use the video name without extension
if file_element is not None:
file_element.set("id", file_id_value) # Set the 'id' attribute
file_element.text = None # Ensure there's no text inside the <file> element
else:
# If <file> element doesn't exist, create one with the 'id' attribute
ET.SubElement(clipitem_element, "file", id=file_id_value)
# Temporarily extract the <fielddominance> element
fielddominance_elem = clipitem_element.find("fielddominance")
if fielddominance_elem is not None:
clipitem_element.remove(fielddominance_elem)
# Add the correct <linkclipref> elements
link_elem_self = ET.SubElement(clipitem_element, "link")
ET.SubElement(link_elem_self, "linkclipref").text = video_clip_item_id
link_elem_audio1 = ET.SubElement(clipitem_element, "link")
ET.SubElement(link_elem_audio1, "linkclipref").text = audio_clip_item_id_1
link_elem_audio2 = ET.SubElement(clipitem_element, "link")
ET.SubElement(link_elem_audio2, "linkclipref").text = audio_clip_item_id_2
# Re-append the <fielddominance> element to the end
if fielddominance_elem is not None:
clipitem_element.append(fielddominance_elem)
track.append(clipitem_element)
# Call the adjust_clipitem_start_end function here
self.adjust_clipitem_start_end(track.findall('clipitem'))
# Extract the <file> element and store it in the video_file_elements list
file_element = clipitem_element.find("file")
if file_element is not None:
video_file_elements.append(file_element)
# Move the audio track creation outside the clip items loop
video_clip_items_list = clip_items_for_predicted_label
# Pass the video_file_elements list to the create_audio_track function
audio_track_left = self.create_audio_track([id_list[1] for id_list in link_ids_list], -1, link_ids_list,
video_clip_items_list, video_file_elements)
audio_track_right = self.create_audio_track([id_list[2] for id_list in link_ids_list], 1, link_ids_list,
video_clip_items_list, video_file_elements)
audio = ET.SubElement(media, "audio")
audio.append(audio_track_left)
audio.append(audio_track_right)
# Save the entire XML tree to a file after processing all clip items
output_path = os.path.join(os.path.dirname(video_path), f"{os.path.basename(video_path)}_predicted.xml")
tree = ET.ElementTree(root)
with open(output_path, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
else:
print(f"Predicted label for {video_path}: {predicted_label} (No clip items found)")
# Example usage of the class
# extractor = VideoAudioFeatureExtractor("path_to_video.mp4", "output_path.xml")
# extractor.process_video()