Spaces:
Running
on
Zero
Running
on
Zero
from importlib.resources import path | |
import pathlib | |
import soundfile as sf | |
import numpy as np | |
import json | |
import multiprocessing | |
import argparse | |
import tqdm | |
import gzip | |
import time | |
import os | |
from tokenizer import TextTokenizer, tokenize_text | |
import glob | |
import sys | |
import os, random, numpy as np, socket | |
import json | |
import tqdm | |
def write_jsonl(data, fn): | |
with open(fn, "w") as file: | |
for entry in data: | |
file.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
def read_jsonl(file_path): | |
cur_data = [] | |
with open(file_path, 'r', encoding='utf-8-sig') as file: | |
for line in file: | |
cur_data.append(json.loads(line.strip())) | |
return cur_data | |
def save_audio(seq, fn): | |
output = seq | |
os.makedirs(os.path.dirname(fn), exist_ok=True) | |
sf.write(fn, output, samplerate=16000) | |
def save_text(text, fn): | |
os.makedirs(os.path.dirname(fn), exist_ok=True) | |
with open(fn, "w") as wwf: | |
wwf.writelines(text) | |
def phonemize_and_save(text, fn): | |
phn = tokenize_text(text_tokenizer, text) | |
os.makedirs(os.path.dirname(fn), exist_ok=True) | |
with open(fn, "w") as f: | |
f.write(' '.join(phn)) | |
return set(phn) | |
def cut_sequence(task): | |
in_audio_fn, output_dir, metadata = task | |
if not os.path.isfile(in_audio_fn): | |
# print("missing: ", in_audio_fn) | |
return None | |
data, samplerate = sf.read(in_audio_fn) | |
assert len(data.shape) == 1 | |
assert samplerate == 16000 | |
all_phns = set() | |
for item in metadata: | |
out_fn = item['file_id'] | |
out_audio_fn = os.path.join(output_dir, "audio", out_fn) | |
out_text_fn = os.path.join(output_dir, "audio", out_fn.replace(".flac", ".txt")) | |
out_phn_fn = os.path.join(output_dir, "phoneme", out_fn.replace(".flac", ".txt")) | |
save_audio(data[int(item['vad'][0]*samplerate):int(item['vad'][1]*samplerate)], out_audio_fn) | |
save_text(item['text'], out_text_fn) | |
phns = phonemize_and_save(item['text'], out_phn_fn) | |
all_phns.update(phns) | |
return all_phns | |
from collections import defaultdict | |
# Function to create a defaultdict recursively | |
def nested_defaultdict(levels, inner_type): | |
if levels <= 1: | |
return defaultdict(inner_type) | |
return defaultdict(lambda: nested_defaultdict(levels-1, inner_type)) | |
def open_mani(fn): | |
print("load segmentation and transcription metadata...") | |
stime = time.time() | |
data = [] | |
with gzip.open(fn, 'rt', encoding='utf-8') as f: | |
for line in f: | |
data.append(json.loads(line)) | |
print(f"loading done, took {time.time() - stime:.4f} seconds") | |
return data | |
def cut(split, | |
audio_dir, | |
mani_dir, | |
output_dir, | |
n_process=32, | |
percent=0.5): | |
split2manifest = { | |
"train": [ | |
"libriheavy_long_cuts_small.jsonl", | |
"libriheavy_long_cuts_medium.jsonl", | |
"libriheavy_long_cuts_large.jsonl", | |
"libriheavy_cuts_small.jsonl", | |
"libriheavy_cuts_medium.jsonl", | |
"libriheavy_cuts_large.jsonl", | |
], | |
"valid": [ | |
"libriheavy_cuts_dev.jsonl", | |
"libriheavy_long_cuts_dev.jsonl" | |
], | |
"test": [ | |
"libriheavy_cuts_test_clean.jsonl", | |
"libriheavy_cuts_test_other.jsonl", | |
"libriheavy_long_cuts_test_clean.jsonl", | |
"libriheavy_long_cuts_test_other.jsonl" | |
] | |
} | |
print("organize data by recording_id (i.e. the original big .flac file name)...") | |
stime = time.time() | |
organized_data = nested_defaultdict(4, list) | |
manifest_fn = os.path.join(output_dir, "manifest_mimi", split+".txt") | |
os.makedirs(os.path.join(output_dir, "manifest_mimi"), exist_ok=True) | |
with open(manifest_fn, "w") as wf: | |
for mani_fn in split2manifest[split]: | |
# data = open_mani(os.path.join(mani_dir, mani_fn)) | |
data = read_jsonl(os.path.join(mani_dir, mani_fn)) | |
for item in data: | |
file_id = item['supervisions'][0]['id'] + '.flac' | |
recording_id = item['recording']['id'] + '.flac' | |
sizeSplit, spk, book, flac = recording_id.split("/") # e.g. 'medium/100/emerald_city_librivox_64kb_mp3/emeraldcity_01_baum_64kb' | |
if os.path.isfile(os.path.join(audio_dir, recording_id)): | |
vad = (item['start'], item['start']+item['duration']) | |
text = item['supervisions'][0]['custom']['texts'][0] | |
file_id = file_id.replace(".flac", "") + f"_{vad[0]:.2f}_{vad[1]:.2f}.flac" | |
organized_data[sizeSplit][spk][book][recording_id].append({"file_id": file_id, "vad":vad, "text": text}) | |
wf.writelines(f"{file_id}\t{item['duration']}\n") | |
# #### take only a subet of tasks | |
tasks = [(os.path.join(audio_dir, recording_id), output_dir, organized_data[sizeSplit][spk][book][recording_id], spk) for sizeSplit in organized_data for spk in organized_data[sizeSplit] for book in organized_data[sizeSplit][spk] for recording_id in organized_data[sizeSplit][spk][book]] | |
ntasks = len(tasks) | |
spk2tasks = defaultdict(list) | |
for task in tasks: | |
spk2tasks[task[3]].append(task) | |
# randomly shuffle each task list for each speaker | |
for spk in spk2tasks: | |
random.shuffle(spk2tasks[spk]) | |
# take only 20% of the tasks, uniformly sampled from each speaker | |
# randomly pick a speaker, and then randomly pick a task from that speaker | |
tasks = [] | |
while len(tasks) < ntasks * percent: | |
spk = random.choice(list(spk2tasks.keys())) | |
if len(spk2tasks[spk]) == 0: | |
continue | |
tasks.append(spk2tasks[spk].pop()[:-1]) | |
print(f"take only {percent*100:.2f}% of the tasks, {len(tasks)} out of {ntasks} tasks") | |
#### take only a subet of tasks | |
print(f"organizing done, took {time.time() - stime:.4f} seconds") | |
print(f"Launching {n_process} processes") | |
phn_vocab = set() | |
cnt = 0 | |
with multiprocessing.Pool(processes=n_process) as pool: | |
for phns in tqdm.tqdm(pool.imap_unordered(cut_sequence, tasks), total=len(tasks)): | |
cnt += 1 | |
if phns != None: | |
phn_vocab.update(phns) | |
# save phn vocabulary | |
if split == "train": | |
vocab_fn = os.path.join(output_dir, "vocab.txt") | |
with open(vocab_fn, "w") as f: | |
for i, phn in enumerate(list(phn_vocab)): | |
if i < len(list(phn_vocab)) - 1: | |
f.write(f"{str(i)}\t{phn}\n") | |
else: | |
f.write(f"{str(i)}\t{phn}") | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Cut a dataset in small " | |
"sequences using VAD files") | |
parser.add_argument('--split', type=str, default='train', choices=['train', 'valid', 'test'], help="train = libriheavy_cuts_{small,medium,large}.jsonl.gz, valid = libriheavy_cuts_dev_{clean,other}.jsonl.gz, test = libriheavy_cuts_test_{clean,other}.jsonl.gz") | |
parser.add_argument('--audio_dir', type=str, default="/data/scratch/pyp/datasets/librilight_example", | |
help="Path to the audio directory") | |
parser.add_argument('--manifest_dir', type=str, default="/data/scratch/pyp/datasets/librilight/libriheavy", help="path to the transcription file's dir, can be downloaded https://huggingface.co/datasets/pkufool/libriheavy/tree/main/v0.1") | |
parser.add_argument('--output_dir', type=str, default="/data/scratch/pyp/datasets/librilight/librilight_example_preprocessed", | |
help="Path to the output directory") | |
parser.add_argument('--n_workers', type=int, default=16, | |
help="Number of parallel worker processes") | |
parser.add_argument('--percent', type=float, default=0.5, help="take only this percent of the tasks, randomly sampled from each speaker") | |
return parser.parse_args() | |
if __name__ == "__main__": | |
args = parse_args() | |
pathlib.Path(args.output_dir).mkdir(exist_ok=True, parents=True) | |
text_tokenizer = TextTokenizer() | |
cut(args.split, args.audio_dir, args.manifest_dir, args.output_dir, args.n_workers, args.percent) |