GestureLSM / dataloaders /utils /data_sample.py
Tharun156's picture
Upload 149 files
f7400bf verified
import math
import numpy as np
from collections import defaultdict
from loguru import logger
def sample_from_clip(
lmdb_manager, audio_file, audio_each_file, pose_each_file, trans_each_file,
trans_v_each_file, shape_each_file, facial_each_file, word_each_file,
vid_each_file, emo_each_file, sem_each_file, args, ori_stride, ori_length,
disable_filtering, clean_first_seconds, clean_final_seconds, is_test,
n_out_samples):
"""Sample clips from the data according to specified parameters."""
round_seconds_skeleton = pose_each_file.shape[0] // args.pose_fps
# Calculate timing information
timing_info = calculate_timing_info(
audio_each_file, facial_each_file, round_seconds_skeleton,
args.audio_fps, args.pose_fps, args.audio_sr, args.audio_rep
)
round_seconds_skeleton = timing_info['final_seconds']
# Calculate clip boundaries
clip_info = calculate_clip_boundaries(
round_seconds_skeleton, clean_first_seconds, clean_final_seconds,
args.audio_fps, args.pose_fps
)
n_filtered_out = defaultdict(int)
# Process each training length ratio
for ratio in args.multi_length_training:
processed_data = process_data_with_ratio(
ori_stride, ori_length, ratio, clip_info, args, is_test,
audio_each_file, pose_each_file, trans_each_file, trans_v_each_file,
shape_each_file, facial_each_file, word_each_file, vid_each_file,
emo_each_file, sem_each_file, audio_file,
lmdb_manager, n_out_samples
)
for type_key, count in processed_data['filtered_counts'].items():
n_filtered_out[type_key] += count
n_out_samples = processed_data['n_out_samples']
return n_filtered_out, n_out_samples
def calculate_timing_info(audio_data, facial_data, round_seconds_skeleton,
audio_fps, pose_fps, audio_sr, audio_rep):
"""Calculate timing information for the data."""
if audio_data is not None:
if audio_rep != "wave16k":
round_seconds_audio = len(audio_data) // audio_fps
elif audio_rep == "mfcc":
round_seconds_audio = audio_data.shape[0] // audio_fps
else:
round_seconds_audio = audio_data.shape[0] // audio_sr
if facial_data is not None:
round_seconds_facial = facial_data.shape[0] // pose_fps
logger.info(f"audio: {round_seconds_audio}s, pose: {round_seconds_skeleton}s, facial: {round_seconds_facial}s")
final_seconds = min(round_seconds_audio, round_seconds_skeleton, round_seconds_facial)
max_round = max(round_seconds_audio, round_seconds_skeleton, round_seconds_facial)
if final_seconds != max_round:
logger.warning(f"reduce to {final_seconds}s, ignore {max_round-final_seconds}s")
else:
logger.info(f"pose: {round_seconds_skeleton}s, audio: {round_seconds_audio}s")
final_seconds = min(round_seconds_audio, round_seconds_skeleton)
max_round = max(round_seconds_audio, round_seconds_skeleton)
if final_seconds != max_round:
logger.warning(f"reduce to {final_seconds}s, ignore {max_round-final_seconds}s")
else:
final_seconds = round_seconds_skeleton
return {
'final_seconds': final_seconds
}
def calculate_clip_boundaries(round_seconds, clean_first_seconds, clean_final_seconds,
audio_fps, pose_fps):
"""Calculate the boundaries for clip sampling."""
clip_s_t = clean_first_seconds
clip_e_t = round_seconds - clean_final_seconds
return {
'clip_s_t': clip_s_t,
'clip_e_t': clip_e_t,
'clip_s_f_audio': audio_fps * clip_s_t,
'clip_e_f_audio': clip_e_t * audio_fps,
'clip_s_f_pose': clip_s_t * pose_fps,
'clip_e_f_pose': clip_e_t * pose_fps
}
def process_data_with_ratio(ori_stride, ori_length, ratio, clip_info, args, is_test,
audio_data, pose_data, trans_data, trans_v_data,
shape_data, facial_data, word_data, vid_data,
emo_data, sem_data, audio_file,
lmdb_manager, n_out_samples):
"""Process data with a specific training length ratio."""
if is_test and not args.test_clip:
cut_length = clip_info['clip_e_f_pose'] - clip_info['clip_s_f_pose']
args.stride = cut_length
max_length = cut_length
else:
args.stride = int(ratio * ori_stride)
cut_length = int(ori_length * ratio)
num_subdivision = math.floor(
(clip_info['clip_e_f_pose'] - clip_info['clip_s_f_pose'] - cut_length) / args.stride
) + 1
logger.info(f"pose from frame {clip_info['clip_s_f_pose']} to {clip_info['clip_e_f_pose']}, length {cut_length}")
logger.info(f"{num_subdivision} clips is expected with stride {args.stride}")
if audio_data is not None:
audio_short_length = math.floor(cut_length / args.pose_fps * args.audio_fps)
logger.info(f"audio from frame {clip_info['clip_s_f_audio']} to {clip_info['clip_e_f_audio']}, length {audio_short_length}")
# Process subdivisions
filtered_counts = defaultdict(int)
for i in range(num_subdivision):
sample_data = extract_sample_data(
i, clip_info, cut_length, args,
audio_data, pose_data, trans_data, trans_v_data,
shape_data, facial_data, word_data, vid_data,
emo_data, sem_data, audio_file,
audio_short_length
)
if sample_data['pose'].any() is not None:
lmdb_manager.add_sample([
sample_data['pose'], sample_data['audio'], sample_data['facial'],
sample_data['shape'], sample_data['word'], sample_data['emo'],
sample_data['sem'], sample_data['vid'], sample_data['trans'],
sample_data['trans_v'], sample_data['audio_name']
])
n_out_samples += 1
return {
'filtered_counts': filtered_counts,
'n_out_samples': n_out_samples
}
def extract_sample_data(idx, clip_info, cut_length, args,
audio_data, pose_data, trans_data, trans_v_data,
shape_data, facial_data, word_data, vid_data,
emo_data, sem_data, audio_file,
audio_short_length):
"""Extract a single sample from the data."""
start_idx = clip_info['clip_s_f_pose'] + idx * args.stride
fin_idx = start_idx + cut_length
sample_data = {
'pose': pose_data[start_idx:fin_idx],
'trans': trans_data[start_idx:fin_idx],
'trans_v': trans_v_data[start_idx:fin_idx],
'shape': shape_data[start_idx:fin_idx],
'facial': facial_data[start_idx:fin_idx] if args.facial_rep is not None else np.array([-1]),
'word': word_data[start_idx:fin_idx] if args.word_rep is not None else np.array([-1]),
'emo': emo_data[start_idx:fin_idx] if args.emo_rep is not None else np.array([-1]),
'sem': sem_data[start_idx:fin_idx] if args.sem_rep is not None else np.array([-1]),
'vid': vid_data[start_idx:fin_idx] if args.id_rep is not None else np.array([-1]),
'audio_name': audio_file
}
if audio_data is not None:
audio_start = clip_info['clip_s_f_audio'] + math.floor(idx * args.stride * args.audio_fps / args.pose_fps)
audio_end = audio_start + audio_short_length
sample_data['audio'] = audio_data[audio_start:audio_end]
else:
sample_data['audio'] = np.array([-1])
return sample_data