Spaces:
Runtime error
Runtime error
File size: 4,479 Bytes
2d5fdd1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import numpy as np
from pathlib import Path
import json
import os.path
import sys
import argparse
import time
import json, pickle
import torch
from math import ceil
from scipy import signal
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.abspath(os.path.join(THIS_DIR, os.pardir))
DATA_DIR = os.path.join(ROOT_DIR, 'data')
sys.path.append(ROOT_DIR)
PAD_STATE = 0
START_STATE = 1
END_STATE = 2
EMPTY_STATE = 3
NUM_SPECIAL_STATES = 4
HUMAN_DELTA = 0.125 # minimum block separation that we are happy with
#from models import create_model
from models.ddc_model import DDCModel
from feature_extraction.utils import distribute_tasks,ResampleLinear1D
parser = argparse.ArgumentParser(description='Get DDC features from song features')
parser.add_argument("data_path", type=str, help="Directory contining Beat Saber level folders")
parser.add_argument('--checkpoints_dir', type=str)
parser.add_argument('--audio_format', type=str, default="wav")
parser.add_argument('--experiment_name', type=str)
parser.add_argument('--peak_threshold', type=float, default=0.0148)
parser.add_argument('--fps', type=float, default=60)
parser.add_argument('--checkpoint', type=str, default="latest")
parser.add_argument('--temperature', type=float, default=1.00)
parser.add_argument('--cuda', action="store_true")
parser.add_argument("--step_size", metavar='', type=float, default=0.01666666666)
parser.add_argument("--replace_existing", action="store_true")
args = parser.parse_args()
# makes arugments into global variables of the same name, used later in the code
globals().update(vars(args))
data_path = Path(data_path)
## distributing tasks accross nodes ##
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(rank)
# print("creating {} of size {}".format(feature_name, feature_size))
experiment_name = args.experiment_name+"/"
checkpoint = args.checkpoint
temperature=args.temperature
from pathlib import Path
''' LOAD MODEL, OPTS, AND WEIGHTS'''
#%%
##loading opt object from experiment
opt = json.loads(open(ROOT_DIR.__str__()+"/feature_extraction/"+experiment_name+"opt.json","r").read())
# we assume we have 1 GPU in generating machine :P
if args.cuda:
opt["gpu_ids"] = [0]
else:
opt["gpu_ids"] = []
opt["checkpoints_dir"] = args.checkpoints_dir
# print(opt["checkpoints_dir"])
opt["load_iter"] = int(checkpoint)
if args.cuda:
opt["cuda"] = True
else:
opt["cuda"] = False
opt["experiment_name"] = args.experiment_name.split("/")[0]
if "dropout" not in opt: #for older experiments
opt["dropout"] = 0.0
# construct opt Struct object
class Struct:
def __init__(self, **entries):
self.__dict__.update(entries)
opt = Struct(**opt)
assert opt.binarized
#model = create_model(opt)
model = DDCModel(opt)
#model.setup()
receptive_field = 1
checkpoint = "iter_"+checkpoint
model.load_networks(checkpoint)
#assuming mp3 for now. TODO: generalize
candidate_feature_files = sorted(data_path.glob('**/*'+audio_format+'_multi_mel_80.npy'), key=lambda path: path.parent.__str__())
tasks = distribute_tasks(candidate_feature_files,rank,size)
for i in tasks:
path = candidate_feature_files[i]
features_file = str(path)+"_"+"ddc_hidden"+".npy"
if replace_existing or not os.path.isfile(features_file):
print(path)
sr = opt.sampling_rate
hop = int(opt.step_size*sr)
features = np.load(path)
#generate level
# first_samples basically works as a padding, for the first few outputs, which don't have any "past part" of the song to look at.
first_samples = torch.full((1,opt.output_channels,receptive_field//2),START_STATE,dtype=torch.float)
print(features.shape)
features, peak_probs = model.generate_features(features)
peak_probs = peak_probs[0,:,-1].cpu().detach().numpy()
features = features.cpu().detach().numpy()
features = features[0]
features = ResampleLinear1D(features,int(np.floor(features.shape[0]*0.01*fps)))
# features = downsample_signal(features[0], 0.01666666666667/0.01)
print(features.shape)
np.save(features_file,features)
window = signal.hamming(ceil(HUMAN_DELTA/opt.step_size))
smoothed_peaks = np.convolve(peak_probs,window,mode='same')
thresholded_peaks = smoothed_peaks*(smoothed_peaks>args.peak_threshold)
peaks = signal.find_peaks(thresholded_peaks)[0]
print("number of peaks", len(peaks))
|