|
|
|
import csv
|
|
import fnmatch
|
|
import glob
|
|
import json
|
|
import os
|
|
import os.path as osp
|
|
|
|
|
|
def parse_directory(path,
|
|
rgb_prefix='img_',
|
|
flow_x_prefix='flow_x_',
|
|
flow_y_prefix='flow_y_',
|
|
level=1):
|
|
"""Parse directories holding extracted frames from standard benchmarks.
|
|
|
|
Args:
|
|
path (str): Directory path to parse frames.
|
|
rgb_prefix (str): Prefix of generated rgb frames name.
|
|
default: 'img_'.
|
|
flow_x_prefix (str): Prefix of generated flow x name.
|
|
default: `flow_x_`.
|
|
flow_y_prefix (str): Prefix of generated flow y name.
|
|
default: `flow_y_`.
|
|
level (int): Directory level for glob searching. Options are 1 and 2.
|
|
default: 1.
|
|
|
|
Returns:
|
|
dict: frame info dict with video id as key and tuple(path(str),
|
|
rgb_num(int), flow_x_num(int)) as value.
|
|
"""
|
|
print(f'parse frames under directory {path}')
|
|
if level == 1:
|
|
|
|
def locate_directory(x):
|
|
return osp.basename(x)
|
|
|
|
frame_dirs = glob.glob(osp.join(path, '*'))
|
|
|
|
elif level == 2:
|
|
|
|
def locate_directory(x):
|
|
return osp.join(osp.basename(osp.dirname(x)), osp.basename(x))
|
|
|
|
frame_dirs = glob.glob(osp.join(path, '*', '*'))
|
|
|
|
else:
|
|
raise ValueError('level can be only 1 or 2')
|
|
|
|
def count_files(directory, prefix_list):
|
|
"""Count file number with a given directory and prefix.
|
|
|
|
Args:
|
|
directory (str): Data directory to be search.
|
|
prefix_list (list): List or prefix.
|
|
|
|
Returns:
|
|
list (int): Number list of the file with the prefix.
|
|
"""
|
|
lst = os.listdir(directory)
|
|
cnt_list = [len(fnmatch.filter(lst, x + '*')) for x in prefix_list]
|
|
return cnt_list
|
|
|
|
|
|
frame_dict = {}
|
|
for i, frame_dir in enumerate(frame_dirs):
|
|
total_num = count_files(frame_dir,
|
|
(rgb_prefix, flow_x_prefix, flow_y_prefix))
|
|
dir_name = locate_directory(frame_dir)
|
|
|
|
num_x = total_num[1]
|
|
num_y = total_num[2]
|
|
if num_x != num_y:
|
|
raise ValueError(f'x and y direction have different number '
|
|
f'of flow images in video directory: {frame_dir}')
|
|
if i % 200 == 0:
|
|
print(f'{i} videos parsed')
|
|
frame_dict[dir_name] = (frame_dir, total_num[0], num_x)
|
|
|
|
print('frame directory analysis done')
|
|
return frame_dict
|
|
|
|
|
|
def parse_ucf101_splits(level):
|
|
"""Parse UCF-101 dataset into "train", "val", "test" splits.
|
|
|
|
Args:
|
|
level (int): Directory level of data. 1 for the single-level directory,
|
|
2 for the two-level directory.
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of UCF-101.
|
|
"""
|
|
class_index_file = 'data/ucf101/annotations/classInd.txt'
|
|
train_file_template = 'data/ucf101/annotations/trainlist{:02d}.txt'
|
|
test_file_template = 'data/ucf101/annotations/testlist{:02d}.txt'
|
|
|
|
with open(class_index_file, 'r') as fin:
|
|
class_index = [x.strip().split() for x in fin]
|
|
class_mapping = {x[1]: int(x[0]) - 1 for x in class_index}
|
|
|
|
def line_to_map(line):
|
|
"""A function to map line string to video and label.
|
|
|
|
Args:
|
|
line (str): A long directory path, which is a text path.
|
|
|
|
Returns:
|
|
tuple[str, str]: (video, label), video is the video id,
|
|
label is the video label.
|
|
"""
|
|
items = line.strip().split()
|
|
video = osp.splitext(items[0])[0]
|
|
if level == 1:
|
|
video = osp.basename(video)
|
|
label = items[0]
|
|
elif level == 2:
|
|
video = osp.join(
|
|
osp.basename(osp.dirname(video)), osp.basename(video))
|
|
label = class_mapping[osp.dirname(items[0])]
|
|
return video, label
|
|
|
|
splits = []
|
|
for i in range(1, 4):
|
|
with open(train_file_template.format(i), 'r') as fin:
|
|
train_list = [line_to_map(x) for x in fin]
|
|
|
|
with open(test_file_template.format(i), 'r') as fin:
|
|
test_list = [line_to_map(x) for x in fin]
|
|
splits.append((train_list, test_list))
|
|
|
|
return splits
|
|
|
|
|
|
def parse_jester_splits(level):
|
|
"""Parse Jester into "train", "val" splits.
|
|
|
|
Args:
|
|
level (int): Directory level of data. 1 for the single-level directory,
|
|
2 for the two-level directory.
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of Jester dataset.
|
|
"""
|
|
|
|
class_index_file = 'data/jester/annotations/jester-v1-labels.csv'
|
|
train_file = 'data/jester/annotations/jester-v1-train.csv'
|
|
val_file = 'data/jester/annotations/jester-v1-validation.csv'
|
|
test_file = 'data/jester/annotations/jester-v1-test.csv'
|
|
|
|
with open(class_index_file, 'r') as fin:
|
|
class_index = [x.strip() for x in fin]
|
|
class_mapping = {class_index[idx]: idx for idx in range(len(class_index))}
|
|
|
|
def line_to_map(line, test_mode=False):
|
|
items = line.strip().split(';')
|
|
video = items[0]
|
|
if level == 1:
|
|
video = osp.basename(video)
|
|
elif level == 2:
|
|
video = osp.join(
|
|
osp.basename(osp.dirname(video)), osp.basename(video))
|
|
if test_mode:
|
|
return video
|
|
|
|
label = class_mapping[items[1]]
|
|
return video, label
|
|
|
|
with open(train_file, 'r') as fin:
|
|
train_list = [line_to_map(x) for x in fin]
|
|
|
|
with open(val_file, 'r') as fin:
|
|
val_list = [line_to_map(x) for x in fin]
|
|
|
|
with open(test_file, 'r') as fin:
|
|
test_list = [line_to_map(x, test_mode=True) for x in fin]
|
|
|
|
splits = ((train_list, val_list, test_list), )
|
|
return splits
|
|
|
|
|
|
def parse_sthv1_splits(level):
|
|
"""Parse Something-Something dataset V1 into "train", "val" splits.
|
|
|
|
Args:
|
|
level (int): Directory level of data. 1 for the single-level directory,
|
|
2 for the two-level directory.
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of Something-Something V1 dataset.
|
|
"""
|
|
|
|
|
|
class_index_file = 'data/sthv1/annotations/something-something-v1-labels.csv'
|
|
|
|
train_file = 'data/sthv1/annotations/something-something-v1-train.csv'
|
|
val_file = 'data/sthv1/annotations/something-something-v1-validation.csv'
|
|
test_file = 'data/sthv1/annotations/something-something-v1-test.csv'
|
|
|
|
with open(class_index_file, 'r') as fin:
|
|
class_index = [x.strip() for x in fin]
|
|
class_mapping = {class_index[idx]: idx for idx in range(len(class_index))}
|
|
|
|
def line_to_map(line, test_mode=False):
|
|
items = line.strip().split(';')
|
|
video = items[0]
|
|
if level == 1:
|
|
video = osp.basename(video)
|
|
elif level == 2:
|
|
video = osp.join(
|
|
osp.basename(osp.dirname(video)), osp.basename(video))
|
|
if test_mode:
|
|
return video
|
|
|
|
label = class_mapping[items[1]]
|
|
return video, label
|
|
|
|
with open(train_file, 'r') as fin:
|
|
train_list = [line_to_map(x) for x in fin]
|
|
|
|
with open(val_file, 'r') as fin:
|
|
val_list = [line_to_map(x) for x in fin]
|
|
|
|
with open(test_file, 'r') as fin:
|
|
test_list = [line_to_map(x, test_mode=True) for x in fin]
|
|
|
|
splits = ((train_list, val_list, test_list), )
|
|
return splits
|
|
|
|
|
|
def parse_sthv2_splits(level):
|
|
"""Parse Something-Something dataset V2 into "train", "val" splits.
|
|
|
|
Args:
|
|
level (int): Directory level of data. 1 for the single-level directory,
|
|
2 for the two-level directory.
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of Something-Something V2 dataset.
|
|
"""
|
|
|
|
|
|
class_index_file = 'data/sthv2/annotations/something-something-v2-labels.json'
|
|
|
|
train_file = 'data/sthv2/annotations/something-something-v2-train.json'
|
|
val_file = 'data/sthv2/annotations/something-something-v2-validation.json'
|
|
test_file = 'data/sthv2/annotations/something-something-v2-test.json'
|
|
|
|
with open(class_index_file, 'r') as fin:
|
|
class_mapping = json.loads(fin.read())
|
|
|
|
def line_to_map(item, test_mode=False):
|
|
video = item['id']
|
|
if level == 1:
|
|
video = osp.basename(video)
|
|
elif level == 2:
|
|
video = osp.join(
|
|
osp.basename(osp.dirname(video)), osp.basename(video))
|
|
if test_mode:
|
|
return video
|
|
|
|
template = item['template'].replace('[', '')
|
|
template = template.replace(']', '')
|
|
label = int(class_mapping[template])
|
|
return video, label
|
|
|
|
with open(train_file, 'r') as fin:
|
|
items = json.loads(fin.read())
|
|
train_list = [line_to_map(item) for item in items]
|
|
|
|
with open(val_file, 'r') as fin:
|
|
items = json.loads(fin.read())
|
|
val_list = [line_to_map(item) for item in items]
|
|
|
|
with open(test_file, 'r') as fin:
|
|
items = json.loads(fin.read())
|
|
test_list = [line_to_map(item, test_mode=True) for item in items]
|
|
|
|
splits = ((train_list, val_list, test_list), )
|
|
return splits
|
|
|
|
|
|
def parse_mmit_splits():
|
|
"""Parse Multi-Moments in Time dataset into "train", "val" splits.
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of Multi-Moments in Time.
|
|
"""
|
|
|
|
|
|
def line_to_map(x):
|
|
video = osp.splitext(x[0])[0]
|
|
labels = [int(digit) for digit in x[1:]]
|
|
return video, labels
|
|
|
|
csv_reader = csv.reader(open('data/mmit/annotations/trainingSet.csv'))
|
|
train_list = [line_to_map(x) for x in csv_reader]
|
|
|
|
csv_reader = csv.reader(open('data/mmit/annotations/validationSet.csv'))
|
|
val_list = [line_to_map(x) for x in csv_reader]
|
|
|
|
test_list = val_list
|
|
|
|
splits = ((train_list, val_list, test_list), )
|
|
return splits
|
|
|
|
|
|
def parse_kinetics_splits(level, dataset):
|
|
"""Parse Kinetics dataset into "train", "val", "test" splits.
|
|
|
|
Args:
|
|
level (int): Directory level of data. 1 for the single-level directory,
|
|
2 for the two-level directory.
|
|
dataset (str): Denotes the version of Kinetics that needs to be parsed,
|
|
choices are "kinetics400", "kinetics600" and "kinetics700".
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of Kinetics.
|
|
"""
|
|
|
|
def convert_label(s, keep_whitespaces=False):
|
|
"""Convert label name to a formal string.
|
|
|
|
Remove redundant '"' and convert whitespace to '_'.
|
|
|
|
Args:
|
|
s (str): String to be converted.
|
|
keep_whitespaces(bool): Whether to keep whitespace. Default: False.
|
|
|
|
Returns:
|
|
str: Converted string.
|
|
"""
|
|
if not keep_whitespaces:
|
|
return s.replace('"', '').replace(' ', '_')
|
|
|
|
return s.replace('"', '')
|
|
|
|
def line_to_map(x, test=False):
|
|
"""A function to map line string to video and label.
|
|
|
|
Args:
|
|
x (str): A single line from Kinetics csv file.
|
|
test (bool): Indicate whether the line comes from test
|
|
annotation file.
|
|
|
|
Returns:
|
|
tuple[str, str]: (video, label), video is the video id,
|
|
label is the video label.
|
|
"""
|
|
if test:
|
|
|
|
video = f'{x[1]}_{int(float(x[2])):06d}_{int(float(x[3])):06d}'
|
|
label = -1
|
|
return video, label
|
|
|
|
video = f'{x[1]}_{int(float(x[2])):06d}_{int(float(x[3])):06d}'
|
|
if level == 2:
|
|
video = f'{convert_label(x[0])}/{video}'
|
|
else:
|
|
assert level == 1
|
|
label = class_mapping[convert_label(x[0])]
|
|
return video, label
|
|
|
|
train_file = f'data/{dataset}/annotations/kinetics_train.csv'
|
|
val_file = f'data/{dataset}/annotations/kinetics_val.csv'
|
|
test_file = f'data/{dataset}/annotations/kinetics_test.csv'
|
|
|
|
csv_reader = csv.reader(open(train_file))
|
|
|
|
next(csv_reader)
|
|
|
|
labels_sorted = sorted({convert_label(row[0]) for row in csv_reader})
|
|
class_mapping = {label: i for i, label in enumerate(labels_sorted)}
|
|
|
|
csv_reader = csv.reader(open(train_file))
|
|
next(csv_reader)
|
|
train_list = [line_to_map(x) for x in csv_reader]
|
|
|
|
csv_reader = csv.reader(open(val_file))
|
|
next(csv_reader)
|
|
val_list = [line_to_map(x) for x in csv_reader]
|
|
|
|
csv_reader = csv.reader(open(test_file))
|
|
next(csv_reader)
|
|
test_list = [line_to_map(x, test=True) for x in csv_reader]
|
|
|
|
splits = ((train_list, val_list, test_list), )
|
|
return splits
|
|
|
|
|
|
def parse_mit_splits():
|
|
"""Parse Moments in Time dataset into "train", "val" splits.
|
|
|
|
Returns:
|
|
list: "train", "val", "test" splits of Moments in Time.
|
|
"""
|
|
|
|
class_mapping = {}
|
|
with open('data/mit/annotations/moments_categories.txt') as f_cat:
|
|
for line in f_cat.readlines():
|
|
cat, digit = line.rstrip().split(',')
|
|
class_mapping[cat] = int(digit)
|
|
|
|
def line_to_map(x):
|
|
video = osp.splitext(x[0])[0]
|
|
label = class_mapping[osp.dirname(x[0])]
|
|
return video, label
|
|
|
|
csv_reader = csv.reader(open('data/mit/annotations/trainingSet.csv'))
|
|
train_list = [line_to_map(x) for x in csv_reader]
|
|
|
|
csv_reader = csv.reader(open('data/mit/annotations/validationSet.csv'))
|
|
val_list = [line_to_map(x) for x in csv_reader]
|
|
|
|
test_list = val_list
|
|
|
|
splits = ((train_list, val_list, test_list), )
|
|
return splits
|
|
|
|
|
|
def parse_hmdb51_split(level):
|
|
train_file_template = 'data/hmdb51/annotations/trainlist{:02d}.txt'
|
|
test_file_template = 'data/hmdb51/annotations/testlist{:02d}.txt'
|
|
class_index_file = 'data/hmdb51/annotations/classInd.txt'
|
|
|
|
def generate_class_index_file():
|
|
"""This function will generate a `ClassInd.txt` for HMDB51 in a format
|
|
like UCF101, where class id starts with 1."""
|
|
video_path = 'data/hmdb51/videos'
|
|
annotation_dir = 'data/hmdb51/annotations'
|
|
|
|
class_list = sorted(os.listdir(video_path))
|
|
class_dict = dict()
|
|
if not osp.exists(class_index_file):
|
|
with open(class_index_file, 'w') as f:
|
|
content = []
|
|
for class_id, class_name in enumerate(class_list):
|
|
|
|
|
|
class_dict[class_name] = class_id + 1
|
|
cur_line = ' '.join([str(class_id + 1), class_name])
|
|
content.append(cur_line)
|
|
content = '\n'.join(content)
|
|
f.write(content)
|
|
else:
|
|
print(f'{class_index_file} has been generated before.')
|
|
class_dict = {
|
|
class_name: class_id + 1
|
|
for class_id, class_name in enumerate(class_list)
|
|
}
|
|
|
|
for i in range(1, 4):
|
|
train_content = []
|
|
test_content = []
|
|
for class_name in class_dict:
|
|
filename = class_name + f'_test_split{i}.txt'
|
|
filename_path = osp.join(annotation_dir, filename)
|
|
with open(filename_path, 'r') as fin:
|
|
for line in fin:
|
|
video_info = line.strip().split()
|
|
video_name = video_info[0]
|
|
if video_info[1] == '1':
|
|
target_line = ' '.join([
|
|
osp.join(class_name, video_name),
|
|
str(class_dict[class_name])
|
|
])
|
|
train_content.append(target_line)
|
|
elif video_info[1] == '2':
|
|
target_line = ' '.join([
|
|
osp.join(class_name, video_name),
|
|
str(class_dict[class_name])
|
|
])
|
|
test_content.append(target_line)
|
|
train_content = '\n'.join(train_content)
|
|
test_content = '\n'.join(test_content)
|
|
with open(train_file_template.format(i), 'w') as fout:
|
|
fout.write(train_content)
|
|
with open(test_file_template.format(i), 'w') as fout:
|
|
fout.write(test_content)
|
|
|
|
generate_class_index_file()
|
|
|
|
with open(class_index_file, 'r') as fin:
|
|
class_index = [x.strip().split() for x in fin]
|
|
class_mapping = {x[1]: int(x[0]) - 1 for x in class_index}
|
|
|
|
def line_to_map(line):
|
|
items = line.strip().split()
|
|
video = osp.splitext(items[0])[0]
|
|
if level == 1:
|
|
video = osp.basename(video)
|
|
elif level == 2:
|
|
video = osp.join(
|
|
osp.basename(osp.dirname(video)), osp.basename(video))
|
|
label = class_mapping[osp.dirname(items[0])]
|
|
return video, label
|
|
|
|
splits = []
|
|
for i in range(1, 4):
|
|
with open(train_file_template.format(i), 'r') as fin:
|
|
train_list = [line_to_map(x) for x in fin]
|
|
|
|
with open(test_file_template.format(i), 'r') as fin:
|
|
test_list = [line_to_map(x) for x in fin]
|
|
splits.append((train_list, test_list))
|
|
|
|
return splits
|
|
|
|
|
|
def parse_diving48_splits():
|
|
|
|
train_file = 'data/diving48/annotations/Diving48_V2_train.json'
|
|
test_file = 'data/diving48/annotations/Diving48_V2_test.json'
|
|
|
|
train = json.load(open(train_file))
|
|
test = json.load(open(test_file))
|
|
|
|
|
|
|
|
|
|
train_list = []
|
|
test_list = []
|
|
|
|
for item in train:
|
|
vid_name = item['vid_name']
|
|
label = item['label']
|
|
train_list.append((vid_name, label))
|
|
|
|
for item in test:
|
|
vid_name = item['vid_name']
|
|
label = item['label']
|
|
test_list.append((vid_name, label))
|
|
|
|
splits = ((train_list, test_list), )
|
|
return splits
|
|
|