import copy import pickle from typing import Dict, List, Tuple, Union import numpy as np import torch import torch.distributed as dist from datasets import Dataset from .cocoeval import COCOeval # Typings _TYPING_BOX = Tuple[float, float, float, float] _TYPING_SCORES = List[float] _TYPING_LABELS = List[int] _TYPING_BOXES = List[_TYPING_BOX] _TYPING_PRED_REF = Union[_TYPING_SCORES, _TYPING_LABELS, _TYPING_BOXES] _TYPING_PREDICTION = Dict[str, _TYPING_PRED_REF] _TYPING_REFERENCE = Dict[str, _TYPING_PRED_REF] _TYPING_PREDICTIONS = Dict[int, _TYPING_PREDICTION] def convert_to_xywh(boxes: torch.Tensor) -> torch.Tensor: """ Convert bounding boxes from (xmin, ymin, xmax, ymax) format to (x, y, width, height) format. Args: boxes (torch.Tensor): Tensor of shape (N, 4) representing bounding boxes in \ (xmin, ymin, xmax, ymax) format. Returns: torch.Tensor: Tensor of shape (N, 4) representing bounding boxes in (x, y, width, height) \ format. """ xmin, ymin, xmax, ymax = boxes.unbind(1) return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1) def create_common_coco_eval( coco_eval: COCOeval, img_ids: List[int], eval_imgs: np.ndarray ) -> None: """ Create a common COCO evaluation by merging image IDs and evaluation images into the \ coco_eval object. Args: coco_eval: COCOeval evaluation object. img_ids (List[int]): Tensor of image IDs. eval_imgs (torch.Tensor): Tensor of evaluation images. """ img_ids, eval_imgs = merge(img_ids, eval_imgs) img_ids = list(img_ids) eval_imgs = list(eval_imgs.flatten()) coco_eval.evalImgs = eval_imgs coco_eval.params.imgIds = img_ids coco_eval._paramsEval = copy.deepcopy(coco_eval.params) def merge(img_ids: List[int], eval_imgs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Merge image IDs and evaluation images from different processes. Args: img_ids (List[int]): List of image ID arrays from different processes. eval_imgs (np.ndarray): Evaluation images from different processes. Returns: Tuple[np.ndarray, np.ndarray]: Merged image IDs and evaluation images. """ all_img_ids = all_gather(img_ids) all_eval_imgs = all_gather(eval_imgs) merged_img_ids = [] for p in all_img_ids: merged_img_ids.extend(p) merged_eval_imgs = [] for p in all_eval_imgs: merged_eval_imgs.append(p) merged_img_ids = np.array(merged_img_ids) merged_eval_imgs = np.concatenate(merged_eval_imgs, 2) # keep only unique (and in sorted order) images merged_img_ids, idx = np.unique(merged_img_ids, return_index=True) merged_eval_imgs = merged_eval_imgs[..., idx] return merged_img_ids, merged_eval_imgs def all_gather(data: List[int]) -> List[List[int]]: """ Run all_gather on arbitrary picklable data (not necessarily tensors). Args: data (List[int]): any picklable object Returns: List[List[int]]: list of data gathered from each rank """ world_size = get_world_size() if world_size == 1: return [data] # serialized to a Tensor buffer = pickle.dumps(data) storage = torch.ByteStorage.from_buffer(buffer) tensor = torch.ByteTensor(storage).to("cuda") # obtain Tensor size of each rank local_size = torch.tensor([tensor.numel()], device="cuda") size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] dist.all_gather(size_list, local_size) size_list = [int(size.item()) for size in size_list] max_size = max(size_list) # receiving Tensor from all ranks # we pad the tensor because torch all_gather does not support # gathering tensors of different shapes tensor_list = [] for _ in size_list: tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) if local_size != max_size: padding = torch.empty( size=(max_size - local_size,), dtype=torch.uint8, device="cuda" ) tensor = torch.cat((tensor, padding), dim=0) dist.all_gather(tensor_list, tensor) data_list = [] for size, tensor in zip(size_list, tensor_list): buffer = tensor.cpu().numpy().tobytes()[:size] data_list.append(pickle.loads(buffer)) return data_list def get_world_size() -> int: """ Get the number of processes in the distributed environment. Returns: int: Number of processes. """ if not is_dist_avail_and_initialized(): return 1 return dist.get_world_size() def is_dist_avail_and_initialized() -> bool: """ Check if distributed environment is available and initialized. Returns: bool: True if distributed environment is available and initialized, False otherwise. """ return dist.is_available() and dist.is_initialized()