|
""" |
|
Copyright (c) 2022, salesforce.com, inc. |
|
All rights reserved. |
|
SPDX-License-Identifier: BSD-3-Clause |
|
For full license text, see the LICENSE_Lavis file in the repo root or https://opensource.org/licenses/BSD-3-Clause |
|
""" |
|
|
|
import time |
|
import random |
|
import torch |
|
from minigpt4.datasets.data_utils import move_to_cuda |
|
from torch.utils.data import DataLoader |
|
|
|
|
|
class MultiIterLoader: |
|
""" |
|
A simple wrapper for iterating over multiple iterators. |
|
|
|
Args: |
|
loaders (List[Loader]): List of Iterator loaders. |
|
ratios (List[float]): List of ratios to sample from each loader. If None, all loaders are sampled uniformly. |
|
""" |
|
|
|
def __init__(self, loaders, ratios=None): |
|
|
|
for loader in loaders: |
|
assert hasattr( |
|
loader, "__next__" |
|
), "Loader {} has no __next__ method.".format(loader) |
|
|
|
if ratios is None: |
|
ratios = [1.0] * len(loaders) |
|
else: |
|
assert len(ratios) == len(loaders) |
|
ratios = [float(ratio) / sum(ratios) for ratio in ratios] |
|
|
|
self.loaders = loaders |
|
self.ratios = ratios |
|
|
|
def __next__(self): |
|
|
|
loader_idx = random.choices(range(len(self.loaders)), self.ratios, k=1)[0] |
|
return next(self.loaders[loader_idx]) |
|
|
|
|
|
class PrefetchLoader(object): |
|
""" |
|
Modified from https://github.com/ChenRocks/UNITER. |
|
|
|
overlap compute and cuda data transfer |
|
(copied and then modified from nvidia apex) |
|
""" |
|
|
|
def __init__(self, loader): |
|
self.loader = loader |
|
self.stream = torch.cuda.Stream() |
|
|
|
def __iter__(self): |
|
loader_it = iter(self.loader) |
|
self.preload(loader_it) |
|
batch = self.next(loader_it) |
|
while batch is not None: |
|
is_tuple = isinstance(batch, tuple) |
|
if is_tuple: |
|
task, batch = batch |
|
|
|
if is_tuple: |
|
yield task, batch |
|
else: |
|
yield batch |
|
batch = self.next(loader_it) |
|
|
|
def __len__(self): |
|
return len(self.loader) |
|
|
|
def preload(self, it): |
|
try: |
|
self.batch = next(it) |
|
except StopIteration: |
|
self.batch = None |
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with torch.cuda.stream(self.stream): |
|
self.batch = move_to_cuda(self.batch) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def next(self, it): |
|
torch.cuda.current_stream().wait_stream(self.stream) |
|
batch = self.batch |
|
if batch is not None: |
|
record_cuda_stream(batch) |
|
self.preload(it) |
|
return batch |
|
|
|
def __getattr__(self, name): |
|
method = self.loader.__getattribute__(name) |
|
return method |
|
|
|
|
|
def record_cuda_stream(batch): |
|
if isinstance(batch, torch.Tensor): |
|
batch.record_stream(torch.cuda.current_stream()) |
|
elif isinstance(batch, list) or isinstance(batch, tuple): |
|
for t in batch: |
|
record_cuda_stream(t) |
|
elif isinstance(batch, dict): |
|
for t in batch.values(): |
|
record_cuda_stream(t) |
|
else: |
|
pass |
|
|
|
|
|
class IterLoader: |
|
""" |
|
A wrapper to convert DataLoader as an infinite iterator. |
|
|
|
Modified from: |
|
https://github.com/open-mmlab/mmcv/blob/master/mmcv/runner/iter_based_runner.py |
|
""" |
|
|
|
def __init__(self, dataloader: DataLoader, use_distributed: bool = False): |
|
self._dataloader = dataloader |
|
self.iter_loader = iter(self._dataloader) |
|
self._use_distributed = use_distributed |
|
self._epoch = 0 |
|
|
|
@property |
|
def epoch(self) -> int: |
|
return self._epoch |
|
|
|
def __next__(self): |
|
try: |
|
data = next(self.iter_loader) |
|
except StopIteration: |
|
self._epoch += 1 |
|
if hasattr(self._dataloader.sampler, "set_epoch") and self._use_distributed: |
|
self._dataloader.sampler.set_epoch(self._epoch) |
|
time.sleep(2) |
|
self.iter_loader = iter(self._dataloader) |
|
data = next(self.iter_loader) |
|
|
|
return data |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __len__(self): |
|
return len(self._dataloader) |
|
|