|
{ |
|
"python.pythonPath": "C:\\Users\\BiGCARE\\anaconda3\\envs\\sv2tts_korean\\python.exe" |
|
} |
|
from encoder.data_objects.speaker_verification_dataset import SpeakerVerificationDataset |
|
from encoder.data_objects.speaker_verification_dataset import SpeakerVerificationDataLoader |
|
|
|
import random |
|
|
|
class RandomCycler: |
|
""" |
|
Creates an internal copy of a sequence and allows access to its items in a constrained random |
|
order. For a source sequence of n items and one or several consecutive queries of a total |
|
of m items, the following guarantees hold (one implies the other): |
|
- Each item will be returned between m // n and ((m - 1) // n) + 1 times. |
|
- Between two appearances of the same item, there may be at most 2 * (n - 1) other items. |
|
""" |
|
|
|
def __init__(self, source): |
|
if len(source) == 0: |
|
raise Exception("Can't create RandomCycler from an empty collection") |
|
self.all_items = list(source) |
|
self.next_items = [] |
|
|
|
def sample(self, count: int): |
|
shuffle = lambda l: random.sample(l, len(l)) |
|
|
|
out = [] |
|
while count > 0: |
|
if count >= len(self.all_items): |
|
out.extend(shuffle(list(self.all_items))) |
|
count -= len(self.all_items) |
|
continue |
|
n = min(count, len(self.next_items)) |
|
out.extend(self.next_items[:n]) |
|
count -= n |
|
self.next_items = self.next_items[n:] |
|
if len(self.next_items) == 0: |
|
self.next_items = shuffle(list(self.all_items)) |
|
return out |
|
|
|
def __next__(self): |
|
return self.sample(1)[0] |
|
|
|
import numpy as np |
|
from typing import List |
|
from encoder.data_objects.speaker import Speaker |
|
|
|
class SpeakerBatch: |
|
def __init__(self, speakers: List[Speaker], utterances_per_speaker: int, n_frames: int): |
|
self.speakers = speakers |
|
self.partials = {s: s.random_partial(utterances_per_speaker, n_frames) for s in speakers} |
|
|
|
# Array of shape (n_speakers * n_utterances, n_frames, mel_n), e.g. for 3 speakers with |
|
# 4 utterances each of 160 frames of 40 mel coefficients: (12, 160, 40) |
|
self.data = np.array([frames for s in speakers for _, frames, _ in self.partials[s]]) |
|
|
|
from encoder.data_objects.random_cycler import RandomCycler |
|
from encoder.data_objects.speaker_batch import SpeakerBatch |
|
from encoder.data_objects.speaker import Speaker |
|
from encoder.params_data import partials_n_frames |
|
from torch.utils.data import Dataset, DataLoader |
|
from pathlib import Path |
|
|
|
# TODO: improve with a pool of speakers for data efficiency |
|
|
|
class SpeakerVerificationDataset(Dataset): |
|
def __init__(self, datasets_root: Path): |
|
self.root = datasets_root |
|
speaker_dirs = [f for f in self.root.glob("*") if f.is_dir()] |
|
if len(speaker_dirs) == 0: |
|
raise Exception("No speakers found. Make sure you are pointing to the directory " |
|
"containing all preprocessed speaker directories.") |
|
self.speakers = [Speaker(speaker_dir) for speaker_dir in speaker_dirs] |
|
self.speaker_cycler = RandomCycler(self.speakers) |
|
|
|
def __len__(self): |
|
return int(1e10) |
|
|
|
def __getitem__(self, index): |
|
return next(self.speaker_cycler) |
|
|
|
def get_logs(self): |
|
log_string = "" |
|
for log_fpath in self.root.glob("*.txt"): |
|
with log_fpath.open("r") as log_file: |
|
log_string += "".join(log_file.readlines()) |
|
return log_string |
|
|
|
|
|
class SpeakerVerificationDataLoader(DataLoader): |
|
def __init__(self, dataset, speakers_per_batch, utterances_per_speaker, sampler=None, |
|
batch_sampler=None, num_workers=0, pin_memory=False, timeout=0, |
|
worker_init_fn=None): |
|
self.utterances_per_speaker = utterances_per_speaker |
|
|
|
super().__init__( |
|
dataset=dataset, |
|
batch_size=speakers_per_batch, |
|
shuffle=False, |
|
sampler=sampler, |
|
batch_sampler=batch_sampler, |
|
num_workers=num_workers, |
|
collate_fn=self.collate, |
|
pin_memory=pin_memory, |
|
drop_last=False, |
|
timeout=timeout, |
|
worker_init_fn=worker_init_fn |
|
) |
|
|
|
def collate(self, speakers): |
|
return SpeakerBatch(speakers, self.utterances_per_speaker, partials_n_frames) |
|
|
|
from encoder.data_objects.random_cycler import RandomCycler |
|
from encoder.data_objects.utterance import Utterance |
|
from pathlib import Path |
|
|
|
# Contains the set of utterances of a single speaker |
|
class Speaker: |
|
def __init__(self, root: Path): |
|
self.root = root |
|
self.name = root.name |
|
self.utterances = None |
|
self.utterance_cycler = None |
|
|
|
def _load_utterances(self): |
|
with self.root.joinpath("_sources.txt").open("r") as sources_file: |
|
sources = [l.split(",") for l in sources_file] |
|
sources = {frames_fname: wave_fpath for frames_fname, wave_fpath in sources} |
|
self.utterances = [Utterance(self.root.joinpath(f), w) for f, w in sources.items()] |
|
self.utterance_cycler = RandomCycler(self.utterances) |
|
|
|
def random_partial(self, count, n_frames): |
|
""" |
|
Samples a batch of <count> unique partial utterances from the disk in a way that all |
|
utterances come up at least once every two cycles and in a random order every time. |
|
|
|
:param count: The number of partial utterances to sample from the set of utterances from |
|
that speaker. Utterances are guaranteed not to be repeated if <count> is not larger than |
|
the number of utterances available. |
|
:param n_frames: The number of frames in the partial utterance. |
|
:return: A list of tuples (utterance, frames, range) where utterance is an Utterance, |
|
frames are the frames of the partial utterances and range is the range of the partial |
|
utterance with regard to the complete utterance. |
|
""" |
|
if self.utterances is None: |
|
self._load_utterances() |
|
|
|
utterances = self.utterance_cycler.sample(count) |
|
|
|
a = [(u,) + u.random_partial(n_frames) for u in utterances] |
|
|
|
return a |
|
|