Spaces:
Running
Running
| """ | |
| This utils script contains PORTAGE of wai-core ops methods for MapAnything. | |
| """ | |
| import numpy as np | |
| import torch | |
| import torch.nn.functional as F | |
| from PIL import Image | |
| def to_numpy( | |
| data: torch.Tensor | np.ndarray | int | float, | |
| dtype: np.dtype | str | type = np.float32, | |
| ) -> np.ndarray: | |
| """ | |
| Convert data to a NumPy array with the specified dtype (default: float32). | |
| This function handles conversion from NumPy arrays and PyTorch tensors to a NumPy array. | |
| Args: | |
| data: Input data (torch.Tensor, np.ndarray, or scalar) | |
| dtype: Target data type (NumPy dtype, str, or type). Default: np.float32. | |
| Returns: | |
| Converted data as NumPy array with specified dtype. | |
| """ | |
| # Set default dtype if not defined | |
| assert dtype is not None, "dtype cannot be None" | |
| dtype = np.dtype(dtype) | |
| # Handle torch.Tensor | |
| if isinstance(data, torch.Tensor): | |
| return data.detach().cpu().numpy().astype(dtype) | |
| # Handle numpy.ndarray | |
| if isinstance(data, np.ndarray): | |
| return data.astype(dtype) | |
| # Handle scalar values | |
| if isinstance(data, (int, float)): | |
| return np.array(data, dtype=dtype) | |
| raise NotImplementedError(f"Unsupported data type: {type(data)}") | |
| def get_dtype_device( | |
| data: torch.Tensor | np.ndarray | dict | list, | |
| ) -> tuple[torch.dtype | np.dtype | None, torch.device | str | type | None]: | |
| """ | |
| Determine the data type and device of the input data. | |
| This function recursively inspects the input data and determines its data type | |
| and device. It handles PyTorch tensors, NumPy arrays, dictionaries, and lists. | |
| Args: | |
| data: Input data (torch.Tensor, np.ndarray, dict, list, or other) | |
| Returns: | |
| tuple: (dtype, device) where: | |
| - dtype: The data type (torch.dtype or np.dtype) | |
| - device: The device (torch.device, 'cpu', 'cuda:X', or np.ndarray) | |
| Raises: | |
| ValueError: If tensors in a dictionary are on different CUDA devices | |
| """ | |
| if isinstance(data, torch.Tensor): | |
| return data.dtype, data.device | |
| if isinstance(data, np.ndarray): | |
| return data.dtype, np.ndarray | |
| if isinstance(data, dict): | |
| dtypes = {get_dtype_device(v)[0] for v in data.values()} | |
| devices = {get_dtype_device(v)[1] for v in data.values()} | |
| cuda_devices = {device for device in devices if str(device).startswith("cuda")} | |
| cpu_devices = {device for device in devices if str(device).startswith("cpu")} | |
| if (len(cuda_devices) > 0) or (len(cpu_devices) > 0): | |
| # torch.tensor | |
| dtype = torch.float | |
| if all(dtype == torch.half for dtype in dtypes): | |
| dtype = torch.half | |
| device = None | |
| if len(cuda_devices) > 1: | |
| raise ValueError("All tensors must be on the same device") | |
| if len(cuda_devices) == 1: | |
| device = list(cuda_devices)[0] | |
| if (device is None) and (len(cpu_devices) == 1): | |
| device = list(cpu_devices)[0] | |
| else: | |
| dtype = np.float32 | |
| # Fix typo in numpy float16 check | |
| if all(dtype == np.float16 for dtype in dtypes): | |
| dtype = np.float16 | |
| device = np.ndarray | |
| elif isinstance(data, list): | |
| if not data: # Handle empty list case | |
| return None, None | |
| dtype, device = get_dtype_device(data[0]) | |
| else: | |
| return np.float32, np.ndarray | |
| return dtype, device | |
| def crop( | |
| data: np.ndarray | torch.Tensor | Image.Image, | |
| bbox: tuple[int, int, int, int] | tuple[int, int], | |
| ) -> np.ndarray | torch.Tensor | Image.Image: | |
| """ | |
| Crop data of different formats (numpy arrays, PyTorch tensors, PIL Images) to a target size. | |
| Args: | |
| data: Input data to resize (numpy.ndarray, torch.Tensor, or PIL.Image.Image) | |
| size: Target size as tuple (offset_height, offset_width, height, width) or tuple (height, width) | |
| Returns: | |
| Cropped data in the same format as the input | |
| """ | |
| if len(bbox) == 4: | |
| offset_height, offset_width, target_height, target_width = bbox | |
| elif len(bbox) == 2: | |
| target_height, target_width = bbox | |
| offset_height, offset_width = 0, 0 | |
| else: | |
| raise ValueError(f"Unsupported size length {len(bbox)}.") | |
| end_height = offset_height + target_height | |
| end_width = offset_width + target_width | |
| if any([sz < 0 for sz in bbox]): | |
| raise ValueError("Bounding box can't have negative values.") | |
| if isinstance(data, np.ndarray): | |
| if ( | |
| max(offset_height, end_height) > data.shape[0] | |
| or max(offset_width, end_width) > data.shape[1] | |
| ): | |
| raise ValueError("Invalid bounding box.") | |
| cropped_data = data[offset_height:end_height, offset_width:end_width, ...] | |
| return cropped_data | |
| # Handle PIL images | |
| elif isinstance(data, Image.Image): | |
| if ( | |
| max(offset_height, end_height) > data.size[1] | |
| or max(offset_width, end_width) > data.size[0] | |
| ): | |
| raise ValueError("Invalid bounding box.") | |
| return data.crop((offset_width, offset_height, end_width, end_height)) | |
| # Handle PyTorch tensors | |
| elif isinstance(data, torch.Tensor): | |
| if data.is_nested: | |
| # special handling for nested tensors | |
| return torch.stack([crop(nested_tensor, bbox) for nested_tensor in data]) | |
| if ( | |
| max(offset_height, end_height) > data.shape[-2] | |
| or max(offset_width, end_width) > data.shape[-1] | |
| ): | |
| raise ValueError("Invalid bounding box.") | |
| cropped_data = data[..., offset_height:end_height, offset_width:end_width] | |
| return cropped_data | |
| else: | |
| raise TypeError(f"Unsupported data type '{type(data)}'.") | |
| def stack( | |
| data: list[ | |
| dict[str, torch.Tensor | np.ndarray] | |
| | list[torch.Tensor | np.ndarray] | |
| | tuple[torch.Tensor | np.ndarray] | |
| ], | |
| ) -> dict[str, torch.Tensor | np.ndarray] | list[torch.Tensor | np.ndarray]: | |
| """ | |
| Stack a list of dictionaries into a single dictionary with stacked values. | |
| Or when given a list of sublists, stack the sublists using torch or numpy stack | |
| if the items are of equal size, or nested tensors if the items are PyTorch tensors | |
| of different size. | |
| This utility function is similar to PyTorch's collate function, but specifically | |
| designed for stacking dictionaries of numpy arrays or PyTorch tensors. | |
| Args: | |
| data (list): A list of dictionaries with the same keys, where values are | |
| either numpy arrays or PyTorch tensors. | |
| OR | |
| A list of sublist, where the values of sublists are PyTorch tensors | |
| or np arrrays. | |
| Returns: | |
| dict: A dictionary with the same keys as input dictionaries, but with values | |
| stacked along a new first dimension. | |
| OR | |
| list: If the input was a list with sublists, it returns a list with a stacked | |
| output for each original input sublist. | |
| Raises: | |
| ValueError: If dictionaries in the list have inconsistent keys. | |
| NotImplementedError: If input is not a list or contains non-dictionary elements. | |
| """ | |
| if not isinstance(data, list): | |
| raise NotImplementedError(f"Stack: Data type not supported: {data}") | |
| if len(data) == 0: | |
| return data | |
| if all(isinstance(entry, dict) for entry in data): | |
| stacked_data = {} | |
| keys = list(data[0].keys()) | |
| if any(set(entry.keys()) != set(keys) for entry in data): | |
| raise ValueError("Data not consistent for stacking") | |
| for key in keys: | |
| stacked_data[key] = [] | |
| for entry in data: | |
| stacked_data[key].append(entry[key]) | |
| # stack it according to data format | |
| if all(isinstance(v, np.ndarray) for v in stacked_data[key]): | |
| stacked_data[key] = np.stack(stacked_data[key]) | |
| elif all(isinstance(v, torch.Tensor) for v in stacked_data[key]): | |
| # Check if all tensors have the same shape | |
| first_shape = stacked_data[key][0].shape | |
| if all(tensor.shape == first_shape for tensor in stacked_data[key]): | |
| stacked_data[key] = torch.stack(stacked_data[key]) | |
| else: | |
| # Use nested tensors if shapes are not consistent | |
| stacked_data[key] = torch.nested.nested_tensor(stacked_data[key]) | |
| return stacked_data | |
| if all(isinstance(entry, list) for entry in data): | |
| # new stacked data will be a list with all of the sublist | |
| stacked_data = [] | |
| for sublist in data: | |
| # stack it according to data format | |
| if all(isinstance(v, np.ndarray) for v in sublist): | |
| stacked_data.append(np.stack(sublist)) | |
| elif all(isinstance(v, torch.Tensor) for v in sublist): | |
| # Check if all tensors have the same shape | |
| first_shape = sublist[0].shape | |
| if all(tensor.shape == first_shape for tensor in sublist): | |
| stacked_data.append(torch.stack(sublist)) | |
| else: | |
| # Use nested tensors if shapes are not consistent | |
| stacked_data.append(torch.nested.nested_tensor(sublist)) | |
| return stacked_data | |
| raise NotImplementedError(f"Stack: Data type not supported: {data}") | |
| def resize( | |
| data: np.ndarray | torch.Tensor | Image.Image, | |
| size: tuple[int, int] | int | None = None, | |
| scale: float | None = None, | |
| modality_format: str | None = None, | |
| ) -> np.ndarray | torch.Tensor | Image.Image: | |
| """ | |
| Resize data of different formats (numpy arrays, PyTorch tensors, PIL Images) to a target size. | |
| Args: | |
| data: Input data to resize (numpy.ndarray, torch.Tensor, or PIL.Image.Image) | |
| size: Target size as tuple (height, width) or single int for long-side scaling | |
| scale: Scale factor to apply to the original dimensions | |
| modality_format: Type of data being resized ('depth', 'normals', or None) | |
| Affects interpolation method used | |
| Returns: | |
| Resized data in the same format as the input | |
| Raises: | |
| ValueError: If neither size nor scale is provided, or if both are provided | |
| TypeError: If data is not a supported type | |
| """ | |
| # Validate input parameters | |
| if size is not None and scale is not None: | |
| raise ValueError("Only one of size or scale should be provided.") | |
| # Calculate size from scale if needed | |
| if size is None: | |
| if scale is None: | |
| raise ValueError("Either size or scale must be provided.") | |
| size = (1, 1) | |
| if isinstance(data, (np.ndarray, torch.Tensor)): | |
| size = (int(data.shape[-2] * scale), int(data.shape[-1] * scale)) | |
| elif isinstance(data, Image.Image): | |
| size = (int(data.size[1] * scale), int(data.size[0] * scale)) | |
| else: | |
| raise TypeError(f"Unsupported data type '{type(data)}'.") | |
| # Handle long-side scaling when size is a single integer | |
| elif isinstance(size, int): | |
| long_side = size | |
| if isinstance(data, (np.ndarray, torch.Tensor)): | |
| if isinstance(data, torch.Tensor) and data.is_nested: | |
| raise ValueError( | |
| "Long-side scaling not support for nested tensors, use fixed size instead." | |
| ) | |
| h, w = data.shape[-2], data.shape[-1] | |
| elif isinstance(data, Image.Image): | |
| w, h = data.size | |
| else: | |
| raise TypeError(f"Unsupported data type '{type(data)}'.") | |
| if h > w: | |
| size = (long_side, int(w * long_side / h)) | |
| else: | |
| size = (int(h * long_side / w), long_side) | |
| target_height, target_width = size | |
| # Set interpolation method based on modality | |
| if modality_format in ["depth", "normals"]: | |
| interpolation = Image.Resampling.NEAREST | |
| torch_interpolation = "nearest" | |
| else: | |
| interpolation = Image.Resampling.LANCZOS | |
| torch_interpolation = "bilinear" | |
| # Handle numpy arrays | |
| if isinstance(data, np.ndarray): | |
| pil_image = Image.fromarray(data) | |
| resized_image = pil_image.resize((target_width, target_height), interpolation) | |
| return np.array(resized_image) | |
| # Handle PIL images | |
| elif isinstance(data, Image.Image): | |
| return data.resize((target_width, target_height), interpolation) | |
| # Handle PyTorch tensors | |
| elif isinstance(data, torch.Tensor): | |
| if data.is_nested: | |
| # special handling for nested tensors | |
| return torch.stack( | |
| [ | |
| resize(nested_tensor, size, scale, modality_format) | |
| for nested_tensor in data | |
| ] | |
| ) | |
| original_dim = data.ndim | |
| if original_dim == 2: # (H, W) | |
| data = data.unsqueeze(0).unsqueeze(0) # Add channel and batch dimensions | |
| elif original_dim == 3: # (C/B, H W) | |
| if modality_format == "depth": | |
| data = data.unsqueeze(1) # channel batch dimension | |
| else: | |
| data = data.unsqueeze(0) # Add batch dimension | |
| resized_tensor = F.interpolate( | |
| data, | |
| size=(target_height, target_width), | |
| mode=torch_interpolation, | |
| align_corners=False if torch_interpolation != "nearest" else None, | |
| ) | |
| if original_dim == 2: | |
| return resized_tensor.squeeze(0).squeeze( | |
| 0 | |
| ) # Remove batch and channel dimensions | |
| elif original_dim == 3: | |
| if modality_format == "depth": | |
| return resized_tensor.squeeze(1) # Remove channel dimension | |
| return resized_tensor.squeeze(0) # Remove batch dimension | |
| else: | |
| return resized_tensor | |
| else: | |
| raise TypeError(f"Unsupported data type '{type(data)}'.") | |