Spaces:
Running
Running
import copy | |
import pickle | |
from typing import Dict, List, Tuple, Union | |
import numpy as np | |
import torch | |
import torch.distributed as dist | |
from datasets import Dataset | |
from .cocoeval import COCOeval | |
# Typings | |
_TYPING_BOX = Tuple[float, float, float, float] | |
_TYPING_SCORES = List[float] | |
_TYPING_LABELS = List[int] | |
_TYPING_BOXES = List[_TYPING_BOX] | |
_TYPING_PRED_REF = Union[_TYPING_SCORES, _TYPING_LABELS, _TYPING_BOXES] | |
_TYPING_PREDICTION = Dict[str, _TYPING_PRED_REF] | |
_TYPING_REFERENCE = Dict[str, _TYPING_PRED_REF] | |
_TYPING_PREDICTIONS = Dict[int, _TYPING_PREDICTION] | |
def convert_to_xywh(boxes: torch.Tensor) -> torch.Tensor: | |
""" | |
Convert bounding boxes from (xmin, ymin, xmax, ymax) format to (x, y, width, height) format. | |
Args: | |
boxes (torch.Tensor): Tensor of shape (N, 4) representing bounding boxes in \ | |
(xmin, ymin, xmax, ymax) format. | |
Returns: | |
torch.Tensor: Tensor of shape (N, 4) representing bounding boxes in (x, y, width, height) \ | |
format. | |
""" | |
xmin, ymin, xmax, ymax = boxes.unbind(1) | |
return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1) | |
def create_common_coco_eval( | |
coco_eval: COCOeval, img_ids: List[int], eval_imgs: np.ndarray | |
) -> None: | |
""" | |
Create a common COCO evaluation by merging image IDs and evaluation images into the \ | |
coco_eval object. | |
Args: | |
coco_eval: COCOeval evaluation object. | |
img_ids (List[int]): Tensor of image IDs. | |
eval_imgs (torch.Tensor): Tensor of evaluation images. | |
""" | |
img_ids, eval_imgs = merge(img_ids, eval_imgs) | |
img_ids = list(img_ids) | |
eval_imgs = list(eval_imgs.flatten()) | |
coco_eval.evalImgs = eval_imgs | |
coco_eval.params.imgIds = img_ids | |
coco_eval._paramsEval = copy.deepcopy(coco_eval.params) | |
def merge(img_ids: List[int], eval_imgs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
""" | |
Merge image IDs and evaluation images from different processes. | |
Args: | |
img_ids (List[int]): List of image ID arrays from different processes. | |
eval_imgs (np.ndarray): Evaluation images from different processes. | |
Returns: | |
Tuple[np.ndarray, np.ndarray]: Merged image IDs and evaluation images. | |
""" | |
all_img_ids = all_gather(img_ids) | |
all_eval_imgs = all_gather(eval_imgs) | |
merged_img_ids = [] | |
for p in all_img_ids: | |
merged_img_ids.extend(p) | |
merged_eval_imgs = [] | |
for p in all_eval_imgs: | |
merged_eval_imgs.append(p) | |
merged_img_ids = np.array(merged_img_ids) | |
merged_eval_imgs = np.concatenate(merged_eval_imgs, 2) | |
# keep only unique (and in sorted order) images | |
merged_img_ids, idx = np.unique(merged_img_ids, return_index=True) | |
merged_eval_imgs = merged_eval_imgs[..., idx] | |
return merged_img_ids, merged_eval_imgs | |
def all_gather(data: List[int]) -> List[List[int]]: | |
""" | |
Run all_gather on arbitrary picklable data (not necessarily tensors). | |
Args: | |
data (List[int]): any picklable object | |
Returns: | |
List[List[int]]: list of data gathered from each rank | |
""" | |
world_size = get_world_size() | |
if world_size == 1: | |
return [data] | |
# serialized to a Tensor | |
buffer = pickle.dumps(data) | |
storage = torch.ByteStorage.from_buffer(buffer) | |
tensor = torch.ByteTensor(storage).to("cuda") | |
# obtain Tensor size of each rank | |
local_size = torch.tensor([tensor.numel()], device="cuda") | |
size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)] | |
dist.all_gather(size_list, local_size) | |
size_list = [int(size.item()) for size in size_list] | |
max_size = max(size_list) | |
# receiving Tensor from all ranks | |
# we pad the tensor because torch all_gather does not support | |
# gathering tensors of different shapes | |
tensor_list = [] | |
for _ in size_list: | |
tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda")) | |
if local_size != max_size: | |
padding = torch.empty( | |
size=(max_size - local_size,), dtype=torch.uint8, device="cuda" | |
) | |
tensor = torch.cat((tensor, padding), dim=0) | |
dist.all_gather(tensor_list, tensor) | |
data_list = [] | |
for size, tensor in zip(size_list, tensor_list): | |
buffer = tensor.cpu().numpy().tobytes()[:size] | |
data_list.append(pickle.loads(buffer)) | |
return data_list | |
def get_world_size() -> int: | |
""" | |
Get the number of processes in the distributed environment. | |
Returns: | |
int: Number of processes. | |
""" | |
if not is_dist_avail_and_initialized(): | |
return 1 | |
return dist.get_world_size() | |
def is_dist_avail_and_initialized() -> bool: | |
""" | |
Check if distributed environment is available and initialized. | |
Returns: | |
bool: True if distributed environment is available and initialized, False otherwise. | |
""" | |
return dist.is_available() and dist.is_initialized() | |