|
from typing import Iterator |
|
from typing import List |
|
from typing import Tuple |
|
from typing import Union |
|
|
|
import numpy as np |
|
from typeguard import check_argument_types |
|
|
|
from espnet2.fileio.read_text import load_num_sequence_text |
|
from espnet2.samplers.abs_sampler import AbsSampler |
|
|
|
|
|
class NumElementsBatchSampler(AbsSampler): |
|
def __init__( |
|
self, |
|
batch_bins: int, |
|
shape_files: Union[Tuple[str, ...], List[str]], |
|
min_batch_size: int = 1, |
|
sort_in_batch: str = "descending", |
|
sort_batch: str = "ascending", |
|
drop_last: bool = False, |
|
padding: bool = True, |
|
): |
|
assert check_argument_types() |
|
assert batch_bins > 0 |
|
if sort_batch != "ascending" and sort_batch != "descending": |
|
raise ValueError( |
|
f"sort_batch must be ascending or descending: {sort_batch}" |
|
) |
|
if sort_in_batch != "descending" and sort_in_batch != "ascending": |
|
raise ValueError( |
|
f"sort_in_batch must be ascending or descending: {sort_in_batch}" |
|
) |
|
|
|
self.batch_bins = batch_bins |
|
self.shape_files = shape_files |
|
self.sort_in_batch = sort_in_batch |
|
self.sort_batch = sort_batch |
|
self.drop_last = drop_last |
|
|
|
|
|
|
|
|
|
utt2shapes = [ |
|
load_num_sequence_text(s, loader_type="csv_int") for s in shape_files |
|
] |
|
|
|
first_utt2shape = utt2shapes[0] |
|
for s, d in zip(shape_files, utt2shapes): |
|
if set(d) != set(first_utt2shape): |
|
raise RuntimeError( |
|
f"keys are mismatched between {s} != {shape_files[0]}" |
|
) |
|
|
|
|
|
|
|
keys = sorted(first_utt2shape, key=lambda k: first_utt2shape[k][0]) |
|
if len(keys) == 0: |
|
raise RuntimeError(f"0 lines found: {shape_files[0]}") |
|
if padding: |
|
|
|
|
|
feat_dims = [np.prod(d[keys[0]][1:]) for d in utt2shapes] |
|
else: |
|
feat_dims = None |
|
|
|
|
|
batch_sizes = [] |
|
current_batch_keys = [] |
|
for key in keys: |
|
current_batch_keys.append(key) |
|
|
|
if padding: |
|
for d, s in zip(utt2shapes, shape_files): |
|
if tuple(d[key][1:]) != tuple(d[keys[0]][1:]): |
|
raise RuntimeError( |
|
"If padding=True, the " |
|
f"feature dimension must be unified: {s}", |
|
) |
|
bins = sum( |
|
len(current_batch_keys) * sh[key][0] * d |
|
for sh, d in zip(utt2shapes, feat_dims) |
|
) |
|
else: |
|
bins = sum( |
|
np.prod(d[k]) for k in current_batch_keys for d in utt2shapes |
|
) |
|
|
|
if bins > batch_bins and len(current_batch_keys) >= min_batch_size: |
|
batch_sizes.append(len(current_batch_keys)) |
|
current_batch_keys = [] |
|
else: |
|
if len(current_batch_keys) != 0 and ( |
|
not self.drop_last or len(batch_sizes) == 0 |
|
): |
|
batch_sizes.append(len(current_batch_keys)) |
|
|
|
if len(batch_sizes) == 0: |
|
|
|
raise RuntimeError("0 batches") |
|
|
|
|
|
|
|
if len(batch_sizes) > 1 and batch_sizes[-1] < min_batch_size: |
|
for i in range(batch_sizes.pop(-1)): |
|
batch_sizes[-(i % len(batch_sizes)) - 1] += 1 |
|
|
|
if not self.drop_last: |
|
|
|
assert sum(batch_sizes) == len(keys), f"{sum(batch_sizes)} != {len(keys)}" |
|
|
|
|
|
self.batch_list = [] |
|
iter_bs = iter(batch_sizes) |
|
bs = next(iter_bs) |
|
minibatch_keys = [] |
|
for key in keys: |
|
minibatch_keys.append(key) |
|
if len(minibatch_keys) == bs: |
|
if sort_in_batch == "descending": |
|
minibatch_keys.reverse() |
|
elif sort_in_batch == "ascending": |
|
|
|
pass |
|
else: |
|
raise ValueError( |
|
"sort_in_batch must be ascending" |
|
f" or descending: {sort_in_batch}" |
|
) |
|
|
|
self.batch_list.append(tuple(minibatch_keys)) |
|
minibatch_keys = [] |
|
try: |
|
bs = next(iter_bs) |
|
except StopIteration: |
|
break |
|
|
|
if sort_batch == "ascending": |
|
pass |
|
elif sort_batch == "descending": |
|
self.batch_list.reverse() |
|
else: |
|
raise ValueError( |
|
f"sort_batch must be ascending or descending: {sort_batch}" |
|
) |
|
|
|
def __repr__(self): |
|
return ( |
|
f"{self.__class__.__name__}(" |
|
f"N-batch={len(self)}, " |
|
f"batch_bins={self.batch_bins}, " |
|
f"sort_in_batch={self.sort_in_batch}, " |
|
f"sort_batch={self.sort_batch})" |
|
) |
|
|
|
def __len__(self): |
|
return len(self.batch_list) |
|
|
|
def __iter__(self) -> Iterator[Tuple[str, ...]]: |
|
return iter(self.batch_list) |
|
|