File size: 6,267 Bytes
640ea28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
{
"python.pythonPath": "C:\\Users\\BiGCARE\\anaconda3\\envs\\sv2tts_korean\\python.exe"
}
from encoder.data_objects.speaker_verification_dataset import SpeakerVerificationDataset
from encoder.data_objects.speaker_verification_dataset import SpeakerVerificationDataLoader
import random
class RandomCycler:
"""
Creates an internal copy of a sequence and allows access to its items in a constrained random
order. For a source sequence of n items and one or several consecutive queries of a total
of m items, the following guarantees hold (one implies the other):
- Each item will be returned between m // n and ((m - 1) // n) + 1 times.
- Between two appearances of the same item, there may be at most 2 * (n - 1) other items.
"""
def __init__(self, source):
if len(source) == 0:
raise Exception("Can't create RandomCycler from an empty collection")
self.all_items = list(source)
self.next_items = []
def sample(self, count: int):
shuffle = lambda l: random.sample(l, len(l))
out = []
while count > 0:
if count >= len(self.all_items):
out.extend(shuffle(list(self.all_items)))
count -= len(self.all_items)
continue
n = min(count, len(self.next_items))
out.extend(self.next_items[:n])
count -= n
self.next_items = self.next_items[n:]
if len(self.next_items) == 0:
self.next_items = shuffle(list(self.all_items))
return out
def __next__(self):
return self.sample(1)[0]
import numpy as np
from typing import List
from encoder.data_objects.speaker import Speaker
class SpeakerBatch:
def __init__(self, speakers: List[Speaker], utterances_per_speaker: int, n_frames: int):
self.speakers = speakers
self.partials = {s: s.random_partial(utterances_per_speaker, n_frames) for s in speakers}
# Array of shape (n_speakers * n_utterances, n_frames, mel_n), e.g. for 3 speakers with
# 4 utterances each of 160 frames of 40 mel coefficients: (12, 160, 40)
self.data = np.array([frames for s in speakers for _, frames, _ in self.partials[s]])
from encoder.data_objects.random_cycler import RandomCycler
from encoder.data_objects.speaker_batch import SpeakerBatch
from encoder.data_objects.speaker import Speaker
from encoder.params_data import partials_n_frames
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
# TODO: improve with a pool of speakers for data efficiency
class SpeakerVerificationDataset(Dataset):
def __init__(self, datasets_root: Path):
self.root = datasets_root
speaker_dirs = [f for f in self.root.glob("*") if f.is_dir()]
if len(speaker_dirs) == 0:
raise Exception("No speakers found. Make sure you are pointing to the directory "
"containing all preprocessed speaker directories.")
self.speakers = [Speaker(speaker_dir) for speaker_dir in speaker_dirs]
self.speaker_cycler = RandomCycler(self.speakers)
def __len__(self):
return int(1e10)
def __getitem__(self, index):
return next(self.speaker_cycler)
def get_logs(self):
log_string = ""
for log_fpath in self.root.glob("*.txt"):
with log_fpath.open("r") as log_file:
log_string += "".join(log_file.readlines())
return log_string
class SpeakerVerificationDataLoader(DataLoader):
def __init__(self, dataset, speakers_per_batch, utterances_per_speaker, sampler=None,
batch_sampler=None, num_workers=0, pin_memory=False, timeout=0,
worker_init_fn=None):
self.utterances_per_speaker = utterances_per_speaker
super().__init__(
dataset=dataset,
batch_size=speakers_per_batch,
shuffle=False,
sampler=sampler,
batch_sampler=batch_sampler,
num_workers=num_workers,
collate_fn=self.collate,
pin_memory=pin_memory,
drop_last=False,
timeout=timeout,
worker_init_fn=worker_init_fn
)
def collate(self, speakers):
return SpeakerBatch(speakers, self.utterances_per_speaker, partials_n_frames)
from encoder.data_objects.random_cycler import RandomCycler
from encoder.data_objects.utterance import Utterance
from pathlib import Path
# Contains the set of utterances of a single speaker
class Speaker:
def __init__(self, root: Path):
self.root = root
self.name = root.name
self.utterances = None
self.utterance_cycler = None
def _load_utterances(self):
with self.root.joinpath("_sources.txt").open("r") as sources_file:
sources = [l.split(",") for l in sources_file]
sources = {frames_fname: wave_fpath for frames_fname, wave_fpath in sources}
self.utterances = [Utterance(self.root.joinpath(f), w) for f, w in sources.items()]
self.utterance_cycler = RandomCycler(self.utterances)
def random_partial(self, count, n_frames):
"""
Samples a batch of <count> unique partial utterances from the disk in a way that all
utterances come up at least once every two cycles and in a random order every time.
:param count: The number of partial utterances to sample from the set of utterances from
that speaker. Utterances are guaranteed not to be repeated if <count> is not larger than
the number of utterances available.
:param n_frames: The number of frames in the partial utterance.
:return: A list of tuples (utterance, frames, range) where utterance is an Utterance,
frames are the frames of the partial utterances and range is the range of the partial
utterance with regard to the complete utterance.
"""
if self.utterances is None:
self._load_utterances()
utterances = self.utterance_cycler.sample(count)
a = [(u,) + u.random_partial(n_frames) for u in utterances]
return a
|