from typing import Dict import webdataset as wds import numpy as np from omegaconf import DictConfig, ListConfig import torch from torch.utils.data import Dataset from pathlib import Path import json from PIL import Image from torchvision import transforms import torchvision from einops import rearrange from ..util import instantiate_from_config from datasets import load_dataset import pytorch_lightning as pl import copy import csv import cv2 import random import matplotlib.pyplot as plt from torch.utils.data import DataLoader import json import os import webdataset as wds import math from torch.utils.data.distributed import DistributedSampler # Some hacky things to make experimentation easier def make_transform_multi_folder_data(paths, caption_files=None, **kwargs): ds = make_multi_folder_data(paths, caption_files, **kwargs) return TransformDataset(ds) def make_nfp_data(base_path): dirs = list(Path(base_path).glob("*/")) print(f"Found {len(dirs)} folders") print(dirs) tforms = [transforms.Resize(512), transforms.CenterCrop(512)] datasets = [NfpDataset(x, image_transforms=copy.copy(tforms), default_caption="A view from a train window") for x in dirs] return torch.utils.data.ConcatDataset(datasets) class VideoDataset(Dataset): def __init__(self, root_dir, image_transforms, caption_file, offset=8, n=2): self.root_dir = Path(root_dir) self.caption_file = caption_file self.n = n ext = "mp4" self.paths = sorted(list(self.root_dir.rglob(f"*.{ext}"))) self.offset = offset if isinstance(image_transforms, ListConfig): image_transforms = [instantiate_from_config(tt) for tt in image_transforms] image_transforms.extend([transforms.ToTensor(), transforms.Lambda(lambda x: rearrange(x * 2. - 1., 'c h w -> h w c'))]) image_transforms = transforms.Compose(image_transforms) self.tform = image_transforms with open(self.caption_file) as f: reader = csv.reader(f) rows = [row for row in reader] self.captions = dict(rows) def __len__(self): return len(self.paths) def __getitem__(self, index): for i in range(10): try: return self._load_sample(index) except Exception: # Not really good enough but... print("uh oh") def _load_sample(self, index): n = self.n filename = self.paths[index] min_frame = 2*self.offset + 2 vid = cv2.VideoCapture(str(filename)) max_frames = int(vid.get(cv2.CAP_PROP_FRAME_COUNT)) curr_frame_n = random.randint(min_frame, max_frames) vid.set(cv2.CAP_PROP_POS_FRAMES,curr_frame_n) _, curr_frame = vid.read() prev_frames = [] for i in range(n): prev_frame_n = curr_frame_n - (i+1)*self.offset vid.set(cv2.CAP_PROP_POS_FRAMES,prev_frame_n) _, prev_frame = vid.read() prev_frame = self.tform(Image.fromarray(prev_frame[...,::-1])) prev_frames.append(prev_frame) vid.release() caption = self.captions[filename.name] data = { "image": self.tform(Image.fromarray(curr_frame[...,::-1])), "prev": torch.cat(prev_frames, dim=-1), "txt": caption } return data # end hacky things def make_tranforms(image_transforms): # if isinstance(image_transforms, ListConfig): # image_transforms = [instantiate_from_config(tt) for tt in image_transforms] image_transforms = [] image_transforms.extend([transforms.ToTensor(), transforms.Lambda(lambda x: rearrange(x * 2. - 1., 'c h w -> h w c'))]) image_transforms = transforms.Compose(image_transforms) return image_transforms def make_multi_folder_data(paths, caption_files=None, **kwargs): """Make a concat dataset from multiple folders Don't suport captions yet If paths is a list, that's ok, if it's a Dict interpret it as: k=folder v=n_times to repeat that """ list_of_paths = [] if isinstance(paths, (Dict, DictConfig)): assert caption_files is None, \ "Caption files not yet supported for repeats" for folder_path, repeats in paths.items(): list_of_paths.extend([folder_path]*repeats) paths = list_of_paths if caption_files is not None: datasets = [FolderData(p, caption_file=c, **kwargs) for (p, c) in zip(paths, caption_files)] else: datasets = [FolderData(p, **kwargs) for p in paths] return torch.utils.data.ConcatDataset(datasets) class NfpDataset(Dataset): def __init__(self, root_dir, image_transforms=[], ext="jpg", default_caption="", ) -> None: """assume sequential frames and a deterministic transform""" self.root_dir = Path(root_dir) self.default_caption = default_caption self.paths = sorted(list(self.root_dir.rglob(f"*.{ext}"))) self.tform = make_tranforms(image_transforms) def __len__(self): return len(self.paths) - 1 def __getitem__(self, index): prev = self.paths[index] curr = self.paths[index+1] data = {} data["image"] = self._load_im(curr) data["prev"] = self._load_im(prev) data["txt"] = self.default_caption return data def _load_im(self, filename): im = Image.open(filename).convert("RGB") return self.tform(im) class ObjaverseDataModuleFromConfig(pl.LightningDataModule): def __init__(self, root_dir, batch_size, total_view, train=None, validation=None, test=None, num_workers=4, **kwargs): super().__init__(self) self.root_dir = root_dir self.batch_size = batch_size self.num_workers = num_workers self.total_view = total_view if train is not None: dataset_config = train if validation is not None: dataset_config = validation if 'image_transforms' in dataset_config: image_transforms = [torchvision.transforms.Resize(dataset_config.image_transforms.size)] else: image_transforms = [] image_transforms.extend([transforms.ToTensor(), transforms.Lambda(lambda x: rearrange(x * 2. - 1., 'c h w -> h w c'))]) self.image_transforms = torchvision.transforms.Compose(image_transforms) def train_dataloader(self): dataset = ObjaverseData(root_dir=self.root_dir, total_view=self.total_view, validation=False, \ image_transforms=self.image_transforms) sampler = DistributedSampler(dataset) return wds.WebLoader(dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False, sampler=sampler) def val_dataloader(self): dataset = ObjaverseData(root_dir=self.root_dir, total_view=self.total_view, validation=True, \ image_transforms=self.image_transforms) sampler = DistributedSampler(dataset) return wds.WebLoader(dataset, batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False) def test_dataloader(self): return wds.WebLoader(ObjaverseData(root_dir=self.root_dir, total_view=self.total_view, validation=self.validation),\ batch_size=self.batch_size, num_workers=self.num_workers, shuffle=False) class ObjaverseData(Dataset): def __init__(self, root_dir='.objaverse/hf-objaverse-v1/views', image_transforms=[], ext="png", default_trans=torch.zeros(3), postprocess=None, return_paths=False, total_view=4, validation=False ) -> None: """Create a dataset from a folder of images. If you pass in a root directory it will be searched for images ending in ext (ext can be a list) """ self.root_dir = Path(root_dir) self.default_trans = default_trans self.return_paths = return_paths if isinstance(postprocess, DictConfig): postprocess = instantiate_from_config(postprocess) self.postprocess = postprocess self.total_view = total_view if not isinstance(ext, (tuple, list, ListConfig)): ext = [ext] with open(os.path.join(root_dir, 'valid_paths.json')) as f: self.paths = json.load(f) total_objects = len(self.paths) if validation: self.paths = self.paths[math.floor(total_objects / 100. * 99.):] # used last 1% as validation else: self.paths = self.paths[:math.floor(total_objects / 100. * 99.)] # used first 99% as training print('============= length of dataset %d =============' % len(self.paths)) self.tform = image_transforms def __len__(self): return len(self.paths) def cartesian_to_spherical(self, xyz): ptsnew = np.hstack((xyz, np.zeros(xyz.shape))) xy = xyz[:,0]**2 + xyz[:,1]**2 z = np.sqrt(xy + xyz[:,2]**2) theta = np.arctan2(np.sqrt(xy), xyz[:,2]) # for elevation angle defined from Z-axis down #ptsnew[:,4] = np.arctan2(xyz[:,2], np.sqrt(xy)) # for elevation angle defined from XY-plane up azimuth = np.arctan2(xyz[:,1], xyz[:,0]) return np.array([theta, azimuth, z]) def get_T(self, target_RT, cond_RT): R, T = target_RT[:3, :3], target_RT[:, -1] T_target = -R.T @ T R, T = cond_RT[:3, :3], cond_RT[:, -1] T_cond = -R.T @ T theta_cond, azimuth_cond, z_cond = self.cartesian_to_spherical(T_cond[None, :]) theta_target, azimuth_target, z_target = self.cartesian_to_spherical(T_target[None, :]) d_theta = theta_target - theta_cond d_azimuth = (azimuth_target - azimuth_cond) % (2 * math.pi) d_z = z_target - z_cond d_T = torch.tensor([d_theta.item(), math.sin(d_azimuth.item()), math.cos(d_azimuth.item()), d_z.item()]) return d_T def load_im(self, path, color): ''' replace background pixel with random color in rendering ''' try: img = plt.imread(path) except: print(path) sys.exit() img[img[:, :, -1] == 0.] = color img = Image.fromarray(np.uint8(img[:, :, :3] * 255.)) return img def __getitem__(self, index): data = {} if self.paths[index][-2:] == '_1': # dirty fix for rendering dataset twice total_view = 8 else: total_view = 4 index_target, index_cond = random.sample(range(total_view), 2) # without replacement filename = os.path.join(self.root_dir, self.paths[index]) # print(self.paths[index]) if self.return_paths: data["path"] = str(filename) color = [1., 1., 1., 1.] try: target_im = self.process_im(self.load_im(os.path.join(filename, '%03d.png' % index_target), color)) cond_im = self.process_im(self.load_im(os.path.join(filename, '%03d.png' % index_cond), color)) target_RT = np.load(os.path.join(filename, '%03d.npy' % index_target)) cond_RT = np.load(os.path.join(filename, '%03d.npy' % index_cond)) except: # very hacky solution, sorry about this filename = os.path.join(self.root_dir, '692db5f2d3a04bb286cb977a7dba903e_1') # this one we know is valid target_im = self.process_im(self.load_im(os.path.join(filename, '%03d.png' % index_target), color)) cond_im = self.process_im(self.load_im(os.path.join(filename, '%03d.png' % index_cond), color)) target_RT = np.load(os.path.join(filename, '%03d.npy' % index_target)) cond_RT = np.load(os.path.join(filename, '%03d.npy' % index_cond)) target_im = torch.zeros_like(target_im) cond_im = torch.zeros_like(cond_im) data["image_target"] = target_im data["image_cond"] = cond_im data["T"] = self.get_T(target_RT, cond_RT) if self.postprocess is not None: data = self.postprocess(data) return data def process_im(self, im): im = im.convert("RGB") return self.tform(im) class FolderData(Dataset): def __init__(self, root_dir, caption_file=None, image_transforms=[], ext="jpg", default_caption="", postprocess=None, return_paths=False, ) -> None: """Create a dataset from a folder of images. If you pass in a root directory it will be searched for images ending in ext (ext can be a list) """ self.root_dir = Path(root_dir) self.default_caption = default_caption self.return_paths = return_paths if isinstance(postprocess, DictConfig): postprocess = instantiate_from_config(postprocess) self.postprocess = postprocess if caption_file is not None: with open(caption_file, "rt") as f: ext = Path(caption_file).suffix.lower() if ext == ".json": captions = json.load(f) elif ext == ".jsonl": lines = f.readlines() lines = [json.loads(x) for x in lines] captions = {x["file_name"]: x["text"].strip("\n") for x in lines} else: raise ValueError(f"Unrecognised format: {ext}") self.captions = captions else: self.captions = None if not isinstance(ext, (tuple, list, ListConfig)): ext = [ext] # Only used if there is no caption file self.paths = [] for e in ext: self.paths.extend(sorted(list(self.root_dir.rglob(f"*.{e}")))) self.tform = make_tranforms(image_transforms) def __len__(self): if self.captions is not None: return len(self.captions.keys()) else: return len(self.paths) def __getitem__(self, index): data = {} if self.captions is not None: chosen = list(self.captions.keys())[index] caption = self.captions.get(chosen, None) if caption is None: caption = self.default_caption filename = self.root_dir/chosen else: filename = self.paths[index] if self.return_paths: data["path"] = str(filename) im = Image.open(filename).convert("RGB") im = self.process_im(im) data["image"] = im if self.captions is not None: data["txt"] = caption else: data["txt"] = self.default_caption if self.postprocess is not None: data = self.postprocess(data) return data def process_im(self, im): im = im.convert("RGB") return self.tform(im) class TransformDataset(): def __init__(self, ds, extra_label="sksbspic"): self.ds = ds self.extra_label = extra_label self.transforms = { "align": transforms.Resize(768), "centerzoom": transforms.CenterCrop(768), "randzoom": transforms.RandomCrop(768), } def __getitem__(self, index): data = self.ds[index] im = data['image'] im = im.permute(2,0,1) # In case data is smaller than expected im = transforms.Resize(1024)(im) tform_name = random.choice(list(self.transforms.keys())) im = self.transforms[tform_name](im) im = im.permute(1,2,0) data['image'] = im data['txt'] = data['txt'] + f" {self.extra_label} {tform_name}" return data def __len__(self): return len(self.ds) def hf_dataset( name, image_transforms=[], image_column="image", text_column="text", split='train', image_key='image', caption_key='txt', ): """Make huggingface dataset with appropriate list of transforms applied """ ds = load_dataset(name, split=split) tform = make_tranforms(image_transforms) assert image_column in ds.column_names, f"Didn't find column {image_column} in {ds.column_names}" assert text_column in ds.column_names, f"Didn't find column {text_column} in {ds.column_names}" def pre_process(examples): processed = {} processed[image_key] = [tform(im) for im in examples[image_column]] processed[caption_key] = examples[text_column] return processed ds.set_transform(pre_process) return ds class TextOnly(Dataset): def __init__(self, captions, output_size, image_key="image", caption_key="txt", n_gpus=1): """Returns only captions with dummy images""" self.output_size = output_size self.image_key = image_key self.caption_key = caption_key if isinstance(captions, Path): self.captions = self._load_caption_file(captions) else: self.captions = captions if n_gpus > 1: # hack to make sure that all the captions appear on each gpu repeated = [n_gpus*[x] for x in self.captions] self.captions = [] [self.captions.extend(x) for x in repeated] def __len__(self): return len(self.captions) def __getitem__(self, index): dummy_im = torch.zeros(3, self.output_size, self.output_size) dummy_im = rearrange(dummy_im * 2. - 1., 'c h w -> h w c') return {self.image_key: dummy_im, self.caption_key: self.captions[index]} def _load_caption_file(self, filename): with open(filename, 'rt') as f: captions = f.readlines() return [x.strip('\n') for x in captions] class IdRetreivalDataset(FolderData): def __init__(self, ret_file, *args, **kwargs): super().__init__(*args, **kwargs) with open(ret_file, "rt") as f: self.ret = json.load(f) def __getitem__(self, index): data = super().__getitem__(index) key = self.paths[index].name matches = self.ret[key] if len(matches) > 0: retreived = random.choice(matches) else: retreived = key filename = self.root_dir/retreived im = Image.open(filename).convert("RGB") im = self.process_im(im) # data["match"] = im data["match"] = torch.cat((data["image"], im), dim=-1) return data