""" Quick n Simple Image Folder, Tarfile based DataSet Hacked together by / Copyright 2020 Ross Wightman """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import torch.utils.data as data import os import re import torch import tarfile from PIL import Image IMG_EXTENSIONS = ['.png', '.jpg', '.jpeg'] def natural_key(string_): """See http://www.codinghorror.com/blog/archives/001018.html""" return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', string_.lower())] def find_images_and_targets(folder, types=IMG_EXTENSIONS, class_to_idx=None, leaf_name_only=True, sort=True): labels = [] filenames = [] for root, subdirs, files in os.walk(folder, topdown=False): rel_path = os.path.relpath(root, folder) if (root != folder) else '' label = os.path.basename(rel_path) if leaf_name_only else rel_path.replace(os.path.sep, '_') for f in files: base, ext = os.path.splitext(f) if ext.lower() in types: filenames.append(os.path.join(root, f)) labels.append(label) if class_to_idx is None: # building class index unique_labels = set(labels) sorted_labels = list(sorted(unique_labels, key=natural_key)) class_to_idx = {c: idx for idx, c in enumerate(sorted_labels)} images_and_targets = [(f, class_to_idx[l]) for f, l in zip(filenames, labels) if l in class_to_idx] if sort: images_and_targets = sorted(images_and_targets, key=lambda k: natural_key(k[0])) return images_and_targets, class_to_idx def load_class_map(filename, root=''): class_map_path = filename if not os.path.exists(class_map_path): class_map_path = os.path.join(root, filename) assert os.path.exists(class_map_path), 'Cannot locate specified class map file (%s)' % filename class_map_ext = os.path.splitext(filename)[-1].lower() if class_map_ext == '.txt': with open(class_map_path) as f: class_to_idx = {v.strip(): k for k, v in enumerate(f)} else: assert False, 'Unsupported class map extension' return class_to_idx class Dataset(data.Dataset): def __init__( self, root, load_bytes=False, transform=None, class_map=''): class_to_idx = None if class_map: class_to_idx = load_class_map(class_map, root) images, class_to_idx = find_images_and_targets(root, class_to_idx=class_to_idx) if len(images) == 0: raise RuntimeError(f'Found 0 images in subfolders of {root}. ' f'Supported image extensions are {", ".join(IMG_EXTENSIONS)}') self.root = root self.samples = images self.imgs = self.samples # torchvision ImageFolder compat self.class_to_idx = class_to_idx self.load_bytes = load_bytes self.transform = transform def __getitem__(self, index): path, target = self.samples[index] img = open(path, 'rb').read() if self.load_bytes else Image.open(path).convert('RGB') if self.transform is not None: img = self.transform(img) if target is None: target = torch.zeros(1).long() return img, target def __len__(self): return len(self.samples) def filename(self, index, basename=False, absolute=False): filename = self.samples[index][0] if basename: filename = os.path.basename(filename) elif not absolute: filename = os.path.relpath(filename, self.root) return filename def filenames(self, basename=False, absolute=False): fn = lambda x: x if basename: fn = os.path.basename elif not absolute: fn = lambda x: os.path.relpath(x, self.root) return [fn(x[0]) for x in self.samples] def _extract_tar_info(tarfile, class_to_idx=None, sort=True): files = [] labels = [] for ti in tarfile.getmembers(): if not ti.isfile(): continue dirname, basename = os.path.split(ti.path) label = os.path.basename(dirname) ext = os.path.splitext(basename)[1] if ext.lower() in IMG_EXTENSIONS: files.append(ti) labels.append(label) if class_to_idx is None: unique_labels = set(labels) sorted_labels = list(sorted(unique_labels, key=natural_key)) class_to_idx = {c: idx for idx, c in enumerate(sorted_labels)} tarinfo_and_targets = [(f, class_to_idx[l]) for f, l in zip(files, labels) if l in class_to_idx] if sort: tarinfo_and_targets = sorted(tarinfo_and_targets, key=lambda k: natural_key(k[0].path)) return tarinfo_and_targets, class_to_idx class DatasetTar(data.Dataset): def __init__(self, root, load_bytes=False, transform=None, class_map=''): class_to_idx = None if class_map: class_to_idx = load_class_map(class_map, root) assert os.path.isfile(root) self.root = root with tarfile.open(root) as tf: # cannot keep this open across processes, reopen later self.samples, self.class_to_idx = _extract_tar_info(tf, class_to_idx) self.imgs = self.samples self.tarfile = None # lazy init in __getitem__ self.load_bytes = load_bytes self.transform = transform def __getitem__(self, index): if self.tarfile is None: self.tarfile = tarfile.open(self.root) tarinfo, target = self.samples[index] iob = self.tarfile.extractfile(tarinfo) img = iob.read() if self.load_bytes else Image.open(iob).convert('RGB') if self.transform is not None: img = self.transform(img) if target is None: target = torch.zeros(1).long() return img, target def __len__(self): return len(self.samples) def filename(self, index, basename=False): filename = self.samples[index][0].name if basename: filename = os.path.basename(filename) return filename def filenames(self, basename=False): fn = os.path.basename if basename else lambda x: x return [fn(x[0].name) for x in self.samples] class AugMixDataset(torch.utils.data.Dataset): """Dataset wrapper to perform AugMix or other clean/augmentation mixes""" def __init__(self, dataset, num_splits=2): self.augmentation = None self.normalize = None self.dataset = dataset if self.dataset.transform is not None: self._set_transforms(self.dataset.transform) self.num_splits = num_splits def _set_transforms(self, x): assert isinstance(x, (list, tuple)) and len(x) == 3, 'Expecting a tuple/list of 3 transforms' self.dataset.transform = x[0] self.augmentation = x[1] self.normalize = x[2] @property def transform(self): return self.dataset.transform @transform.setter def transform(self, x): self._set_transforms(x) def _normalize(self, x): return x if self.normalize is None else self.normalize(x) def __getitem__(self, i): x, y = self.dataset[i] # all splits share the same dataset base transform x_list = [self._normalize(x)] # first split only normalizes (this is the 'clean' split) # run the full augmentation on the remaining splits for _ in range(self.num_splits - 1): x_list.append(self._normalize(self.augmentation(x))) return tuple(x_list), y def __len__(self): return len(self.dataset)