Spaces:
No application file
No application file
import torch | |
from torch.utils import data | |
import numpy as np | |
from os.path import join as pjoin | |
import random | |
import codecs as cs | |
from tqdm import tqdm | |
import utils.paramUtil as paramUtil | |
from torch.utils.data._utils.collate import default_collate | |
import options.option_transformer as option_trans | |
args = option_trans.get_args_parser() | |
def collate_fn(batch): | |
batch.sort(key=lambda x: x[3], reverse=True) | |
return default_collate(batch) | |
'''For use of training text-2-motion generative model''' | |
class Text2MotionDataset(data.Dataset): | |
def __init__(self, dataset_name, feat_bias = 5, unit_length = 4, codebook_size = 1024, tokenizer_name=None,method=None): | |
self.max_length = 64 | |
self.pointer = 0 | |
self.dataset_name = dataset_name | |
self.unit_length = unit_length | |
# self.mot_start_idx = codebook_size | |
self.mot_end_idx = codebook_size | |
self.mot_pad_idx = codebook_size + 1 | |
self.method=method | |
if dataset_name == 't2m': | |
self.data_root = './dataset/HumanML3D' | |
self.motion_dir = pjoin(self.data_root, 'new_joint_vecs') | |
self.text_dir = pjoin(self.data_root, 'texts') | |
self.joints_num = 22 | |
radius = 4 | |
fps = 20 | |
self.max_motion_length = 26 if unit_length == 8 else 51 | |
dim_pose = 263 | |
kinematic_chain = paramUtil.t2m_kinematic_chain | |
elif dataset_name == 'kit': | |
self.data_root = './dataset/KIT-ML' | |
self.motion_dir = pjoin(self.data_root, 'new_joint_vecs') | |
self.text_dir = pjoin(self.data_root, 'texts') | |
self.joints_num = 21 | |
radius = 240 * 8 | |
fps = 12.5 | |
dim_pose = 251 | |
self.max_motion_length = 26 if unit_length == 8 else 51 | |
kinematic_chain = paramUtil.kit_kinematic_chain | |
split_file = pjoin(self.data_root, 'train.txt') | |
id_list = [] | |
with cs.open(split_file, 'r') as f: | |
for line in f.readlines(): | |
id_list.append(line.strip()) | |
new_name_list = [] | |
data_dict = {} | |
if self.method=='adv': | |
pass | |
for name in tqdm(id_list): | |
try: | |
m_token_list = np.load(pjoin(self.data_root, tokenizer_name, '%s.npy'%name)) | |
# Read text | |
with cs.open(pjoin(self.text_dir, name + '.txt')) as f: | |
text_data = [] | |
flag = False | |
lines = f.readlines() | |
for line in lines: | |
try: | |
text_dict = {} | |
line_split = line.strip().split('#') | |
caption = line_split[0] | |
txt_perb = line_split[-1] | |
t_tokens = line_split[1].split(' ') | |
f_tag = float(line_split[2]) | |
to_tag = float(line_split[3]) | |
f_tag = 0.0 if np.isnan(f_tag) else f_tag | |
to_tag = 0.0 if np.isnan(to_tag) else to_tag | |
text_dict['caption'] = caption | |
text_dict['tokens'] = t_tokens | |
text_dict['caption_perb'] = txt_perb | |
if f_tag == 0.0 and to_tag == 0.0: | |
flag = True | |
text_data.append(text_dict) | |
else: | |
m_token_list_new = [tokens[int(f_tag*fps/unit_length) : int(to_tag*fps/unit_length)] for tokens in m_token_list if int(f_tag*fps/unit_length) < int(to_tag*fps/unit_length)] | |
if len(m_token_list_new) == 0: | |
continue | |
new_name = '%s_%f_%f'%(name, f_tag, to_tag) | |
data_dict[new_name] = {'m_token_list': m_token_list_new, | |
'text':[text_dict]} | |
new_name_list.append(new_name) | |
except: | |
pass | |
if flag: | |
data_dict[name] = {'m_token_list': m_token_list, | |
'text':text_data} | |
new_name_list.append(name) | |
except: | |
pass | |
self.data_dict = data_dict | |
self.name_list = new_name_list | |
def __len__(self): | |
return len(self.data_dict) | |
def __getitem__(self, item): | |
data = self.data_dict[self.name_list[item]] | |
m_token_list, text_list = data['m_token_list'], data['text'] | |
m_tokens = random.choice(m_token_list) | |
text_data = random.choice(text_list) | |
caption,caption_perb= text_data['caption'], text_data['caption_perb'] | |
coin = np.random.choice([False, False, True]) | |
# print(len(m_tokens)) | |
if coin: | |
# drop one token at the head or tail | |
coin2 = np.random.choice([True, False]) | |
if coin2: | |
m_tokens = m_tokens[:-1] | |
else: | |
m_tokens = m_tokens[1:] | |
m_tokens_len = m_tokens.shape[0] | |
if m_tokens_len+1 < self.max_motion_length: | |
m_tokens = np.concatenate([m_tokens, np.ones((1), dtype=int) * self.mot_end_idx, np.ones((self.max_motion_length-1-m_tokens_len), dtype=int) * self.mot_pad_idx], axis=0) | |
else: | |
m_tokens = np.concatenate([m_tokens, np.ones((1), dtype=int) * self.mot_end_idx], axis=0) | |
return caption,caption_perb, m_tokens.reshape(-1), m_tokens_len | |
def DATALoader(dataset_name, | |
batch_size, codebook_size, tokenizer_name, unit_length=4, | |
num_workers = 0) : | |
train_loader = torch.utils.data.DataLoader(Text2MotionDataset(dataset_name, codebook_size = codebook_size, tokenizer_name = tokenizer_name, unit_length=unit_length), | |
batch_size, | |
shuffle=False, | |
num_workers=num_workers, | |
#collate_fn=collate_fn, | |
drop_last = True) | |
return train_loader | |
from torch.utils.data.distributed import DistributedSampler | |
def DATALoader_ddp(dataset_name, | |
batch_size, codebook_size, tokenizer_name, unit_length=4, | |
num_workers = 0) : | |
dataset = Text2MotionDataset(dataset_name, codebook_size = codebook_size, tokenizer_name = tokenizer_name, unit_length=unit_length) | |
train_sampler = DistributedSampler(dataset) | |
train_loader = torch.utils.data.DataLoader(dataset, | |
batch_size, | |
num_workers=num_workers, | |
#collate_fn=collate_fn, | |
drop_last = True, | |
sampler=train_sampler) | |
return train_loader | |
def cycle(iterable): | |
while True: | |
for x in iterable: | |
yield x | |