myTest01 / feature_extraction /generate_ddc_features.py
meng2003's picture
Upload 357 files
2d5fdd1
import numpy as np
from pathlib import Path
import json
import os.path
import sys
import argparse
import time
import json, pickle
import torch
from math import ceil
from scipy import signal
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.abspath(os.path.join(THIS_DIR, os.pardir))
DATA_DIR = os.path.join(ROOT_DIR, 'data')
sys.path.append(ROOT_DIR)
PAD_STATE = 0
START_STATE = 1
END_STATE = 2
EMPTY_STATE = 3
NUM_SPECIAL_STATES = 4
HUMAN_DELTA = 0.125 # minimum block separation that we are happy with
#from models import create_model
from models.ddc_model import DDCModel
from feature_extraction.utils import distribute_tasks,ResampleLinear1D
parser = argparse.ArgumentParser(description='Get DDC features from song features')
parser.add_argument("data_path", type=str, help="Directory contining Beat Saber level folders")
parser.add_argument('--checkpoints_dir', type=str)
parser.add_argument('--audio_format', type=str, default="wav")
parser.add_argument('--experiment_name', type=str)
parser.add_argument('--peak_threshold', type=float, default=0.0148)
parser.add_argument('--fps', type=float, default=60)
parser.add_argument('--checkpoint', type=str, default="latest")
parser.add_argument('--temperature', type=float, default=1.00)
parser.add_argument('--cuda', action="store_true")
parser.add_argument("--step_size", metavar='', type=float, default=0.01666666666)
parser.add_argument("--replace_existing", action="store_true")
args = parser.parse_args()
# makes arugments into global variables of the same name, used later in the code
globals().update(vars(args))
data_path = Path(data_path)
## distributing tasks accross nodes ##
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(rank)
# print("creating {} of size {}".format(feature_name, feature_size))
experiment_name = args.experiment_name+"/"
checkpoint = args.checkpoint
temperature=args.temperature
from pathlib import Path
''' LOAD MODEL, OPTS, AND WEIGHTS'''
#%%
##loading opt object from experiment
opt = json.loads(open(ROOT_DIR.__str__()+"/feature_extraction/"+experiment_name+"opt.json","r").read())
# we assume we have 1 GPU in generating machine :P
if args.cuda:
opt["gpu_ids"] = [0]
else:
opt["gpu_ids"] = []
opt["checkpoints_dir"] = args.checkpoints_dir
# print(opt["checkpoints_dir"])
opt["load_iter"] = int(checkpoint)
if args.cuda:
opt["cuda"] = True
else:
opt["cuda"] = False
opt["experiment_name"] = args.experiment_name.split("/")[0]
if "dropout" not in opt: #for older experiments
opt["dropout"] = 0.0
# construct opt Struct object
class Struct:
def __init__(self, **entries):
self.__dict__.update(entries)
opt = Struct(**opt)
assert opt.binarized
#model = create_model(opt)
model = DDCModel(opt)
#model.setup()
receptive_field = 1
checkpoint = "iter_"+checkpoint
model.load_networks(checkpoint)
#assuming mp3 for now. TODO: generalize
candidate_feature_files = sorted(data_path.glob('**/*'+audio_format+'_multi_mel_80.npy'), key=lambda path: path.parent.__str__())
tasks = distribute_tasks(candidate_feature_files,rank,size)
for i in tasks:
path = candidate_feature_files[i]
features_file = str(path)+"_"+"ddc_hidden"+".npy"
if replace_existing or not os.path.isfile(features_file):
print(path)
sr = opt.sampling_rate
hop = int(opt.step_size*sr)
features = np.load(path)
#generate level
# first_samples basically works as a padding, for the first few outputs, which don't have any "past part" of the song to look at.
first_samples = torch.full((1,opt.output_channels,receptive_field//2),START_STATE,dtype=torch.float)
print(features.shape)
features, peak_probs = model.generate_features(features)
peak_probs = peak_probs[0,:,-1].cpu().detach().numpy()
features = features.cpu().detach().numpy()
features = features[0]
features = ResampleLinear1D(features,int(np.floor(features.shape[0]*0.01*fps)))
# features = downsample_signal(features[0], 0.01666666666667/0.01)
print(features.shape)
np.save(features_file,features)
window = signal.hamming(ceil(HUMAN_DELTA/opt.step_size))
smoothed_peaks = np.convolve(peak_probs,window,mode='same')
thresholded_peaks = smoothed_peaks*(smoothed_peaks>args.peak_threshold)
peaks = signal.find_peaks(thresholded_peaks)[0]
print("number of peaks", len(peaks))